You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by wu...@apache.org on 2018/09/12 09:35:58 UTC

[incubator-skywalking] branch master updated: Finished topology query. (#1663)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 23b8d6e  Finished topology query. (#1663)
23b8d6e is described below

commit 23b8d6ef41ca470807287219ee99f91ba8905be9
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Wed Sep 12 17:35:54 2018 +0800

    Finished topology query. (#1663)
    
    * Finished topology query.
    
    * Fixed the bug of CI failure and startup failure.
---
 .../skywalking/oap/server/core/CoreModule.java     |   6 +
 .../oap/server/core/CoreModuleProvider.java        |   3 +
 .../server/core/analysis/indicator/Indicator.java  |   2 +-
 .../manual/service/ServiceComponentIndicator.java  |   7 +-
 .../manual/service/ServiceMappingIndicator.java    |   7 +-
 .../oap/server/core/query/TopologyBuilder.java     | 203 +++++++++++++++++++++
 .../server/core/query/TopologyQueryService.java    | 127 +++++++++++++
 .../oap/server/core/query/entity}/Call.java        |  11 +-
 .../oap/server/core/query/entity/IntValues.java}   |  10 +-
 .../oap/server/core/query/entity}/KVInt.java       |   6 +-
 .../oap/server/core/query/entity}/Node.java        |   8 +-
 .../oap/server/core/query/entity}/Step.java        |   2 +-
 .../oap/server/core/query/entity/Topology.java}    |  10 +-
 .../oap/server/core/query/sql/Function.java}       |  11 +-
 .../oap/server/core/query/sql/GroupBy.java}        |  15 +-
 .../oap/server/core/query/sql/IntKeyValues.java}   |  16 +-
 .../oap/server/core/query/sql/Where.java}          |  12 +-
 .../oap/server/core/storage/TimePyramid.java}      |  24 ++-
 .../core/storage/TimePyramidTableNameBuilder.java  |  49 +++++
 .../query/IMetricQueryDAO.java}                    |  26 ++-
 .../core/storage/query/IUniqueQueryDAO.java}       |  16 +-
 .../core/storage/query/OneIdGroupValue.java}       |  15 +-
 .../oap/server/core/storage/query/TwoIdGroup.java} |  15 +-
 .../core/storage/query/TwoIdGroupValue.java}       |  16 +-
 .../oap/query/graphql/GraphQLQueryProvider.java    |  17 +-
 .../oap/query/graphql/resolver/MetricQuery.java    |  14 +-
 .../oap/query/graphql/resolver/TopologyQuery.java  |  34 +++-
 .../oap/query/graphql/type/Duration.java           |   1 +
 .../oap/query/graphql/util/DurationUtils.java}     |  14 +-
 oap-server/server-starter/pom.xml                  |   4 +-
 .../storage-elasticsearch-plugin/pom.xml           |   6 +
 .../elasticsearch/base/ColumnTypeEsMapping.java    |   3 +
 .../storage/plugin/elasticsearch/base/EsDAO.java   |  24 +++
 .../elasticsearch/query/MetricQueryEsDAO.java      | 122 +++++++++++++
 .../elasticsearch/query/UniqueQueryEsDAO.java      |  68 +++++++
 .../src/test/resources/log4j2.xml                  |  31 ++++
 36 files changed, 830 insertions(+), 125 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
index 2d12a56..d0b93eb 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core;
 import java.util.*;
 import org.apache.skywalking.oap.server.core.cache.*;
 import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
+import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
 import org.apache.skywalking.oap.server.core.register.service.*;
 import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
 import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
@@ -50,10 +51,15 @@ public class CoreModule extends ModuleDefine {
         addInsideService(classes);
         addRegisterService(classes);
         addCacheService(classes);
+        addQueryService(classes);
 
         return classes.toArray(new Class[] {});
     }
 
