You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by wu...@apache.org on 2018/09/13 11:10:53 UTC

[incubator-skywalking] branch master updated: Global topology and service topology write by manual. (#1671)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new babc6d2  Global topology and service topology write by manual. (#1671)
babc6d2 is described below

commit babc6d293097c1bbdf87b84b87ae0bdaa63f1c02
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Thu Sep 13 19:10:47 2018 +0800

    Global topology and service topology write by manual. (#1671)
---
 .../ServiceCallRelationDispatcher.java             |  21 ++-
 ...ava => ServiceRelationClientSideIndicator.java} |  36 ++--
 ...ava => ServiceRelationServerSideIndicator.java} |  32 ++--
 .../server/core/query/TopologyQueryService.java    | 103 +++++-------
 .../oap/server/core/storage/StorageModule.java     |   4 +-
 .../core/storage/query/ITopologyQueryDAO.java      |  45 +++++
 .../oap/query/graphql/resolver/TopologyQuery.java  |   2 +-
 .../StorageModuleElasticsearchProvider.java        |   4 +
 .../storage/plugin/elasticsearch/base/EsDAO.java   |   2 +-
 .../elasticsearch/query/TopologyQueryEsDAO.java    | 184 +++++++++++++++++++++
 10 files changed, 323 insertions(+), 110 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationDispatcher.java
