You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@skywalking.apache.org by GitBox <gi...@apache.org> on 2018/03/08 13:09:59 UTC

[GitHub] wu-sheng closed pull request #913: Fixed the issue #907

wu-sheng closed pull request #913: Fixed the issue #907
URL: https://github.com/apache/incubator-skywalking/pull/913
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationMetricEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationMetricEsUIDAO.java
index b9dba1565..c3c7213f0 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationMetricEsUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationMetricEsUIDAO.java
@@ -18,10 +18,8 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.ui;
 
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationMetricUIDAO;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO;
@@ -35,13 +33,10 @@
 import org.elasticsearch.action.search.SearchType;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.script.Script;
 import org.elasticsearch.search.aggregations.AggregationBuilders;
 import org.elasticsearch.search.aggregations.bucket.terms.Terms;
 import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
 import org.elasticsearch.search.aggregations.metrics.sum.Sum;
-import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
-import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders;
 
 /**
  * @author peng-yongsheng
@@ -52,8 +47,6 @@ public ApplicationMetricEsUIDAO(ElasticSearchClient client) {
         super(client);
     }
 
-    private static final String AVG_TPS = "avg_tps";
-
     @Override
     public List<ApplicationTPS> getTopNApplicationThroughput(Step step, long startTimeBucket, long endTimeBucket,
         int betweenSecond, int topN, MetricSource metricSource) {
@@ -70,36 +63,37 @@ public ApplicationMetricEsUIDAO(ElasticSearchClient client) {
         searchRequestBuilder.setQuery(boolQuery);
         searchRequestBuilder.setSize(0);
 
-        TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(ApplicationMetricTable.COLUMN_APPLICATION_ID).field(ApplicationMetricTable.COLUMN_APPLICATION_ID).size(topN);
+        TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(ApplicationMetricTable.COLUMN_APPLICATION_ID).field(ApplicationMetricTable.COLUMN_APPLICATION_ID).size(2000);
         aggregationBuilder.subAggregation(AggregationBuilders.sum(ApplicationMetricTable.COLUMN_TRANSACTION_CALLS).field(ApplicationMetricTable.COLUMN_TRANSACTION_CALLS));
-        aggregationBuilder.subAggregation(AggregationBuilders.sum(ApplicationMetricTable.COLUMN_TRANSACTION_ERROR_CALLS).field(ApplicationMetricTable.COLUMN_TRANSACTION_ERROR_CALLS));
-
-        Map<String, String> bucketsPathsMap = new HashMap<>();
-        bucketsPathsMap.put(ApplicationMetricTable.COLUMN_TRANSACTION_CALLS, ApplicationMetricTable.COLUMN_TRANSACTION_CALLS);
-        bucketsPathsMap.put(ApplicationMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, ApplicationMetricTable.COLUMN_TRANSACTION_ERROR_CALLS);
-
-        String idOrCode = "(params." + ApplicationMetricTable.COLUMN_TRANSACTION_CALLS + " - params." + ApplicationMetricTable.COLUMN_TRANSACTION_ERROR_CALLS + ")"
-            + " / "
-            + "(" + betweenSecond + ")";
-        Script script = new Script(idOrCode);
-        aggregationBuilder.subAggregation(PipelineAggregatorBuilders.bucketScript(AVG_TPS, bucketsPathsMap, script));
-
         searchRequestBuilder.addAggregation(aggregationBuilder);
+
         SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
 
         List<ApplicationTPS> applicationTPSs = new LinkedList<>();
         Terms applicationIdTerms = searchResponse.getAggregations().get(ApplicationMetricTable.COLUMN_APPLICATION_ID);
         applicationIdTerms.getBuckets().forEach(applicationIdTerm -> {
             int applicationId = applicationIdTerm.getKeyAsNumber().intValue();
+            Sum callSum = applicationIdTerm.getAggregations().get(ApplicationMetricTable.COLUMN_TRANSACTION_CALLS);
+            long calls = (long)callSum.getValue();
+            int callsPerSec = (int)(betweenSecond == 0 ? 0 : calls / betweenSecond);
 
             ApplicationTPS applicationTPS = new ApplicationTPS();
-            InternalSimpleValue simpleValue = applicationIdTerm.getAggregations().get(AVG_TPS);
-
             applicationTPS.setApplicationId(applicationId);
-            applicationTPS.setCallsPerSec((int)simpleValue.getValue());
+            applicationTPS.setCallsPerSec(callsPerSec);
             applicationTPSs.add(applicationTPS);
         });
-        return applicationTPSs;
+
+        applicationTPSs.sort((first, second) -> first.getCallsPerSec() > second.getCallsPerSec() ? -1 : 1);
+
+        if (applicationTPSs.size() <= topN) {
+            return applicationTPSs;
+        } else {
+            List<ApplicationTPS> newCollection = new LinkedList<>();
+            for (int i = 0; i < topN; i++) {
+                newCollection.add(applicationTPSs.get(i));
+            }
+            return newCollection;
+        }
     }
 
     @Override
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceMetricEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceMetricEsUIDAO.java
index a52eba8f2..701550b96 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceMetricEsUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceMetricEsUIDAO.java
@@ -18,15 +18,14 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.ui;
 
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.util.Const;
 import org.apache.skywalking.apm.collector.storage.dao.ui.IInstanceMetricUIDAO;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO;
 import org.apache.skywalking.apm.collector.storage.table.MetricSource;
+import org.apache.skywalking.apm.collector.storage.table.application.ApplicationMetricTable;
 import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetricTable;
 import org.apache.skywalking.apm.collector.storage.ui.common.Step;
 import org.apache.skywalking.apm.collector.storage.ui.server.AppServerInfo;
@@ -40,26 +39,22 @@
 import org.elasticsearch.action.search.SearchType;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.script.Script;
 import org.elasticsearch.search.aggregations.AggregationBuilders;
 import org.elasticsearch.search.aggregations.bucket.terms.Terms;
 import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
-import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
-import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders;
+import org.elasticsearch.search.aggregations.metrics.sum.Sum;
 
 /**
  * @author peng-yongsheng
  */
 public class InstanceMetricEsUIDAO extends EsDAO implements IInstanceMetricUIDAO {
 
-    private static final String AVG_TPS = "avg_tps";
-
     public InstanceMetricEsUIDAO(ElasticSearchClient client) {
         super(client);
     }
 
-    @Override public List<AppServerInfo> getServerThroughput(int applicationId, Step step, long startTimeBucket, long endTimeBucket,
-        int secondBetween, int topN, MetricSource metricSource) {
+    @Override public List<AppServerInfo> getServerThroughput(int applicationId, Step step, long startTimeBucket,
+        long endTimeBucket, int secondBetween, int topN, MetricSource metricSource) {
         String tableName = TimePyramidTableNameBuilder.build(step, InstanceMetricTable.TABLE);
 
         SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(tableName);
@@ -76,36 +71,36 @@ public InstanceMetricEsUIDAO(ElasticSearchClient client) {
         searchRequestBuilder.setQuery(boolQuery);
         searchRequestBuilder.setSize(0);
 
-        TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(InstanceMetricTable.COLUMN_INSTANCE_ID).field(InstanceMetricTable.COLUMN_INSTANCE_ID).size(topN);
+        TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(InstanceMetricTable.COLUMN_INSTANCE_ID).field(InstanceMetricTable.COLUMN_INSTANCE_ID).size(2000);
         aggregationBuilder.subAggregation(AggregationBuilders.sum(InstanceMetricTable.COLUMN_TRANSACTION_CALLS).field(InstanceMetricTable.COLUMN_TRANSACTION_CALLS));
-        aggregationBuilder.subAggregation(AggregationBuilders.sum(InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS).field(InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS));
-
-        Map<String, String> bucketsPathsMap = new HashMap<>();
-        bucketsPathsMap.put(InstanceMetricTable.COLUMN_TRANSACTION_CALLS, InstanceMetricTable.COLUMN_TRANSACTION_CALLS);
-        bucketsPathsMap.put(InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS);
-
-        String idOrCode = "(params." + InstanceMetricTable.COLUMN_TRANSACTION_CALLS + " - params." + InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS + ")"
-            + " / "
-            + "( " + secondBetween + " )";
-        Script script = new Script(idOrCode);
-        aggregationBuilder.subAggregation(PipelineAggregatorBuilders.bucketScript(AVG_TPS, bucketsPathsMap, script));
 
         searchRequestBuilder.addAggregation(aggregationBuilder);
         SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
 
         List<AppServerInfo> appServerInfos = new LinkedList<>();
-        Terms serviceIdTerms = searchResponse.getAggregations().get(InstanceMetricTable.COLUMN_INSTANCE_ID);
-        serviceIdTerms.getBuckets().forEach(serviceIdTerm -> {
-            int instanceId = serviceIdTerm.getKeyAsNumber().intValue();
+        Terms instanceIdTerms = searchResponse.getAggregations().get(InstanceMetricTable.COLUMN_INSTANCE_ID);
+        instanceIdTerms.getBuckets().forEach(instanceIdTerm -> {
+            int instanceId = instanceIdTerm.getKeyAsNumber().intValue();
+            Sum callSum = instanceIdTerm.getAggregations().get(ApplicationMetricTable.COLUMN_TRANSACTION_CALLS);
+            long calls = (long)callSum.getValue();
+            int callsPerSec = (int)(secondBetween == 0 ? 0 : calls / secondBetween);
 
             AppServerInfo appServerInfo = new AppServerInfo();
-            InternalSimpleValue simpleValue = serviceIdTerm.getAggregations().get(AVG_TPS);
-
             appServerInfo.setId(instanceId);
-            appServerInfo.setCallsPerSec((int)simpleValue.getValue());
+            appServerInfo.setCallsPerSec(callsPerSec);
             appServerInfos.add(appServerInfo);
         });
-        return appServerInfos;
+
+        appServerInfos.sort((first, second) -> first.getCallsPerSec() > second.getCallsPerSec() ? -1 : 1);
+        if (appServerInfos.size() <= topN) {
+            return appServerInfos;
+        } else {
+            List<AppServerInfo> newCollection = new LinkedList<>();
+            for (int i = 0; i < topN; i++) {
+                newCollection.add(appServerInfos.get(i));
+            }
+            return newCollection;
+        }
     }
 
     @Override public List<Integer> getServerTPSTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceMetricEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceMetricEsUIDAO.java
index 111355240..677a47cef 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceMetricEsUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceMetricEsUIDAO.java
@@ -19,10 +19,10 @@
 package org.apache.skywalking.apm.collector.storage.es.dao.ui;
 
 import java.util.Collection;
-import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
+import java.util.Set;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.util.Const;
 import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceMetricUIDAO;
@@ -43,21 +43,19 @@
 import org.elasticsearch.action.search.SearchType;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.script.Script;
+import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.aggregations.AggregationBuilders;
 import org.elasticsearch.search.aggregations.bucket.terms.Terms;
 import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
 import org.elasticsearch.search.aggregations.metrics.sum.Sum;
-import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
-import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders;
+import org.elasticsearch.search.sort.SortBuilders;
+import org.elasticsearch.search.sort.SortOrder;
 
 /**
  * @author peng-yongsheng
  */
 public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO {
 
-    private static final String AVG_DURATION = "avg_duration";
-
     public ServiceMetricEsUIDAO(ElasticSearchClient client) {
         super(client);
     }
@@ -177,8 +175,7 @@ public ServiceMetricEsUIDAO(ElasticSearchClient client) {
 
     @Override
     public List<ServiceMetric> getSlowService(int applicationId, Step step, long startTimeBucket, long endTimeBucket,
-        Integer topN,
-        MetricSource metricSource) {
+        Integer topN, MetricSource metricSource) {
         String tableName = TimePyramidTableNameBuilder.build(step, ServiceMetricTable.TABLE);
 
         SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(tableName);
@@ -193,43 +190,30 @@ public ServiceMetricEsUIDAO(ElasticSearchClient client) {
         boolQuery.must().add(QueryBuilders.termQuery(ServiceMetricTable.COLUMN_SOURCE_VALUE, metricSource.getValue()));
 
         searchRequestBuilder.setQuery(boolQuery);
-        searchRequestBuilder.setSize(0);
-
-        TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(ServiceMetricTable.COLUMN_SERVICE_ID).field(ServiceMetricTable.COLUMN_SERVICE_ID).size(topN);
-        aggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceMetricTable.COLUMN_TRANSACTION_CALLS).field(ServiceMetricTable.COLUMN_TRANSACTION_CALLS));
-        aggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS).field(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS));
-        aggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM).field(ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM));
-        aggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM).field(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM));
-
-        Map<String, String> bucketsPathsMap = new HashMap<>();
-        bucketsPathsMap.put(ServiceMetricTable.COLUMN_TRANSACTION_CALLS, ServiceMetricTable.COLUMN_TRANSACTION_CALLS);
-        bucketsPathsMap.put(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS);
-        bucketsPathsMap.put(ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM, ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM);
-        bucketsPathsMap.put(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM, ServiceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM);
-
-        String idOrCode = "(params." + ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM + " - params." + ServiceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM + ")"
-            + " / "
-            + "(params." + ServiceMetricTable.COLUMN_TRANSACTION_CALLS + " - params." + ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS + ")";
-        Script script = new Script(idOrCode);
-        aggregationBuilder.subAggregation(PipelineAggregatorBuilders.bucketScript(AVG_DURATION, bucketsPathsMap, script));
-
-        searchRequestBuilder.addAggregation(aggregationBuilder);
+        searchRequestBuilder.setSize(topN * 60);
+        searchRequestBuilder.addSort(SortBuilders.fieldSort(ServiceMetricTable.COLUMN_TRANSACTION_AVERAGE_DURATION).order(SortOrder.DESC));
         SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
 
-        List<ServiceMetric> serviceMetrics = new LinkedList<>();
-        Terms serviceIdTerms = searchResponse.getAggregations().get(ServiceMetricTable.COLUMN_SERVICE_ID);
-        serviceIdTerms.getBuckets().forEach(serviceIdTerm -> {
-            int serviceId = serviceIdTerm.getKeyAsNumber().intValue();
+        SearchHit[] searchHits = searchResponse.getHits().getHits();
 
-            ServiceMetric serviceMetric = new ServiceMetric();
-            InternalSimpleValue simpleValue = serviceIdTerm.getAggregations().get(AVG_DURATION);
-            Sum calls = serviceIdTerm.getAggregations().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS);
+        Set<Integer> serviceIds = new HashSet<>();
+        List<ServiceMetric> serviceMetrics = new LinkedList<>();
+        for (SearchHit searchHit : searchHits) {
+            int serviceId = ((Number)searchHit.getSource().get(ServiceMetricTable.COLUMN_SERVICE_ID)).intValue();
+            if (!serviceIds.contains(serviceId)) {
+                ServiceMetric serviceMetric = new ServiceMetric();
+                serviceMetric.setId(serviceId);
+                serviceMetric.setCalls(((Number)searchHit.getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue());
+                serviceMetric.setAvgResponseTime(((Number)searchHit.getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_AVERAGE_DURATION)).intValue());
+                serviceMetrics.add(serviceMetric);
+
+                serviceIds.add(serviceId);
+            }
+            if (topN == serviceIds.size()) {
+                break;
+            }
+        }
 
-            serviceMetric.setCalls((long)calls.getValue());
-            serviceMetric.setId(serviceId);
-            serviceMetric.setAvgResponseTime((int)simpleValue.getValue());
-            serviceMetrics.add(serviceMetric);
-        });
         return serviceMetrics;
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services