You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/04/17 14:22:05 UTC
[skywalking] branch metrics updated: Adopt polished query protocol
and support top n in old/new query.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch metrics
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/metrics by this push:
new 52b66f3 Adopt polished query protocol and support top n in old/new query.
52b66f3 is described below
commit 52b66f3a1a5d31d4e665abe79a5d9987804100af
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Fri Apr 17 22:21:15 2020 +0800
Adopt polished query protocol and support top n in old/new query.
---
.../oap/server/core/analysis/IDManager.java | 7 +-
.../server/core/query/AggregationQueryService.java | 18 ++--
.../server/core/query/TopNRecordsQueryService.java | 10 +-
.../oap/server/core/query/input/TopNCondition.java | 5 +
.../core/storage/query/ITopNRecordsQueryDAO.java | 15 ++-
.../oap/query/graphql/resolver/MetricsQuery.java | 40 ++++---
.../query/graphql/resolver/TopNRecordsQuery.java | 54 +++++-----
.../query/graphql/type/TopNRecordsCondition.java | 1 +
.../elasticsearch/query/AggregationQueryEsDAO.java | 120 +++++----------------
.../elasticsearch/query/TopNRecordsQueryEsDAO.java | 29 +++--
.../query/AggregationQueryEs7DAO.java | 64 ++++++-----
.../plugin/jdbc/h2/dao/H2AggregationQueryDAO.java | 15 +--
.../plugin/jdbc/h2/dao/H2TopNRecordsQueryDAO.java | 30 +++---
13 files changed, 185 insertions(+), 223 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/IDManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/IDManager.java
index 4eb4b47..67e2b91 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/IDManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/IDManager.java
@@ -39,8 +39,11 @@ public class IDManager {
* @return encoded service id
*/
public static String buildId(String name, NodeType type) {
- return encode(name) + Const.SERVICE_ID_CONNECTOR + BooleanUtils.booleanToValue(
- type.equals(NodeType.Normal));
+ return buildId(name, type.equals(NodeType.Normal));
+ }
+
+ public static String buildId(String name, boolean isNormal) {
+ return encode(name) + Const.SERVICE_ID_CONNECTOR + BooleanUtils.booleanToValue(isNormal);
}
/**
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/AggregationQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/AggregationQueryService.java
index 57b9d79..40a4c82 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/AggregationQueryService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/AggregationQueryService.java
@@ -22,8 +22,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.apm.util.StringUtil;
+import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
-import org.apache.skywalking.oap.server.core.analysis.NodeType;
import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.query.input.TopNCondition;
@@ -52,18 +52,18 @@ public class AggregationQueryService implements Service {
return aggregationQueryDAO;
}
- public List<SelectedRecord> sortMetrics(TopNCondition metrics, Duration duration) throws IOException {
- final String valueCName = ValueColumnMetadata.INSTANCE.getValueCName(metrics.getName());
+ public List<SelectedRecord> sortMetrics(TopNCondition condition, Duration duration) throws IOException {
+ final String valueCName = ValueColumnMetadata.INSTANCE.getValueCName(condition.getName());
List<KeyValue> additionalConditions = null;
- if (StringUtil.isNotEmpty(metrics.getParentService())) {
+ if (StringUtil.isNotEmpty(condition.getParentService())) {
additionalConditions = new ArrayList<>(1);
- final String serviceId = IDManager.ServiceID.buildId(metrics.getParentService(), NodeType.Normal);
+ final String serviceId = IDManager.ServiceID.buildId(condition.getParentService(), condition.isNormal());
additionalConditions.add(new KeyValue(InstanceTraffic.SERVICE_ID, serviceId));
}
final List<SelectedRecord> selectedRecords = getAggregationQueryDAO().sortMetrics(
- metrics, valueCName, duration, additionalConditions);
+ condition, valueCName, duration, additionalConditions);
selectedRecords.forEach(selectedRecord -> {
- switch (metrics.getScope()) {
+ switch (condition.getScope()) {
case Service:
selectedRecord.setName(IDManager.ServiceID.analysisId(selectedRecord.getId()).getName());
break;
@@ -71,8 +71,10 @@ public class AggregationQueryService implements Service {
selectedRecord.setName(IDManager.ServiceInstanceID.analysisId(selectedRecord.getId()).getName());
break;
case Endpoint:
- selectedRecord.setName(IDManager.ServiceInstanceID.analysisId(selectedRecord.getId()).getName());
+ selectedRecord.setName(IDManager.EndpointID.analysisId(selectedRecord.getId()).getEndpointName());
break;
+ default:
+ selectedRecord.setName(Const.UNKNOWN);
}
});
return selectedRecords;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopNRecordsQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopNRecordsQueryService.java
index 3746e41..1e8e4c8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopNRecordsQueryService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopNRecordsQueryService.java
@@ -20,8 +20,9 @@ package org.apache.skywalking.oap.server.core.query;
import java.io.IOException;
import java.util.List;
-import org.apache.skywalking.oap.server.core.query.enumeration.Order;
-import org.apache.skywalking.oap.server.core.query.type.TopNRecord;
+import org.apache.skywalking.oap.server.core.query.input.Duration;
+import org.apache.skywalking.oap.server.core.query.input.TopNCondition;
+import org.apache.skywalking.oap.server.core.query.type.SelectedRecord;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
@@ -44,8 +45,7 @@ public class TopNRecordsQueryService implements Service {
return topNRecordsQueryDAO;
}
- public List<TopNRecord> getTopNRecords(long startSecondTB, long endSecondTB, String metricName, String serviceId,
- int topN, Order order) throws IOException {
- return getTopNRecordsQueryDAO().getTopNRecords(startSecondTB, endSecondTB, metricName, serviceId, topN, order);
+ public List<SelectedRecord> readSampledRecords(TopNCondition condition, Duration duration) throws IOException {
+ return getTopNRecordsQueryDAO().readSampledRecords(condition, duration);
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/input/TopNCondition.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/input/TopNCondition.java
index a55c7bd..6964184 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/input/TopNCondition.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/input/TopNCondition.java
@@ -40,6 +40,11 @@ public class TopNCondition {
*/
private String parentService;
/**
+ * Normal service is the service having installed agent or metrics reported directly. Unnormal service is
+ * conjectural service, usually detected by the agent.
+ */
+ private boolean isNormal;
+ /**
* Indicate the metrics entity scope. Because this is a top list, don't need to set the Entity like the
* MetricsCondition. Only accept scope = {@link Scope#Service} {@link Scope#ServiceInstance} and {@link
* Scope#Endpoint}, ignore others due to those are pointless.
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopNRecordsQueryDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopNRecordsQueryDAO.java
index 3be289f..a818d5c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopNRecordsQueryDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopNRecordsQueryDAO.java
@@ -20,11 +20,18 @@ package org.apache.skywalking.oap.server.core.storage.query;
import java.io.IOException;
import java.util.List;
-import org.apache.skywalking.oap.server.core.query.enumeration.Order;
-import org.apache.skywalking.oap.server.core.query.type.TopNRecord;
+import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor;
+import org.apache.skywalking.oap.server.core.query.input.Duration;
+import org.apache.skywalking.oap.server.core.query.input.TopNCondition;
+import org.apache.skywalking.oap.server.core.query.type.SelectedRecord;
import org.apache.skywalking.oap.server.library.module.Service;
+/**
+ * Query the records sampled by {@link Stream} = {@link TopNStreamProcessor}
+ *
+ * @since 8.0.0
+ */
public interface ITopNRecordsQueryDAO extends Service {
- List<TopNRecord> getTopNRecords(long startSecondTB, long endSecondTB, String metricName, String serviceId, int topN,
- Order order) throws IOException;
+ List<SelectedRecord> readSampledRecords(TopNCondition condition, Duration duration) throws IOException;
}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetricsQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetricsQuery.java
index 59f26a4..6efbb04 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetricsQuery.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetricsQuery.java
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.query.AggregationQueryService;
import org.apache.skywalking.oap.server.core.query.MetricQueryService;
+import org.apache.skywalking.oap.server.core.query.TopNRecordsQueryService;
import org.apache.skywalking.oap.server.core.query.enumeration.MetricsType;
import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.query.input.MetricsCondition;
@@ -43,6 +44,7 @@ public class MetricsQuery implements GraphQLQueryResolver {
private final ModuleManager moduleManager;
private MetricQueryService metricQueryService;
private AggregationQueryService queryService;
+ private TopNRecordsQueryService topNRecordsQueryService;
public MetricsQuery(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
@@ -57,10 +59,17 @@ public class MetricsQuery implements GraphQLQueryResolver {
return queryService;
}
+ private TopNRecordsQueryService getTopNRecordsQueryService() {
+ if (topNRecordsQueryService == null) {
+ this.topNRecordsQueryService = moduleManager.find(CoreModule.NAME)
+ .provider()
+ .getService(TopNRecordsQueryService.class);
+ }
+ return topNRecordsQueryService;
+ }
+
/**
* Metrics definition metadata query. Response the metrics type which determines the suitable query methods.
- *
- * @since 8.0.0
*/
public MetricsType typeOfMetrics(String name) throws IOException {
return MetricsType.UNKNOWN;
@@ -68,38 +77,31 @@ public class MetricsQuery implements GraphQLQueryResolver {
/**
* Read metrics single value in the duration of required metrics
- *
- * @since 8.0.0
*/
- public int readMetricsValue(MetricsCondition metrics, Duration duration) throws IOException {
+ public int readMetricsValue(MetricsCondition condition, Duration duration) throws IOException {
return 0;
}
/**
* Read time-series values in the duration of required metrics
- *
- * @since 8.0.0
*/
- public List<MetricsValues> readMetricsValues(MetricsCondition metrics, Duration duration) throws IOException {
+ public List<MetricsValues> readMetricsValues(MetricsCondition condition, Duration duration) throws IOException {
return Collections.emptyList();
}
/**
* Read entity list of required metrics and parent entity type.
- *
- * @since 8.0.0
*/
- public List<SelectedRecord> sortMetrics(TopNCondition metrics, Duration duration) throws IOException {
- return getQueryService().sortMetrics(metrics, duration);
+ public List<SelectedRecord> sortMetrics(TopNCondition condition, Duration duration) throws IOException {
+ return getQueryService().sortMetrics(condition, duration);
}
/**
* Read value in the given time duration, usually as a linear.
*
* @param labels the labels you need to query.
- * @since 8.0.0
*/
- public List<MetricsValues> readLabeledMetricsValues(MetricsCondition metrics,
+ public List<MetricsValues> readLabeledMetricsValues(MetricsCondition condition,
List<String> labels,
Duration duration) throws IOException {
return Collections.emptyList();
@@ -107,19 +109,15 @@ public class MetricsQuery implements GraphQLQueryResolver {
/**
* Heatmap is bucket based value statistic result.
- *
- * @since 8.0.0
*/
- public HeatMap readHeatMap(MetricsCondition metrics, Duration duration) throws IOException {
+ public HeatMap readHeatMap(MetricsCondition condition, Duration duration) throws IOException {
return new HeatMap();
}
/**
* Read the sampled records.
- *
- * @since 8.0.0
*/
- public List<SelectedRecord> readSampledRecords(TopNCondition metrics, Duration duration) throws IOException {
- return Collections.emptyList();
+ public List<SelectedRecord> readSampledRecords(TopNCondition condition, Duration duration) throws IOException {
+ return getTopNRecordsQueryService().readSampledRecords(condition, duration);
}
}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopNRecordsQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopNRecordsQuery.java
index 34fa050..0fbc347 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopNRecordsQuery.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopNRecordsQuery.java
@@ -20,47 +20,47 @@ package org.apache.skywalking.oap.query.graphql.resolver;
import com.coxautodev.graphql.tools.GraphQLQueryResolver;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.oap.query.graphql.type.TopNRecordsCondition;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.query.DurationUtils;
-import org.apache.skywalking.oap.server.core.query.TopNRecordsQueryService;
-import org.apache.skywalking.oap.server.core.query.enumeration.Order;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
+import org.apache.skywalking.oap.server.core.query.input.TopNCondition;
+import org.apache.skywalking.oap.server.core.query.type.SelectedRecord;
import org.apache.skywalking.oap.server.core.query.type.TopNRecord;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
- * @since 8.0.0 This query is replaced by {@link MetricsQuery}
+ * @since 8.0.0 This query is replaced by {@link MetricsQuery}, all queries have been delegated to there.
*/
@Deprecated
public class TopNRecordsQuery implements GraphQLQueryResolver {
- private final ModuleManager moduleManager;
- private TopNRecordsQueryService topNRecordsQueryService;
+ private MetricsQuery query;
public TopNRecordsQuery(ModuleManager moduleManager) {
- this.moduleManager = moduleManager;
- }
-
- private TopNRecordsQueryService getTopNRecordsQueryService() {
- if (topNRecordsQueryService == null) {
- this.topNRecordsQueryService = moduleManager.find(CoreModule.NAME)
- .provider()
- .getService(TopNRecordsQueryService.class);
- }
- return topNRecordsQueryService;
+ query = new MetricsQuery(moduleManager);
}
public List<TopNRecord> getTopNRecords(TopNRecordsCondition condition) throws IOException {
- long startSecondTB = DurationUtils.INSTANCE.startTimeDurationToSecondTimeBucket(
- condition.getDuration().getStep(), condition.getDuration().getStart());
- long endSecondTB = DurationUtils.INSTANCE.endTimeDurationToSecondTimeBucket(
- condition.getDuration().getStep(), condition.getDuration().getEnd());
-
- String metricName = condition.getMetricName();
- Order order = condition.getOrder();
- int topN = condition.getTopN();
+ TopNCondition topNCondition = new TopNCondition();
+ topNCondition.setName(condition.getMetricName());
+ final IDManager.ServiceID.ServiceIDDefinition serviceIDDefinition = IDManager.ServiceID.analysisId(
+ condition.getServiceId());
+ topNCondition.setParentService(serviceIDDefinition.getName());
+ topNCondition.setNormal(serviceIDDefinition.isReal());
+ // Scope is not required in topN record query.
+ // topNCondition.setScope();
+ topNCondition.setOrder(condition.getOrder());
+ topNCondition.setTopN(condition.getTopN());
- return getTopNRecordsQueryService().getTopNRecords(
- startSecondTB, endSecondTB, metricName, condition.getServiceId(), topN, order);
+ final List<SelectedRecord> selectedRecords = query.readSampledRecords(topNCondition, condition.getDuration());
+ List<TopNRecord> list = new ArrayList<>(selectedRecords.size());
+ selectedRecords.forEach(record -> {
+ TopNRecord top = new TopNRecord();
+ top.setStatement(record.getName());
+ top.setTraceId(record.getRefId());
+ top.setLatency(Long.parseLong(record.getValue()));
+ list.add(top);
+ });
+ return list;
}
}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/TopNRecordsCondition.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/TopNRecordsCondition.java
index c091980..ede62a2 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/TopNRecordsCondition.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/TopNRecordsCondition.java
@@ -23,6 +23,7 @@ import lombok.Setter;
import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.query.enumeration.Order;
+@Deprecated
@Getter
@Setter
public class TopNRecordsCondition {
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AggregationQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AggregationQueryEsDAO.java
index 441815f..546a6e2 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AggregationQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AggregationQueryEsDAO.java
@@ -21,22 +21,20 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.skywalking.oap.server.core.analysis.DownSampling;
-import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
-import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.query.enumeration.Order;
-import org.apache.skywalking.oap.server.core.query.type.TopNEntity;
+import org.apache.skywalking.oap.server.core.query.input.Duration;
+import org.apache.skywalking.oap.server.core.query.input.TopNCondition;
+import org.apache.skywalking.oap.server.core.query.type.KeyValue;
+import org.apache.skywalking.oap.server.core.query.type.SelectedRecord;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
-import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.avg.Avg;
import org.elasticsearch.search.builder.SearchSourceBuilder;
@@ -47,105 +45,41 @@ public class AggregationQueryEsDAO extends EsDAO implements IAggregationQueryDAO
}
@Override
- public List<TopNEntity> getServiceTopN(String indexName, String valueCName, int topN, DownSampling downsampling,
- long startTB, long endTB, Order order) throws IOException {
+ public List<SelectedRecord> sortMetrics(final TopNCondition metrics,
+ final String valueColumnName,
+ final Duration duration,
+ final List<KeyValue> additionalConditions) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
- sourceBuilder.query(QueryBuilders.rangeQuery(Metrics.TIME_BUCKET).lte(endTB).gte(startTB));
- return aggregation(indexName, valueCName, sourceBuilder, topN, order);
- }
+ sourceBuilder.query(QueryBuilders.rangeQuery(Metrics.TIME_BUCKET)
+ .lte(duration.getEndTimeBucket())
+ .gte(duration.getStartTimeBucket()));
- @Override
- public List<TopNEntity> getAllServiceInstanceTopN(String indexName,
- String valueCName,
- int topN,
- DownSampling downsampling,
- long startTB,
- long endTB,
- Order order) throws IOException {
- SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
- sourceBuilder.query(QueryBuilders.rangeQuery(Metrics.TIME_BUCKET).lte(endTB).gte(startTB));
- return aggregation(indexName, valueCName, sourceBuilder, topN, order);
- }
-
- @Override
- public List<TopNEntity> getServiceInstanceTopN(String serviceId,
- String indexName,
- String valueCName,
- int topN,
- DownSampling downsampling,
- long startTB,
- long endTB,
- Order order) throws IOException {
- SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
-
- BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
- sourceBuilder.query(boolQueryBuilder);
-
- boolQueryBuilder.must().add(QueryBuilders.rangeQuery(Metrics.TIME_BUCKET).lte(endTB).gte(startTB));
- boolQueryBuilder.must().add(QueryBuilders.termQuery(InstanceTraffic.SERVICE_ID, serviceId));
-
- return aggregation(indexName, valueCName, sourceBuilder, topN, order);
- }
-
- @Override
- public List<TopNEntity> getAllEndpointTopN(String indexName, String valueCName, int topN, DownSampling downsampling,
- long startTB, long endTB, Order order) throws IOException {
- SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
- sourceBuilder.query(QueryBuilders.rangeQuery(Metrics.TIME_BUCKET).lte(endTB).gte(startTB));
- return aggregation(indexName, valueCName, sourceBuilder, topN, order);
- }
-
- @Override
- public List<TopNEntity> getEndpointTopN(String serviceId,
- String indexName,
- String valueCName,
- int topN,
- DownSampling downsampling,
- long startTB,
- long endTB,
- Order order) throws IOException {
- SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
-
- BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
- sourceBuilder.query(boolQueryBuilder);
-
- boolQueryBuilder.must().add(QueryBuilders.rangeQuery(Metrics.TIME_BUCKET).lte(endTB).gte(startTB));
- boolQueryBuilder.must().add(QueryBuilders.termQuery(EndpointTraffic.SERVICE_ID, serviceId));
-
- return aggregation(indexName, valueCName, sourceBuilder, topN, order);
- }
-
- protected List<TopNEntity> aggregation(String indexName, String valueCName, SearchSourceBuilder sourceBuilder,
- int topN, Order order) throws IOException {
boolean asc = false;
- if (order.equals(Order.ASC)) {
+ if (metrics.getOrder().equals(Order.ASC)) {
asc = true;
}
- TermsAggregationBuilder aggregationBuilder = aggregationBuilder(valueCName, topN, asc);
-
- sourceBuilder.aggregation(aggregationBuilder);
+ sourceBuilder.aggregation(
+ AggregationBuilders.terms(Metrics.ENTITY_ID)
+ .field(Metrics.ENTITY_ID)
+ .order(BucketOrder.aggregation(valueColumnName, asc))
+ .size(metrics.getTopN())
+ .subAggregation(AggregationBuilders.avg(valueColumnName).field(valueColumnName))
+ );
- SearchResponse response = getClient().search(indexName, sourceBuilder);
+ SearchResponse response = getClient().search(metrics.getName(), sourceBuilder);
- List<TopNEntity> topNEntities = new ArrayList<>();
+ List<SelectedRecord> topNList = new ArrayList<>();
Terms idTerms = response.getAggregations().get(Metrics.ENTITY_ID);
for (Terms.Bucket termsBucket : idTerms.getBuckets()) {
- TopNEntity topNEntity = new TopNEntity();
- topNEntity.setId(termsBucket.getKeyAsString());
- Avg value = termsBucket.getAggregations().get(valueCName);
- topNEntity.setValue((long) value.getValue());
- topNEntities.add(topNEntity);
+ SelectedRecord record = new SelectedRecord();
+ record.setId(termsBucket.getKeyAsString());
+ Avg value = termsBucket.getAggregations().get(valueColumnName);
+ record.setValue(String.valueOf((long) value.getValue()));
+ topNList.add(record);
}
- return topNEntities;
+ return topNList;
}
- protected TermsAggregationBuilder aggregationBuilder(final String valueCName, final int topN, final boolean asc) {
- return AggregationBuilders.terms(Metrics.ENTITY_ID)
- .field(Metrics.ENTITY_ID)
- .order(BucketOrder.aggregation(valueCName, asc))
- .size(topN)
- .subAggregation(AggregationBuilders.avg(valueCName).field(valueCName));
- }
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopNRecordsQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopNRecordsQueryEsDAO.java
index c957dc1..41eb8f6 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopNRecordsQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopNRecordsQueryEsDAO.java
@@ -21,9 +21,12 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.query.enumeration.Order;
-import org.apache.skywalking.oap.server.core.query.type.TopNRecord;
+import org.apache.skywalking.oap.server.core.query.input.Duration;
+import org.apache.skywalking.oap.server.core.query.input.TopNCondition;
+import org.apache.skywalking.oap.server.core.query.type.SelectedRecord;
import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
@@ -40,24 +43,28 @@ public class TopNRecordsQueryEsDAO extends EsDAO implements ITopNRecordsQueryDAO
}
@Override
- public List<TopNRecord> getTopNRecords(long startSecondTB, long endSecondTB, String metricName, String serviceId,
- int topN, Order order) throws IOException {
+ public List<SelectedRecord> readSampledRecords(final TopNCondition condition,
+ final Duration duration) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
- boolQueryBuilder.must().add(QueryBuilders.rangeQuery(TopN.TIME_BUCKET).gte(startSecondTB).lte(endSecondTB));
+ boolQueryBuilder.must().add(QueryBuilders.rangeQuery(TopN.TIME_BUCKET)
+ .gte(duration.getStartTimeBucket())
+ .lte(duration.getEndTimeBucket()));
+ final String serviceId = IDManager.ServiceID.buildId(condition.getParentService(), condition.isNormal());
boolQueryBuilder.must().add(QueryBuilders.termQuery(TopN.SERVICE_ID, serviceId));
sourceBuilder.query(boolQueryBuilder);
- sourceBuilder.size(topN).sort(TopN.LATENCY, order.equals(Order.DES) ? SortOrder.DESC : SortOrder.ASC);
- SearchResponse response = getClient().search(metricName, sourceBuilder);
+ sourceBuilder.size(condition.getTopN())
+ .sort(TopN.LATENCY, condition.getOrder().equals(Order.DES) ? SortOrder.DESC : SortOrder.ASC);
+ SearchResponse response = getClient().search(condition.getName(), sourceBuilder);
- List<TopNRecord> results = new ArrayList<>();
+ List<SelectedRecord> results = new ArrayList<>(condition.getTopN());
for (SearchHit searchHit : response.getHits().getHits()) {
- TopNRecord record = new TopNRecord();
- record.setStatement((String) searchHit.getSourceAsMap().get(TopN.STATEMENT));
- record.setTraceId((String) searchHit.getSourceAsMap().get(TopN.TRACE_ID));
- record.setLatency(((Number) searchHit.getSourceAsMap().get(TopN.LATENCY)).longValue());
+ SelectedRecord record = new SelectedRecord();
+ record.setName((String) searchHit.getSourceAsMap().get(TopN.STATEMENT));
+ record.setRefId((String) searchHit.getSourceAsMap().get(TopN.TRACE_ID));
+ record.setValue(((Number) searchHit.getSourceAsMap().get(TopN.LATENCY)).toString());
results.add(record);
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/AggregationQueryEs7DAO.java b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/AggregationQueryEs7DAO.java
index bb157f8..82f72da 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/AggregationQueryEs7DAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/AggregationQueryEs7DAO.java
@@ -18,61 +18,69 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.query.enumeration.Order;
-import org.apache.skywalking.oap.server.core.query.type.TopNEntity;
+import org.apache.skywalking.oap.server.core.query.input.Duration;
+import org.apache.skywalking.oap.server.core.query.input.TopNCondition;
+import org.apache.skywalking.oap.server.core.query.type.KeyValue;
+import org.apache.skywalking.oap.server.core.query.type.SelectedRecord;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.AggregationQueryEsDAO;
import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
-import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.Avg;
import org.elasticsearch.search.builder.SearchSourceBuilder;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
+/**
+ * {@link Avg} has package changes in ES 7, so have to rewrite the codes.
+ */
public class AggregationQueryEs7DAO extends AggregationQueryEsDAO {
public AggregationQueryEs7DAO(ElasticSearchClient client) {
super(client);
}
- protected List<TopNEntity> aggregation(String indexName, String valueCName, SearchSourceBuilder sourceBuilder,
- int topN, Order order) throws IOException {
+ @Override
+ public List<SelectedRecord> sortMetrics(final TopNCondition metrics,
+ final String valueColumnName,
+ final Duration duration,
+ final List<KeyValue> additionalConditions) throws IOException {
+ SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+ sourceBuilder.query(QueryBuilders.rangeQuery(Metrics.TIME_BUCKET)
+ .lte(duration.getEndTimeBucket())
+ .gte(duration.getStartTimeBucket()));
boolean asc = false;
- if (order.equals(Order.ASC)) {
+ if (metrics.getOrder().equals(Order.ASC)) {
asc = true;
}
- TermsAggregationBuilder aggregationBuilder = aggregationBuilder(valueCName, topN, asc);
-
- sourceBuilder.aggregation(aggregationBuilder);
+ sourceBuilder.aggregation(
+ AggregationBuilders.terms(Metrics.ENTITY_ID)
+ .field(Metrics.ENTITY_ID)
+ .order(BucketOrder.aggregation(valueColumnName, asc))
+ .size(metrics.getTopN())
+ .subAggregation(AggregationBuilders.avg(valueColumnName).field(valueColumnName))
+ );
- SearchResponse response = getClient().search(indexName, sourceBuilder);
+ SearchResponse response = getClient().search(metrics.getName(), sourceBuilder);
- List<TopNEntity> topNEntities = new ArrayList<>();
+ List<SelectedRecord> topNList = new ArrayList<>();
Terms idTerms = response.getAggregations().get(Metrics.ENTITY_ID);
for (Terms.Bucket termsBucket : idTerms.getBuckets()) {
- TopNEntity topNEntity = new TopNEntity();
- topNEntity.setId(termsBucket.getKeyAsString());
- Avg value = termsBucket.getAggregations().get(valueCName);
- topNEntity.setValue((long) value.getValue());
- topNEntities.add(topNEntity);
+ SelectedRecord record = new SelectedRecord();
+ record.setId(termsBucket.getKeyAsString());
+ Avg value = termsBucket.getAggregations().get(valueColumnName);
+ record.setValue(String.valueOf((long) value.getValue()));
+ topNList.add(record);
}
- return topNEntities;
- }
-
- protected TermsAggregationBuilder aggregationBuilder(final String valueCName, final int topN, final boolean asc) {
- return AggregationBuilders.terms(Metrics.ENTITY_ID)
- .field(Metrics.ENTITY_ID)
- .order(BucketOrder.aggregation(valueCName, asc))
- .size(topN)
- .subAggregation(AggregationBuilders.avg(valueCName).field(valueCName));
+ return topNList;
}
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2AggregationQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2AggregationQueryDAO.java
index 2f6ec06..6de3579 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2AggregationQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2AggregationQueryDAO.java
@@ -26,16 +26,12 @@ import java.util.ArrayList;
import java.util.List;
import lombok.AccessLevel;
import lombok.Getter;
-import org.apache.skywalking.oap.server.core.analysis.DownSampling;
-import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
-import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.query.enumeration.Order;
import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.query.input.TopNCondition;
import org.apache.skywalking.oap.server.core.query.type.KeyValue;
import org.apache.skywalking.oap.server.core.query.type.SelectedRecord;
-import org.apache.skywalking.oap.server.core.query.type.TopNEntity;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
@@ -62,7 +58,9 @@ public class H2AggregationQueryDAO implements IAggregationQueryDAO {
.append(" from ")
.append(metrics.getName())
.append(" where ");
- this.setTimeRangeCondition(sql, conditions, duration.getStartTimeBucket(), duration.getEndTimeBucket());
+ sql.append(Metrics.TIME_BUCKET).append(" >= ? and ").append(Metrics.TIME_BUCKET).append(" <= ?");
+ conditions.add(duration.getStartTimeBucket());
+ conditions.add(duration.getEndTimeBucket());
if (additionalConditions != null) {
additionalConditions.forEach(condition -> {
sql.append(" and ").append(condition.getKey()).append("=?");
@@ -89,11 +87,4 @@ public class H2AggregationQueryDAO implements IAggregationQueryDAO {
}
return topNEntities;
}
-
- protected void setTimeRangeCondition(StringBuilder sql, List<Object> conditions, long startTimestamp,
- long endTimestamp) {
- sql.append(Metrics.TIME_BUCKET).append(" >= ? and ").append(Metrics.TIME_BUCKET).append(" <= ?");
- conditions.add(startTimestamp);
- conditions.add(endTimestamp);
- }
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TopNRecordsQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TopNRecordsQueryDAO.java
index 2333c14..baa1fda 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TopNRecordsQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TopNRecordsQueryDAO.java
@@ -24,8 +24,12 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.query.enumeration.Order;
+import org.apache.skywalking.oap.server.core.query.input.Duration;
+import org.apache.skywalking.oap.server.core.query.input.TopNCondition;
+import org.apache.skywalking.oap.server.core.query.type.SelectedRecord;
import org.apache.skywalking.oap.server.core.query.type.TopNRecord;
import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
@@ -38,35 +42,36 @@ public class H2TopNRecordsQueryDAO implements ITopNRecordsQueryDAO {
}
@Override
- public List<TopNRecord> getTopNRecords(long startSecondTB, long endSecondTB, String metricName, String serviceId,
- int topN, Order order) throws IOException {
- StringBuilder sql = new StringBuilder("select * from " + metricName + " where ");
+ public List<SelectedRecord> readSampledRecords(final TopNCondition condition,
+ final Duration duration) throws IOException {
+ StringBuilder sql = new StringBuilder("select * from " + condition.getName() + " where ");
List<Object> parameters = new ArrayList<>(10);
sql.append(" service_id = ? ");
+ final String serviceId = IDManager.ServiceID.buildId(condition.getParentService(), condition.isNormal());
parameters.add(serviceId);
sql.append(" and ").append(TopN.TIME_BUCKET).append(" >= ?");
- parameters.add(startSecondTB);
+ parameters.add(duration.getStartTimeBucket());
sql.append(" and ").append(TopN.TIME_BUCKET).append(" <= ?");
- parameters.add(endSecondTB);
+ parameters.add(duration.getEndTimeBucket());
sql.append(" order by ").append(TopN.LATENCY);
- if (order.equals(Order.DES)) {
+ if (condition.getOrder().equals(Order.DES)) {
sql.append(" desc ");
} else {
sql.append(" asc ");
}
- sql.append(" limit ").append(topN);
+ sql.append(" limit ").append(condition.getTopN());
- List<TopNRecord> results = new ArrayList<>();
+ List<SelectedRecord> results = new ArrayList<>();
try (Connection connection = h2Client.getConnection()) {
try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), parameters.toArray(new Object[0]))) {
while (resultSet.next()) {
- TopNRecord record = new TopNRecord();
- record.setStatement(resultSet.getString(TopN.STATEMENT));
- record.setTraceId(resultSet.getString(TopN.TRACE_ID));
- record.setLatency(resultSet.getLong(TopN.LATENCY));
+ SelectedRecord record = new SelectedRecord();
+ record.setName(resultSet.getString(TopN.STATEMENT));
+ record.setRefId(resultSet.getString(TopN.TRACE_ID));
+ record.setValue(resultSet.getString(TopN.LATENCY));
results.add(record);
}
}
@@ -76,4 +81,5 @@ public class H2TopNRecordsQueryDAO implements ITopNRecordsQueryDAO {
return results;
}
+
}