You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by wu...@apache.org on 2018/09/14 06:00:09 UTC

[incubator-skywalking] branch master updated: Endpoint topology metric and query. (#1674)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 366609a  Endpoint topology metric and query. (#1674)
366609a is described below

commit 366609a44936a64a47f96ddcdb91b3eda16bb637
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Fri Sep 14 14:00:03 2018 +0800

    Endpoint topology metric and query. (#1674)
---
 .../server/core/analysis/DispatcherManager.java    |   6 +-
 .../EndpointCallRelationDispatcher.java            |  56 ++++++++
 .../EndpointRelationClientSideIndicator.java       | 151 +++++++++++++++++++++
 .../EndpointRelationServerSideIndicator.java       | 151 +++++++++++++++++++++
 .../oap/server/core/query/TopologyBuilder.java     |  24 +---
 .../server/core/query/TopologyQueryService.java    |  38 ++++++
 .../oap/server/core/query/entity/Call.java         |   3 +-
 .../oap/server/core/source/EndpointRelation.java   |   4 +
 .../oap/server/core/source/ServiceRelation.java    |   4 +
 .../core/storage/query/ITopologyQueryDAO.java      |   6 +
 .../oap/query/graphql/resolver/TopologyQuery.java  |   7 +
 .../src/main/resources/query-protocol              |   2 +-
 .../elasticsearch/query/TopologyQueryEsDAO.java    | 101 ++++++++++----
 13 files changed, 503 insertions(+), 50 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
index 7a44a31..a676500 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
@@ -30,10 +30,10 @@ import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancej
 import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancejvmmemorypool.ServiceInstanceJVMMemoryPoolDispatcher;
 import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancerelation.ServiceInstanceRelationDispatcher;
 import org.apache.skywalking.oap.server.core.analysis.generated.servicerelation.ServiceRelationDispatcher;
+import org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation.EndpointCallRelationDispatcher;
 import org.apache.skywalking.oap.server.core.analysis.manual.service.*;
 import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.ServiceCallRelationDispatcher;
-import org.apache.skywalking.oap.server.core.source.Scope;
-import org.apache.skywalking.oap.server.core.source.Source;
+import org.apache.skywalking.oap.server.core.source.*;
 import org.slf4j.*;
 
 /**
@@ -59,7 +59,7 @@ public class DispatcherManager {
 
         this.dispatcherMap.put(Scope.ServiceRelation, new SourceDispatcher[] {new ServiceRelationDispatcher(), new ServiceCallRelationDispatcher()});
         this.dispatcherMap.put(Scope.ServiceInstanceRelation, new SourceDispatcher[] {new ServiceInstanceRelationDispatcher()});
-        this.dispatcherMap.put(Scope.EndpointRelation, new SourceDispatcher[] {new EndpointRelationDispatcher()});
+        this.dispatcherMap.put(Scope.EndpointRelation, new SourceDispatcher[] {new EndpointRelationDispatcher(), new EndpointCallRelationDispatcher()});
 
         this.dispatcherMap.put(Scope.ServiceInstanceJVMCPU, new SourceDispatcher[] {new ServiceInstanceJVMCPUDispatcher()});
         this.dispatcherMap.put(Scope.ServiceInstanceJVMGC, new SourceDispatcher[] {new ServiceInstanceJVMGCDispatcher()});
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointCallRelationDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointCallRelationDispatcher.java
new file mode 100644
index 0000000..8b01169
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointCallRelationDispatcher.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation;
+
+import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
+import org.apache.skywalking.oap.server.core.analysis.worker.IndicatorProcess;
+import org.apache.skywalking.oap.server.core.source.EndpointRelation;
+
+/**
+ * @author wusheng
+ */
+public class EndpointCallRelationDispatcher implements SourceDispatcher<EndpointRelation> {
+    @Override
+    public void dispatch(EndpointRelation source) {
+        switch (source.getDetectPoint()) {
+            case CLIENT:
+                clientSide(source);
+                break;
+            case SERVER:
+                serverSide(source);
+                break;
+        }
+    }
+
+    private void serverSide(EndpointRelation source) {
+        EndpointRelationServerSideIndicator indicator = new EndpointRelationServerSideIndicator();
+        indicator.setTimeBucket(source.getTimeBucket());
+        indicator.setSourceEndpointId(source.getEndpointId());
+        indicator.setDestEndpointId(source.getChildEndpointId());
+        IndicatorProcess.INSTANCE.in(indicator);
+    }
+
+    private void clientSide(EndpointRelation source) {
+        EndpointRelationClientSideIndicator indicator = new EndpointRelationClientSideIndicator();
+        indicator.setTimeBucket(source.getTimeBucket());
+        indicator.setSourceEndpointId(source.getEndpointId());
+        indicator.setDestEndpointId(source.getChildEndpointId());
+        IndicatorProcess.INSTANCE.in(indicator);
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointRelationClientSideIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointRelationClientSideIndicator.java
new file mode 100644
index 0000000..9bea74b
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointRelationClientSideIndicator.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation;
+
+import java.util.*;
+import lombok.*;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
+import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.annotation.*;
+
+@IndicatorType
+@StreamData
+@StorageEntity(name = EndpointRelationClientSideIndicator.INDEX_NAME, builder = EndpointRelationClientSideIndicator.Builder.class)
+public class EndpointRelationClientSideIndicator extends Indicator {
+
+    public static final String INDEX_NAME = "endpoint_relation_client_side";
+    public static final String SOURCE_ENDPOINT_ID = "source_endpoint_id";
+    public static final String DEST_ENDPOINT_ID = "dest_endpoint_id";
+
+    @Setter @Getter @Column(columnName = SOURCE_ENDPOINT_ID) @IDColumn private int sourceEndpointId;
+    @Setter @Getter @Column(columnName = DEST_ENDPOINT_ID) @IDColumn private int destEndpointId;
+
+    @Override public String id() {
+        String splitJointId = String.valueOf(getTimeBucket());
+        splitJointId += Const.ID_SPLIT + String.valueOf(sourceEndpointId);
+        splitJointId += Const.ID_SPLIT + String.valueOf(destEndpointId);
+        return splitJointId;
+    }
+
+    @Override public void combine(Indicator indicator) {
+
+    }
+
+    @Override public void calculate() {
+
+    }
+
+    @Override public Indicator toHour() {
+        EndpointRelationClientSideIndicator indicator = new EndpointRelationClientSideIndicator();
+        indicator.setTimeBucket(toTimeBucketInHour());
+        indicator.setSourceEndpointId(getSourceEndpointId());
+        indicator.setDestEndpointId(getDestEndpointId());
+        return indicator;
+    }
+
+    @Override public Indicator toDay() {
+        EndpointRelationClientSideIndicator indicator = new EndpointRelationClientSideIndicator();
+        indicator.setTimeBucket(toTimeBucketInDay());
+        indicator.setSourceEndpointId(getSourceEndpointId());
+        indicator.setDestEndpointId(getDestEndpointId());
+        return indicator;
+    }
+
+    @Override public Indicator toMonth() {
+        EndpointRelationClientSideIndicator indicator = new EndpointRelationClientSideIndicator();
+        indicator.setTimeBucket(toTimeBucketInMonth());
+        indicator.setSourceEndpointId(getSourceEndpointId());
+        indicator.setDestEndpointId(getDestEndpointId());
+        return indicator;
+    }
+
+    @Override public int remoteHashCode() {
+        int result = 17;
+        result = 31 * result + sourceEndpointId;
+        result = 31 * result + destEndpointId;
+        return result;
+    }
+
+    @Override public void deserialize(RemoteData remoteData) {
+        setSourceEndpointId(remoteData.getDataIntegers(0));
+        setDestEndpointId(remoteData.getDataIntegers(1));
+        setTimeBucket(remoteData.getDataLongs(0));
+    }
+
+    @Override public RemoteData.Builder serialize() {
+        RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
+
+        remoteBuilder.setDataIntegers(1, getDestEndpointId());
+        remoteBuilder.setDataIntegers(0, getSourceEndpointId());
+        remoteBuilder.setDataLongs(0, getTimeBucket());
+
+        return remoteBuilder;
+    }
+
+    @Override public int hashCode() {
+        int result = 17;
+        result = 31 * result + sourceEndpointId;
+        result = 31 * result + destEndpointId;
+        result = 31 * result + (int)getTimeBucket();
+        return result;
+    }
+
+    @Override public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+
+        EndpointRelationClientSideIndicator indicator = (EndpointRelationClientSideIndicator)obj;
+        if (sourceEndpointId != indicator.sourceEndpointId)
+            return false;
+        if (destEndpointId != indicator.destEndpointId)
+            return false;
+
+        if (getTimeBucket() != indicator.getTimeBucket())
+            return false;
+
+        return true;
+    }
+
+    public static class Builder implements StorageBuilder<EndpointRelationClientSideIndicator> {
+
+        @Override public EndpointRelationClientSideIndicator map2Data(Map<String, Object> dbMap) {
+            EndpointRelationClientSideIndicator indicator = new EndpointRelationClientSideIndicator();
+            indicator.setSourceEndpointId(((Number)dbMap.get(SOURCE_ENDPOINT_ID)).intValue());
+            indicator.setDestEndpointId(((Number)dbMap.get(DEST_ENDPOINT_ID)).intValue());
+            indicator.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
+            return indicator;
+        }
+
+        @Override public Map<String, Object> data2Map(EndpointRelationClientSideIndicator storageData) {
+            Map<String, Object> map = new HashMap<>();
+            map.put(TIME_BUCKET, storageData.getTimeBucket());
+            map.put(SOURCE_ENDPOINT_ID, storageData.getSourceEndpointId());
+            map.put(DEST_ENDPOINT_ID, storageData.getDestEndpointId());
+            return map;
+        }
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointRelationServerSideIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointRelationServerSideIndicator.java
new file mode 100644
index 0000000..52c7ee5
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointRelationServerSideIndicator.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation;
+
+import java.util.*;
+import lombok.*;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
+import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.annotation.*;
+
+@IndicatorType
+@StreamData
+@StorageEntity(name = EndpointRelationServerSideIndicator.INDEX_NAME, builder = EndpointRelationServerSideIndicator.Builder.class)
+public class EndpointRelationServerSideIndicator extends Indicator {
+
+    public static final String INDEX_NAME = "endpoint_relation_server_side";
+    public static final String SOURCE_ENDPOINT_ID = "source_endpoint_id";
+    public static final String DEST_ENDPOINT_ID = "dest_endpoint_id";
+
+    @Setter @Getter @Column(columnName = SOURCE_ENDPOINT_ID) @IDColumn private int sourceEndpointId;
+    @Setter @Getter @Column(columnName = DEST_ENDPOINT_ID) @IDColumn private int destEndpointId;
+
+    @Override public String id() {
+        String splitJointId = String.valueOf(getTimeBucket());
+        splitJointId += Const.ID_SPLIT + String.valueOf(sourceEndpointId);
+        splitJointId += Const.ID_SPLIT + String.valueOf(destEndpointId);
+        return splitJointId;
+    }
+
+    @Override public void combine(Indicator indicator) {
+
+    }
+
+    @Override public void calculate() {
+
+    }
+
+    @Override public Indicator toHour() {
+        EndpointRelationServerSideIndicator indicator = new EndpointRelationServerSideIndicator();
+        indicator.setTimeBucket(toTimeBucketInHour());
+        indicator.setSourceEndpointId(getSourceEndpointId());
+        indicator.setDestEndpointId(getDestEndpointId());
+        return indicator;
+    }
+
+    @Override public Indicator toDay() {
+        EndpointRelationServerSideIndicator indicator = new EndpointRelationServerSideIndicator();
+        indicator.setTimeBucket(toTimeBucketInDay());
+        indicator.setSourceEndpointId(getSourceEndpointId());
+        indicator.setDestEndpointId(getDestEndpointId());
+        return indicator;
+    }
+
+    @Override public Indicator toMonth() {
+        EndpointRelationServerSideIndicator indicator = new EndpointRelationServerSideIndicator();
+        indicator.setTimeBucket(toTimeBucketInMonth());
+        indicator.setSourceEndpointId(getSourceEndpointId());
+        indicator.setDestEndpointId(getDestEndpointId());
+        return indicator;
+    }
+
+    @Override public int remoteHashCode() {
+        int result = 17;
+        result = 31 * result + sourceEndpointId;
+        result = 31 * result + destEndpointId;
+        return result;
+    }
+
+    @Override public void deserialize(RemoteData remoteData) {
+        setSourceEndpointId(remoteData.getDataIntegers(0));
+        setDestEndpointId(remoteData.getDataIntegers(1));
+        setTimeBucket(remoteData.getDataLongs(0));
+    }
+
+    @Override public RemoteData.Builder serialize() {
+        RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
+
+        remoteBuilder.setDataIntegers(0, getSourceEndpointId());
+        remoteBuilder.setDataIntegers(1, getDestEndpointId());
+        remoteBuilder.setDataLongs(0, getTimeBucket());
+
+        return remoteBuilder;
+    }
+
+    @Override public int hashCode() {
+        int result = 17;
+        result = 31 * result + sourceEndpointId;
+        result = 31 * result + destEndpointId;
+        result = 31 * result + (int)getTimeBucket();
+        return result;
+    }
+
+    @Override public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+
+        EndpointRelationServerSideIndicator indicator = (EndpointRelationServerSideIndicator)obj;
+        if (sourceEndpointId != indicator.sourceEndpointId)
+            return false;
+        if (destEndpointId != indicator.destEndpointId)
+            return false;
+
+        if (getTimeBucket() != indicator.getTimeBucket())
+            return false;
+
+        return true;
+    }
+
+    public static class Builder implements StorageBuilder<EndpointRelationServerSideIndicator> {
+
+        @Override public EndpointRelationServerSideIndicator map2Data(Map<String, Object> dbMap) {
+            EndpointRelationServerSideIndicator indicator = new EndpointRelationServerSideIndicator();
+            indicator.setSourceEndpointId(((Number)dbMap.get(SOURCE_ENDPOINT_ID)).intValue());
+            indicator.setDestEndpointId(((Number)dbMap.get(DEST_ENDPOINT_ID)).intValue());
+            indicator.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
+            return indicator;
+        }
+
+        @Override public Map<String, Object> data2Map(EndpointRelationServerSideIndicator storageData) {
+            Map<String, Object> map = new HashMap<>();
+            map.put(SOURCE_ENDPOINT_ID, storageData.getSourceEndpointId());
+            map.put(DEST_ENDPOINT_ID, storageData.getDestEndpointId());
+            map.put(TIME_BUCKET, storageData.getTimeBucket());
+            return map;
+        }
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyBuilder.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyBuilder.java
index abdae7c..7be38bd 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyBuilder.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyBuilder.java
@@ -37,12 +37,10 @@ class TopologyBuilder {
     private static final Logger logger = LoggerFactory.getLogger(TopologyBuilder.class);
 
     private final ServiceInventoryCache serviceInventoryCache;
-    //    private final DateBetweenService dateBetweenService;
     private final IComponentLibraryCatalogService componentLibraryCatalogService;
 
     TopologyBuilder(ModuleManager moduleManager) {
         this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).getService(ServiceInventoryCache.class);
-//        this.dateBetweenService = new DateBetweenService(moduleManager);
         this.componentLibraryCatalogService = moduleManager.find(CoreModule.NAME).getService(IComponentLibraryCatalogService.class);
     }
 
@@ -56,8 +54,6 @@ class TopologyBuilder {
         serviceRelationServerCalls = serverCallsFilter(serviceRelationServerCalls);
 
         List<Node> nodes = new LinkedList<>();
-        Map<Integer, Integer> applicationMinuteBetweenMap = new HashMap<>();
-
         List<Call> calls = new LinkedList<>();
         Set<Integer> nodeIds = new HashSet<>();
         serviceRelationClientCalls.forEach(clientCall -> {
@@ -91,17 +87,13 @@ class TopologyBuilder {
             int actualTargetId = mappings.getOrDefault(target.getSequence(), target.getSequence());
             call.setTarget(actualTargetId);
             call.setCallType(nodeCompMap.get(clientCall.getTarget()));
-//            try {
-//                call.setCpm(clientCall.getCalls() / getApplicationMinuteBetween(applicationMinuteBetweenMap, source.getSequence(), startSecondTimeBucket, endSecondTimeBucket));
-//            } catch (ParseException e) {
-//                logger.error(e.getMessage(), e);
-//            }
+            call.setId(clientCall.getId());
             calls.add(call);
         });
 
-        serviceRelationServerCalls.forEach(referenceMetric -> {
-            ServiceInventory source = serviceInventoryCache.get(referenceMetric.getSource());
-            ServiceInventory target = serviceInventoryCache.get(referenceMetric.getTarget());
+        serviceRelationServerCalls.forEach(serverCall -> {
+            ServiceInventory source = serviceInventoryCache.get(serverCall.getSource());
+            ServiceInventory target = serviceInventoryCache.get(serverCall.getTarget());
 
             if (source.getSequence() == Const.NONE_SERVICE_ID) {
                 if (!nodeIds.contains(source.getSequence())) {
@@ -128,17 +120,13 @@ class TopologyBuilder {
             Call call = new Call();
             call.setSource(source.getSequence());
             call.setTarget(target.getSequence());
+            call.setId(serverCall.getId());
 
             if (source.getSequence() == Const.NONE_SERVICE_ID) {
                 call.setCallType(Const.EMPTY_STRING);
             } else {
-                call.setCallType(nodeCompMap.get(referenceMetric.getTarget()));
+                call.setCallType(nodeCompMap.get(serverCall.getTarget()));
             }
-//            try {
-//                call.setCpm(referenceMetric.getCalls() / getApplicationMinuteBetween(applicationMinuteBetweenMap, target.getSequence(), startSecondTimeBucket, endSecondTimeBucket));
-//            } catch (ParseException e) {
-//                logger.error(e.getMessage(), e);
-//            }
             calls.add(call);
         });
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java
index 930e2fa..3ba322f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java
@@ -20,6 +20,9 @@ package org.apache.skywalking.oap.server.core.query;
 
 import java.io.IOException;
 import java.util.*;
+import org.apache.skywalking.oap.server.core.*;
+import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
+import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
 import org.apache.skywalking.oap.server.core.query.entity.*;
 import org.apache.skywalking.oap.server.core.source.*;
 import org.apache.skywalking.oap.server.core.storage.StorageModule;
@@ -37,6 +40,8 @@ public class TopologyQueryService implements Service {
 
     private final ModuleManager moduleManager;
     private ITopologyQueryDAO topologyQueryDAO;
+    private EndpointInventoryCache endpointInventoryCache;
+    private IComponentLibraryCatalogService componentLibraryCatalogService;
 
     public TopologyQueryService(ModuleManager moduleManager) {
         this.moduleManager = moduleManager;
@@ -49,6 +54,20 @@ public class TopologyQueryService implements Service {
         return topologyQueryDAO;
     }
 
+    private IComponentLibraryCatalogService getComponentLibraryCatalogService() {
+        if (componentLibraryCatalogService == null) {
+            componentLibraryCatalogService = moduleManager.find(CoreModule.NAME).getService(IComponentLibraryCatalogService.class);
+        }
+        return componentLibraryCatalogService;
+    }
+
+    private EndpointInventoryCache getEndpointInventoryCache() {
+        if (endpointInventoryCache == null) {
+            endpointInventoryCache = moduleManager.find(CoreModule.NAME).getService(EndpointInventoryCache.class);
+        }
+        return endpointInventoryCache;
+    }
+
     public Topology getGlobalTopology(final Step step, final long startTB, final long endTB) throws IOException {
         logger.debug("step: {}, startTimeBucket: {}, endTimeBucket: {}", step, startTB, endTB);
         List<ServiceComponent> serviceComponents = getTopologyQueryDAO().loadServiceComponents(step, startTB, endTB);
@@ -95,4 +114,23 @@ public class TopologyQueryService implements Service {
 
         return topology;
     }
+
+    public Topology getEndpointTopology(final Step step, final long startTB, final long endTB,
+        final int endpointId) throws IOException {
+        List<ServiceComponent> serviceComponents = getTopologyQueryDAO().loadServiceComponents(step, startTB, endTB);
+
+        Map<Integer, String> components = new HashMap<>();
+        serviceComponents.forEach(component -> components.put(component.getServiceId(), getComponentLibraryCatalogService().getComponentName(component.getComponentId())));
+
+        List<Call> calls = getTopologyQueryDAO().loadSpecifiedDestOfServerSideEndpointRelations(step, startTB, endTB, endpointId);
+        calls.addAll(getTopologyQueryDAO().loadSpecifiedSourceOfClientSideEndpointRelations(step, startTB, endTB, endpointId));
+
+        calls.forEach(call -> {
+            call.setCallType(components.getOrDefault(getEndpointInventoryCache().get(call.getTarget()).getServiceId(), Const.UNKNOWN));
+        });
+
+        Topology topology = new Topology();
+        topology.getCalls().addAll(calls);
+        return topology;
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Call.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Call.java
index 7a2f807..92f7852 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Call.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Call.java
@@ -26,6 +26,5 @@ public class Call {
     private int source;
     private int target;
     private String callType;
-    private long calls;
-    private long cpm;
+    private String id;
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/EndpointRelation.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/EndpointRelation.java
index 76cb76f..d1a025e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/EndpointRelation.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/EndpointRelation.java
@@ -34,6 +34,10 @@ public class EndpointRelation extends Source {
         return String.valueOf(endpointId) + Const.ID_SPLIT + String.valueOf(childEndpointId);
     }
 
+    public static String buildEntityId(int endpointId, int childEndpointId) {
+        return String.valueOf(endpointId) + Const.ID_SPLIT + String.valueOf(childEndpointId);
+    }
+
     @Getter @Setter private int endpointId;
     @Getter @Setter private String endpoint;
     @Getter @Setter private int serviceId;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceRelation.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceRelation.java
index b65b150..bed3929 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceRelation.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceRelation.java
@@ -34,6 +34,10 @@ public class ServiceRelation extends Source {
         return String.valueOf(sourceServiceId) + Const.ID_SPLIT + String.valueOf(destServiceId);
     }
 
+    public static String buildEntityId(int sourceServiceId, int destServiceId) {
+        return String.valueOf(sourceServiceId) + Const.ID_SPLIT + String.valueOf(destServiceId);
+    }
+
     @Getter @Setter private int sourceServiceId;
     @Getter @Setter private String sourceServiceName;
     @Getter @Setter private String sourceServiceInstanceName;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopologyQueryDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopologyQueryDAO.java
index 74d31b9..514fb75 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopologyQueryDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopologyQueryDAO.java
@@ -42,4 +42,10 @@ public interface ITopologyQueryDAO extends Service {
     List<ServiceMapping> loadServiceMappings(Step step, long startTB, long endTB) throws IOException;
 
     List<ServiceComponent> loadServiceComponents(Step step, long startTB, long endTB) throws IOException;
+
+    List<Call> loadSpecifiedDestOfServerSideEndpointRelations(Step step, long startTB, long endTB,
+        int destEndpointId) throws IOException;
+
+    List<Call> loadSpecifiedSourceOfClientSideEndpointRelations(Step step, long startTB, long endTB,
+        int sourceEndpointId) throws IOException;
 }
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
index c90ffb7..185e858 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
@@ -56,4 +56,11 @@ public class TopologyQuery implements GraphQLQueryResolver {
 
         return getQueryService().getServiceTopology(duration.getStep(), startTimeBucket, endTimeBucket, serviceId);
     }
+
+    public Topology getEndpointTopology(final int endpointId, final Duration duration) throws IOException {
+        long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
+        long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
+
+        return getQueryService().getEndpointTopology(duration.getStep(), startTimeBucket, endTimeBucket, endpointId);
+    }
 }
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
index c6ee5d1..b93baa5 160000
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
@@ -1 +1 @@
-Subproject commit c6ee5d15f38024c3be61be579eac4035ffbfd4bf
+Subproject commit b93baa58852595164aae50e3e7be37683c220175
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java
index 5e7be1b..00de99e 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
 import java.io.IOException;
 import java.util.*;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation.*;
 import org.apache.skywalking.oap.server.core.analysis.manual.service.*;
 import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.*;
 import org.apache.skywalking.oap.server.core.query.entity.*;
@@ -57,18 +58,22 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
         setQueryCondition(sourceBuilder, startTB, endTB, serviceIds);
 
         String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationServerSideIndicator.INDEX_NAME);
-        return load(sourceBuilder, indexName, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID);
+        return load(sourceBuilder, indexName, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID, Source.Service);
     }
 
     @Override
     public List<Call> loadSpecifiedClientSideServiceRelations(Step step, long startTB, long endTB,
         List<Integer> serviceIds) throws IOException {
+        if (CollectionUtils.isEmpty(serviceIds)) {
+            throw new UnexpectedException("Service id is null");
+        }
+
         SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
         sourceBuilder.size(0);
         setQueryCondition(sourceBuilder, startTB, endTB, serviceIds);
 
         String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationClientSideIndicator.INDEX_NAME);
-        return load(sourceBuilder, indexName, ServiceRelationClientSideIndicator.SOURCE_SERVICE_ID, ServiceRelationClientSideIndicator.DEST_SERVICE_ID);
+        return load(sourceBuilder, indexName, ServiceRelationClientSideIndicator.SOURCE_SERVICE_ID, ServiceRelationClientSideIndicator.DEST_SERVICE_ID, Source.Service);
     }
 
     private void setQueryCondition(SearchSourceBuilder sourceBuilder, long startTB, long endTB,
@@ -95,7 +100,7 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
         sourceBuilder.query(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
         sourceBuilder.size(0);
 
-        return load(sourceBuilder, indexName, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID);
+        return load(sourceBuilder, indexName, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID, Source.Service);
     }
 
     @Override public List<Call> loadClientSideServiceRelations(Step step, long startTB, long endTB) throws IOException {
@@ -104,29 +109,7 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
         sourceBuilder.query(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
         sourceBuilder.size(0);
 
-        return load(sourceBuilder, indexName, ServiceRelationClientSideIndicator.SOURCE_SERVICE_ID, ServiceRelationClientSideIndicator.DEST_SERVICE_ID);
-    }
-
-    private List<Call> load(SearchSourceBuilder sourceBuilder, String indexName, String sourceCName,
-        String destCName) throws IOException {
-        TermsAggregationBuilder sourceAggregation = AggregationBuilders.terms(sourceCName).field(sourceCName).size(1000);
-        sourceAggregation.subAggregation(AggregationBuilders.terms(destCName).field(destCName).size(1000));
-        sourceBuilder.aggregation(sourceAggregation);
-
-        SearchResponse response = getClient().search(indexName, sourceBuilder);
-
-        List<Call> calls = new ArrayList<>();
-        Terms sourceTerms = response.getAggregations().get(sourceCName);
-        for (Terms.Bucket sourceBucket : sourceTerms.getBuckets()) {
-            Terms destTerms = sourceBucket.getAggregations().get(destCName);
-            for (Terms.Bucket destBucket : destTerms.getBuckets()) {
-                Call value = new Call();
-                value.setSource(sourceBucket.getKeyAsNumber().intValue());
-                value.setTarget(destBucket.getKeyAsNumber().intValue());
-                calls.add(value);
-            }
-        }
-        return calls;
+        return load(sourceBuilder, indexName, ServiceRelationClientSideIndicator.SOURCE_SERVICE_ID, ServiceRelationClientSideIndicator.DEST_SERVICE_ID, Source.Service);
     }
 
     @Override public List<ServiceMapping> loadServiceMappings(Step step, long startTB, long endTB) throws IOException {
@@ -181,4 +164,70 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
         }
         return serviceComponents;
     }
+
+    @Override
+    public List<Call> loadSpecifiedDestOfServerSideEndpointRelations(Step step, long startTB, long endTB,
+        int destEndpointId) throws IOException {
+        String indexName = TimePyramidTableNameBuilder.build(step, EndpointRelationServerSideIndicator.INDEX_NAME);
+
+        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+        sourceBuilder.size(0);
+
+        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
+        boolQuery.must().add(QueryBuilders.rangeQuery(EndpointRelationServerSideIndicator.TIME_BUCKET).gte(startTB).lte(endTB));
+        boolQuery.must().add(QueryBuilders.termQuery(EndpointRelationServerSideIndicator.DEST_ENDPOINT_ID, destEndpointId));
+        sourceBuilder.query(boolQuery);
+
+        return load(sourceBuilder, indexName, EndpointRelationServerSideIndicator.SOURCE_ENDPOINT_ID, EndpointRelationServerSideIndicator.DEST_ENDPOINT_ID, Source.Endpoint);
+    }
+
+    @Override
+    public List<Call> loadSpecifiedSourceOfClientSideEndpointRelations(Step step, long startTB, long endTB,
+        int sourceEndpointId) throws IOException {
+        String indexName = TimePyramidTableNameBuilder.build(step, EndpointRelationClientSideIndicator.INDEX_NAME);
+
+        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+        sourceBuilder.size(0);
+
+        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
+        boolQuery.must().add(QueryBuilders.rangeQuery(EndpointRelationClientSideIndicator.TIME_BUCKET).gte(startTB).lte(endTB));
+        boolQuery.must().add(QueryBuilders.termQuery(EndpointRelationClientSideIndicator.SOURCE_ENDPOINT_ID, sourceEndpointId));
+        sourceBuilder.query(boolQuery);
+
+        return load(sourceBuilder, indexName, EndpointRelationClientSideIndicator.SOURCE_ENDPOINT_ID, EndpointRelationClientSideIndicator.DEST_ENDPOINT_ID, Source.Endpoint);
+    }
+
+    private List<Call> load(SearchSourceBuilder sourceBuilder, String indexName, String sourceCName,
+        String destCName, Source source) throws IOException {
+        TermsAggregationBuilder sourceAggregation = AggregationBuilders.terms(sourceCName).field(sourceCName).size(1000);
+        sourceAggregation.subAggregation(AggregationBuilders.terms(destCName).field(destCName).size(1000));
+        sourceBuilder.aggregation(sourceAggregation);
+
+        SearchResponse response = getClient().search(indexName, sourceBuilder);
+
+        List<Call> calls = new ArrayList<>();
+        Terms sourceTerms = response.getAggregations().get(sourceCName);
+        for (Terms.Bucket sourceBucket : sourceTerms.getBuckets()) {
+            Terms destTerms = sourceBucket.getAggregations().get(destCName);
+            for (Terms.Bucket destBucket : destTerms.getBuckets()) {
+                Call value = new Call();
+                value.setSource(sourceBucket.getKeyAsNumber().intValue());
+                value.setTarget(destBucket.getKeyAsNumber().intValue());
+                switch (source) {
+                    case Service:
+                        value.setId(ServiceRelation.buildEntityId(value.getSource(), value.getTarget()));
+                        break;
+                    case Endpoint:
+                        value.setId(EndpointRelation.buildEntityId(value.getSource(), value.getTarget()));
+                        break;
+                }
+                calls.add(value);
+            }
+        }
+        return calls;
+    }
+
+    enum Source {
+        Service, Endpoint
+    }
 }