You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/04/17 14:22:05 UTC

[skywalking] branch metrics updated: Adopt polished query protocol and support top n in old/new query.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch metrics
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/metrics by this push:
     new 52b66f3  Adopt polished query protocol and support top n in old/new query.
52b66f3 is described below

commit 52b66f3a1a5d31d4e665abe79a5d9987804100af
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Fri Apr 17 22:21:15 2020 +0800

    Adopt polished query protocol and support top n in old/new query.
---
 .../oap/server/core/analysis/IDManager.java        |   7 +-
 .../server/core/query/AggregationQueryService.java |  18 ++--
 .../server/core/query/TopNRecordsQueryService.java |  10 +-
 .../oap/server/core/query/input/TopNCondition.java |   5 +
 .../core/storage/query/ITopNRecordsQueryDAO.java   |  15 ++-
 .../oap/query/graphql/resolver/MetricsQuery.java   |  40 ++++---
 .../query/graphql/resolver/TopNRecordsQuery.java   |  54 +++++-----
 .../query/graphql/type/TopNRecordsCondition.java   |   1 +
 .../elasticsearch/query/AggregationQueryEsDAO.java | 120 +++++----------------
 .../elasticsearch/query/TopNRecordsQueryEsDAO.java |  29 +++--
 .../query/AggregationQueryEs7DAO.java              |  64 ++++++-----
 .../plugin/jdbc/h2/dao/H2AggregationQueryDAO.java  |  15 +--
 .../plugin/jdbc/h2/dao/H2TopNRecordsQueryDAO.java  |  30 +++---
 13 files changed, 185 insertions(+), 223 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/IDManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/IDManager.java
