You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2018/09/13 11:10:48 UTC

[GitHub] wu-sheng closed pull request #1671: Global topology and service topology write by manual.

wu-sheng closed pull request #1671: Global topology and service topology write by manual.
URL: https://github.com/apache/incubator-skywalking/pull/1671
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationDispatcher.java
index aad015f59..c1e3cb426 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationDispatcher.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationDispatcher.java
@@ -28,11 +28,26 @@
 public class ServiceCallRelationDispatcher implements SourceDispatcher<ServiceRelation> {
     @Override
     public void dispatch(ServiceRelation source) {
-        doDispatch(source);
+        switch (source.getDetectPoint()) {
+            case SERVER:
+                serverSide(source);
+                break;
+            case CLIENT:
+                clientSide(source);
+                break;
+        }
     }
 
-    public void doDispatch(ServiceRelation source) {
-        ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+    private void serverSide(ServiceRelation source) {
+        ServiceRelationServerSideIndicator indicator = new ServiceRelationServerSideIndicator();
+        indicator.setTimeBucket(source.getTimeBucket());
+        indicator.setSourceServiceId(source.getSourceServiceId());
+        indicator.setDestServiceId(source.getDestServiceId());
+        IndicatorProcess.INSTANCE.in(indicator);
+    }
+
+    private void clientSide(ServiceRelation source) {
+        ServiceRelationClientSideIndicator indicator = new ServiceRelationClientSideIndicator();
         indicator.setTimeBucket(source.getTimeBucket());
         indicator.setSourceServiceId(source.getSourceServiceId());
         indicator.setDestServiceId(source.getDestServiceId());
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationClientSideIndicator.java
similarity index 81%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationIndicator.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationClientSideIndicator.java
index 0663607f5..cd38b00b0 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationClientSideIndicator.java
@@ -18,26 +18,22 @@
 
 package org.apache.skywalking.oap.server.core.analysis.manual.servicerelation;
 
-import java.util.HashMap;
-import java.util.Map;
-import lombok.Getter;
-import lombok.Setter;
+import java.util.*;
+import lombok.*;
 import org.apache.skywalking.oap.server.core.Const;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
 import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
 import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
 import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
-import org.apache.skywalking.oap.server.core.storage.annotation.Column;
-import org.apache.skywalking.oap.server.core.storage.annotation.IDColumn;
-import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntity;
+import org.apache.skywalking.oap.server.core.storage.annotation.*;
 
 @IndicatorType
 @StreamData
-@StorageEntity(name = ServiceCallRelationIndicator.INDEX_NAME, builder = ServiceCallRelationIndicator.Builder.class)
-public class ServiceCallRelationIndicator extends Indicator {
+@StorageEntity(name = ServiceRelationClientSideIndicator.INDEX_NAME, builder = ServiceRelationClientSideIndicator.Builder.class)
+public class ServiceRelationClientSideIndicator extends Indicator {
 
-    public static final String INDEX_NAME = "service_call_relation";
+    public static final String INDEX_NAME = "service_relation_client_side";
     public static final String SOURCE_SERVICE_ID = "source_service_id";
     public static final String DEST_SERVICE_ID = "dest_service_id";
 
@@ -60,7 +56,7 @@
     }
 
     @Override public Indicator toHour() {
-        ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+        ServiceRelationClientSideIndicator indicator = new ServiceRelationClientSideIndicator();
         indicator.setTimeBucket(toTimeBucketInHour());
         indicator.setSourceServiceId(getSourceServiceId());
         indicator.setDestServiceId(getDestServiceId());
@@ -68,7 +64,7 @@
     }
 
     @Override public Indicator toDay() {
-        ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+        ServiceRelationClientSideIndicator indicator = new ServiceRelationClientSideIndicator();
         indicator.setTimeBucket(toTimeBucketInDay());
         indicator.setSourceServiceId(getSourceServiceId());
         indicator.setDestServiceId(getDestServiceId());
@@ -76,7 +72,7 @@
     }
 
     @Override public Indicator toMonth() {
-        ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+        ServiceRelationClientSideIndicator indicator = new ServiceRelationClientSideIndicator();
         indicator.setTimeBucket(toTimeBucketInMonth());
         indicator.setSourceServiceId(getSourceServiceId());
         indicator.setDestServiceId(getDestServiceId());
@@ -99,8 +95,8 @@
     @Override public RemoteData.Builder serialize() {
         RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
 
-        remoteBuilder.setDataIntegers(0, getSourceServiceId());
         remoteBuilder.setDataIntegers(1, getDestServiceId());
+        remoteBuilder.setDataIntegers(0, getSourceServiceId());
         remoteBuilder.setDataLongs(0, getTimeBucket());
 
         return remoteBuilder;
@@ -122,7 +118,7 @@
         if (getClass() != obj.getClass())
             return false;
 
-        ServiceCallRelationIndicator indicator = (ServiceCallRelationIndicator)obj;
+        ServiceRelationClientSideIndicator indicator = (ServiceRelationClientSideIndicator)obj;
         if (sourceServiceId != indicator.sourceServiceId)
             return false;
         if (destServiceId != indicator.destServiceId)
@@ -134,21 +130,21 @@
         return true;
     }
 
-    public static class Builder implements StorageBuilder<ServiceCallRelationIndicator> {
+    public static class Builder implements StorageBuilder<ServiceRelationClientSideIndicator> {
 
-        @Override public ServiceCallRelationIndicator map2Data(Map<String, Object> dbMap) {
-            ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+        @Override public ServiceRelationClientSideIndicator map2Data(Map<String, Object> dbMap) {
+            ServiceRelationClientSideIndicator indicator = new ServiceRelationClientSideIndicator();
             indicator.setSourceServiceId(((Number)dbMap.get(SOURCE_SERVICE_ID)).intValue());
             indicator.setDestServiceId(((Number)dbMap.get(DEST_SERVICE_ID)).intValue());
             indicator.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
             return indicator;
         }
 
-        @Override public Map<String, Object> data2Map(ServiceCallRelationIndicator storageData) {
+        @Override public Map<String, Object> data2Map(ServiceRelationClientSideIndicator storageData) {
             Map<String, Object> map = new HashMap<>();
+            map.put(TIME_BUCKET, storageData.getTimeBucket());
             map.put(SOURCE_SERVICE_ID, storageData.getSourceServiceId());
             map.put(DEST_SERVICE_ID, storageData.getDestServiceId());
-            map.put(TIME_BUCKET, storageData.getTimeBucket());
             return map;
         }
     }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationServerSideIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationServerSideIndicator.java
new file mode 100644
index 000000000..c438c6852
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationServerSideIndicator.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.servicerelation;
+
+import java.util.*;
+import lombok.*;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
+import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.annotation.*;
+
+@IndicatorType
+@StreamData
+@StorageEntity(name = ServiceRelationServerSideIndicator.INDEX_NAME, builder = ServiceRelationServerSideIndicator.Builder.class)
+public class ServiceRelationServerSideIndicator extends Indicator {
+
+    public static final String INDEX_NAME = "service_relation_server_side";
+    public static final String SOURCE_SERVICE_ID = "source_service_id";
+    public static final String DEST_SERVICE_ID = "dest_service_id";
+
+    @Setter @Getter @Column(columnName = SOURCE_SERVICE_ID) @IDColumn private int sourceServiceId;
+    @Setter @Getter @Column(columnName = DEST_SERVICE_ID) @IDColumn private int destServiceId;
+
+    @Override public String id() {
+        String splitJointId = String.valueOf(getTimeBucket());
+        splitJointId += Const.ID_SPLIT + String.valueOf(sourceServiceId);
+        splitJointId += Const.ID_SPLIT + String.valueOf(destServiceId);
+        return splitJointId;
+    }
+
+    @Override public void combine(Indicator indicator) {
+
+    }
+
+    @Override public void calculate() {
+
+    }
+
+    @Override public Indicator toHour() {
+        ServiceRelationServerSideIndicator indicator = new ServiceRelationServerSideIndicator();
+        indicator.setTimeBucket(toTimeBucketInHour());
+        indicator.setSourceServiceId(getSourceServiceId());
+        indicator.setDestServiceId(getDestServiceId());
+        return indicator;
+    }
+
+    @Override public Indicator toDay() {
+        ServiceRelationServerSideIndicator indicator = new ServiceRelationServerSideIndicator();
+        indicator.setTimeBucket(toTimeBucketInDay());
+        indicator.setSourceServiceId(getSourceServiceId());
+        indicator.setDestServiceId(getDestServiceId());
+        return indicator;
+    }
+
+    @Override public Indicator toMonth() {
+        ServiceRelationServerSideIndicator indicator = new ServiceRelationServerSideIndicator();
+        indicator.setTimeBucket(toTimeBucketInMonth());
+        indicator.setSourceServiceId(getSourceServiceId());
+        indicator.setDestServiceId(getDestServiceId());
+        return indicator;
+    }
+
+    @Override public int remoteHashCode() {
+        int result = 17;
+        result = 31 * result + sourceServiceId;
+        result = 31 * result + destServiceId;
+        return result;
+    }
+
+    @Override public void deserialize(RemoteData remoteData) {
+        setSourceServiceId(remoteData.getDataIntegers(0));
+        setDestServiceId(remoteData.getDataIntegers(1));
+        setTimeBucket(remoteData.getDataLongs(0));
+    }
+
+    @Override public RemoteData.Builder serialize() {
+        RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
+
+        remoteBuilder.setDataIntegers(0, getSourceServiceId());
+        remoteBuilder.setDataIntegers(1, getDestServiceId());
+        remoteBuilder.setDataLongs(0, getTimeBucket());
+
+        return remoteBuilder;
+    }
+
+    @Override public int hashCode() {
+        int result = 17;
+        result = 31 * result + sourceServiceId;
+        result = 31 * result + destServiceId;
+        result = 31 * result + (int)getTimeBucket();
+        return result;
+    }
+
+    @Override public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+
+        ServiceRelationServerSideIndicator indicator = (ServiceRelationServerSideIndicator)obj;
+        if (sourceServiceId != indicator.sourceServiceId)
+            return false;
+        if (destServiceId != indicator.destServiceId)
+            return false;
+
+        if (getTimeBucket() != indicator.getTimeBucket())
+            return false;
+
+        return true;
+    }
+
+    public static class Builder implements StorageBuilder<ServiceRelationServerSideIndicator> {
+
+        @Override public ServiceRelationServerSideIndicator map2Data(Map<String, Object> dbMap) {
+            ServiceRelationServerSideIndicator indicator = new ServiceRelationServerSideIndicator();
+            indicator.setSourceServiceId(((Number)dbMap.get(SOURCE_SERVICE_ID)).intValue());
+            indicator.setDestServiceId(((Number)dbMap.get(DEST_SERVICE_ID)).intValue());
+            indicator.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
+            return indicator;
+        }
+
+        @Override public Map<String, Object> data2Map(ServiceRelationServerSideIndicator storageData) {
+            Map<String, Object> map = new HashMap<>();
+            map.put(SOURCE_SERVICE_ID, storageData.getSourceServiceId());
+            map.put(DEST_SERVICE_ID, storageData.getDestServiceId());
+            map.put(TIME_BUCKET, storageData.getTimeBucket());
+            return map;
+        }
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java
index 57993dd3e..930e2fae8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java
@@ -20,12 +20,10 @@
 
 import java.io.IOException;
 import java.util.*;
-import org.apache.skywalking.oap.server.core.analysis.manual.service.*;
 import org.apache.skywalking.oap.server.core.query.entity.*;
-import org.apache.skywalking.oap.server.core.query.sql.*;
 import org.apache.skywalking.oap.server.core.source.*;
 import org.apache.skywalking.oap.server.core.storage.StorageModule;
-import org.apache.skywalking.oap.server.core.storage.query.*;
+import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
 import org.apache.skywalking.oap.server.library.module.*;
 import org.apache.skywalking.oap.server.library.module.Service;
 import org.slf4j.*;
@@ -38,90 +36,63 @@
     private static final Logger logger = LoggerFactory.getLogger(TopologyQueryService.class);
 
     private final ModuleManager moduleManager;
-    private IMetricQueryDAO metricQueryDAO;
-    private IUniqueQueryDAO uniqueQueryDAO;
+    private ITopologyQueryDAO topologyQueryDAO;
 
     public TopologyQueryService(ModuleManager moduleManager) {
         this.moduleManager = moduleManager;
     }
 
-    private IMetricQueryDAO getMetricQueryDAO() {
-        if (metricQueryDAO == null) {
-            metricQueryDAO = moduleManager.find(StorageModule.NAME).getService(IMetricQueryDAO.class);
+    private ITopologyQueryDAO getTopologyQueryDAO() {
+        if (topologyQueryDAO == null) {
+            topologyQueryDAO = moduleManager.find(StorageModule.NAME).getService(ITopologyQueryDAO.class);
         }
-        return metricQueryDAO;
-    }
-
-    private IUniqueQueryDAO getUniqueQueryDAO() {
-        if (uniqueQueryDAO == null) {
-            uniqueQueryDAO = moduleManager.find(StorageModule.NAME).getService(IUniqueQueryDAO.class);
-        }
-        return uniqueQueryDAO;
+        return topologyQueryDAO;
     }
 
     public Topology getGlobalTopology(final Step step, final long startTB, final long endTB) throws IOException {
         logger.debug("step: {}, startTimeBucket: {}, endTimeBucket: {}", step, startTB, endTB);
-        List<ServiceComponent> serviceComponents = loadServiceComponent(step, startTB, endTB);
-        List<ServiceMapping> serviceMappings = loadServiceMapping(step, startTB, endTB);
+        List<ServiceComponent> serviceComponents = getTopologyQueryDAO().loadServiceComponents(step, startTB, endTB);
+        List<ServiceMapping> serviceMappings = getTopologyQueryDAO().loadServiceMappings(step, startTB, endTB);
 
-        List<Call> serviceRelationClientCalls = loadServiceRelationCalls(step, startTB, endTB, "service_relation_client_calls_sum");
-        List<Call> serviceRelationServerCalls = loadServiceRelationCalls(step, startTB, endTB, "service_relation_server_calls_sum");
+        List<Call> serviceRelationClientCalls = getTopologyQueryDAO().loadClientSideServiceRelations(step, startTB, endTB);
+        List<Call> serviceRelationServerCalls = getTopologyQueryDAO().loadServerSideServiceRelations(step, startTB, endTB);
 
         TopologyBuilder builder = new TopologyBuilder(moduleManager);
         return builder.build(serviceComponents, serviceMappings, serviceRelationClientCalls, serviceRelationServerCalls);
     }
 
-    public Topology getServiceTopology(final Step step, final long startTimeBucket, final long endTimeBucket,
-        final String serviceId) {
-        return new Topology();
-    }
-
-    private List<ServiceComponent> loadServiceComponent(final Step step, final long startTB,
-        final long endTB) throws IOException {
-        List<TwoIdGroup> twoIdGroups = getUniqueQueryDAO().aggregation(ServiceComponentIndicator.INDEX_NAME, step, startTB, endTB,
-            new Where(), ServiceComponentIndicator.SERVICE_ID, ServiceComponentIndicator.COMPONENT_ID);
-
-        List<ServiceComponent> serviceComponents = new ArrayList<>();
-        twoIdGroups.forEach(twoIdGroup -> {
-            ServiceComponent serviceComponent = new ServiceComponent();
-            serviceComponent.setServiceId(twoIdGroup.getId1());
-            serviceComponent.setComponentId(twoIdGroup.getId2());
-            serviceComponents.add(serviceComponent);
-        });
-
-        return serviceComponents;
-    }
-
-    private List<ServiceMapping> loadServiceMapping(final Step step, final long startTB,
-        final long endTB) throws IOException {
-        List<TwoIdGroup> twoIdGroups = getUniqueQueryDAO().aggregation(ServiceMappingIndicator.INDEX_NAME, step, startTB, endTB,
-            new Where(), ServiceMappingIndicator.SERVICE_ID, ServiceMappingIndicator.MAPPING_SERVICE_ID);
-
-        List<ServiceMapping> serviceMappings = new ArrayList<>();
-        twoIdGroups.forEach(twoIdGroup -> {
-            ServiceMapping serviceMapping = new ServiceMapping();
-            serviceMapping.setServiceId(twoIdGroup.getId1());
-            serviceMapping.setMappingServiceId(twoIdGroup.getId2());
-            serviceMappings.add(serviceMapping);
+    public Topology getServiceTopology(final Step step, final long startTB, final long endTB,
+        final int serviceId) throws IOException {
+        List<ServiceComponent> serviceComponents = getTopologyQueryDAO().loadServiceComponents(step, startTB, endTB);
+        List<ServiceMapping> serviceMappings = getTopologyQueryDAO().loadServiceMappings(step, startTB, endTB);
+
+        Set<Integer> serviceIds = new HashSet<>();
+        serviceIds.add(serviceId);
+        serviceMappings.forEach(mapping -> {
+            if (mapping.getServiceId() == serviceId) {
+                serviceIds.add(mapping.getMappingServiceId());
+            }
         });
+        List<Integer> serviceIdList = new ArrayList<>(serviceIds);
 
-        return serviceMappings;
-    }
-
-    private List<Call> loadServiceRelationCalls(final Step step, final long startTB, final long endTB,
-        String indName) throws IOException {
-        List<TwoIdGroupValue> twoIdGroupValues = getMetricQueryDAO().aggregation(indName, step, startTB, endTB, new Where(), "source_service_id", "dest_service_id", "value", Function.Sum);
+        List<Call> serviceRelationClientCalls = getTopologyQueryDAO().loadSpecifiedClientSideServiceRelations(step, startTB, endTB, serviceIdList);
+        List<Call> serviceRelationServerCalls = getTopologyQueryDAO().loadSpecifiedServerSideServiceRelations(step, startTB, endTB, serviceIdList);
 
-        List<Call> clientCalls = new ArrayList<>();
+        TopologyBuilder builder = new TopologyBuilder(moduleManager);
+        Topology topology = builder.build(serviceComponents, serviceMappings, serviceRelationClientCalls, serviceRelationServerCalls);
 
-        twoIdGroupValues.forEach(twoIdGroupValue -> {
-            Call call = new Call();
-            call.setSource(twoIdGroupValue.getId1());
-            call.setTarget(twoIdGroupValue.getId2());
-            call.setCalls(twoIdGroupValue.getValue().longValue());
-            clientCalls.add(call);
+        Set<Integer> nodeIds = new HashSet<>();
+        topology.getCalls().forEach(call -> {
+            nodeIds.add(call.getSource());
+            nodeIds.add(call.getTarget());
         });
 
-        return clientCalls;
+        for (int i = topology.getNodes().size() - 1; i >= 0; i--) {
+            if (!nodeIds.contains(topology.getNodes().get(i).getId())) {
+                topology.getNodes().remove(i);
+            }
+        }
+
+        return topology;
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
index 6bf519deb..c1bf7634d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.core.storage;
 
 import org.apache.skywalking.oap.server.core.storage.cache.*;
+import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
 import org.apache.skywalking.oap.server.library.module.ModuleDefine;
 
 /**
@@ -36,6 +37,7 @@
         return new Class[] {
             IBatchDAO.class, StorageDAO.class, IRegisterLockDAO.class,
             IServiceInventoryCacheDAO.class, IServiceInstanceInventoryCacheDAO.class,
-            IEndpointInventoryCacheDAO.class, INetworkAddressInventoryCacheDAO.class};
+            IEndpointInventoryCacheDAO.class, INetworkAddressInventoryCacheDAO.class,
+            ITopologyQueryDAO.class};
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopologyQueryDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopologyQueryDAO.java
new file mode 100644
index 000000000..74d31b942
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopologyQueryDAO.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage.query;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.skywalking.oap.server.core.query.entity.*;
+import org.apache.skywalking.oap.server.core.source.*;
+import org.apache.skywalking.oap.server.library.module.Service;
+
+/**
+ * @author peng-yongsheng
+ */
+public interface ITopologyQueryDAO extends Service {
+
+    List<Call> loadSpecifiedServerSideServiceRelations(Step step, long startTB, long endTB,
+        List<Integer> serviceIds) throws IOException;
+
+    List<Call> loadSpecifiedClientSideServiceRelations(Step step, long startTB, long endTB,
+        List<Integer> serviceIds) throws IOException;
+
+    List<Call> loadServerSideServiceRelations(Step step, long startTB, long endTB) throws IOException;
+
+    List<Call> loadClientSideServiceRelations(Step step, long startTB, long endTB) throws IOException;
+
+    List<ServiceMapping> loadServiceMappings(Step step, long startTB, long endTB) throws IOException;
+
+    List<ServiceComponent> loadServiceComponents(Step step, long startTB, long endTB) throws IOException;
+}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
index 501513f67..c90ffb71f 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
@@ -50,7 +50,7 @@ public Topology getGlobalTopology(final Duration duration) throws IOException {
         return getQueryService().getGlobalTopology(duration.getStep(), startTimeBucket, endTimeBucket);
     }
 
-    public Topology getServiceTopology(final String serviceId, final Duration duration) {
+    public Topology getServiceTopology(final int serviceId, final Duration duration) throws IOException {
         long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
         long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
 
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index 8acc0936d..696e86e74 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -20,12 +20,14 @@
 
 import org.apache.skywalking.oap.server.core.storage.*;
 import org.apache.skywalking.oap.server.core.storage.cache.*;
+import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
 import org.apache.skywalking.oap.server.library.client.NameSpace;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.oap.server.library.module.*;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.*;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.*;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock.*;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopologyQueryEsDAO;
 import org.slf4j.*;
 
 /**
@@ -72,6 +74,8 @@ public void prepare() throws ServiceNotProvidedException {
         this.registerServiceImplementation(IServiceInstanceInventoryCacheDAO.class, new ServiceInstanceInventoryCacheDAO(elasticSearchClient));
         this.registerServiceImplementation(IEndpointInventoryCacheDAO.class, new EndpointInventoryCacheEsDAO(elasticSearchClient));
         this.registerServiceImplementation(INetworkAddressInventoryCacheDAO.class, new NetworkAddressInventoryCacheEsDAO(elasticSearchClient));
+
+        this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearchClient));
     }
 
     @Override
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
index fcc4a1ead..48bb44c1e 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
@@ -34,7 +34,7 @@ public EsDAO(ElasticSearchClient client) {
         super(client);
     }
 
-    public void queryBuild(SearchSourceBuilder sourceBuilder, Where where, long startTB, long endTB) {
+    public final void queryBuild(SearchSourceBuilder sourceBuilder, Where where, long startTB, long endTB) {
         RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(Indicator.TIME_BUCKET).gte(startTB).lte(endTB);
         if (where.getKeyValues().isEmpty()) {
             sourceBuilder.query(rangeQueryBuilder);
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java
new file mode 100644
index 000000000..5e7be1b6e
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
+
+import java.io.IOException;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.manual.service.*;
+import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.*;
+import org.apache.skywalking.oap.server.core.query.entity.*;
+import org.apache.skywalking.oap.server.core.source.*;
+import org.apache.skywalking.oap.server.core.storage.TimePyramidTableNameBuilder;
+import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.*;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+import org.elasticsearch.search.aggregations.bucket.terms.*;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+
+/**
+ * @author peng-yongsheng
+ */
+public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
+
+    public TopologyQueryEsDAO(ElasticSearchClient client) {
+        super(client);
+    }
+
+    @Override
+    public List<Call> loadSpecifiedServerSideServiceRelations(Step step, long startTB, long endTB,
+        List<Integer> serviceIds) throws IOException {
+        if (CollectionUtils.isEmpty(serviceIds)) {
+            throw new UnexpectedException("Service id is null");
+        }
+
+        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+        sourceBuilder.size(0);
+        setQueryCondition(sourceBuilder, startTB, endTB, serviceIds);
+
+        String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationServerSideIndicator.INDEX_NAME);
+        return load(sourceBuilder, indexName, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID);
+    }
+
+    @Override
+    public List<Call> loadSpecifiedClientSideServiceRelations(Step step, long startTB, long endTB,
+        List<Integer> serviceIds) throws IOException {
+        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+        sourceBuilder.size(0);
+        setQueryCondition(sourceBuilder, startTB, endTB, serviceIds);
+
+        String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationClientSideIndicator.INDEX_NAME);
+        return load(sourceBuilder, indexName, ServiceRelationClientSideIndicator.SOURCE_SERVICE_ID, ServiceRelationClientSideIndicator.DEST_SERVICE_ID);
+    }
+
+    private void setQueryCondition(SearchSourceBuilder sourceBuilder, long startTB, long endTB,
+        List<Integer> serviceIds) {
+        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
+        boolQuery.must().add(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
+
+        BoolQueryBuilder serviceIdBoolQuery = QueryBuilders.boolQuery();
+        boolQuery.must().add(serviceIdBoolQuery);
+
+        if (serviceIds.size() == 1) {
+            boolQuery.should().add(QueryBuilders.termQuery(ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, serviceIds.get(0)));
+            boolQuery.should().add(QueryBuilders.termQuery(ServiceRelationServerSideIndicator.DEST_SERVICE_ID, serviceIds.get(0)));
+        } else {
+            boolQuery.should().add(QueryBuilders.termsQuery(ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, serviceIds));
+            boolQuery.should().add(QueryBuilders.termsQuery(ServiceRelationServerSideIndicator.DEST_SERVICE_ID, serviceIds));
+        }
+        sourceBuilder.query(boolQuery);
+    }
+
+    @Override public List<Call> loadServerSideServiceRelations(Step step, long startTB, long endTB) throws IOException {
+        String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationServerSideIndicator.INDEX_NAME);
+        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+        sourceBuilder.query(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
+        sourceBuilder.size(0);
+
+        return load(sourceBuilder, indexName, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID);
+    }
+
+    @Override public List<Call> loadClientSideServiceRelations(Step step, long startTB, long endTB) throws IOException {
+        String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationClientSideIndicator.INDEX_NAME);
+        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+        sourceBuilder.query(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
+        sourceBuilder.size(0);
+
+        return load(sourceBuilder, indexName, ServiceRelationClientSideIndicator.SOURCE_SERVICE_ID, ServiceRelationClientSideIndicator.DEST_SERVICE_ID);
+    }
+
+    private List<Call> load(SearchSourceBuilder sourceBuilder, String indexName, String sourceCName,
+        String destCName) throws IOException {
+        TermsAggregationBuilder sourceAggregation = AggregationBuilders.terms(sourceCName).field(sourceCName).size(1000);
+        sourceAggregation.subAggregation(AggregationBuilders.terms(destCName).field(destCName).size(1000));
+        sourceBuilder.aggregation(sourceAggregation);
+
+        SearchResponse response = getClient().search(indexName, sourceBuilder);
+
+        List<Call> calls = new ArrayList<>();
+        Terms sourceTerms = response.getAggregations().get(sourceCName);
+        for (Terms.Bucket sourceBucket : sourceTerms.getBuckets()) {
+            Terms destTerms = sourceBucket.getAggregations().get(destCName);
+            for (Terms.Bucket destBucket : destTerms.getBuckets()) {
+                Call value = new Call();
+                value.setSource(sourceBucket.getKeyAsNumber().intValue());
+                value.setTarget(destBucket.getKeyAsNumber().intValue());
+                calls.add(value);
+            }
+        }
+        return calls;
+    }
+
+    @Override public List<ServiceMapping> loadServiceMappings(Step step, long startTB, long endTB) throws IOException {
+        String indexName = TimePyramidTableNameBuilder.build(step, ServiceMappingIndicator.INDEX_NAME);
+        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+        sourceBuilder.query(QueryBuilders.rangeQuery(ServiceMappingIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
+        sourceBuilder.size(0);
+
+        TermsAggregationBuilder sourceAggregation = AggregationBuilders.terms(ServiceMappingIndicator.SERVICE_ID).field(ServiceMappingIndicator.SERVICE_ID).size(1000);
+        sourceAggregation.subAggregation(AggregationBuilders.terms(ServiceMappingIndicator.MAPPING_SERVICE_ID).field(ServiceMappingIndicator.MAPPING_SERVICE_ID).size(1000));
+        sourceBuilder.aggregation(sourceAggregation);
+
+        SearchResponse response = getClient().search(indexName, sourceBuilder);
+
+        List<ServiceMapping> serviceMappings = new ArrayList<>();
+        Terms serviceIdTerms = response.getAggregations().get(ServiceMappingIndicator.SERVICE_ID);
+        for (Terms.Bucket serviceIdBucket : serviceIdTerms.getBuckets()) {
+            Terms mappingServiceIdTerms = serviceIdBucket.getAggregations().get(ServiceMappingIndicator.MAPPING_SERVICE_ID);
+            for (Terms.Bucket mappingServiceIdBucket : mappingServiceIdTerms.getBuckets()) {
+                ServiceMapping serviceMapping = new ServiceMapping();
+                serviceMapping.setServiceId(serviceIdBucket.getKeyAsNumber().intValue());
+                serviceMapping.setMappingServiceId(mappingServiceIdBucket.getKeyAsNumber().intValue());
+                serviceMappings.add(serviceMapping);
+            }
+        }
+        return serviceMappings;
+    }
+
+    @Override
+    public List<ServiceComponent> loadServiceComponents(Step step, long startTB, long endTB) throws IOException {
+        String indexName = TimePyramidTableNameBuilder.build(step, ServiceComponentIndicator.INDEX_NAME);
+        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+        sourceBuilder.query(QueryBuilders.rangeQuery(ServiceComponentIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
+        sourceBuilder.size(0);
+
+        TermsAggregationBuilder sourceAggregation = AggregationBuilders.terms(ServiceComponentIndicator.SERVICE_ID).field(ServiceComponentIndicator.SERVICE_ID).size(1000);
+        sourceAggregation.subAggregation(AggregationBuilders.terms(ServiceComponentIndicator.COMPONENT_ID).field(ServiceComponentIndicator.COMPONENT_ID).size(1000));
+        sourceBuilder.aggregation(sourceAggregation);
+
+        SearchResponse response = getClient().search(indexName, sourceBuilder);
+
+        List<ServiceComponent> serviceComponents = new ArrayList<>();
+        Terms serviceIdTerms = response.getAggregations().get(ServiceComponentIndicator.SERVICE_ID);
+        for (Terms.Bucket serviceIdBucket : serviceIdTerms.getBuckets()) {
+            Terms componentIdTerms = serviceIdBucket.getAggregations().get(ServiceComponentIndicator.COMPONENT_ID);
+            for (Terms.Bucket componentIdBucket : componentIdTerms.getBuckets()) {
+                ServiceComponent serviceComponent = new ServiceComponent();
+                serviceComponent.setServiceId(serviceIdBucket.getKeyAsNumber().intValue());
+                serviceComponent.setComponentId(componentIdBucket.getKeyAsNumber().intValue());
+                serviceComponents.add(serviceComponent);
+            }
+        }
+        return serviceComponents;
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services