+    private void addQueryService(List<Class> classes) {
+        classes.add(TopologyQueryService.class);
+    }
+
     private void addServerInterface(List<Class> classes) {
         classes.add(GRPCHandlerRegister.class);
         classes.add(JettyHandlerRegister.class);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index dc32872..d22648e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -24,6 +24,7 @@ import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
 import org.apache.skywalking.oap.server.core.cache.*;
 import org.apache.skywalking.oap.server.core.cluster.*;
 import org.apache.skywalking.oap.server.core.config.*;
+import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
 import org.apache.skywalking.oap.server.core.register.annotation.InventoryTypeListener;
 import org.apache.skywalking.oap.server.core.register.service.*;
 import org.apache.skywalking.oap.server.core.remote.*;
@@ -108,6 +109,8 @@ public class CoreModuleProvider extends ModuleProvider {
         this.registerServiceImplementation(NetworkAddressInventoryCache.class, new NetworkAddressInventoryCache(getManager()));
         this.registerServiceImplementation(INetworkAddressInventoryRegister.class, new NetworkAddressInventoryRegister(getManager()));
 
+        this.registerServiceImplementation(TopologyQueryService.class, new TopologyQueryService(getManager()));
+
         annotationScan.registerListener(storageAnnotationListener);
         annotationScan.registerListener(streamAnnotationListener);
         annotationScan.registerListener(new IndicatorTypeListener(getManager()));
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
index 933ac59..c56e3b5 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
@@ -28,7 +28,7 @@ import org.apache.skywalking.oap.server.core.storage.annotation.Column;
  */
 public abstract class Indicator extends StreamData implements StorageData {
 
-    protected static final String TIME_BUCKET = "time_bucket";
+    public static final String TIME_BUCKET = "time_bucket";
 
     @Getter @Setter @Column(columnName = TIME_BUCKET) private long timeBucket;
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceComponentIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceComponentIndicator.java
index 406943c..ef5e558 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceComponentIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceComponentIndicator.java
@@ -33,11 +33,12 @@ import org.apache.skywalking.oap.server.core.storage.annotation.*;
  */
 @IndicatorType
 @StreamData
-@StorageEntity(name = "service_component", builder = ServiceComponentIndicator.Builder.class)
+@StorageEntity(name = ServiceComponentIndicator.INDEX_NAME, builder = ServiceComponentIndicator.Builder.class)
 public class ServiceComponentIndicator extends Indicator {
 
-    private static final String SERVICE_ID = "service_id";
-    private static final String COMPONENT_ID = "component_id";
+    public static final String INDEX_NAME = "service_component";
+    public static final String SERVICE_ID = "service_id";
+    public static final String COMPONENT_ID = "component_id";
 
     @Setter @Getter @Column(columnName = SERVICE_ID) private int serviceId;
     @Setter @Getter @Column(columnName = COMPONENT_ID) private int componentId;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceMappingIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceMappingIndicator.java
index 808fcae..891057e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceMappingIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceMappingIndicator.java
@@ -33,11 +33,12 @@ import org.apache.skywalking.oap.server.core.storage.annotation.*;
  */
 @IndicatorType
 @StreamData
-@StorageEntity(name = "service_mapping", builder = ServiceMappingIndicator.Builder.class)
+@StorageEntity(name = ServiceMappingIndicator.INDEX_NAME, builder = ServiceMappingIndicator.Builder.class)
 public class ServiceMappingIndicator extends Indicator {
 
-    private static final String SERVICE_ID = "service_id";
-    private static final String MAPPING_SERVICE_ID = "mapping_service_id";
+    public static final String INDEX_NAME = "service_mapping";
+    public static final String SERVICE_ID = "service_id";
+    public static final String MAPPING_SERVICE_ID = "mapping_service_id";
 
     @Setter @Getter @Column(columnName = SERVICE_ID) private int serviceId;
     @Setter @Getter @Column(columnName = MAPPING_SERVICE_ID) private int mappingServiceId;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyBuilder.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyBuilder.java
new file mode 100644
index 0000000..abdae7c
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyBuilder.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.query;
+
+import java.util.*;
+import org.apache.skywalking.oap.server.core.*;
+import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
+import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
+import org.apache.skywalking.oap.server.core.query.entity.*;
+import org.apache.skywalking.oap.server.core.register.ServiceInventory;
+import org.apache.skywalking.oap.server.core.source.*;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.util.BooleanUtils;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+class TopologyBuilder {
+
+    private static final Logger logger = LoggerFactory.getLogger(TopologyBuilder.class);
+
+    private final ServiceInventoryCache serviceInventoryCache;
+    //    private final DateBetweenService dateBetweenService;
+    private final IComponentLibraryCatalogService componentLibraryCatalogService;
+
+    TopologyBuilder(ModuleManager moduleManager) {
+        this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).getService(ServiceInventoryCache.class);
+//        this.dateBetweenService = new DateBetweenService(moduleManager);
+        this.componentLibraryCatalogService = moduleManager.find(CoreModule.NAME).getService(IComponentLibraryCatalogService.class);
+    }
+
+    Topology build(List<ServiceComponent> serviceComponents, List<ServiceMapping> serviceMappings,
+        List<Call> serviceRelationClientCalls, List<Call> serviceRelationServerCalls) {
+        Map<Integer, String> nodeCompMap = buildNodeCompMap(serviceComponents);
+        Map<Integer, String> conjecturalNodeCompMap = buildConjecturalNodeCompMap(serviceComponents);
+        Map<Integer, Integer> mappings = changeMapping2Map(serviceMappings);
+        filterZeroSourceOrTargetReference(serviceRelationClientCalls);
+        filterZeroSourceOrTargetReference(serviceRelationServerCalls);
+        serviceRelationServerCalls = serverCallsFilter(serviceRelationServerCalls);
+
+        List<Node> nodes = new LinkedList<>();
+        Map<Integer, Integer> applicationMinuteBetweenMap = new HashMap<>();
+
+        List<Call> calls = new LinkedList<>();
+        Set<Integer> nodeIds = new HashSet<>();
+        serviceRelationClientCalls.forEach(clientCall -> {
+            ServiceInventory source = serviceInventoryCache.get(clientCall.getSource());
+            ServiceInventory target = serviceInventoryCache.get(clientCall.getTarget());
+
+            if (BooleanUtils.valueToBoolean(target.getIsAddress()) && !mappings.containsKey(target.getSequence())) {
+                if (!nodeIds.contains(target.getSequence())) {
+                    Node conjecturalNode = new Node();
+                    conjecturalNode.setId(target.getSequence());
+                    conjecturalNode.setName(target.getName());
+                    conjecturalNode.setType(conjecturalNodeCompMap.getOrDefault(target.getSequence(), Const.UNKNOWN));
+                    conjecturalNode.setReal(false);
+                    nodes.add(conjecturalNode);
+                    nodeIds.add(target.getSequence());
+                }
+            }
+
+            Set<Integer> serviceNodeIds = buildNodeIds(nodes);
+            if (!serviceNodeIds.contains(source.getSequence())) {
+                Node serviceNode = new Node();
+                serviceNode.setId(source.getSequence());
+                serviceNode.setName(source.getName());
+                serviceNode.setType(nodeCompMap.getOrDefault(source.getSequence(), Const.UNKNOWN));
+                nodes.add(serviceNode);
+            }
+
+            Call call = new Call();
+            call.setSource(source.getSequence());
+
+            int actualTargetId = mappings.getOrDefault(target.getSequence(), target.getSequence());
+            call.setTarget(actualTargetId);
+            call.setCallType(nodeCompMap.get(clientCall.getTarget()));
+//            try {
+//                call.setCpm(clientCall.getCalls() / getApplicationMinuteBetween(applicationMinuteBetweenMap, source.getSequence(), startSecondTimeBucket, endSecondTimeBucket));
+//            } catch (ParseException e) {
+//                logger.error(e.getMessage(), e);
+//            }
+            calls.add(call);
+        });
+
+        serviceRelationServerCalls.forEach(referenceMetric -> {
+            ServiceInventory source = serviceInventoryCache.get(referenceMetric.getSource());
+            ServiceInventory target = serviceInventoryCache.get(referenceMetric.getTarget());
+
+            if (source.getSequence() == Const.NONE_SERVICE_ID) {
+                if (!nodeIds.contains(source.getSequence())) {
+                    Node visualUserNode = new Node();
+                    visualUserNode.setId(source.getSequence());
+                    visualUserNode.setName(Const.USER_CODE);
+                    visualUserNode.setType(Const.USER_CODE.toUpperCase());
+                    nodes.add(visualUserNode);
+                    nodeIds.add(source.getSequence());
+                }
+            }
+
+            if (BooleanUtils.valueToBoolean(source.getIsAddress())) {
+                if (!nodeIds.contains(source.getSequence())) {
+                    Node conjecturalNode = new Node();
+                    conjecturalNode.setId(source.getSequence());
+                    conjecturalNode.setName(source.getName());
+                    conjecturalNode.setType(conjecturalNodeCompMap.getOrDefault(target.getSequence(), Const.UNKNOWN));
+                    nodeIds.add(source.getSequence());
+                    nodes.add(conjecturalNode);
+                }
+            }
+
+            Call call = new Call();
+            call.setSource(source.getSequence());
+            call.setTarget(target.getSequence());
+
+            if (source.getSequence() == Const.NONE_SERVICE_ID) {
+                call.setCallType(Const.EMPTY_STRING);
+            } else {
+                call.setCallType(nodeCompMap.get(referenceMetric.getTarget()));
+            }
+//            try {
+//                call.setCpm(referenceMetric.getCalls() / getApplicationMinuteBetween(applicationMinuteBetweenMap, target.getSequence(), startSecondTimeBucket, endSecondTimeBucket));
+//            } catch (ParseException e) {
+//                logger.error(e.getMessage(), e);
+//            }
+            calls.add(call);
+        });
+
+        Topology topology = new Topology();
+        topology.getCalls().addAll(calls);
+        topology.getNodes().addAll(nodes);
+        return topology;
+    }
+
+    private Set<Integer> buildNodeIds(List<Node> nodes) {
+        Set<Integer> nodeIds = new HashSet<>();
+        nodes.forEach(node -> nodeIds.add(node.getId()));
+        return nodeIds;
+    }
+
+    private List<Call> serverCallsFilter(List<Call> serviceRelationServerCalls) {
+        List<Call> filteredCalls = new LinkedList<>();
+
+        serviceRelationServerCalls.forEach(serverCall -> {
+            ServiceInventory source = serviceInventoryCache.get(serverCall.getSource());
+            if (BooleanUtils.valueToBoolean(source.getIsAddress()) || source.getSequence() == Const.NONE_SERVICE_ID) {
+                filteredCalls.add(serverCall);
+            }
+        });
+
+        return filteredCalls;
+    }
+
+    private Map<Integer, Integer> changeMapping2Map(List<ServiceMapping> serviceMappings) {
+        Map<Integer, Integer> mappings = new HashMap<>();
+        serviceMappings.forEach(serviceMapping -> mappings.put(serviceMapping.getMappingServiceId(), serviceMapping.getServiceId()));
+        return mappings;
+    }
+
+    private Map<Integer, String> buildConjecturalNodeCompMap(List<ServiceComponent> serviceComponents) {
+        Map<Integer, String> components = new HashMap<>();
+        serviceComponents.forEach(serviceComponent -> {
+            int componentServerId = this.componentLibraryCatalogService.getServerIdBasedOnComponent(serviceComponent.getComponentId());
+            String componentName = this.componentLibraryCatalogService.getServerName(componentServerId);
+            components.put(serviceComponent.getServiceId(), componentName);
+        });
+        return components;
+    }
+
+    private Map<Integer, String> buildNodeCompMap(List<ServiceComponent> serviceComponents) {
+        Map<Integer, String> components = new HashMap<>();
+        serviceComponents.forEach(serviceComponent -> {
+            String componentName = this.componentLibraryCatalogService.getComponentName(serviceComponent.getComponentId());
+            components.put(serviceComponent.getServiceId(), componentName);
+        });
+        return components;
+    }
+
+    private void filterZeroSourceOrTargetReference(List<Call> serviceRelationClientCalls) {
+        for (int i = serviceRelationClientCalls.size() - 1; i >= 0; i--) {
+            Call call = serviceRelationClientCalls.get(i);
+            if (call.getSource() == 0 || call.getTarget() == 0) {
+                serviceRelationClientCalls.remove(i);
+            }
+        }
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java
new file mode 100644
index 0000000..57993dd
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.query;
+
+import java.io.IOException;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.analysis.manual.service.*;
+import org.apache.skywalking.oap.server.core.query.entity.*;
+import org.apache.skywalking.oap.server.core.query.sql.*;
+import org.apache.skywalking.oap.server.core.source.*;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
+import org.apache.skywalking.oap.server.core.storage.query.*;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.library.module.Service;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class TopologyQueryService implements Service {
+
+    private static final Logger logger = LoggerFactory.getLogger(TopologyQueryService.class);
+
+    private final ModuleManager moduleManager;
+    private IMetricQueryDAO metricQueryDAO;
+    private IUniqueQueryDAO uniqueQueryDAO;
+
+    public TopologyQueryService(ModuleManager moduleManager) {
+        this.moduleManager = moduleManager;
+    }
+
+    private IMetricQueryDAO getMetricQueryDAO() {
+        if (metricQueryDAO == null) {
+            metricQueryDAO = moduleManager.find(StorageModule.NAME).getService(IMetricQueryDAO.class);
+        }
+        return metricQueryDAO;
+    }
+
+    private IUniqueQueryDAO getUniqueQueryDAO() {
+        if (uniqueQueryDAO == null) {
+            uniqueQueryDAO = moduleManager.find(StorageModule.NAME).getService(IUniqueQueryDAO.class);
+        }
+        return uniqueQueryDAO;
+    }
+
+    public Topology getGlobalTopology(final Step step, final long startTB, final long endTB) throws IOException {
+        logger.debug("step: {}, startTimeBucket: {}, endTimeBucket: {}", step, startTB, endTB);
+        List<ServiceComponent> serviceComponents = loadServiceComponent(step, startTB, endTB);
+        List<ServiceMapping> serviceMappings = loadServiceMapping(step, startTB, endTB);
+
+        List<Call> serviceRelationClientCalls = loadServiceRelationCalls(step, startTB, endTB, "service_relation_client_calls_sum");
+        List<Call> serviceRelationServerCalls = loadServiceRelationCalls(step, startTB, endTB, "service_relation_server_calls_sum");
+
+        TopologyBuilder builder = new TopologyBuilder(moduleManager);
+        return builder.build(serviceComponents, serviceMappings, serviceRelationClientCalls, serviceRelationServerCalls);
+    }
+
+    public Topology getServiceTopology(final Step step, final long startTimeBucket, final long endTimeBucket,
+        final String serviceId) {
+        return new Topology();
+    }
+
+    private List<ServiceComponent> loadServiceComponent(final Step step, final long startTB,
+        final long endTB) throws IOException {
+        List<TwoIdGroup> twoIdGroups = getUniqueQueryDAO().aggregation(ServiceComponentIndicator.INDEX_NAME, step, startTB, endTB,
+            new Where(), ServiceComponentIndicator.SERVICE_ID, ServiceComponentIndicator.COMPONENT_ID);
+
+        List<ServiceComponent> serviceComponents = new ArrayList<>();
+        twoIdGroups.forEach(twoIdGroup -> {
+            ServiceComponent serviceComponent = new ServiceComponent();
+            serviceComponent.setServiceId(twoIdGroup.getId1());
+            serviceComponent.setComponentId(twoIdGroup.getId2());
+            serviceComponents.add(serviceComponent);
+        });
+
+        return serviceComponents;
+    }
+
+    private List<ServiceMapping> loadServiceMapping(final Step step, final long startTB,
+        final long endTB) throws IOException {
+        List<TwoIdGroup> twoIdGroups = getUniqueQueryDAO().aggregation(ServiceMappingIndicator.INDEX_NAME, step, startTB, endTB,
+            new Where(), ServiceMappingIndicator.SERVICE_ID, ServiceMappingIndicator.MAPPING_SERVICE_ID);
+
+        List<ServiceMapping> serviceMappings = new ArrayList<>();
+        twoIdGroups.forEach(twoIdGroup -> {
+            ServiceMapping serviceMapping = new ServiceMapping();
+            serviceMapping.setServiceId(twoIdGroup.getId1());
+            serviceMapping.setMappingServiceId(twoIdGroup.getId2());
+            serviceMappings.add(serviceMapping);
+        });
+
+        return serviceMappings;
+    }
+
+    private List<Call> loadServiceRelationCalls(final Step step, final long startTB, final long endTB,
+        String indName) throws IOException {
+        List<TwoIdGroupValue> twoIdGroupValues = getMetricQueryDAO().aggregation(indName, step, startTB, endTB, new Where(), "source_service_id", "dest_service_id", "value", Function.Sum);
+
+        List<Call> clientCalls = new ArrayList<>();
+
+        twoIdGroupValues.forEach(twoIdGroupValue -> {
+            Call call = new Call();
+            call.setSource(twoIdGroupValue.getId1());
+            call.setTarget(twoIdGroupValue.getId2());
+            call.setCalls(twoIdGroupValue.getValue().longValue());
+            clientCalls.add(call);
+        });
+
+        return clientCalls;
+    }
+}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Call.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Call.java
similarity index 84%
rename from oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Call.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Call.java
index 7a97df8..7a2f807 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Call.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Call.java
@@ -16,11 +16,16 @@
  *
  */
 
-package org.apache.skywalking.oap.query.graphql.type;
+package org.apache.skywalking.oap.server.core.query.entity;
 
+import lombok.*;
+
+@Getter
+@Setter
 public class Call {
-    private String source;
-    private String target;
+    private int source;
+    private int target;
     private String callType;
+    private long calls;
     private long cpm;
 }
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Topology.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/IntValues.java
similarity index 81%
rename from oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Topology.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/IntValues.java
index 8879ddc..70609c5 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Topology.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/IntValues.java
@@ -16,11 +16,11 @@
  *
  */
 
-package org.apache.skywalking.oap.query.graphql.type;
+package org.apache.skywalking.oap.server.core.query.entity;
 
-import java.util.List;
+import java.util.*;
+import lombok.Getter;
 
-public class Topology {
-    private List<Node> nodes;
-    private List<Call> calls;
+public class IntValues {
+    @Getter private List<KVInt> values = new LinkedList<>();
 }
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/KVInt.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/KVInt.java
similarity index 90%
rename from oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/KVInt.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/KVInt.java
index 266fb60..83d9680 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/KVInt.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/KVInt.java
@@ -16,8 +16,12 @@
  *
  */
 
-package org.apache.skywalking.oap.query.graphql.type;
+package org.apache.skywalking.oap.server.core.query.entity;
 
+import lombok.*;
+
+@Setter
+@Getter
 public class KVInt {
     private String id;
     private int value;
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Node.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Node.java
similarity index 88%
rename from oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Node.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Node.java
index 4ab47b2..c230232 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Node.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Node.java
@@ -16,10 +16,14 @@
  *
  */
 
-package org.apache.skywalking.oap.query.graphql.type;
+package org.apache.skywalking.oap.server.core.query.entity;
 
+import lombok.*;
+
+@Getter
+@Setter
 public class Node {
-    private String id;
+    private int id;
     private String name;
     private String type;
     private boolean isReal;
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Step.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Step.java
similarity index 93%
rename from oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Step.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Step.java
index 44f81fb..7fed3a5 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Step.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Step.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.skywalking.oap.query.graphql.type;
+package org.apache.skywalking.oap.server.core.query.entity;
 
 public enum Step {
     MONTH,
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Topology.java
similarity index 80%
copy from oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Topology.java
index 5d9cb09..c76261d 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Topology.java
@@ -16,13 +16,13 @@
  *
  */
 
-package org.apache.skywalking.oap.query.graphql.type;
+package org.apache.skywalking.oap.server.core.query.entity;
 
+import java.util.*;
 import lombok.Getter;
 
 @Getter
-public class Duration {
-    private String start;
-    private String end;
-    private Step step;
+public class Topology {
+    private List<Node> nodes = new ArrayList<>();
+    private List<Call> calls = new ArrayList<>();
 }
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/LinearIntValues.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/Function.java
similarity index 85%
rename from oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/LinearIntValues.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/Function.java
index 0002f0a..bce985e 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/LinearIntValues.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/Function.java
@@ -16,10 +16,11 @@
  *
  */
 
-package org.apache.skywalking.oap.query.graphql.type;
+package org.apache.skywalking.oap.server.core.query.sql;
 
-import java.util.List;
-
-public class LinearIntValues {
-    private List<KVInt> values;
+/**
+ * @author peng-yongsheng
+ */
+public enum Function {
+    Avg, Sum
 }
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/GroupBy.java
similarity index 80%
copy from oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/GroupBy.java
index 5d9cb09..76923f5 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/GroupBy.java
@@ -16,13 +16,16 @@
  *
  */
 
-package org.apache.skywalking.oap.query.graphql.type;
+package org.apache.skywalking.oap.server.core.query.sql;
 
-import lombok.Getter;
+import lombok.*;
 
+/**
+ * @author peng-yongsheng
+ */
 @Getter
-public class Duration {
-    private String start;
-    private String end;
-    private Step step;
+@Setter
+public class GroupBy {
+    private String columnOne;
+    private String columnTwo;
 }
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/IntKeyValues.java
similarity index 75%
copy from oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/IntKeyValues.java
index 5d9cb09..c0a2745 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/IntKeyValues.java
@@ -16,13 +16,15 @@
  *
  */
 
-package org.apache.skywalking.oap.query.graphql.type;
+package org.apache.skywalking.oap.server.core.query.sql;
 
-import lombok.Getter;
+import java.util.*;
+import lombok.*;
 
-@Getter
-public class Duration {
-    private String start;
-    private String end;
-    private Step step;
+/**
+ * @author peng-yongsheng
+ */
+public class IntKeyValues {
+    @Getter @Setter private String key;
+    @Getter private List<Integer> values = new LinkedList<>();
 }
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/Where.java
similarity index 81%
copy from oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/Where.java
index 5d9cb09..62672a5 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/Where.java
@@ -16,13 +16,15 @@
  *
  */
 
-package org.apache.skywalking.oap.query.graphql.type;
+package org.apache.skywalking.oap.server.core.query.sql;
 
+import java.util.*;
 import lombok.Getter;
 
+/**
+ * @author peng-yongsheng
+ */
 @Getter
-public class Duration {
-    private String start;
-    private String end;
-    private Step step;
+public class Where {
+    private List<IntKeyValues> keyValues = new LinkedList<>();
 }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/TimePyramid.java
similarity index 65%
copy from oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/TimePyramid.java
index e7c8138..786167e 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/TimePyramid.java
@@ -16,17 +16,27 @@
  *
  */
 
-package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
-
-import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
-import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+package org.apache.skywalking.oap.server.core.storage;
 
 /**
  * @author peng-yongsheng
  */
-public abstract class EsDAO extends AbstractDAO<ElasticSearchClient> {
+public enum TimePyramid {
+    Second(0, "second"), Minute(1, "minute"), Hour(2, "hour"), Day(3, "day"), Month(4, "month");
+
+    private final int value;
+    private final String name;
+
+    TimePyramid(int value, String name) {
+        this.value = value;
+        this.name = name;
+    }
+
+    public int getValue() {
+        return value;
+    }
 
-    public EsDAO(ElasticSearchClient client) {
-        super(client);
+    public String getName() {
+        return name;
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/TimePyramidTableNameBuilder.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/TimePyramidTableNameBuilder.java
new file mode 100644
index 0000000..b413e9a
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/TimePyramidTableNameBuilder.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage;
+
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.query.entity.Step;
+
+/**
+ * @author peng-yongsheng
+ */
+public class TimePyramidTableNameBuilder {
+
+    private TimePyramidTableNameBuilder() {
+    }
+
+    public static String build(Step step, String tableName) {
+        switch (step) {
+            case MONTH:
+                tableName = tableName + Const.ID_SPLIT + TimePyramid.Month.getName();
+                break;
+            case DAY:
+                tableName = tableName + Const.ID_SPLIT + TimePyramid.Day.getName();
+                break;
+            case HOUR:
+                tableName = tableName + Const.ID_SPLIT + TimePyramid.Hour.getName();
+                break;
+            case MINUTE:
+                tableName = tableName + Const.ID_SPLIT + TimePyramid.Minute.getName();
+                break;
+        }
+        return tableName;
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetricQueryDAO.java
similarity index 53%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetricQueryDAO.java
index 933ac59..7165069 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetricQueryDAO.java
@@ -16,25 +16,23 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.analysis.indicator;
+package org.apache.skywalking.oap.server.core.storage.query;
 
-import lombok.*;
-import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-import org.apache.skywalking.oap.server.core.storage.StorageData;
-import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import java.io.IOException;
+import java.util.List;
+import org.apache.skywalking.oap.server.core.query.entity.Step;
+import org.apache.skywalking.oap.server.core.query.sql.*;
+import org.apache.skywalking.oap.server.core.storage.DAO;
 
 /**
  * @author peng-yongsheng
  */
-public abstract class Indicator extends StreamData implements StorageData {
+public interface IMetricQueryDAO extends DAO {
 
-    protected static final String TIME_BUCKET = "time_bucket";
+    List<OneIdGroupValue> aggregation(String indName, Step step, long startTB,
+        long endTB, Where where, String idCName, String valueCName, Function function) throws IOException;
 
-    @Getter @Setter @Column(columnName = TIME_BUCKET) private long timeBucket;
-
-    public abstract String id();
-
-    public abstract void combine(Indicator indicator);
-
-    public abstract void calculate();
+    List<TwoIdGroupValue> aggregation(String indName, Step step, long startTB,
+        long endTB, Where where, String idCName1, String idCName2, String valueCName,
+        Function function) throws IOException;
 }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IUniqueQueryDAO.java
similarity index 62%
copy from oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IUniqueQueryDAO.java
index e7c8138..8ba1fc1 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IUniqueQueryDAO.java
@@ -16,17 +16,19 @@
  *
  */
 
-package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+package org.apache.skywalking.oap.server.core.storage.query;
 
-import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
-import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import java.io.IOException;
+import java.util.List;
+import org.apache.skywalking.oap.server.core.query.entity.Step;
+import org.apache.skywalking.oap.server.core.query.sql.Where;
+import org.apache.skywalking.oap.server.core.storage.DAO;
 
 /**
  * @author peng-yongsheng
  */
-public abstract class EsDAO extends AbstractDAO<ElasticSearchClient> {
+public interface IUniqueQueryDAO extends DAO {
 
-    public EsDAO(ElasticSearchClient client) {
-        super(client);
-    }
+    List<TwoIdGroup> aggregation(String indName, Step step, long startTB,
+        long endTB, Where where, String idCName1, String idCName2) throws IOException;
 }
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/OneIdGroupValue.java
similarity index 80%
copy from oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/OneIdGroupValue.java
index 5d9cb09..bc1370a 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/OneIdGroupValue.java
@@ -16,13 +16,16 @@
  *
  */
 
-package org.apache.skywalking.oap.query.graphql.type;
+package org.apache.skywalking.oap.server.core.storage.query;
 
-import lombok.Getter;
+import lombok.*;
 
+/**
+ * @author peng-yongsheng
+ */
 @Getter
-public class Duration {
-    private String start;
-    private String end;
-    private Step step;
+@Setter
+public class OneIdGroupValue {
+    private int id;
+    private Number value;
 }
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/TwoIdGroup.java
similarity index 81%
copy from oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/TwoIdGroup.java
index 5d9cb09..1638092 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/TwoIdGroup.java
@@ -16,13 +16,16 @@
  *
  */
 
-package org.apache.skywalking.oap.query.graphql.type;
+package org.apache.skywalking.oap.server.core.storage.query;
 
-import lombok.Getter;
+import lombok.*;
 
+/**
+ * @author peng-yongsheng
+ */
 @Getter
-public class Duration {
-    private String start;
-    private String end;
-    private Step step;
+@Setter
+public class TwoIdGroup {
+    private int id1;
+    private int id2;
 }
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/TwoIdGroupValue.java
similarity index 78%
copy from oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/TwoIdGroupValue.java
index 5d9cb09..add34d6 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/TwoIdGroupValue.java
@@ -16,13 +16,17 @@
  *
  */
 
-package org.apache.skywalking.oap.query.graphql.type;
+package org.apache.skywalking.oap.server.core.storage.query;
 
-import lombok.Getter;
+import lombok.*;
 
+/**
+ * @author peng-yongsheng
+ */
 @Getter
-public class Duration {
-    private String start;
-    private String end;
-    private Step step;
+@Setter
+public class TwoIdGroupValue {
+    private int id1;
+    private int id2;
+    private Number value;
 }
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/GraphQLQueryProvider.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/GraphQLQueryProvider.java
index b2c6740..4525fe4 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/GraphQLQueryProvider.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/GraphQLQueryProvider.java
@@ -21,22 +21,11 @@ package org.apache.skywalking.oap.query.graphql;
 import com.coxautodev.graphql.tools.SchemaParser;
 import graphql.GraphQL;
 import graphql.schema.GraphQLSchema;
-import org.apache.skywalking.oap.query.graphql.resolver.AggregationQuery;
-import org.apache.skywalking.oap.query.graphql.resolver.AlarmQuery;
-import org.apache.skywalking.oap.query.graphql.resolver.MetadataQuery;
-import org.apache.skywalking.oap.query.graphql.resolver.MetricQuery;
-import org.apache.skywalking.oap.query.graphql.resolver.Mutation;
-import org.apache.skywalking.oap.query.graphql.resolver.Query;
-import org.apache.skywalking.oap.query.graphql.resolver.TopologyQuery;
-import org.apache.skywalking.oap.query.graphql.resolver.TraceQuery;
+import org.apache.skywalking.oap.query.graphql.resolver.*;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.query.QueryModule;
 import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
-import org.apache.skywalking.oap.server.library.module.ModuleProvider;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.apache.skywalking.oap.server.library.module.*;
 
 /**
  * GraphQL query provider.
@@ -70,7 +59,7 @@ public class GraphQLQueryProvider extends ModuleProvider {
             .file("query-protocol/metric.graphqls")
             .resolvers(new MetricQuery())
             .file("query-protocol/topology.graphqls")
-            .resolvers(new TopologyQuery())
+            .resolvers(new TopologyQuery(getManager()))
             .file("query-protocol/trace.graphqls")
             .resolvers(new TraceQuery())
             .file("query-protocol/aggregation.graphqls")
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetricQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetricQuery.java
index f9ee63c..7744463 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetricQuery.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetricQuery.java
@@ -20,17 +20,19 @@ package org.apache.skywalking.oap.query.graphql.resolver;
 
 import com.coxautodev.graphql.tools.GraphQLQueryResolver;
 import org.apache.skywalking.oap.query.graphql.type.*;
+import org.apache.skywalking.oap.server.core.query.entity.IntValues;
 
 public class MetricQuery implements GraphQLQueryResolver {
-    public LinearIntValues getLinearIntValues(final MetricCondition metric, final Duration duration) {
-        return new LinearIntValues();
+
+    public IntValues getValues(final BatchMetricConditions metric, final Duration duration) {
+        return new IntValues();
     }
 
-    public Thermodynamic getThermodynamic(final MetricCondition metric, final Duration duration) {
-        return new Thermodynamic();
+    public IntValues getLinearIntValues(final MetricCondition metric, final Duration duration) {
+        return new IntValues();
     }
 
-    public LinearIntValues getValues(final BatchMetricConditions metric, final Duration duration) {
-        return new LinearIntValues();
+    public Thermodynamic getThermodynamic(final MetricCondition metric, final Duration duration) {
+        return new Thermodynamic();
     }
 }
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
index f5c7e36..501513f 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
@@ -19,15 +19,41 @@
 package org.apache.skywalking.oap.query.graphql.resolver;
 
 import com.coxautodev.graphql.tools.GraphQLQueryResolver;
+import java.io.IOException;
 import org.apache.skywalking.oap.query.graphql.type.Duration;
-import org.apache.skywalking.oap.query.graphql.type.Topology;
+import org.apache.skywalking.oap.query.graphql.util.DurationUtils;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
+import org.apache.skywalking.oap.server.core.query.entity.Topology;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
 
 public class TopologyQuery implements GraphQLQueryResolver {
-    public Topology getGlobalTopology(final Duration duration) {
-        return new Topology();
+
+    private final ModuleManager moduleManager;
+    private TopologyQueryService queryService;
+
+    public TopologyQuery(ModuleManager moduleManager) {
+        this.moduleManager = moduleManager;
+    }
+
+    private TopologyQueryService getQueryService() {
+        if (queryService == null) {
+            this.queryService = moduleManager.find(CoreModule.NAME).getService(TopologyQueryService.class);
+        }
+        return queryService;
+    }
+
+    public Topology getGlobalTopology(final Duration duration) throws IOException {
+        long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
+        long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
+
+        return getQueryService().getGlobalTopology(duration.getStep(), startTimeBucket, endTimeBucket);
     }
 
     public Topology getServiceTopology(final String serviceId, final Duration duration) {
-        return new Topology();
+        long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
+        long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
+
+        return getQueryService().getServiceTopology(duration.getStep(), startTimeBucket, endTimeBucket, serviceId);
     }
 }
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
index 5d9cb09..b73142b 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/Duration.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.query.graphql.type;
 
 import lombok.Getter;
+import org.apache.skywalking.oap.server.core.query.entity.Step;
 
 @Getter
 public class Duration {
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/util/DurationUtils.java
similarity index 69%
copy from oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
copy to oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/util/DurationUtils.java
index e7c8138..ec83b06 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/util/DurationUtils.java
@@ -16,17 +16,19 @@
  *
  */
 
-package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+package org.apache.skywalking.oap.query.graphql.util;
 
-import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
-import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.core.Const;
 
 /**
  * @author peng-yongsheng
  */
-public abstract class EsDAO extends AbstractDAO<ElasticSearchClient> {
+public enum DurationUtils {
+    INSTANCE;
 
-    public EsDAO(ElasticSearchClient client) {
-        super(client);
+    public long exchangeToTimeBucket(String dateStr) {
+        dateStr = dateStr.replaceAll("-", Const.EMPTY_STRING);
+        dateStr = dateStr.replaceAll(" ", Const.EMPTY_STRING);
+        return Long.valueOf(dateStr);
     }
 }
diff --git a/oap-server/server-starter/pom.xml b/oap-server/server-starter/pom.xml
index bb89bbc..b4845a7 100644
--- a/oap-server/server-starter/pom.xml
+++ b/oap-server/server-starter/pom.xml
@@ -95,13 +95,13 @@
         </dependency>
         <!-- storage module -->
 
-        <!-- query module -->
+        <!-- queryBuild module -->
         <dependency>
             <groupId>org.apache.skywalking</groupId>
             <artifactId>query-graphql-plugin</artifactId>
             <version>${project.version}</version>
         </dependency>
-        <!-- query module -->
+        <!-- queryBuild module -->
 
         <!-- alarm module -->
         <dependency>
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/pom.xml b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/pom.xml
index 6c93cb6..737e4ff 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/pom.xml
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/pom.xml
@@ -41,5 +41,11 @@
             <artifactId>library-client</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>fr.pilato.elasticsearch.testcontainers</groupId>
+            <artifactId>testcontainers-elasticsearch</artifactId>
+            <version>0.1</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
index c7ceeed..0647eea 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
 
+import java.util.List;
 import org.apache.skywalking.oap.server.core.storage.model.DataTypeMapping;
 
 /**
@@ -34,6 +35,8 @@ public class ColumnTypeEsMapping implements DataTypeMapping {
             return "double";
         } else if (String.class.equals(type)) {
             return "keyword";
+        } else if (List.class.equals(type)) {
+            return "keyword";
         } else {
             throw new IllegalArgumentException("Unsupported data type: " + type.getName());
         }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
index e7c8138..fcc4a1e 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
@@ -18,8 +18,12 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
 
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.query.sql.Where;
 import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.elasticsearch.index.query.*;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
 
 /**
  * @author peng-yongsheng
@@ -29,4 +33,24 @@ public abstract class EsDAO extends AbstractDAO<ElasticSearchClient> {
     public EsDAO(ElasticSearchClient client) {
         super(client);
     }
+
+    public void queryBuild(SearchSourceBuilder sourceBuilder, Where where, long startTB, long endTB) {
+        RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(Indicator.TIME_BUCKET).gte(startTB).lte(endTB);
+        if (where.getKeyValues().isEmpty()) {
+            sourceBuilder.query(rangeQueryBuilder);
+        } else {
+            BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
+            boolQuery.must().add(rangeQueryBuilder);
+
+            where.getKeyValues().forEach(keyValues -> {
+                if (keyValues.getValues().size() > 1) {
+                    boolQuery.must().add(QueryBuilders.termsQuery(keyValues.getKey(), keyValues.getValues()));
+                } else {
+                    boolQuery.must().add(QueryBuilders.termQuery(keyValues.getKey(), keyValues.getValues().get(0)));
+                }
+            });
+            sourceBuilder.query(boolQuery);
+        }
+        sourceBuilder.size(0);
+    }
 }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricQueryEsDAO.java
new file mode 100644
index 0000000..bb90cbf
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricQueryEsDAO.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
+
+import java.io.IOException;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.query.entity.Step;
+import org.apache.skywalking.oap.server.core.query.sql.*;
+import org.apache.skywalking.oap.server.core.storage.TimePyramidTableNameBuilder;
+import org.apache.skywalking.oap.server.core.storage.query.*;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+import org.elasticsearch.search.aggregations.bucket.terms.*;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+
+/**
+ * @author peng-yongsheng
+ */
+public class MetricQueryEsDAO extends EsDAO implements IMetricQueryDAO {
+
+    public MetricQueryEsDAO(ElasticSearchClient client) {
+        super(client);
+    }
+
+    public List<OneIdGroupValue> aggregation(String indName, Step step, long startTB,
+        long endTB, Where where, String idCName, String valueCName, Function function) throws IOException {
+        String indexName = TimePyramidTableNameBuilder.build(step, indName);
+
+        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+        queryBuild(sourceBuilder, where, startTB, endTB);
+
+        TermsAggregationBuilder aggIdCName1 = AggregationBuilders.terms(idCName).field(idCName).size(1000);
+        functionAggregation(function, aggIdCName1, valueCName);
+
+        sourceBuilder.aggregation(aggIdCName1);
+
+        SearchResponse response = getClient().search(indexName, sourceBuilder);
+
+        List<OneIdGroupValue> values = new ArrayList<>();
+        Terms idTerms = response.getAggregations().get(idCName);
+        for (Terms.Bucket idBucket : idTerms.getBuckets()) {
+            Terms valueTerms = idBucket.getAggregations().get(valueCName);
+            for (Terms.Bucket valueBucket : valueTerms.getBuckets()) {
+                OneIdGroupValue value = new OneIdGroupValue();
+                value.setId(idBucket.getKeyAsNumber().intValue());
+                value.setValue(valueBucket.getKeyAsNumber());
+                values.add(value);
+            }
+        }
+        return values;
+    }
+
+    public List<TwoIdGroupValue> aggregation(String indName, Step step, long startTB,
+        long endTB, Where where, String idCName1, String idCName2, String valueCName,
+        Function function) throws IOException {
+        String indexName = TimePyramidTableNameBuilder.build(step, indName);
+
+        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+        queryBuild(sourceBuilder, where, startTB, endTB);
+
+        sourceBuilder.aggregation(
+            AggregationBuilders.terms(idCName1).field(idCName1).size(1000)
+                .subAggregation(AggregationBuilders.terms(idCName2).field(idCName2).size(1000)
+                    .subAggregation(AggregationBuilders.avg(valueCName).field(valueCName)))
+        );
+
+        TermsAggregationBuilder aggIdCName1 = AggregationBuilders.terms(idCName1).field(idCName1).size(1000);
+        TermsAggregationBuilder aggIdCName2 = AggregationBuilders.terms(idCName2).field(idCName2).size(1000);
+        aggIdCName1.subAggregation(aggIdCName2);
+        functionAggregation(function, aggIdCName2, valueCName);
+
+        sourceBuilder.aggregation(aggIdCName1);
+
+        SearchResponse response = getClient().search(indexName, sourceBuilder);
+
+        List<TwoIdGroupValue> values = new ArrayList<>();
+        Terms id1Terms = response.getAggregations().get(idCName1);
+        for (Terms.Bucket id1Bucket : id1Terms.getBuckets()) {
+            Terms id2Terms = id1Bucket.getAggregations().get(idCName2);
+            for (Terms.Bucket id2Bucket : id2Terms.getBuckets()) {
+                Terms valueTerms = id1Bucket.getAggregations().get(valueCName);
+                for (Terms.Bucket valueBucket : valueTerms.getBuckets()) {
+                    TwoIdGroupValue value = new TwoIdGroupValue();
+                    value.setId1(id1Bucket.getKeyAsNumber().intValue());
+                    value.setId1(id2Bucket.getKeyAsNumber().intValue());
+                    value.setValue(valueBucket.getKeyAsNumber());
+                    values.add(value);
+                }
+            }
+        }
+        return values;
+    }
+
+    private void functionAggregation(Function function, TermsAggregationBuilder parentAggBuilder, String valueCName) {
+        switch (function) {
+            case Avg:
+                parentAggBuilder.subAggregation(AggregationBuilders.avg(valueCName).field(valueCName));
+                break;
+            case Sum:
+                parentAggBuilder.subAggregation(AggregationBuilders.sum(valueCName).field(valueCName));
+                break;
+        }
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/UniqueQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/UniqueQueryEsDAO.java
new file mode 100644
index 0000000..ef3b4ff
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/UniqueQueryEsDAO.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
+
+import java.io.IOException;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.query.entity.Step;
+import org.apache.skywalking.oap.server.core.query.sql.Where;
+import org.apache.skywalking.oap.server.core.storage.TimePyramidTableNameBuilder;
+import org.apache.skywalking.oap.server.core.storage.query.*;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+import org.elasticsearch.search.aggregations.bucket.terms.Terms;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+
+/**
+ * @author peng-yongsheng
+ */
+public class UniqueQueryEsDAO extends EsDAO implements IUniqueQueryDAO {
+
+    public UniqueQueryEsDAO(ElasticSearchClient client) {
+        super(client);
+    }
+
+    @Override public List<TwoIdGroup> aggregation(String indName, Step step, long startTB, long endTB, Where where,
+        String idCName1, String idCName2) throws IOException {
+        String indexName = TimePyramidTableNameBuilder.build(step, indName);
+
+        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+        queryBuild(sourceBuilder, where, startTB, endTB);
+
+        sourceBuilder.aggregation(AggregationBuilders.terms(idCName1).field(idCName1).size(1000)
+            .subAggregation(AggregationBuilders.terms(idCName2).field(idCName2).size(1000)));
+
+        SearchResponse response = getClient().search(indexName, sourceBuilder);
+
+        List<TwoIdGroup> values = new ArrayList<>();
+        Terms id1Terms = response.getAggregations().get(idCName1);
+        for (Terms.Bucket id1Bucket : id1Terms.getBuckets()) {
+            Terms id2Terms = id1Bucket.getAggregations().get(idCName2);
+            for (Terms.Bucket id2Bucket : id2Terms.getBuckets()) {
+                TwoIdGroup value = new TwoIdGroup();
+                value.setId1(id1Bucket.getKeyAsNumber().intValue());
+                value.setId2(id2Bucket.getKeyAsNumber().intValue());
+                values.add(value);
+            }
+        }
+        return values;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/resources/log4j2.xml b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..6eb5b3f
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/resources/log4j2.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<Configuration status="DEBUG">
+    <Appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
+        </Console>
+    </Appenders>
+    <Loggers>
+        <Root level="DEBUG">
+            <AppenderRef ref="Console"/>
+        </Root>
+    </Loggers>
+</Configuration>