index aad015f..c1e3cb4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationDispatcher.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationDispatcher.java
@@ -28,11 +28,26 @@ import org.apache.skywalking.oap.server.core.source.ServiceRelation;
 public class ServiceCallRelationDispatcher implements SourceDispatcher<ServiceRelation> {
     @Override
     public void dispatch(ServiceRelation source) {
-        doDispatch(source);
+        switch (source.getDetectPoint()) {
+            case SERVER:
+                serverSide(source);
+                break;
+            case CLIENT:
+                clientSide(source);
+                break;
+        }
     }
 
-    public void doDispatch(ServiceRelation source) {
-        ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+    private void serverSide(ServiceRelation source) {
+        ServiceRelationServerSideIndicator indicator = new ServiceRelationServerSideIndicator();
+        indicator.setTimeBucket(source.getTimeBucket());
+        indicator.setSourceServiceId(source.getSourceServiceId());
+        indicator.setDestServiceId(source.getDestServiceId());
+        IndicatorProcess.INSTANCE.in(indicator);
+    }
+
+    private void clientSide(ServiceRelation source) {
+        ServiceRelationClientSideIndicator indicator = new ServiceRelationClientSideIndicator();
         indicator.setTimeBucket(source.getTimeBucket());
         indicator.setSourceServiceId(source.getSourceServiceId());
         indicator.setDestServiceId(source.getDestServiceId());
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationClientSideIndicator.java
similarity index 81%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationIndicator.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationClientSideIndicator.java
index 0663607..cd38b00 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationClientSideIndicator.java
@@ -18,26 +18,22 @@
 
 package org.apache.skywalking.oap.server.core.analysis.manual.servicerelation;
 
-import java.util.HashMap;
-import java.util.Map;
-import lombok.Getter;
-import lombok.Setter;
+import java.util.*;
+import lombok.*;
 import org.apache.skywalking.oap.server.core.Const;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
 import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
 import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
 import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
-import org.apache.skywalking.oap.server.core.storage.annotation.Column;
-import org.apache.skywalking.oap.server.core.storage.annotation.IDColumn;
-import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntity;
+import org.apache.skywalking.oap.server.core.storage.annotation.*;
 
 @IndicatorType
 @StreamData
-@StorageEntity(name = ServiceCallRelationIndicator.INDEX_NAME, builder = ServiceCallRelationIndicator.Builder.class)
-public class ServiceCallRelationIndicator extends Indicator {
+@StorageEntity(name = ServiceRelationClientSideIndicator.INDEX_NAME, builder = ServiceRelationClientSideIndicator.Builder.class)
+public class ServiceRelationClientSideIndicator extends Indicator {
 
-    public static final String INDEX_NAME = "service_call_relation";
+    public static final String INDEX_NAME = "service_relation_client_side";
     public static final String SOURCE_SERVICE_ID = "source_service_id";
     public static final String DEST_SERVICE_ID = "dest_service_id";
 
@@ -60,7 +56,7 @@ public class ServiceCallRelationIndicator extends Indicator {
     }
 
     @Override public Indicator toHour() {
-        ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+        ServiceRelationClientSideIndicator indicator = new ServiceRelationClientSideIndicator();
         indicator.setTimeBucket(toTimeBucketInHour());
         indicator.setSourceServiceId(getSourceServiceId());
         indicator.setDestServiceId(getDestServiceId());
@@ -68,7 +64,7 @@ public class ServiceCallRelationIndicator extends Indicator {
     }
 
     @Override public Indicator toDay() {
-        ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+        ServiceRelationClientSideIndicator indicator = new ServiceRelationClientSideIndicator();
         indicator.setTimeBucket(toTimeBucketInDay());
         indicator.setSourceServiceId(getSourceServiceId());
         indicator.setDestServiceId(getDestServiceId());
@@ -76,7 +72,7 @@ public class ServiceCallRelationIndicator extends Indicator {
     }
 
     @Override public Indicator toMonth() {
-        ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+        ServiceRelationClientSideIndicator indicator = new ServiceRelationClientSideIndicator();
         indicator.setTimeBucket(toTimeBucketInMonth());
         indicator.setSourceServiceId(getSourceServiceId());
         indicator.setDestServiceId(getDestServiceId());
@@ -99,8 +95,8 @@ public class ServiceCallRelationIndicator extends Indicator {
     @Override public RemoteData.Builder serialize() {
         RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
 
-        remoteBuilder.setDataIntegers(0, getSourceServiceId());
         remoteBuilder.setDataIntegers(1, getDestServiceId());
+        remoteBuilder.setDataIntegers(0, getSourceServiceId());
         remoteBuilder.setDataLongs(0, getTimeBucket());
 
         return remoteBuilder;
@@ -122,7 +118,7 @@ public class ServiceCallRelationIndicator extends Indicator {
         if (getClass() != obj.getClass())
             return false;
 
-        ServiceCallRelationIndicator indicator = (ServiceCallRelationIndicator)obj;
+        ServiceRelationClientSideIndicator indicator = (ServiceRelationClientSideIndicator)obj;
         if (sourceServiceId != indicator.sourceServiceId)
             return false;
         if (destServiceId != indicator.destServiceId)
@@ -134,21 +130,21 @@ public class ServiceCallRelationIndicator extends Indicator {
         return true;
     }
 
-    public static class Builder implements StorageBuilder<ServiceCallRelationIndicator> {
+    public static class Builder implements StorageBuilder<ServiceRelationClientSideIndicator> {
 
-        @Override public ServiceCallRelationIndicator map2Data(Map<String, Object> dbMap) {
-            ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+        @Override public ServiceRelationClientSideIndicator map2Data(Map<String, Object> dbMap) {
+            ServiceRelationClientSideIndicator indicator = new ServiceRelationClientSideIndicator();
             indicator.setSourceServiceId(((Number)dbMap.get(SOURCE_SERVICE_ID)).intValue());
             indicator.setDestServiceId(((Number)dbMap.get(DEST_SERVICE_ID)).intValue());
             indicator.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
             return indicator;
         }
 
-        @Override public Map<String, Object> data2Map(ServiceCallRelationIndicator storageData) {
+        @Override public Map<String, Object> data2Map(ServiceRelationClientSideIndicator storageData) {
             Map<String, Object> map = new HashMap<>();
+            map.put(TIME_BUCKET, storageData.getTimeBucket());
             map.put(SOURCE_SERVICE_ID, storageData.getSourceServiceId());
             map.put(DEST_SERVICE_ID, storageData.getDestServiceId());
-            map.put(TIME_BUCKET, storageData.getTimeBucket());
             return map;
         }
     }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationServerSideIndicator.java
similarity index 81%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationIndicator.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationServerSideIndicator.java
index 0663607..c438c68 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationServerSideIndicator.java
@@ -18,26 +18,22 @@
 
 package org.apache.skywalking.oap.server.core.analysis.manual.servicerelation;
 
-import java.util.HashMap;
-import java.util.Map;
-import lombok.Getter;
-import lombok.Setter;
+import java.util.*;
+import lombok.*;
 import org.apache.skywalking.oap.server.core.Const;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
 import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
 import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
 import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
-import org.apache.skywalking.oap.server.core.storage.annotation.Column;
-import org.apache.skywalking.oap.server.core.storage.annotation.IDColumn;
-import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntity;
+import org.apache.skywalking.oap.server.core.storage.annotation.*;
 
 @IndicatorType
 @StreamData
-@StorageEntity(name = ServiceCallRelationIndicator.INDEX_NAME, builder = ServiceCallRelationIndicator.Builder.class)
-public class ServiceCallRelationIndicator extends Indicator {
+@StorageEntity(name = ServiceRelationServerSideIndicator.INDEX_NAME, builder = ServiceRelationServerSideIndicator.Builder.class)
+public class ServiceRelationServerSideIndicator extends Indicator {
 
-    public static final String INDEX_NAME = "service_call_relation";
+    public static final String INDEX_NAME = "service_relation_server_side";
     public static final String SOURCE_SERVICE_ID = "source_service_id";
     public static final String DEST_SERVICE_ID = "dest_service_id";
 
@@ -60,7 +56,7 @@ public class ServiceCallRelationIndicator extends Indicator {
     }
 
     @Override public Indicator toHour() {
-        ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+        ServiceRelationServerSideIndicator indicator = new ServiceRelationServerSideIndicator();
         indicator.setTimeBucket(toTimeBucketInHour());
         indicator.setSourceServiceId(getSourceServiceId());
         indicator.setDestServiceId(getDestServiceId());
@@ -68,7 +64,7 @@ public class ServiceCallRelationIndicator extends Indicator {
     }
 
     @Override public Indicator toDay() {
-        ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+        ServiceRelationServerSideIndicator indicator = new ServiceRelationServerSideIndicator();
         indicator.setTimeBucket(toTimeBucketInDay());
         indicator.setSourceServiceId(getSourceServiceId());
         indicator.setDestServiceId(getDestServiceId());
@@ -76,7 +72,7 @@ public class ServiceCallRelationIndicator extends Indicator {
     }
 
     @Override public Indicator toMonth() {
-        ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+        ServiceRelationServerSideIndicator indicator = new ServiceRelationServerSideIndicator();
         indicator.setTimeBucket(toTimeBucketInMonth());
         indicator.setSourceServiceId(getSourceServiceId());
         indicator.setDestServiceId(getDestServiceId());
@@ -122,7 +118,7 @@ public class ServiceCallRelationIndicator extends Indicator {
         if (getClass() != obj.getClass())
             return false;
 
-        ServiceCallRelationIndicator indicator = (ServiceCallRelationIndicator)obj;
+        ServiceRelationServerSideIndicator indicator = (ServiceRelationServerSideIndicator)obj;
         if (sourceServiceId != indicator.sourceServiceId)
             return false;
         if (destServiceId != indicator.destServiceId)
@@ -134,17 +130,17 @@ public class ServiceCallRelationIndicator extends Indicator {
         return true;
     }
 
-    public static class Builder implements StorageBuilder<ServiceCallRelationIndicator> {
+    public static class Builder implements StorageBuilder<ServiceRelationServerSideIndicator> {
 
-        @Override public ServiceCallRelationIndicator map2Data(Map<String, Object> dbMap) {
-            ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+        @Override public ServiceRelationServerSideIndicator map2Data(Map<String, Object> dbMap) {
+            ServiceRelationServerSideIndicator indicator = new ServiceRelationServerSideIndicator();
             indicator.setSourceServiceId(((Number)dbMap.get(SOURCE_SERVICE_ID)).intValue());
             indicator.setDestServiceId(((Number)dbMap.get(DEST_SERVICE_ID)).intValue());
             indicator.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
             return indicator;
         }
 
-        @Override public Map<String, Object> data2Map(ServiceCallRelationIndicator storageData) {
+        @Override public Map<String, Object> data2Map(ServiceRelationServerSideIndicator storageData) {
             Map<String, Object> map = new HashMap<>();
             map.put(SOURCE_SERVICE_ID, storageData.getSourceServiceId());
             map.put(DEST_SERVICE_ID, storageData.getDestServiceId());
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java
index 57993dd..930e2fa 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java
@@ -20,12 +20,10 @@ package org.apache.skywalking.oap.server.core.query;
 
 import java.io.IOException;
 import java.util.*;
-import org.apache.skywalking.oap.server.core.analysis.manual.service.*;
 import org.apache.skywalking.oap.server.core.query.entity.*;
-import org.apache.skywalking.oap.server.core.query.sql.*;
 import org.apache.skywalking.oap.server.core.source.*;
 import org.apache.skywalking.oap.server.core.storage.StorageModule;
-import org.apache.skywalking.oap.server.core.storage.query.*;
+import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
 import org.apache.skywalking.oap.server.library.module.*;
 import org.apache.skywalking.oap.server.library.module.Service;
 import org.slf4j.*;
@@ -38,90 +36,63 @@ public class TopologyQueryService implements Service {
     private static final Logger logger = LoggerFactory.getLogger(TopologyQueryService.class);
 
     private final ModuleManager moduleManager;
-    private IMetricQueryDAO metricQueryDAO;
-    private IUniqueQueryDAO uniqueQueryDAO;
+    private ITopologyQueryDAO topologyQueryDAO;
 
     public TopologyQueryService(ModuleManager moduleManager) {
         this.moduleManager = moduleManager;
     }
 
-    private IMetricQueryDAO getMetricQueryDAO() {
-        if (metricQueryDAO == null) {
-            metricQueryDAO = moduleManager.find(StorageModule.NAME).getService(IMetricQueryDAO.class);
+    private ITopologyQueryDAO getTopologyQueryDAO() {
+        if (topologyQueryDAO == null) {
+            topologyQueryDAO = moduleManager.find(StorageModule.NAME).getService(ITopologyQueryDAO.class);
         }
-        return metricQueryDAO;
-    }
-
-    private IUniqueQueryDAO getUniqueQueryDAO() {
-        if (uniqueQueryDAO == null) {
-            uniqueQueryDAO = moduleManager.find(StorageModule.NAME).getService(IUniqueQueryDAO.class);
-        }
-        return uniqueQueryDAO;
+        return topologyQueryDAO;
     }
 
     public Topology getGlobalTopology(final Step step, final long startTB, final long endTB) throws IOException {
         logger.debug("step: {}, startTimeBucket: {}, endTimeBucket: {}", step, startTB, endTB);
-        List<ServiceComponent> serviceComponents = loadServiceComponent(step, startTB, endTB);
-        List<ServiceMapping> serviceMappings = loadServiceMapping(step, startTB, endTB);
+        List<ServiceComponent> serviceComponents = getTopologyQueryDAO().loadServiceComponents(step, startTB, endTB);
+        List<ServiceMapping> serviceMappings = getTopologyQueryDAO().loadServiceMappings(step, startTB, endTB);
 
-        List<Call> serviceRelationClientCalls = loadServiceRelationCalls(step, startTB, endTB, "service_relation_client_calls_sum");
-        List<Call> serviceRelationServerCalls = loadServiceRelationCalls(step, startTB, endTB, "service_relation_server_calls_sum");
+        List<Call> serviceRelationClientCalls = getTopologyQueryDAO().loadClientSideServiceRelations(step, startTB, endTB);
+        List<Call> serviceRelationServerCalls = getTopologyQueryDAO().loadServerSideServiceRelations(step, startTB, endTB);
 
         TopologyBuilder builder = new TopologyBuilder(moduleManager);
         return builder.build(serviceComponents, serviceMappings, serviceRelationClientCalls, serviceRelationServerCalls);
     }
 
-    public Topology getServiceTopology(final Step step, final long startTimeBucket, final long endTimeBucket,
-        final String serviceId) {
-        return new Topology();
-    }
-
-    private List<ServiceComponent> loadServiceComponent(final Step step, final long startTB,
-        final long endTB) throws IOException {
-        List<TwoIdGroup> twoIdGroups = getUniqueQueryDAO().aggregation(ServiceComponentIndicator.INDEX_NAME, step, startTB, endTB,
-            new Where(), ServiceComponentIndicator.SERVICE_ID, ServiceComponentIndicator.COMPONENT_ID);
-
-        List<ServiceComponent> serviceComponents = new ArrayList<>();
-        twoIdGroups.forEach(twoIdGroup -> {
-            ServiceComponent serviceComponent = new ServiceComponent();
-            serviceComponent.setServiceId(twoIdGroup.getId1());
-            serviceComponent.setComponentId(twoIdGroup.getId2());
-            serviceComponents.add(serviceComponent);
-        });
-
-        return serviceComponents;
-    }
-
-    private List<ServiceMapping> loadServiceMapping(final Step step, final long startTB,
-        final long endTB) throws IOException {
-        List<TwoIdGroup> twoIdGroups = getUniqueQueryDAO().aggregation(ServiceMappingIndicator.INDEX_NAME, step, startTB, endTB,
-            new Where(), ServiceMappingIndicator.SERVICE_ID, ServiceMappingIndicator.MAPPING_SERVICE_ID);
-
-        List<ServiceMapping> serviceMappings = new ArrayList<>();
-        twoIdGroups.forEach(twoIdGroup -> {
-            ServiceMapping serviceMapping = new ServiceMapping();
-            serviceMapping.setServiceId(twoIdGroup.getId1());
-            serviceMapping.setMappingServiceId(twoIdGroup.getId2());
-            serviceMappings.add(serviceMapping);
+    public Topology getServiceTopology(final Step step, final long startTB, final long endTB,
+        final int serviceId) throws IOException {
+        List<ServiceComponent> serviceComponents = getTopologyQueryDAO().loadServiceComponents(step, startTB, endTB);
+        List<ServiceMapping> serviceMappings = getTopologyQueryDAO().loadServiceMappings(step, startTB, endTB);
+
+        Set<Integer> serviceIds = new HashSet<>();
+        serviceIds.add(serviceId);
+        serviceMappings.forEach(mapping -> {
+            if (mapping.getServiceId() == serviceId) {
+                serviceIds.add(mapping.getMappingServiceId());
+            }
         });
+        List<Integer> serviceIdList = new ArrayList<>(serviceIds);
 
-        return serviceMappings;
-    }
-
-    private List<Call> loadServiceRelationCalls(final Step step, final long startTB, final long endTB,
-        String indName) throws IOException {
-        List<TwoIdGroupValue> twoIdGroupValues = getMetricQueryDAO().aggregation(indName, step, startTB, endTB, new Where(), "source_service_id", "dest_service_id", "value", Function.Sum);
+        List<Call> serviceRelationClientCalls = getTopologyQueryDAO().loadSpecifiedClientSideServiceRelations(step, startTB, endTB, serviceIdList);
+        List<Call> serviceRelationServerCalls = getTopologyQueryDAO().loadSpecifiedServerSideServiceRelations(step, startTB, endTB, serviceIdList);
 
-        List<Call> clientCalls = new ArrayList<>();
+        TopologyBuilder builder = new TopologyBuilder(moduleManager);
+        Topology topology = builder.build(serviceComponents, serviceMappings, serviceRelationClientCalls, serviceRelationServerCalls);
 
-        twoIdGroupValues.forEach(twoIdGroupValue -> {
-            Call call = new Call();
-            call.setSource(twoIdGroupValue.getId1());
-            call.setTarget(twoIdGroupValue.getId2());
-            call.setCalls(twoIdGroupValue.getValue().longValue());
-            clientCalls.add(call);
+        Set<Integer> nodeIds = new HashSet<>();
+        topology.getCalls().forEach(call -> {
+            nodeIds.add(call.getSource());
+            nodeIds.add(call.getTarget());
         });
 
-        return clientCalls;
+        for (int i = topology.getNodes().size() - 1; i >= 0; i--) {
+            if (!nodeIds.contains(topology.getNodes().get(i).getId())) {
+                topology.getNodes().remove(i);
+            }
+        }
+
+        return topology;
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
index 6bf519d..c1bf763 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.core.storage;
 
 import org.apache.skywalking.oap.server.core.storage.cache.*;
+import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
 import org.apache.skywalking.oap.server.library.module.ModuleDefine;
 
 /**
@@ -36,6 +37,7 @@ public class StorageModule extends ModuleDefine {
         return new Class[] {
             IBatchDAO.class, StorageDAO.class, IRegisterLockDAO.class,
             IServiceInventoryCacheDAO.class, IServiceInstanceInventoryCacheDAO.class,
-            IEndpointInventoryCacheDAO.class, INetworkAddressInventoryCacheDAO.class};
+            IEndpointInventoryCacheDAO.class, INetworkAddressInventoryCacheDAO.class,
+            ITopologyQueryDAO.class};
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopologyQueryDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopologyQueryDAO.java
new file mode 100644
index 0000000..74d31b9
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopologyQueryDAO.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage.query;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.skywalking.oap.server.core.query.entity.*;
+import org.apache.skywalking.oap.server.core.source.*;
+import org.apache.skywalking.oap.server.library.module.Service;
+
+/**
+ * @author peng-yongsheng
+ */
+public interface ITopologyQueryDAO extends Service {
+
+    List<Call> loadSpecifiedServerSideServiceRelations(Step step, long startTB, long endTB,
+        List<Integer> serviceIds) throws IOException;
+
+    List<Call> loadSpecifiedClientSideServiceRelations(Step step, long startTB, long endTB,
+        List<Integer> serviceIds) throws IOException;
+
+    List<Call> loadServerSideServiceRelations(Step step, long startTB, long endTB) throws IOException;
+
+    List<Call> loadClientSideServiceRelations(Step step, long startTB, long endTB) throws IOException;
+
+    List<ServiceMapping> loadServiceMappings(Step step, long startTB, long endTB) throws IOException;
+
+    List<ServiceComponent> loadServiceComponents(Step step, long startTB, long endTB) throws IOException;
+}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
index 501513f..c90ffb7 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
@@ -50,7 +50,7 @@ public class TopologyQuery implements GraphQLQueryResolver {
         return getQueryService().getGlobalTopology(duration.getStep(), startTimeBucket, endTimeBucket);
     }
 
-    public Topology getServiceTopology(final String serviceId, final Duration duration) {
+    public Topology getServiceTopology(final int serviceId, final Duration duration) throws IOException {
         long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
         long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
 
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index 8acc093..696e86e 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -20,12 +20,14 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
 
 import org.apache.skywalking.oap.server.core.storage.*;
 import org.apache.skywalking.oap.server.core.storage.cache.*;
+import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
 import org.apache.skywalking.oap.server.library.client.NameSpace;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.oap.server.library.module.*;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.*;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.*;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock.*;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopologyQueryEsDAO;
 import org.slf4j.*;
 
 /**
@@ -72,6 +74,8 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
         this.registerServiceImplementation(IServiceInstanceInventoryCacheDAO.class, new ServiceInstanceInventoryCacheDAO(elasticSearchClient));
         this.registerServiceImplementation(IEndpointInventoryCacheDAO.class, new EndpointInventoryCacheEsDAO(elasticSearchClient));
         this.registerServiceImplementation(INetworkAddressInventoryCacheDAO.class, new NetworkAddressInventoryCacheEsDAO(elasticSearchClient));
+
+        this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearchClient));
     }
 
     @Override
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
index fcc4a1e..48bb44c 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
@@ -34,7 +34,7 @@ public abstract class EsDAO extends AbstractDAO<ElasticSearchClient> {
         super(client);
     }
 
-    public void queryBuild(SearchSourceBuilder sourceBuilder, Where where, long startTB, long endTB) {
+    public final void queryBuild(SearchSourceBuilder sourceBuilder, Where where, long startTB, long endTB) {
         RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(Indicator.TIME_BUCKET).gte(startTB).lte(endTB);
         if (where.getKeyValues().isEmpty()) {
             sourceBuilder.query(rangeQueryBuilder);
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java
new file mode 100644
index 0000000..5e7be1b
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
+
+import java.io.IOException;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.manual.service.*;
+import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.*;
+import org.apache.skywalking.oap.server.core.query.entity.*;
+import org.apache.skywalking.oap.server.core.source.*;
+import org.apache.skywalking.oap.server.core.storage.TimePyramidTableNameBuilder;
+import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.*;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+import org.elasticsearch.search.aggregations.bucket.terms.*;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+
+/**
+ * @author peng-yongsheng
+ */
+public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
+
+    public TopologyQueryEsDAO(ElasticSearchClient client) {
+        super(client);
+    }
+
+    @Override
+    public List<Call> loadSpecifiedServerSideServiceRelations(Step step, long startTB, long endTB,
+        List<Integer> serviceIds) throws IOException {
+        if (CollectionUtils.isEmpty(serviceIds)) {
+            throw new UnexpectedException("Service id is null");
+        }
+
+        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+        sourceBuilder.size(0);
+        setQueryCondition(sourceBuilder, startTB, endTB, serviceIds);
+
+        String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationServerSideIndicator.INDEX_NAME);
+        return load(sourceBuilder, indexName, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID);
+    }
+
+    @Override
+    public List<Call> loadSpecifiedClientSideServiceRelations(Step step, long startTB, long endTB,
+        List<Integer> serviceIds) throws IOException {
+        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+        sourceBuilder.size(0);
+        setQueryCondition(sourceBuilder, startTB, endTB, serviceIds);
+
+        String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationClientSideIndicator.INDEX_NAME);
+        return load(sourceBuilder, indexName, ServiceRelationClientSideIndicator.SOURCE_SERVICE_ID, ServiceRelationClientSideIndicator.DEST_SERVICE_ID);
+    }
+
+    private void setQueryCondition(SearchSourceBuilder sourceBuilder, long startTB, long endTB,
+        List<Integer> serviceIds) {
+        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
+        boolQuery.must().add(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
+
+        BoolQueryBuilder serviceIdBoolQuery = QueryBuilders.boolQuery();
+        boolQuery.must().add(serviceIdBoolQuery);
+
+        if (serviceIds.size() == 1) {
+            boolQuery.should().add(QueryBuilders.termQuery(ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, serviceIds.get(0)));
+            boolQuery.should().add(QueryBuilders.termQuery(ServiceRelationServerSideIndicator.DEST_SERVICE_ID, serviceIds.get(0)));
+        } else {
+            boolQuery.should().add(QueryBuilders.termsQuery(ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, serviceIds));
+            boolQuery.should().add(QueryBuilders.termsQuery(ServiceRelationServerSideIndicator.DEST_SERVICE_ID, serviceIds));
+        }
+        sourceBuilder.query(boolQuery);
+    }
+
+    @Override public List<Call> loadServerSideServiceRelations(Step step, long startTB, long endTB) throws IOException {
+        String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationServerSideIndicator.INDEX_NAME);
+        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+        sourceBuilder.query(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
+        sourceBuilder.size(0);
+
+        return load(sourceBuilder, indexName, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID);
+    }
+
+    @Override public List<Call> loadClientSideServiceRelations(Step step, long startTB, long endTB) throws IOException {
+        String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationClientSideIndicator.INDEX_NAME);
+        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+        sourceBuilder.query(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
+        sourceBuilder.size(0);
+
+        return load(sourceBuilder, indexName, ServiceRelationClientSideIndicator.SOURCE_SERVICE_ID, ServiceRelationClientSideIndicator.DEST_SERVICE_ID);
+    }
+
+    private List<Call> load(SearchSourceBuilder sourceBuilder, String indexName, String sourceCName,
+        String destCName) throws IOException {
+        TermsAggregationBuilder sourceAggregation = AggregationBuilders.terms(sourceCName).field(sourceCName).size(1000);
+        sourceAggregation.subAggregation(AggregationBuilders.terms(destCName).field(destCName).size(1000));
+        sourceBuilder.aggregation(sourceAggregation);
+
+        SearchResponse response = getClient().search(indexName, sourceBuilder);
+
+        List<Call> calls = new ArrayList<>();
+        Terms sourceTerms = response.getAggregations().get(sourceCName);
+        for (Terms.Bucket sourceBucket : sourceTerms.getBuckets()) {
+            Terms destTerms = sourceBucket.getAggregations().get(destCName);
+            for (Terms.Bucket destBucket : destTerms.getBuckets()) {
+                Call value = new Call();
+                value.setSource(sourceBucket.getKeyAsNumber().intValue());
+                value.setTarget(destBucket.getKeyAsNumber().intValue());
+                calls.add(value);
+            }
+        }
+        return calls;
+    }
+
+    @Override public List<ServiceMapping> loadServiceMappings(Step step, long startTB, long endTB) throws IOException {
+        String indexName = TimePyramidTableNameBuilder.build(step, ServiceMappingIndicator.INDEX_NAME);
+        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+        sourceBuilder.query(QueryBuilders.rangeQuery(ServiceMappingIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
+        sourceBuilder.size(0);
+
+        TermsAggregationBuilder sourceAggregation = AggregationBuilders.terms(ServiceMappingIndicator.SERVICE_ID).field(ServiceMappingIndicator.SERVICE_ID).size(1000);
+        sourceAggregation.subAggregation(AggregationBuilders.terms(ServiceMappingIndicator.MAPPING_SERVICE_ID).field(ServiceMappingIndicator.MAPPING_SERVICE_ID).size(1000));
+        sourceBuilder.aggregation(sourceAggregation);
+
+        SearchResponse response = getClient().search(indexName, sourceBuilder);
+
+        List<ServiceMapping> serviceMappings = new ArrayList<>();
+        Terms serviceIdTerms = response.getAggregations().get(ServiceMappingIndicator.SERVICE_ID);
+        for (Terms.Bucket serviceIdBucket : serviceIdTerms.getBuckets()) {
+            Terms mappingServiceIdTerms = serviceIdBucket.getAggregations().get(ServiceMappingIndicator.MAPPING_SERVICE_ID);
+            for (Terms.Bucket mappingServiceIdBucket : mappingServiceIdTerms.getBuckets()) {
+                ServiceMapping serviceMapping = new ServiceMapping();
+                serviceMapping.setServiceId(serviceIdBucket.getKeyAsNumber().intValue());
+                serviceMapping.setMappingServiceId(mappingServiceIdBucket.getKeyAsNumber().intValue());
+                serviceMappings.add(serviceMapping);
+            }
+        }
+        return serviceMappings;
+    }
+
+    @Override
+    public List<ServiceComponent> loadServiceComponents(Step step, long startTB, long endTB) throws IOException {
+        String indexName = TimePyramidTableNameBuilder.build(step, ServiceComponentIndicator.INDEX_NAME);
+        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+        sourceBuilder.query(QueryBuilders.rangeQuery(ServiceComponentIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
+        sourceBuilder.size(0);
+
+        TermsAggregationBuilder sourceAggregation = AggregationBuilders.terms(ServiceComponentIndicator.SERVICE_ID).field(ServiceComponentIndicator.SERVICE_ID).size(1000);
+        sourceAggregation.subAggregation(AggregationBuilders.terms(ServiceComponentIndicator.COMPONENT_ID).field(ServiceComponentIndicator.COMPONENT_ID).size(1000));
+        sourceBuilder.aggregation(sourceAggregation);
+
+        SearchResponse response = getClient().search(indexName, sourceBuilder);
+
+        List<ServiceComponent> serviceComponents = new ArrayList<>();
+        Terms serviceIdTerms = response.getAggregations().get(ServiceComponentIndicator.SERVICE_ID);
+        for (Terms.Bucket serviceIdBucket : serviceIdTerms.getBuckets()) {
+            Terms componentIdTerms = serviceIdBucket.getAggregations().get(ServiceComponentIndicator.COMPONENT_ID);
+            for (Terms.Bucket componentIdBucket : componentIdTerms.getBuckets()) {
+                ServiceComponent serviceComponent = new ServiceComponent();
+                serviceComponent.setServiceId(serviceIdBucket.getKeyAsNumber().intValue());
+                serviceComponent.setComponentId(componentIdBucket.getKeyAsNumber().intValue());
+                serviceComponents.add(serviceComponent);
+            }
+        }
+        return serviceComponents;
+    }
+}