You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by wu...@apache.org on 2018/09/12 09:35:58 UTC
[incubator-skywalking] branch master updated: Finished topology
query. (#1663)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 23b8d6e Finished topology query. (#1663)
23b8d6e is described below
commit 23b8d6ef41ca470807287219ee99f91ba8905be9
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Wed Sep 12 17:35:54 2018 +0800
Finished topology query. (#1663)
* Finished topology query.
* Fixed the bug of CI failure and startup failure.
---
.../skywalking/oap/server/core/CoreModule.java | 6 +
.../oap/server/core/CoreModuleProvider.java | 3 +
.../server/core/analysis/indicator/Indicator.java | 2 +-
.../manual/service/ServiceComponentIndicator.java | 7 +-
.../manual/service/ServiceMappingIndicator.java | 7 +-
.../oap/server/core/query/TopologyBuilder.java | 203 +++++++++++++++++++++
.../server/core/query/TopologyQueryService.java | 127 +++++++++++++
.../oap/server/core/query/entity}/Call.java | 11 +-
.../oap/server/core/query/entity/IntValues.java} | 10 +-
.../oap/server/core/query/entity}/KVInt.java | 6 +-
.../oap/server/core/query/entity}/Node.java | 8 +-
.../oap/server/core/query/entity}/Step.java | 2 +-
.../oap/server/core/query/entity/Topology.java} | 10 +-
.../oap/server/core/query/sql/Function.java} | 11 +-
.../oap/server/core/query/sql/GroupBy.java} | 15 +-
.../oap/server/core/query/sql/IntKeyValues.java} | 16 +-
.../oap/server/core/query/sql/Where.java} | 12 +-
.../oap/server/core/storage/TimePyramid.java} | 24 ++-
.../core/storage/TimePyramidTableNameBuilder.java | 49 +++++
.../query/IMetricQueryDAO.java} | 26 ++-
.../core/storage/query/IUniqueQueryDAO.java} | 16 +-
.../core/storage/query/OneIdGroupValue.java} | 15 +-
.../oap/server/core/storage/query/TwoIdGroup.java} | 15 +-
.../core/storage/query/TwoIdGroupValue.java} | 16 +-
.../oap/query/graphql/GraphQLQueryProvider.java | 17 +-
.../oap/query/graphql/resolver/MetricQuery.java | 14 +-
.../oap/query/graphql/resolver/TopologyQuery.java | 34 +++-
.../oap/query/graphql/type/Duration.java | 1 +
.../oap/query/graphql/util/DurationUtils.java} | 14 +-
oap-server/server-starter/pom.xml | 4 +-
.../storage-elasticsearch-plugin/pom.xml | 6 +
.../elasticsearch/base/ColumnTypeEsMapping.java | 3 +
.../storage/plugin/elasticsearch/base/EsDAO.java | 24 +++
.../elasticsearch/query/MetricQueryEsDAO.java | 122 +++++++++++++
.../elasticsearch/query/UniqueQueryEsDAO.java | 68 +++++++
.../src/test/resources/log4j2.xml | 31 ++++
36 files changed, 830 insertions(+), 125 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
index 2d12a56..d0b93eb 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core;
import java.util.*;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
+import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
@@ -50,10 +51,15 @@ public class CoreModule extends ModuleDefine {
addInsideService(classes);
addRegisterService(classes);
addCacheService(classes);
+ addQueryService(classes);
return classes.toArray(new Class[] {});
}
+ private void addQueryService(List<Class> classes) {
+ classes.add(TopologyQueryService.class);
+ }
+
private void addServerInterface(List<Class> classes) {
classes.add(GRPCHandlerRegister.class);
classes.add(JettyHandlerRegister.class);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index dc32872..d22648e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -24,6 +24,7 @@ import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.config.*;
+import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
import org.apache.skywalking.oap.server.core.register.annotation.InventoryTypeListener;
import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.core.remote.*;
@@ -108,6 +109,8 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(NetworkAddressInventoryCache.class, new NetworkAddressInventoryCache(getManager()));
this.registerServiceImplementation(INetworkAddressInventoryRegister.class, new NetworkAddressInventoryRegister(getManager()));
+ this.registerServiceImplementation(TopologyQueryService.class, new TopologyQueryService(getManager()));
+
annotationScan.registerListener(storageAnnotationListener);
annotationScan.registerListener(streamAnnotationListener);
annotationScan.registerListener(new IndicatorTypeListener(getManager()));
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
index 933ac59..c56e3b5 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
@@ -28,7 +28,7 @@ import org.apache.skywalking.oap.server.core.storage.annotation.Column;
*/
public abstract class Indicator extends StreamData implements StorageData {
- protected static final String TIME_BUCKET = "time_bucket";
+ public static final String TIME_BUCKET = "time_bucket";
@Getter @Setter @Column(columnName = TIME_BUCKET) private long timeBucket;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceComponentIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceComponentIndicator.java
index 406943c..ef5e558 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceComponentIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceComponentIndicator.java
@@ -33,11 +33,12 @@ import org.apache.skywalking.oap.server.core.storage.annotation.*;
*/
@IndicatorType
@StreamData
-@StorageEntity(name = "service_component", builder = ServiceComponentIndicator.Builder.class)
+@StorageEntity(name = ServiceComponentIndicator.INDEX_NAME, builder = ServiceComponentIndicator.Builder.class)
public class ServiceComponentIndicator extends Indicator {
- private static final String SERVICE_ID = "service_id";
- private static final String COMPONENT_ID = "component_id";
+ public static final String INDEX_NAME = "service_component";
+ public static final String SERVICE_ID = "service_id";
+ public static final String COMPONENT_ID = "component_id";
@Setter @Getter @Column(columnName = SERVICE_ID) private int serviceId;
@Setter @Getter @Column(columnName = COMPONENT_ID) private int componentId;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceMappingIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceMappingIndicator.java
index 808fcae..891057e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceMappingIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceMappingIndicator.java
@@ -33,11 +33,12 @@ import org.apache.skywalking.oap.server.core.storage.annotation.*;
*/
@IndicatorType
@StreamData
-@StorageEntity(name = "service_mapping", builder = ServiceMappingIndicator.Builder.class)
+@StorageEntity(name = ServiceMappingIndicator.INDEX_NAME, builder = ServiceMappingIndicator.Builder.class)
public class ServiceMappingIndicator extends Indicator {
- private static final String SERVICE_ID = "service_id";
- private static final String MAPPING_SERVICE_ID = "mapping_service_id";
+ public static final String INDEX_NAME = "service_mapping";
+ public static final String SERVICE_ID = "service_id";
+ public static final String MAPPING_SERVICE_ID = "mapping_service_id";
@Setter @Getter @Column(columnName = SERVICE_ID) private int serviceId;
@Setter @Getter @Column(columnName = MAPPING_SERVICE_ID) private int mappingServiceId;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyBuilder.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyBuilder.java
new file mode 100644
index 0000000..abdae7c
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyBuilder.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.query;
+
+import java.util.*;
+import org.apache.skywalking.oap.server.core.*;
+import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
+import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
+import org.apache.skywalking.oap.server.core.query.entity.*;
+import org.apache.skywalking.oap.server.core.register.ServiceInventory;
+import org.apache.skywalking.oap.server.core.source.*;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.util.BooleanUtils;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+class TopologyBuilder {
+
+ private static final Logger logger = LoggerFactory.getLogger(TopologyBuilder.class);
+
+ private final ServiceInventoryCache serviceInventoryCache;
+ // private final DateBetweenService dateBetweenService;
+ private final IComponentLibraryCatalogService componentLibraryCatalogService;
+
+ TopologyBuilder(ModuleManager moduleManager) {
+ this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).getService(ServiceInventoryCache.class);
+// this.dateBetweenService = new DateBetweenService(moduleManager);
+ this.componentLibraryCatalogService = moduleManager.find(CoreModule.NAME).getService(IComponentLibraryCatalogService.class);
+ }
+
+ Topology build(List<ServiceComponent> serviceComponents, List<ServiceMapping> serviceMappings,
+ List<Call> serviceRelationClientCalls, List<Call> serviceRelationServerCalls) {
+ Map<Integer, String> nodeCompMap = buildNodeCompMap(serviceComponents);
+ Map<Integer, String> conjecturalNodeCompMap = buildConjecturalNodeCompMap(serviceComponents);
+ Map<Integer, Integer> mappings = changeMapping2Map(serviceMappings);
+ filterZeroSourceOrTargetReference(serviceRelationClientCalls);
+ filterZeroSourceOrTargetReference(serviceRelationServerCalls);
+ serviceRelationServerCalls = serverCallsFilter(serviceRelationServerCalls);
+
+ List<Node> nodes = new LinkedList<>();
+ Map<Integer, Integer> applicationMinuteBetweenMap = new HashMap<>();
+
+ List<Call> calls = new LinkedList<>();
+ Set<Integer> nodeIds = new HashSet<>();
+ serviceRelationClientCalls.forEach(clientCall -> {
+ ServiceInventory source = serviceInventoryCache.get(clientCall.getSource());
+ ServiceInventory target = serviceInventoryCache.get(clientCall.getTarget());
+
+ if (BooleanUtils.valueToBoolean(target.getIsAddress()) && !mappings.containsKey(target.getSequence())) {
+ if (!nodeIds.contains(target.getSequence())) {
+ Node conjecturalNode = new Node();
+ conjecturalNode.setId(target.getSequence());
+ conjecturalNode.setName(target.getName());
+ conjecturalNode.setType(conjecturalNodeCompMap.getOrDefault(target.getSequence(), Const.UNKNOWN));
+ conjecturalNode.setReal(false);
+ nodes.add(conjecturalNode);
+ nodeIds.add(target.getSequence());
+ }
+ }
+
+ Set<Integer> serviceNodeIds = buildNodeIds(nodes);
+ if (!serviceNodeIds.contains(source.getSequence())) {
+ Node serviceNode = new Node();
+ serviceNode.setId(source.getSequence());
+ serviceNode.setName(source.getName());
+ serviceNode.setType(nodeCompMap.getOrDefault(source.getSequence(), Const.UNKNOWN));
+ nodes.add(serviceNode);
+ }
+
+ Call call = new Call();
+ call.setSource(source.getSequence());
+
+ int actualTargetId = mappings.getOrDefault(target.getSequence(), target.getSequence());
+ call.setTarget(actualTargetId);
+ call.setCallType(nodeCompMap.get(clientCall.getTarget()));
+// try {
+// call.setCpm(clientCall.getCalls() / getApplicationMinuteBetween(applicationMinuteBetweenMap, source.getSequence(), startSecondTimeBucket, endSecondTimeBucket));
+// } catch (ParseException e) {
+// logger.error(e.getMessage(), e);
+// }
+ calls.add(call);
+ });
+
+ serviceRelationServerCalls.forEach(referenceMetric -> {
+ ServiceInventory source = serviceInventoryCache.get(referenceMetric.getSource());
+ ServiceInventory target = serviceInventoryCache.get(referenceMetric.getTarget());
+
+ if (source.getSequence() == Const.NONE_SERVICE_ID) {
+ if (!nodeIds.contains(source.getSequence())) {
+ Node visualUserNode = new Node();
+ visualUserNode.setId(source.getSequence());
+ visualUserNode.setName(Const.USER_CODE);
+ visualUserNode.setType(Const.USER_CODE.toUpperCase());
+ nodes.add(visualUserNode);
+ nodeIds.add(source.getSequence());
+ }
+ }
+
+ if (BooleanUtils.valueToBoolean(source.getIsAddress())) {
+ if (!nodeIds.contains(source.getSequence())) {
+ Node conjecturalNode = new Node();
+ conjecturalNode.setId(source.getSequence());
+ conjecturalNode.setName(source.getName());
+ conjecturalNode.setType(conjecturalNodeCompMap.getOrDefault(target.getSequence(), Const.UNKNOWN));
+ nodeIds.add(source.getSequence());
+ nodes.add(conjecturalNode);
+ }
+ }
+
+ Call call = new Call();
+ call.setSource(source.getSequence());
+ call.setTarget(target.getSequence());
+
+ if (source.getSequence() == Const.NONE_SERVICE_ID) {
+ call.setCallType(Const.EMPTY_STRING);
+ } else {
+ call.setCallType(nodeCompMap.get(referenceMetric.getTarget()));
+ }
+// try {
+// call.setCpm(referenceMetric.getCalls() / getApplicationMinuteBetween(applicationMinuteBetweenMap, target.getSequence(), startSecondTimeBucket, endSecondTimeBucket));
+// } catch (ParseException e) {
+// logger.error(e.getMessage(), e);
+// }
+ calls.add(call);
+ });
+
+ Topology topology = new Topology();
+ topology.getCalls().addAll(calls);
+ topology.getNodes().addAll(nodes);
+ return topology;
+ }
+
+ private Set<Integer> buildNodeIds(List<Node> nodes) {
+ Set<Integer> nodeIds = new HashSet<>();
+ nodes.forEach(node -> nodeIds.add(node.getId()));
+ return nodeIds;
+ }
+
+ private List<Call> serverCallsFilter(List<Call> serviceRelationServerCalls) {
+ List<Call> filteredCalls = new LinkedList<>();
+
+ serviceRelationServerCalls.forEach(serverCall -> {
+ ServiceInventory source = serviceInventoryCache.get(serverCall.getSource());
+ if (BooleanUtils.valueToBoolean(source.getIsAddress()) || source.getSequence() == Const.NONE_SERVICE_ID) {
+ filteredCalls.add(serverCall);
+ }
+ });
+
+ return filteredCalls;
+ }
+
+ private Map<Integer, Integer> changeMapping2Map(List<ServiceMapping> serviceMappings) {
+ Map<Integer, Integer> mappings = new HashMap<>();
+ serviceMappings.forEach(serviceMapping -> mappings.put(serviceMapping.getMappingServiceId(), serviceMapping.getServiceId()));
+ return mappings;
+ }
+
+ private Map<Integer, String> buildConjecturalNodeCompMap(List<ServiceComponent> serviceComponents) {
+ Map<Integer, String> components = new HashMap<>();
+ serviceComponents.forEach(serviceComponent -> {
+ int componentServerId = this.componentLibraryCatalogService.getServerIdBasedOnComponent(serviceComponent.getComponentId());
+ String componentName = this.componentLibraryCatalogService.getServerName(componentServerId);
+ components.put(serviceComponent.getServiceId(), componentName);
+ });
+ return components;
+ }
+
+ private Map<Integer, String> buildNodeCompMap(List<ServiceComponent> serviceComponents) {
+ Map<Integer, String> components = new HashMap<>();
+ serviceComponents.forEach(serviceComponent -> {
+ String componentName = this.componentLibraryCatalogService.getComponentName(serviceComponent.getComponentId());
+ components.put(serviceComponent.getServiceId(), componentName);
+ });
+ return components;
+ }
+
+ private void filterZeroSourceOrTargetReference(List<Call> serviceRelationClientCalls) {
+ for (int i = serviceRelationClientCalls.size() - 1; i >= 0; i--) {
+ Call call = serviceRelationClientCalls.get(i);
+ if (call.getSource() == 0 || call.getTarget() == 0) {
+ serviceRelationClientCalls.remove(i);
+ }
+ }
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java
new file mode 100644
index 0000000..57993dd
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.query;
+
+import java.io.IOException;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.analysis.manual.service.*;
+import org.apache.skywalking.oap.server.core.query.entity.*;
+import org.apache.skywalking.oap.server.core.query.sql.*;
+import org.apache.skywalking.oap.server.core.source.*;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
+import org.apache.skywalking.oap.server.core.storage.query.*;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.library.module.Service;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class TopologyQueryService implements Service {
+
+ private static final Logger logger = LoggerFactory.getLogger(TopologyQueryService.class);
+
+ private final ModuleManager moduleManager;
+ private IMetricQueryDAO metricQueryDAO;
+ private IUniqueQueryDAO uniqueQueryDAO;
+
+ public TopologyQueryService(ModuleManager moduleManager) {
+ this.moduleManager = moduleManager;
+ }
+
+ private IMetricQueryDAO getMetricQueryDAO() {
+ if (metricQueryDAO == null) {
+ metricQueryDAO = moduleManager.find(StorageModule.NAME).getService(IMetricQueryDAO.class);
+ }
+ return metricQueryDAO;
+ }
+
+ private IUniqueQueryDAO getUniqueQueryDAO() {
+ if (uniqueQueryDAO == null) {
+ uniqueQueryDAO = moduleManager.find(StorageModule.NAME).getService(IUniqueQueryDAO.class);
+ }
+ return uniqueQueryDAO;
+ }
+
+ public Topology getGlobalTopology(final Step step, final long startTB, final long endTB) throws IOException {
+ logger.debug("step: {}, startTimeBucket: {}, endTimeBucket: {}", step, startTB, endTB);
+ List<ServiceComponent> serviceComponents = loadServiceComponent(step, startTB, endTB);
+ List<ServiceMapping> serviceMappings = loadServiceMapping(step, startTB, endTB);
+
+ List<Call> serviceRelationClientCalls = loadServiceRelationCalls(step, startTB, endTB, "service_relation_client_calls_sum");
+ List<Call> serviceRelationServerCalls = loadServiceRelationCalls(step, startTB, endTB, "service_relation_server_calls_sum");
+
+ TopologyBuilder builder = new TopologyBuilder(moduleManager);
+ return builder.build(serviceComponents, serviceMappings, serviceRelationClientCalls, serviceRelationServerCalls);
+ }
+
+ public Topology getServiceTopology(final Step step, final long startTimeBucket, final long endTimeBucket,
+ final String serviceId) {
+ return new Topology();
+ }
+
+ private List<ServiceComponent> loadServiceComponent(final Step step, final long startTB,
+ final long endTB) throws IOException {
+ List<TwoIdGroup> twoIdGroups = getUniqueQueryDAO().aggregation(ServiceComponentIndicator.INDEX_NAME, step, startTB, endTB,
+ new Where(), ServiceComponentIndicator.SERVICE_ID, ServiceComponentIndicator.COMPONENT_ID);
+
+ List<ServiceComponent> serviceComponents = new ArrayList<>();
+ twoIdGroups.forEach(twoIdGroup -> {
+ ServiceComponent serviceComponent = new ServiceComponent();
+ serviceComponent.setServiceId(twoIdGroup.getId1());
+ serviceComponent.setComponentId(twoIdGroup.getId2());
+ serviceComponents.add(serviceComponent);
+ });
+
+ return serviceComponents;
+ }
+
+ private List<ServiceMapping> loadServiceMapping(final Step step, final long startTB,
+ final long endTB) throws IOException {
+ List<TwoIdGroup> twoIdGroups = getUniqueQueryDAO().aggregation(ServiceMappingIndicator.INDEX_NAME, step, startTB, endTB,
+ new Where(), ServiceMappingIndicator.SERVICE_ID, ServiceMappingIndicator.MAPPING_SERVICE_ID);
+
+ List<ServiceMapping> serviceMappings = new ArrayList<>();
+ twoIdGroups.forEach(twoIdGroup -> {
+ ServiceMapping serviceMapping = new ServiceMapping();
+ serviceMapping.setServiceId(twoIdGroup.getId1());
+ serviceMapping.setMappingServiceId(twoIdGroup.getId2());
+ serviceMappings.add(serviceMapping);
+ });
+
+ return serviceMappings;
+ }
+
+ private List<Call> loadServiceRelationCalls(final Step step, final long startTB, final long endTB,
+ String indName) throws IOException {
+ List<TwoIdGroupValue> twoIdGroupValues = getMetricQueryDAO().aggregation(indName, step, startTB, endTB, new Where(), "source_service_id", "dest_service_id", "value", Function.Sum);
+
+ List<Call> clientCalls = new ArrayList<>();
+
+ twoIdGroupValues.forEach(twoIdGroupValue -> {
+ Call call = new Call();
+ call.setSource(twoIdGroupValue.getId1());
+ call.setTarget(twoIdGroupValue.getId2());
+ call.setCalls(twoIdGroupValue.getValue().longValue());
+ clientCalls.add(call);
+ });
+
+ return clientCalls;
+ }
+}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Call.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Call.java
similarity index 84%
rename from oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Call.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Call.java
index 7a97df8..7a2f807 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Call.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Call.java
@@ -16,11 +16,16 @@
*
*/
-package org.apache.skywalking.oap.query.graphql.type;
+package org.apache.skywalking.oap.server.core.query.entity;
+import lombok.*;
+
+@Getter
+@Setter
public class Call {
- private String source;
- private String target;
+ private int source;
+ private int target;
private String callType;
+ private long calls;
private long cpm;
}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Topology.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/IntValues.java
similarity index 81%
rename from oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Topology.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/IntValues.java
index 8879ddc..70609c5 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Topology.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/IntValues.java
@@ -16,11 +16,11 @@
*
*/
-package org.apache.skywalking.oap.query.graphql.type;
+package org.apache.skywalking.oap.server.core.query.entity;
-import java.util.List;
+import java.util.*;
+import lombok.Getter;
-public class Topology {
- private List<Node> nodes;
- private List<Call> calls;
+public class IntValues {
+ @Getter private List<KVInt> values = new LinkedList<>();
}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/KVInt.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/KVInt.java
similarity index 90%
rename from oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/KVInt.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/KVInt.java
index 266fb60..83d9680 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/KVInt.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/KVInt.java
@@ -16,8 +16,12 @@
*
*/
-package org.apache.skywalking.oap.query.graphql.type;
+package org.apache.skywalking.oap.server.core.query.entity;
+import lombok.*;
+
+@Setter
+@Getter
public class KVInt {
private String id;
private int value;
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Node.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Node.java
similarity index 88%
rename from oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Node.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Node.java
index 4ab47b2..c230232 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Node.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Node.java
@@ -16,10 +16,14 @@
*
*/
-package org.apache.skywalking.oap.query.graphql.type;
+package org.apache.skywalking.oap.server.core.query.entity;
+import lombok.*;
+
+@Getter
+@Setter
public class Node {
- private String id;
+ private int id;
private String name;
private String type;
private boolean isReal;
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Step.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Step.java
similarity index 93%
rename from oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Step.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Step.java
index 44f81fb..7fed3a5 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Step.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Step.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.oap.query.graphql.type;
+package org.apache.skywalking.oap.server.core.query.entity;
public enum Step {
MONTH,
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Topology.java
similarity index 80%
copy from oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Topology.java
index 5d9cb09..c76261d 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Topology.java
@@ -16,13 +16,13 @@
*
*/
-package org.apache.skywalking.oap.query.graphql.type;
+package org.apache.skywalking.oap.server.core.query.entity;
+import java.util.*;
import lombok.Getter;
@Getter
-public class Duration {
- private String start;
- private String end;
- private Step step;
+public class Topology {
+ private List<Node> nodes = new ArrayList<>();
+ private List<Call> calls = new ArrayList<>();
}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/LinearIntValues.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/Function.java
similarity index 85%
rename from oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/LinearIntValues.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/Function.java
index 0002f0a..bce985e 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/LinearIntValues.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/Function.java
@@ -16,10 +16,11 @@
*
*/
-package org.apache.skywalking.oap.query.graphql.type;
+package org.apache.skywalking.oap.server.core.query.sql;
-import java.util.List;
-
-public class LinearIntValues {
- private List<KVInt> values;
+/**
+ * @author peng-yongsheng
+ */
+public enum Function {
+ Avg, Sum
}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/GroupBy.java
similarity index 80%
copy from oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/GroupBy.java
index 5d9cb09..76923f5 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/GroupBy.java
@@ -16,13 +16,16 @@
*
*/
-package org.apache.skywalking.oap.query.graphql.type;
+package org.apache.skywalking.oap.server.core.query.sql;
-import lombok.Getter;
+import lombok.*;
+/**
+ * @author peng-yongsheng
+ */
@Getter
-public class Duration {
- private String start;
- private String end;
- private Step step;
+@Setter
+public class GroupBy {
+ private String columnOne;
+ private String columnTwo;
}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/IntKeyValues.java
similarity index 75%
copy from oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/IntKeyValues.java
index 5d9cb09..c0a2745 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/IntKeyValues.java
@@ -16,13 +16,15 @@
*
*/
-package org.apache.skywalking.oap.query.graphql.type;
+package org.apache.skywalking.oap.server.core.query.sql;
-import lombok.Getter;
+import java.util.*;
+import lombok.*;
-@Getter
-public class Duration {
- private String start;
- private String end;
- private Step step;
+/**
+ * @author peng-yongsheng
+ */
+public class IntKeyValues {
+ @Getter @Setter private String key;
+ @Getter private List<Integer> values = new LinkedList<>();
}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/Where.java
similarity index 81%
copy from oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/Where.java
index 5d9cb09..62672a5 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/Where.java
@@ -16,13 +16,15 @@
*
*/
-package org.apache.skywalking.oap.query.graphql.type;
+package org.apache.skywalking.oap.server.core.query.sql;
+import java.util.*;
import lombok.Getter;
+/**
+ * @author peng-yongsheng
+ */
@Getter
-public class Duration {
- private String start;
- private String end;
- private Step step;
+public class Where {
+ private List<IntKeyValues> keyValues = new LinkedList<>();
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/TimePyramid.java
similarity index 65%
copy from oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/TimePyramid.java
index e7c8138..786167e 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/TimePyramid.java
@@ -16,17 +16,27 @@
*
*/
-package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
-
-import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
-import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+package org.apache.skywalking.oap.server.core.storage;
/**
* @author peng-yongsheng
*/
-public abstract class EsDAO extends AbstractDAO<ElasticSearchClient> {
+public enum TimePyramid {
+ Second(0, "second"), Minute(1, "minute"), Hour(2, "hour"), Day(3, "day"), Month(4, "month");
+
+ private final int value;
+ private final String name;
+
+ TimePyramid(int value, String name) {
+ this.value = value;
+ this.name = name;
+ }
+
+ public int getValue() {
+ return value;
+ }
- public EsDAO(ElasticSearchClient client) {
- super(client);
+ public String getName() {
+ return name;
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/TimePyramidTableNameBuilder.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/TimePyramidTableNameBuilder.java
new file mode 100644
index 0000000..b413e9a
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/TimePyramidTableNameBuilder.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage;
+
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.query.entity.Step;
+
+/**
+ * @author peng-yongsheng
+ */
+public class TimePyramidTableNameBuilder {
+
+ private TimePyramidTableNameBuilder() {
+ }
+
+ public static String build(Step step, String tableName) {
+ switch (step) {
+ case MONTH:
+ tableName = tableName + Const.ID_SPLIT + TimePyramid.Month.getName();
+ break;
+ case DAY:
+ tableName = tableName + Const.ID_SPLIT + TimePyramid.Day.getName();
+ break;
+ case HOUR:
+ tableName = tableName + Const.ID_SPLIT + TimePyramid.Hour.getName();
+ break;
+ case MINUTE:
+ tableName = tableName + Const.ID_SPLIT + TimePyramid.Minute.getName();
+ break;
+ }
+ return tableName;
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetricQueryDAO.java
similarity index 53%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetricQueryDAO.java
index 933ac59..7165069 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetricQueryDAO.java
@@ -16,25 +16,23 @@
*
*/
-package org.apache.skywalking.oap.server.core.analysis.indicator;
+package org.apache.skywalking.oap.server.core.storage.query;
-import lombok.*;
-import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-import org.apache.skywalking.oap.server.core.storage.StorageData;
-import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import java.io.IOException;
+import java.util.List;
+import org.apache.skywalking.oap.server.core.query.entity.Step;
+import org.apache.skywalking.oap.server.core.query.sql.*;
+import org.apache.skywalking.oap.server.core.storage.DAO;
/**
* @author peng-yongsheng
*/
-public abstract class Indicator extends StreamData implements StorageData {
+public interface IMetricQueryDAO extends DAO {
- protected static final String TIME_BUCKET = "time_bucket";
+ List<OneIdGroupValue> aggregation(String indName, Step step, long startTB,
+ long endTB, Where where, String idCName, String valueCName, Function function) throws IOException;
- @Getter @Setter @Column(columnName = TIME_BUCKET) private long timeBucket;
-
- public abstract String id();
-
- public abstract void combine(Indicator indicator);
-
- public abstract void calculate();
+ List<TwoIdGroupValue> aggregation(String indName, Step step, long startTB,
+ long endTB, Where where, String idCName1, String idCName2, String valueCName,
+ Function function) throws IOException;
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IUniqueQueryDAO.java
similarity index 62%
copy from oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IUniqueQueryDAO.java
index e7c8138..8ba1fc1 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IUniqueQueryDAO.java
@@ -16,17 +16,19 @@
*
*/
-package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+package org.apache.skywalking.oap.server.core.storage.query;
-import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
-import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import java.io.IOException;
+import java.util.List;
+import org.apache.skywalking.oap.server.core.query.entity.Step;
+import org.apache.skywalking.oap.server.core.query.sql.Where;
+import org.apache.skywalking.oap.server.core.storage.DAO;
/**
* @author peng-yongsheng
*/
-public abstract class EsDAO extends AbstractDAO<ElasticSearchClient> {
+public interface IUniqueQueryDAO extends DAO {
- public EsDAO(ElasticSearchClient client) {
- super(client);
- }
+ List<TwoIdGroup> aggregation(String indName, Step step, long startTB,
+ long endTB, Where where, String idCName1, String idCName2) throws IOException;
}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/OneIdGroupValue.java
similarity index 80%
copy from oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/OneIdGroupValue.java
index 5d9cb09..bc1370a 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/OneIdGroupValue.java
@@ -16,13 +16,16 @@
*
*/
-package org.apache.skywalking.oap.query.graphql.type;
+package org.apache.skywalking.oap.server.core.storage.query;
-import lombok.Getter;
+import lombok.*;
+/**
+ * @author peng-yongsheng
+ */
@Getter
-public class Duration {
- private String start;
- private String end;
- private Step step;
+@Setter
+public class OneIdGroupValue {
+ private int id;
+ private Number value;
}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/TwoIdGroup.java
similarity index 81%
copy from oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/TwoIdGroup.java
index 5d9cb09..1638092 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/TwoIdGroup.java
@@ -16,13 +16,16 @@
*
*/
-package org.apache.skywalking.oap.query.graphql.type;
+package org.apache.skywalking.oap.server.core.storage.query;
-import lombok.Getter;
+import lombok.*;
+/**
+ * @author peng-yongsheng
+ */
@Getter
-public class Duration {
- private String start;
- private String end;
- private Step step;
+@Setter
+public class TwoIdGroup {
+ private int id1;
+ private int id2;
}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/TwoIdGroupValue.java
similarity index 78%
copy from oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/TwoIdGroupValue.java
index 5d9cb09..add34d6 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/TwoIdGroupValue.java
@@ -16,13 +16,17 @@
*
*/
-package org.apache.skywalking.oap.query.graphql.type;
+package org.apache.skywalking.oap.server.core.storage.query;
-import lombok.Getter;
+import lombok.*;
+/**
+ * @author peng-yongsheng
+ */
@Getter
-public class Duration {
- private String start;
- private String end;
- private Step step;
+@Setter
+public class TwoIdGroupValue {
+ private int id1;
+ private int id2;
+ private Number value;
}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/GraphQLQueryProvider.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/GraphQLQueryProvider.java
index b2c6740..4525fe4 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/GraphQLQueryProvider.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/GraphQLQueryProvider.java
@@ -21,22 +21,11 @@ package org.apache.skywalking.oap.query.graphql;
import com.coxautodev.graphql.tools.SchemaParser;
import graphql.GraphQL;
import graphql.schema.GraphQLSchema;
-import org.apache.skywalking.oap.query.graphql.resolver.AggregationQuery;
-import org.apache.skywalking.oap.query.graphql.resolver.AlarmQuery;
-import org.apache.skywalking.oap.query.graphql.resolver.MetadataQuery;
-import org.apache.skywalking.oap.query.graphql.resolver.MetricQuery;
-import org.apache.skywalking.oap.query.graphql.resolver.Mutation;
-import org.apache.skywalking.oap.query.graphql.resolver.Query;
-import org.apache.skywalking.oap.query.graphql.resolver.TopologyQuery;
-import org.apache.skywalking.oap.query.graphql.resolver.TraceQuery;
+import org.apache.skywalking.oap.query.graphql.resolver.*;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.query.QueryModule;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
-import org.apache.skywalking.oap.server.library.module.ModuleProvider;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.apache.skywalking.oap.server.library.module.*;
/**
* GraphQL query provider.
@@ -70,7 +59,7 @@ public class GraphQLQueryProvider extends ModuleProvider {
.file("query-protocol/metric.graphqls")
.resolvers(new MetricQuery())
.file("query-protocol/topology.graphqls")
- .resolvers(new TopologyQuery())
+ .resolvers(new TopologyQuery(getManager()))
.file("query-protocol/trace.graphqls")
.resolvers(new TraceQuery())
.file("query-protocol/aggregation.graphqls")
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetricQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetricQuery.java
index f9ee63c..7744463 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetricQuery.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetricQuery.java
@@ -20,17 +20,19 @@ package org.apache.skywalking.oap.query.graphql.resolver;
import com.coxautodev.graphql.tools.GraphQLQueryResolver;
import org.apache.skywalking.oap.query.graphql.type.*;
+import org.apache.skywalking.oap.server.core.query.entity.IntValues;
public class MetricQuery implements GraphQLQueryResolver {
- public LinearIntValues getLinearIntValues(final MetricCondition metric, final Duration duration) {
- return new LinearIntValues();
+
+ public IntValues getValues(final BatchMetricConditions metric, final Duration duration) {
+ return new IntValues();
}
- public Thermodynamic getThermodynamic(final MetricCondition metric, final Duration duration) {
- return new Thermodynamic();
+ public IntValues getLinearIntValues(final MetricCondition metric, final Duration duration) {
+ return new IntValues();
}
- public LinearIntValues getValues(final BatchMetricConditions metric, final Duration duration) {
- return new LinearIntValues();
+ public Thermodynamic getThermodynamic(final MetricCondition metric, final Duration duration) {
+ return new Thermodynamic();
}
}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
index f5c7e36..501513f 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
@@ -19,15 +19,41 @@
package org.apache.skywalking.oap.query.graphql.resolver;
import com.coxautodev.graphql.tools.GraphQLQueryResolver;
+import java.io.IOException;
import org.apache.skywalking.oap.query.graphql.type.Duration;
-import org.apache.skywalking.oap.query.graphql.type.Topology;
+import org.apache.skywalking.oap.query.graphql.util.DurationUtils;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
+import org.apache.skywalking.oap.server.core.query.entity.Topology;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
public class TopologyQuery implements GraphQLQueryResolver {
- public Topology getGlobalTopology(final Duration duration) {
- return new Topology();
+
+ private final ModuleManager moduleManager;
+ private TopologyQueryService queryService;
+
+ public TopologyQuery(ModuleManager moduleManager) {
+ this.moduleManager = moduleManager;
+ }
+
+ private TopologyQueryService getQueryService() {
+ if (queryService == null) {
+ this.queryService = moduleManager.find(CoreModule.NAME).getService(TopologyQueryService.class);
+ }
+ return queryService;
+ }
+
+ public Topology getGlobalTopology(final Duration duration) throws IOException {
+ long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
+ long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
+
+ return getQueryService().getGlobalTopology(duration.getStep(), startTimeBucket, endTimeBucket);
}
public Topology getServiceTopology(final String serviceId, final Duration duration) {
- return new Topology();
+ long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
+ long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
+
+ return getQueryService().getServiceTopology(duration.getStep(), startTimeBucket, endTimeBucket, serviceId);
}
}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
index 5d9cb09..b73142b 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.query.graphql.type;
import lombok.Getter;
+import org.apache.skywalking.oap.server.core.query.entity.Step;
@Getter
public class Duration {
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/util/DurationUtils.java
similarity index 69%
copy from oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
copy to oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/util/DurationUtils.java
index e7c8138..ec83b06 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/util/DurationUtils.java
@@ -16,17 +16,19 @@
*
*/
-package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+package org.apache.skywalking.oap.query.graphql.util;
-import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
-import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.core.Const;
/**
* @author peng-yongsheng
*/
-public abstract class EsDAO extends AbstractDAO<ElasticSearchClient> {
+public enum DurationUtils {
+ INSTANCE;
- public EsDAO(ElasticSearchClient client) {
- super(client);
+ public long exchangeToTimeBucket(String dateStr) {
+ dateStr = dateStr.replaceAll("-", Const.EMPTY_STRING);
+ dateStr = dateStr.replaceAll(" ", Const.EMPTY_STRING);
+ return Long.valueOf(dateStr);
}
}
diff --git a/oap-server/server-starter/pom.xml b/oap-server/server-starter/pom.xml
index bb89bbc..b4845a7 100644
--- a/oap-server/server-starter/pom.xml
+++ b/oap-server/server-starter/pom.xml
@@ -95,13 +95,13 @@
</dependency>
<!-- storage module -->
- <!-- query module -->
+ <!-- queryBuild module -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>query-graphql-plugin</artifactId>
<version>${project.version}</version>
</dependency>
- <!-- query module -->
+ <!-- queryBuild module -->
<!-- alarm module -->
<dependency>
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/pom.xml b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/pom.xml
index 6c93cb6..737e4ff 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/pom.xml
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/pom.xml
@@ -41,5 +41,11 @@
<artifactId>library-client</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>fr.pilato.elasticsearch.testcontainers</groupId>
+ <artifactId>testcontainers-elasticsearch</artifactId>
+ <version>0.1</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
index c7ceeed..0647eea 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+import java.util.List;
import org.apache.skywalking.oap.server.core.storage.model.DataTypeMapping;
/**
@@ -34,6 +35,8 @@ public class ColumnTypeEsMapping implements DataTypeMapping {
return "double";
} else if (String.class.equals(type)) {
return "keyword";
+ } else if (List.class.equals(type)) {
+ return "keyword";
} else {
throw new IllegalArgumentException("Unsupported data type: " + type.getName());
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
index e7c8138..fcc4a1e 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
@@ -18,8 +18,12 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.query.sql.Where;
import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.elasticsearch.index.query.*;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
/**
* @author peng-yongsheng
@@ -29,4 +33,24 @@ public abstract class EsDAO extends AbstractDAO<ElasticSearchClient> {
public EsDAO(ElasticSearchClient client) {
super(client);
}
+
+ public void queryBuild(SearchSourceBuilder sourceBuilder, Where where, long startTB, long endTB) {
+ RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(Indicator.TIME_BUCKET).gte(startTB).lte(endTB);
+ if (where.getKeyValues().isEmpty()) {
+ sourceBuilder.query(rangeQueryBuilder);
+ } else {
+ BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
+ boolQuery.must().add(rangeQueryBuilder);
+
+ where.getKeyValues().forEach(keyValues -> {
+ if (keyValues.getValues().size() > 1) {
+ boolQuery.must().add(QueryBuilders.termsQuery(keyValues.getKey(), keyValues.getValues()));
+ } else {
+ boolQuery.must().add(QueryBuilders.termQuery(keyValues.getKey(), keyValues.getValues().get(0)));
+ }
+ });
+ sourceBuilder.query(boolQuery);
+ }
+ sourceBuilder.size(0);
+ }
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricQueryEsDAO.java
new file mode 100644
index 0000000..bb90cbf
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricQueryEsDAO.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
+
+import java.io.IOException;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.query.entity.Step;
+import org.apache.skywalking.oap.server.core.query.sql.*;
+import org.apache.skywalking.oap.server.core.storage.TimePyramidTableNameBuilder;
+import org.apache.skywalking.oap.server.core.storage.query.*;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+import org.elasticsearch.search.aggregations.bucket.terms.*;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+
+/**
+ * @author peng-yongsheng
+ */
+public class MetricQueryEsDAO extends EsDAO implements IMetricQueryDAO {
+
+ public MetricQueryEsDAO(ElasticSearchClient client) {
+ super(client);
+ }
+
+ public List<OneIdGroupValue> aggregation(String indName, Step step, long startTB,
+ long endTB, Where where, String idCName, String valueCName, Function function) throws IOException {
+ String indexName = TimePyramidTableNameBuilder.build(step, indName);
+
+ SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+ queryBuild(sourceBuilder, where, startTB, endTB);
+
+ TermsAggregationBuilder aggIdCName1 = AggregationBuilders.terms(idCName).field(idCName).size(1000);
+ functionAggregation(function, aggIdCName1, valueCName);
+
+ sourceBuilder.aggregation(aggIdCName1);
+
+ SearchResponse response = getClient().search(indexName, sourceBuilder);
+
+ List<OneIdGroupValue> values = new ArrayList<>();
+ Terms idTerms = response.getAggregations().get(idCName);
+ for (Terms.Bucket idBucket : idTerms.getBuckets()) {
+ Terms valueTerms = idBucket.getAggregations().get(valueCName);
+ for (Terms.Bucket valueBucket : valueTerms.getBuckets()) {
+ OneIdGroupValue value = new OneIdGroupValue();
+ value.setId(idBucket.getKeyAsNumber().intValue());
+ value.setValue(valueBucket.getKeyAsNumber());
+ values.add(value);
+ }
+ }
+ return values;
+ }
+
+ public List<TwoIdGroupValue> aggregation(String indName, Step step, long startTB,
+ long endTB, Where where, String idCName1, String idCName2, String valueCName,
+ Function function) throws IOException {
+ String indexName = TimePyramidTableNameBuilder.build(step, indName);
+
+ SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+ queryBuild(sourceBuilder, where, startTB, endTB);
+
+ sourceBuilder.aggregation(
+ AggregationBuilders.terms(idCName1).field(idCName1).size(1000)
+ .subAggregation(AggregationBuilders.terms(idCName2).field(idCName2).size(1000)
+ .subAggregation(AggregationBuilders.avg(valueCName).field(valueCName)))
+ );
+
+ TermsAggregationBuilder aggIdCName1 = AggregationBuilders.terms(idCName1).field(idCName1).size(1000);
+ TermsAggregationBuilder aggIdCName2 = AggregationBuilders.terms(idCName2).field(idCName2).size(1000);
+ aggIdCName1.subAggregation(aggIdCName2);
+ functionAggregation(function, aggIdCName2, valueCName);
+
+ sourceBuilder.aggregation(aggIdCName1);
+
+ SearchResponse response = getClient().search(indexName, sourceBuilder);
+
+ List<TwoIdGroupValue> values = new ArrayList<>();
+ Terms id1Terms = response.getAggregations().get(idCName1);
+ for (Terms.Bucket id1Bucket : id1Terms.getBuckets()) {
+ Terms id2Terms = id1Bucket.getAggregations().get(idCName2);
+ for (Terms.Bucket id2Bucket : id2Terms.getBuckets()) {
+ Terms valueTerms = id1Bucket.getAggregations().get(valueCName);
+ for (Terms.Bucket valueBucket : valueTerms.getBuckets()) {
+ TwoIdGroupValue value = new TwoIdGroupValue();
+ value.setId1(id1Bucket.getKeyAsNumber().intValue());
+ value.setId1(id2Bucket.getKeyAsNumber().intValue());
+ value.setValue(valueBucket.getKeyAsNumber());
+ values.add(value);
+ }
+ }
+ }
+ return values;
+ }
+
+ private void functionAggregation(Function function, TermsAggregationBuilder parentAggBuilder, String valueCName) {
+ switch (function) {
+ case Avg:
+ parentAggBuilder.subAggregation(AggregationBuilders.avg(valueCName).field(valueCName));
+ break;
+ case Sum:
+ parentAggBuilder.subAggregation(AggregationBuilders.sum(valueCName).field(valueCName));
+ break;
+ }
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/UniqueQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/UniqueQueryEsDAO.java
new file mode 100644
index 0000000..ef3b4ff
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/UniqueQueryEsDAO.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
+
+import java.io.IOException;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.query.entity.Step;
+import org.apache.skywalking.oap.server.core.query.sql.Where;
+import org.apache.skywalking.oap.server.core.storage.TimePyramidTableNameBuilder;
+import org.apache.skywalking.oap.server.core.storage.query.*;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+import org.elasticsearch.search.aggregations.bucket.terms.Terms;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+
+/**
+ * @author peng-yongsheng
+ */
+public class UniqueQueryEsDAO extends EsDAO implements IUniqueQueryDAO {
+
+ public UniqueQueryEsDAO(ElasticSearchClient client) {
+ super(client);
+ }
+
+ @Override public List<TwoIdGroup> aggregation(String indName, Step step, long startTB, long endTB, Where where,
+ String idCName1, String idCName2) throws IOException {
+ String indexName = TimePyramidTableNameBuilder.build(step, indName);
+
+ SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+ queryBuild(sourceBuilder, where, startTB, endTB);
+
+ sourceBuilder.aggregation(AggregationBuilders.terms(idCName1).field(idCName1).size(1000)
+ .subAggregation(AggregationBuilders.terms(idCName2).field(idCName2).size(1000)));
+
+ SearchResponse response = getClient().search(indexName, sourceBuilder);
+
+ List<TwoIdGroup> values = new ArrayList<>();
+ Terms id1Terms = response.getAggregations().get(idCName1);
+ for (Terms.Bucket id1Bucket : id1Terms.getBuckets()) {
+ Terms id2Terms = id1Bucket.getAggregations().get(idCName2);
+ for (Terms.Bucket id2Bucket : id2Terms.getBuckets()) {
+ TwoIdGroup value = new TwoIdGroup();
+ value.setId1(id1Bucket.getKeyAsNumber().intValue());
+ value.setId2(id2Bucket.getKeyAsNumber().intValue());
+ values.add(value);
+ }
+ }
+ return values;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/resources/log4j2.xml b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..6eb5b3f
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/resources/log4j2.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+<Configuration status="DEBUG">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="DEBUG">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>