index 4eb4b47..67e2b91 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/IDManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/IDManager.java
@@ -39,8 +39,11 @@ public class IDManager {
          * @return encoded service id
          */
         public static String buildId(String name, NodeType type) {
-            return encode(name) + Const.SERVICE_ID_CONNECTOR + BooleanUtils.booleanToValue(
-                type.equals(NodeType.Normal));
+            return buildId(name, type.equals(NodeType.Normal));
+        }
+
+        public static String buildId(String name, boolean isNormal) {
+            return encode(name) + Const.SERVICE_ID_CONNECTOR + BooleanUtils.booleanToValue(isNormal);
         }
 
         /**
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/AggregationQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/AggregationQueryService.java
index 57b9d79..40a4c82 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/AggregationQueryService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/AggregationQueryService.java
@@ -22,8 +22,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.skywalking.apm.util.StringUtil;
+import org.apache.skywalking.oap.server.core.Const;
 import org.apache.skywalking.oap.server.core.analysis.IDManager;
-import org.apache.skywalking.oap.server.core.analysis.NodeType;
 import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
 import org.apache.skywalking.oap.server.core.query.input.Duration;
 import org.apache.skywalking.oap.server.core.query.input.TopNCondition;
@@ -52,18 +52,18 @@ public class AggregationQueryService implements Service {
         return aggregationQueryDAO;
     }
 
-    public List<SelectedRecord> sortMetrics(TopNCondition metrics, Duration duration) throws IOException {
-        final String valueCName = ValueColumnMetadata.INSTANCE.getValueCName(metrics.getName());
+    public List<SelectedRecord> sortMetrics(TopNCondition condition, Duration duration) throws IOException {
+        final String valueCName = ValueColumnMetadata.INSTANCE.getValueCName(condition.getName());
         List<KeyValue> additionalConditions = null;
-        if (StringUtil.isNotEmpty(metrics.getParentService())) {
+        if (StringUtil.isNotEmpty(condition.getParentService())) {
             additionalConditions = new ArrayList<>(1);
-            final String serviceId = IDManager.ServiceID.buildId(metrics.getParentService(), NodeType.Normal);
+            final String serviceId = IDManager.ServiceID.buildId(condition.getParentService(), condition.isNormal());
             additionalConditions.add(new KeyValue(InstanceTraffic.SERVICE_ID, serviceId));
         }
         final List<SelectedRecord> selectedRecords = getAggregationQueryDAO().sortMetrics(
-            metrics, valueCName, duration, additionalConditions);
+            condition, valueCName, duration, additionalConditions);
         selectedRecords.forEach(selectedRecord -> {
-            switch (metrics.getScope()) {
+            switch (condition.getScope()) {
                 case Service:
                     selectedRecord.setName(IDManager.ServiceID.analysisId(selectedRecord.getId()).getName());
                     break;
@@ -71,8 +71,10 @@ public class AggregationQueryService implements Service {
                     selectedRecord.setName(IDManager.ServiceInstanceID.analysisId(selectedRecord.getId()).getName());
                     break;
                 case Endpoint:
-                    selectedRecord.setName(IDManager.ServiceInstanceID.analysisId(selectedRecord.getId()).getName());
+                    selectedRecord.setName(IDManager.EndpointID.analysisId(selectedRecord.getId()).getEndpointName());
                     break;
+                default:
+                    selectedRecord.setName(Const.UNKNOWN);
             }
         });
         return selectedRecords;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopNRecordsQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopNRecordsQueryService.java
index 3746e41..1e8e4c8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopNRecordsQueryService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopNRecordsQueryService.java
@@ -20,8 +20,9 @@ package org.apache.skywalking.oap.server.core.query;
 
 import java.io.IOException;
 import java.util.List;
-import org.apache.skywalking.oap.server.core.query.enumeration.Order;
-import org.apache.skywalking.oap.server.core.query.type.TopNRecord;
+import org.apache.skywalking.oap.server.core.query.input.Duration;
+import org.apache.skywalking.oap.server.core.query.input.TopNCondition;
+import org.apache.skywalking.oap.server.core.query.type.SelectedRecord;
 import org.apache.skywalking.oap.server.core.storage.StorageModule;
 import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
@@ -44,8 +45,7 @@ public class TopNRecordsQueryService implements Service {
         return topNRecordsQueryDAO;
     }
 
-    public List<TopNRecord> getTopNRecords(long startSecondTB, long endSecondTB, String metricName, String serviceId,
-        int topN, Order order) throws IOException {
-        return getTopNRecordsQueryDAO().getTopNRecords(startSecondTB, endSecondTB, metricName, serviceId, topN, order);
+    public List<SelectedRecord> readSampledRecords(TopNCondition condition, Duration duration) throws IOException {
+        return getTopNRecordsQueryDAO().readSampledRecords(condition, duration);
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/input/TopNCondition.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/input/TopNCondition.java
index a55c7bd..6964184 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/input/TopNCondition.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/input/TopNCondition.java
@@ -40,6 +40,11 @@ public class TopNCondition {
      */
     private String parentService;
     /**
+     * Normal service is the service having installed agent or metrics reported directly. Unnormal service is
+     * conjectural service, usually detected by the agent.
+     */
+    private boolean isNormal;
+    /**
      * Indicate the metrics entity scope. Because this is a top list, don't need to set the Entity like the
      * MetricsCondition. Only accept scope = {@link Scope#Service} {@link Scope#ServiceInstance} and {@link
      * Scope#Endpoint}, ignore others due to those are pointless.
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopNRecordsQueryDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopNRecordsQueryDAO.java
index 3be289f..a818d5c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopNRecordsQueryDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopNRecordsQueryDAO.java
@@ -20,11 +20,18 @@ package org.apache.skywalking.oap.server.core.storage.query;
 
 import java.io.IOException;
 import java.util.List;
-import org.apache.skywalking.oap.server.core.query.enumeration.Order;
-import org.apache.skywalking.oap.server.core.query.type.TopNRecord;
+import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor;
+import org.apache.skywalking.oap.server.core.query.input.Duration;
+import org.apache.skywalking.oap.server.core.query.input.TopNCondition;
+import org.apache.skywalking.oap.server.core.query.type.SelectedRecord;
 import org.apache.skywalking.oap.server.library.module.Service;
 
+/**
+ * Query the records sampled by {@link Stream} = {@link TopNStreamProcessor}
+ *
+ * @since 8.0.0
+ */
 public interface ITopNRecordsQueryDAO extends Service {
-    List<TopNRecord> getTopNRecords(long startSecondTB, long endSecondTB, String metricName, String serviceId, int topN,
-        Order order) throws IOException;
+    List<SelectedRecord> readSampledRecords(TopNCondition condition, Duration duration) throws IOException;
 }
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetricsQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetricsQuery.java
index 59f26a4..6efbb04 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetricsQuery.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetricsQuery.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.query.AggregationQueryService;
 import org.apache.skywalking.oap.server.core.query.MetricQueryService;
+import org.apache.skywalking.oap.server.core.query.TopNRecordsQueryService;
 import org.apache.skywalking.oap.server.core.query.enumeration.MetricsType;
 import org.apache.skywalking.oap.server.core.query.input.Duration;
 import org.apache.skywalking.oap.server.core.query.input.MetricsCondition;
@@ -43,6 +44,7 @@ public class MetricsQuery implements GraphQLQueryResolver {
     private final ModuleManager moduleManager;
     private MetricQueryService metricQueryService;
     private AggregationQueryService queryService;
+    private TopNRecordsQueryService topNRecordsQueryService;
 
     public MetricsQuery(ModuleManager moduleManager) {
         this.moduleManager = moduleManager;
@@ -57,10 +59,17 @@ public class MetricsQuery implements GraphQLQueryResolver {
         return queryService;
     }
 
+    private TopNRecordsQueryService getTopNRecordsQueryService() {
+        if (topNRecordsQueryService == null) {
+            this.topNRecordsQueryService = moduleManager.find(CoreModule.NAME)
+                                                        .provider()
+                                                        .getService(TopNRecordsQueryService.class);
+        }
+        return topNRecordsQueryService;
+    }
+
     /**
      * Metrics definition metadata query. Response the metrics type which determines the suitable query methods.
-     *
-     * @since 8.0.0
      */
     public MetricsType typeOfMetrics(String name) throws IOException {
         return MetricsType.UNKNOWN;
@@ -68,38 +77,31 @@ public class MetricsQuery implements GraphQLQueryResolver {
 
     /**
      * Read metrics single value in the duration of required metrics
-     *
-     * @since 8.0.0
      */
-    public int readMetricsValue(MetricsCondition metrics, Duration duration) throws IOException {
+    public int readMetricsValue(MetricsCondition condition, Duration duration) throws IOException {
         return 0;
     }
 
     /**
      * Read time-series values in the duration of required metrics
-     *
-     * @since 8.0.0
      */
-    public List<MetricsValues> readMetricsValues(MetricsCondition metrics, Duration duration) throws IOException {
+    public List<MetricsValues> readMetricsValues(MetricsCondition condition, Duration duration) throws IOException {
         return Collections.emptyList();
     }
 
     /**
      * Read entity list of required metrics and parent entity type.
-     *
-     * @since 8.0.0
      */
-    public List<SelectedRecord> sortMetrics(TopNCondition metrics, Duration duration) throws IOException {
-        return getQueryService().sortMetrics(metrics, duration);
+    public List<SelectedRecord> sortMetrics(TopNCondition condition, Duration duration) throws IOException {
+        return getQueryService().sortMetrics(condition, duration);
     }
 
     /**
      * Read value in the given time duration, usually as a linear.
      *
      * @param labels the labels you need to query.
-     * @since 8.0.0
      */
-    public List<MetricsValues> readLabeledMetricsValues(MetricsCondition metrics,
+    public List<MetricsValues> readLabeledMetricsValues(MetricsCondition condition,
                                                         List<String> labels,
                                                         Duration duration) throws IOException {
         return Collections.emptyList();
@@ -107,19 +109,15 @@ public class MetricsQuery implements GraphQLQueryResolver {
 
     /**
      * Heatmap is bucket based value statistic result.
-     *
-     * @since 8.0.0
      */
-    public HeatMap readHeatMap(MetricsCondition metrics, Duration duration) throws IOException {
+    public HeatMap readHeatMap(MetricsCondition condition, Duration duration) throws IOException {
         return new HeatMap();
     }
 
     /**
      * Read the sampled records.
-     *
-     * @since 8.0.0
      */
-    public List<SelectedRecord> readSampledRecords(TopNCondition metrics, Duration duration) throws IOException {
-        return Collections.emptyList();
+    public List<SelectedRecord> readSampledRecords(TopNCondition condition, Duration duration) throws IOException {
+        return getTopNRecordsQueryService().readSampledRecords(condition, duration);
     }
 }
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopNRecordsQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopNRecordsQuery.java
index 34fa050..0fbc347 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopNRecordsQuery.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopNRecordsQuery.java
@@ -20,47 +20,47 @@ package org.apache.skywalking.oap.query.graphql.resolver;
 
 import com.coxautodev.graphql.tools.GraphQLQueryResolver;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.skywalking.oap.query.graphql.type.TopNRecordsCondition;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.query.DurationUtils;
-import org.apache.skywalking.oap.server.core.query.TopNRecordsQueryService;
-import org.apache.skywalking.oap.server.core.query.enumeration.Order;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
+import org.apache.skywalking.oap.server.core.query.input.TopNCondition;
+import org.apache.skywalking.oap.server.core.query.type.SelectedRecord;
 import org.apache.skywalking.oap.server.core.query.type.TopNRecord;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 
 /**
- * @since 8.0.0 This query is replaced by {@link MetricsQuery}
+ * @since 8.0.0 This query is replaced by {@link MetricsQuery}, all queries have been delegated to there.
  */
 @Deprecated
 public class TopNRecordsQuery implements GraphQLQueryResolver {
-    private final ModuleManager moduleManager;
-    private TopNRecordsQueryService topNRecordsQueryService;
+    private MetricsQuery query;
 
     public TopNRecordsQuery(ModuleManager moduleManager) {
-        this.moduleManager = moduleManager;
-    }
-
-    private TopNRecordsQueryService getTopNRecordsQueryService() {
-        if (topNRecordsQueryService == null) {
-            this.topNRecordsQueryService = moduleManager.find(CoreModule.NAME)
-                                                        .provider()
-                                                        .getService(TopNRecordsQueryService.class);
-        }
-        return topNRecordsQueryService;
+        query = new MetricsQuery(moduleManager);
     }
 
     public List<TopNRecord> getTopNRecords(TopNRecordsCondition condition) throws IOException {
-        long startSecondTB = DurationUtils.INSTANCE.startTimeDurationToSecondTimeBucket(
-            condition.getDuration().getStep(), condition.getDuration().getStart());
-        long endSecondTB = DurationUtils.INSTANCE.endTimeDurationToSecondTimeBucket(
-            condition.getDuration().getStep(), condition.getDuration().getEnd());
-
-        String metricName = condition.getMetricName();
-        Order order = condition.getOrder();
-        int topN = condition.getTopN();
+        TopNCondition topNCondition = new TopNCondition();
+        topNCondition.setName(condition.getMetricName());
+        final IDManager.ServiceID.ServiceIDDefinition serviceIDDefinition = IDManager.ServiceID.analysisId(
+            condition.getServiceId());
+        topNCondition.setParentService(serviceIDDefinition.getName());
+        topNCondition.setNormal(serviceIDDefinition.isReal());
+        // Scope is not required in topN record query.
+        // topNCondition.setScope();
+        topNCondition.setOrder(condition.getOrder());
+        topNCondition.setTopN(condition.getTopN());
 
-        return getTopNRecordsQueryService().getTopNRecords(
-            startSecondTB, endSecondTB, metricName, condition.getServiceId(), topN, order);
+        final List<SelectedRecord> selectedRecords = query.readSampledRecords(topNCondition, condition.getDuration());
+        List<TopNRecord> list = new ArrayList<>(selectedRecords.size());
+        selectedRecords.forEach(record -> {
+            TopNRecord top = new TopNRecord();
+            top.setStatement(record.getName());
+            top.setTraceId(record.getRefId());
+            top.setLatency(Long.parseLong(record.getValue()));
+            list.add(top);
+        });
+        return list;
     }
 }
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/TopNRecordsCondition.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/TopNRecordsCondition.java
index c091980..ede62a2 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/TopNRecordsCondition.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/TopNRecordsCondition.java
@@ -23,6 +23,7 @@ import lombok.Setter;
 import org.apache.skywalking.oap.server.core.query.input.Duration;
 import org.apache.skywalking.oap.server.core.query.enumeration.Order;
 
+@Deprecated
 @Getter
 @Setter
 public class TopNRecordsCondition {
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AggregationQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AggregationQueryEsDAO.java
index 441815f..546a6e2 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AggregationQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AggregationQueryEsDAO.java
@@ -21,22 +21,20 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.skywalking.oap.server.core.analysis.DownSampling;
-import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
-import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.query.enumeration.Order;
-import org.apache.skywalking.oap.server.core.query.type.TopNEntity;
+import org.apache.skywalking.oap.server.core.query.input.Duration;
+import org.apache.skywalking.oap.server.core.query.input.TopNCondition;
+import org.apache.skywalking.oap.server.core.query.type.KeyValue;
+import org.apache.skywalking.oap.server.core.query.type.SelectedRecord;
 import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
 import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.aggregations.AggregationBuilders;
 import org.elasticsearch.search.aggregations.BucketOrder;
 import org.elasticsearch.search.aggregations.bucket.terms.Terms;
-import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
 import org.elasticsearch.search.aggregations.metrics.avg.Avg;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 
@@ -47,105 +45,41 @@ public class AggregationQueryEsDAO extends EsDAO implements IAggregationQueryDAO
     }
 
     @Override
-    public List<TopNEntity> getServiceTopN(String indexName, String valueCName, int topN, DownSampling downsampling,
-                                           long startTB, long endTB, Order order) throws IOException {
+    public List<SelectedRecord> sortMetrics(final TopNCondition metrics,
+                                            final String valueColumnName,
+                                            final Duration duration,
+                                            final List<KeyValue> additionalConditions) throws IOException {
         SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
-        sourceBuilder.query(QueryBuilders.rangeQuery(Metrics.TIME_BUCKET).lte(endTB).gte(startTB));
-        return aggregation(indexName, valueCName, sourceBuilder, topN, order);
-    }
+        sourceBuilder.query(QueryBuilders.rangeQuery(Metrics.TIME_BUCKET)
+                                         .lte(duration.getEndTimeBucket())
+                                         .gte(duration.getStartTimeBucket()));
 
-    @Override
-    public List<TopNEntity> getAllServiceInstanceTopN(String indexName,
-                                                      String valueCName,
-                                                      int topN,
-                                                      DownSampling downsampling,
-                                                      long startTB,
-                                                      long endTB,
-                                                      Order order) throws IOException {
-        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
-        sourceBuilder.query(QueryBuilders.rangeQuery(Metrics.TIME_BUCKET).lte(endTB).gte(startTB));
-        return aggregation(indexName, valueCName, sourceBuilder, topN, order);
-    }
-
-    @Override
-    public List<TopNEntity> getServiceInstanceTopN(String serviceId,
-                                                   String indexName,
-                                                   String valueCName,
-                                                   int topN,
-                                                   DownSampling downsampling,
-                                                   long startTB,
-                                                   long endTB,
-                                                   Order order) throws IOException {
-        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
-
-        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
-        sourceBuilder.query(boolQueryBuilder);
-
-        boolQueryBuilder.must().add(QueryBuilders.rangeQuery(Metrics.TIME_BUCKET).lte(endTB).gte(startTB));
-        boolQueryBuilder.must().add(QueryBuilders.termQuery(InstanceTraffic.SERVICE_ID, serviceId));
-
-        return aggregation(indexName, valueCName, sourceBuilder, topN, order);
-    }
-
-    @Override
-    public List<TopNEntity> getAllEndpointTopN(String indexName, String valueCName, int topN, DownSampling downsampling,
-                                               long startTB, long endTB, Order order) throws IOException {
-        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
-        sourceBuilder.query(QueryBuilders.rangeQuery(Metrics.TIME_BUCKET).lte(endTB).gte(startTB));
-        return aggregation(indexName, valueCName, sourceBuilder, topN, order);
-    }
-
-    @Override
-    public List<TopNEntity> getEndpointTopN(String serviceId,
-                                            String indexName,
-                                            String valueCName,
-                                            int topN,
-                                            DownSampling downsampling,
-                                            long startTB,
-                                            long endTB,
-                                            Order order) throws IOException {
-        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
-
-        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
-        sourceBuilder.query(boolQueryBuilder);
-
-        boolQueryBuilder.must().add(QueryBuilders.rangeQuery(Metrics.TIME_BUCKET).lte(endTB).gte(startTB));
-        boolQueryBuilder.must().add(QueryBuilders.termQuery(EndpointTraffic.SERVICE_ID, serviceId));
-
-        return aggregation(indexName, valueCName, sourceBuilder, topN, order);
-    }
-
-    protected List<TopNEntity> aggregation(String indexName, String valueCName, SearchSourceBuilder sourceBuilder,
-                                           int topN, Order order) throws IOException {
         boolean asc = false;
-        if (order.equals(Order.ASC)) {
+        if (metrics.getOrder().equals(Order.ASC)) {
             asc = true;
         }
 
-        TermsAggregationBuilder aggregationBuilder = aggregationBuilder(valueCName, topN, asc);
-
-        sourceBuilder.aggregation(aggregationBuilder);
+        sourceBuilder.aggregation(
+            AggregationBuilders.terms(Metrics.ENTITY_ID)
+                               .field(Metrics.ENTITY_ID)
+                               .order(BucketOrder.aggregation(valueColumnName, asc))
+                               .size(metrics.getTopN())
+                               .subAggregation(AggregationBuilders.avg(valueColumnName).field(valueColumnName))
+        );
 
-        SearchResponse response = getClient().search(indexName, sourceBuilder);
+        SearchResponse response = getClient().search(metrics.getName(), sourceBuilder);
 
-        List<TopNEntity> topNEntities = new ArrayList<>();
+        List<SelectedRecord> topNList = new ArrayList<>();
         Terms idTerms = response.getAggregations().get(Metrics.ENTITY_ID);
         for (Terms.Bucket termsBucket : idTerms.getBuckets()) {
-            TopNEntity topNEntity = new TopNEntity();
-            topNEntity.setId(termsBucket.getKeyAsString());
-            Avg value = termsBucket.getAggregations().get(valueCName);
-            topNEntity.setValue((long) value.getValue());
-            topNEntities.add(topNEntity);
+            SelectedRecord record = new SelectedRecord();
+            record.setId(termsBucket.getKeyAsString());
+            Avg value = termsBucket.getAggregations().get(valueColumnName);
+            record.setValue(String.valueOf((long) value.getValue()));
+            topNList.add(record);
         }
 
-        return topNEntities;
+        return topNList;
     }
 
-    protected TermsAggregationBuilder aggregationBuilder(final String valueCName, final int topN, final boolean asc) {
-        return AggregationBuilders.terms(Metrics.ENTITY_ID)
-                                  .field(Metrics.ENTITY_ID)
-                                  .order(BucketOrder.aggregation(valueCName, asc))
-                                  .size(topN)
-                                  .subAggregation(AggregationBuilders.avg(valueCName).field(valueCName));
-    }
 }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopNRecordsQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopNRecordsQueryEsDAO.java
index c957dc1..41eb8f6 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopNRecordsQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopNRecordsQueryEsDAO.java
@@ -21,9 +21,12 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
 import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
 import org.apache.skywalking.oap.server.core.query.enumeration.Order;
-import org.apache.skywalking.oap.server.core.query.type.TopNRecord;
+import org.apache.skywalking.oap.server.core.query.input.Duration;
+import org.apache.skywalking.oap.server.core.query.input.TopNCondition;
+import org.apache.skywalking.oap.server.core.query.type.SelectedRecord;
 import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
@@ -40,24 +43,28 @@ public class TopNRecordsQueryEsDAO extends EsDAO implements ITopNRecordsQueryDAO
     }
 
     @Override
-    public List<TopNRecord> getTopNRecords(long startSecondTB, long endSecondTB, String metricName, String serviceId,
-        int topN, Order order) throws IOException {
+    public List<SelectedRecord> readSampledRecords(final TopNCondition condition,
+                                                   final Duration duration) throws IOException {
         SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
         BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
-        boolQueryBuilder.must().add(QueryBuilders.rangeQuery(TopN.TIME_BUCKET).gte(startSecondTB).lte(endSecondTB));
+        boolQueryBuilder.must().add(QueryBuilders.rangeQuery(TopN.TIME_BUCKET)
+                                                 .gte(duration.getStartTimeBucket())
+                                                 .lte(duration.getEndTimeBucket()));
+        final String serviceId = IDManager.ServiceID.buildId(condition.getParentService(), condition.isNormal());
         boolQueryBuilder.must().add(QueryBuilders.termQuery(TopN.SERVICE_ID, serviceId));
 
         sourceBuilder.query(boolQueryBuilder);
-        sourceBuilder.size(topN).sort(TopN.LATENCY, order.equals(Order.DES) ? SortOrder.DESC : SortOrder.ASC);
-        SearchResponse response = getClient().search(metricName, sourceBuilder);
+        sourceBuilder.size(condition.getTopN())
+                     .sort(TopN.LATENCY, condition.getOrder().equals(Order.DES) ? SortOrder.DESC : SortOrder.ASC);
+        SearchResponse response = getClient().search(condition.getName(), sourceBuilder);
 
-        List<TopNRecord> results = new ArrayList<>();
+        List<SelectedRecord> results = new ArrayList<>(condition.getTopN());
 
         for (SearchHit searchHit : response.getHits().getHits()) {
-            TopNRecord record = new TopNRecord();
-            record.setStatement((String) searchHit.getSourceAsMap().get(TopN.STATEMENT));
-            record.setTraceId((String) searchHit.getSourceAsMap().get(TopN.TRACE_ID));
-            record.setLatency(((Number) searchHit.getSourceAsMap().get(TopN.LATENCY)).longValue());
+            SelectedRecord record = new SelectedRecord();
+            record.setName((String) searchHit.getSourceAsMap().get(TopN.STATEMENT));
+            record.setRefId((String) searchHit.getSourceAsMap().get(TopN.TRACE_ID));
+            record.setValue(((Number) searchHit.getSourceAsMap().get(TopN.LATENCY)).toString());
             results.add(record);
         }
 
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/AggregationQueryEs7DAO.java b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/AggregationQueryEs7DAO.java
index bb157f8..82f72da 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/AggregationQueryEs7DAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/AggregationQueryEs7DAO.java
@@ -18,61 +18,69 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.query.enumeration.Order;
-import org.apache.skywalking.oap.server.core.query.type.TopNEntity;
+import org.apache.skywalking.oap.server.core.query.input.Duration;
+import org.apache.skywalking.oap.server.core.query.input.TopNCondition;
+import org.apache.skywalking.oap.server.core.query.type.KeyValue;
+import org.apache.skywalking.oap.server.core.query.type.SelectedRecord;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.AggregationQueryEsDAO;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.aggregations.AggregationBuilders;
 import org.elasticsearch.search.aggregations.BucketOrder;
 import org.elasticsearch.search.aggregations.bucket.terms.Terms;
-import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
 import org.elasticsearch.search.aggregations.metrics.Avg;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
+/**
+ * {@link Avg} has package changes in ES 7, so have to rewrite the codes.
+ */
 public class AggregationQueryEs7DAO extends AggregationQueryEsDAO {
 
     public AggregationQueryEs7DAO(ElasticSearchClient client) {
         super(client);
     }
 
-    protected List<TopNEntity> aggregation(String indexName, String valueCName, SearchSourceBuilder sourceBuilder,
-        int topN, Order order) throws IOException {
+    @Override
+    public List<SelectedRecord> sortMetrics(final TopNCondition metrics,
+                                            final String valueColumnName,
+                                            final Duration duration,
+                                            final List<KeyValue> additionalConditions) throws IOException {
+        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+        sourceBuilder.query(QueryBuilders.rangeQuery(Metrics.TIME_BUCKET)
+                                         .lte(duration.getEndTimeBucket())
+                                         .gte(duration.getStartTimeBucket()));
 
         boolean asc = false;
-        if (order.equals(Order.ASC)) {
+        if (metrics.getOrder().equals(Order.ASC)) {
             asc = true;
         }
 
-        TermsAggregationBuilder aggregationBuilder = aggregationBuilder(valueCName, topN, asc);
-
-        sourceBuilder.aggregation(aggregationBuilder);
+        sourceBuilder.aggregation(
+            AggregationBuilders.terms(Metrics.ENTITY_ID)
+                               .field(Metrics.ENTITY_ID)
+                               .order(BucketOrder.aggregation(valueColumnName, asc))
+                               .size(metrics.getTopN())
+                               .subAggregation(AggregationBuilders.avg(valueColumnName).field(valueColumnName))
+        );
 
-        SearchResponse response = getClient().search(indexName, sourceBuilder);
+        SearchResponse response = getClient().search(metrics.getName(), sourceBuilder);
 
-        List<TopNEntity> topNEntities = new ArrayList<>();
+        List<SelectedRecord> topNList = new ArrayList<>();
         Terms idTerms = response.getAggregations().get(Metrics.ENTITY_ID);
         for (Terms.Bucket termsBucket : idTerms.getBuckets()) {
-            TopNEntity topNEntity = new TopNEntity();
-            topNEntity.setId(termsBucket.getKeyAsString());
-            Avg value = termsBucket.getAggregations().get(valueCName);
-            topNEntity.setValue((long) value.getValue());
-            topNEntities.add(topNEntity);
+            SelectedRecord record = new SelectedRecord();
+            record.setId(termsBucket.getKeyAsString());
+            Avg value = termsBucket.getAggregations().get(valueColumnName);
+            record.setValue(String.valueOf((long) value.getValue()));
+            topNList.add(record);
         }
 
-        return topNEntities;
-    }
-
-    protected TermsAggregationBuilder aggregationBuilder(final String valueCName, final int topN, final boolean asc) {
-        return AggregationBuilders.terms(Metrics.ENTITY_ID)
-                                  .field(Metrics.ENTITY_ID)
-                                  .order(BucketOrder.aggregation(valueCName, asc))
-                                  .size(topN)
-                                  .subAggregation(AggregationBuilders.avg(valueCName).field(valueCName));
+        return topNList;
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2AggregationQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2AggregationQueryDAO.java
index 2f6ec06..6de3579 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2AggregationQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2AggregationQueryDAO.java
@@ -26,16 +26,12 @@ import java.util.ArrayList;
 import java.util.List;
 import lombok.AccessLevel;
 import lombok.Getter;
-import org.apache.skywalking.oap.server.core.analysis.DownSampling;
-import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
-import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.query.enumeration.Order;
 import org.apache.skywalking.oap.server.core.query.input.Duration;
 import org.apache.skywalking.oap.server.core.query.input.TopNCondition;
 import org.apache.skywalking.oap.server.core.query.type.KeyValue;
 import org.apache.skywalking.oap.server.core.query.type.SelectedRecord;
-import org.apache.skywalking.oap.server.core.query.type.TopNEntity;
 import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 
@@ -62,7 +58,9 @@ public class H2AggregationQueryDAO implements IAggregationQueryDAO {
            .append(" from ")
            .append(metrics.getName())
            .append(" where ");
-        this.setTimeRangeCondition(sql, conditions, duration.getStartTimeBucket(), duration.getEndTimeBucket());
+        sql.append(Metrics.TIME_BUCKET).append(" >= ? and ").append(Metrics.TIME_BUCKET).append(" <= ?");
+        conditions.add(duration.getStartTimeBucket());
+        conditions.add(duration.getEndTimeBucket());
         if (additionalConditions != null) {
             additionalConditions.forEach(condition -> {
                 sql.append(" and ").append(condition.getKey()).append("=?");
@@ -89,11 +87,4 @@ public class H2AggregationQueryDAO implements IAggregationQueryDAO {
         }
         return topNEntities;
     }
-
-    protected void setTimeRangeCondition(StringBuilder sql, List<Object> conditions, long startTimestamp,
-                                         long endTimestamp) {
-        sql.append(Metrics.TIME_BUCKET).append(" >= ? and ").append(Metrics.TIME_BUCKET).append(" <= ?");
-        conditions.add(startTimestamp);
-        conditions.add(endTimestamp);
-    }
 }
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TopNRecordsQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TopNRecordsQueryDAO.java
index 2333c14..baa1fda 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TopNRecordsQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TopNRecordsQueryDAO.java
@@ -24,8 +24,12 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
 import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
 import org.apache.skywalking.oap.server.core.query.enumeration.Order;
+import org.apache.skywalking.oap.server.core.query.input.Duration;
+import org.apache.skywalking.oap.server.core.query.input.TopNCondition;
+import org.apache.skywalking.oap.server.core.query.type.SelectedRecord;
 import org.apache.skywalking.oap.server.core.query.type.TopNRecord;
 import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
@@ -38,35 +42,36 @@ public class H2TopNRecordsQueryDAO implements ITopNRecordsQueryDAO {
     }
 
     @Override
-    public List<TopNRecord> getTopNRecords(long startSecondTB, long endSecondTB, String metricName, String serviceId,
-        int topN, Order order) throws IOException {
-        StringBuilder sql = new StringBuilder("select * from " + metricName + " where ");
+    public List<SelectedRecord> readSampledRecords(final TopNCondition condition,
+                                                   final Duration duration) throws IOException {
+        StringBuilder sql = new StringBuilder("select * from " + condition.getName() + " where ");
         List<Object> parameters = new ArrayList<>(10);
 
         sql.append(" service_id = ? ");
+        final String serviceId = IDManager.ServiceID.buildId(condition.getParentService(), condition.isNormal());
         parameters.add(serviceId);
 
         sql.append(" and ").append(TopN.TIME_BUCKET).append(" >= ?");
-        parameters.add(startSecondTB);
+        parameters.add(duration.getStartTimeBucket());
         sql.append(" and ").append(TopN.TIME_BUCKET).append(" <= ?");
-        parameters.add(endSecondTB);
+        parameters.add(duration.getEndTimeBucket());
 
         sql.append(" order by ").append(TopN.LATENCY);
-        if (order.equals(Order.DES)) {
+        if (condition.getOrder().equals(Order.DES)) {
             sql.append(" desc ");
         } else {
             sql.append(" asc ");
         }
-        sql.append(" limit ").append(topN);
+        sql.append(" limit ").append(condition.getTopN());
 
-        List<TopNRecord> results = new ArrayList<>();
+        List<SelectedRecord> results = new ArrayList<>();
         try (Connection connection = h2Client.getConnection()) {
             try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), parameters.toArray(new Object[0]))) {
                 while (resultSet.next()) {
-                    TopNRecord record = new TopNRecord();
-                    record.setStatement(resultSet.getString(TopN.STATEMENT));
-                    record.setTraceId(resultSet.getString(TopN.TRACE_ID));
-                    record.setLatency(resultSet.getLong(TopN.LATENCY));
+                    SelectedRecord record = new SelectedRecord();
+                    record.setName(resultSet.getString(TopN.STATEMENT));
+                    record.setRefId(resultSet.getString(TopN.TRACE_ID));
+                    record.setValue(resultSet.getString(TopN.LATENCY));
                     results.add(record);
                 }
             }
@@ -76,4 +81,5 @@ public class H2TopNRecordsQueryDAO implements ITopNRecordsQueryDAO {
 
         return results;
     }
+
 }