You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by wu...@apache.org on 2018/09/14 06:00:09 UTC
[incubator-skywalking] branch master updated: Endpoint topology
metric and query. (#1674)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 366609a Endpoint topology metric and query. (#1674)
366609a is described below
commit 366609a44936a64a47f96ddcdb91b3eda16bb637
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Fri Sep 14 14:00:03 2018 +0800
Endpoint topology metric and query. (#1674)
---
.../server/core/analysis/DispatcherManager.java | 6 +-
.../EndpointCallRelationDispatcher.java | 56 ++++++++
.../EndpointRelationClientSideIndicator.java | 151 +++++++++++++++++++++
.../EndpointRelationServerSideIndicator.java | 151 +++++++++++++++++++++
.../oap/server/core/query/TopologyBuilder.java | 24 +---
.../server/core/query/TopologyQueryService.java | 38 ++++++
.../oap/server/core/query/entity/Call.java | 3 +-
.../oap/server/core/source/EndpointRelation.java | 4 +
.../oap/server/core/source/ServiceRelation.java | 4 +
.../core/storage/query/ITopologyQueryDAO.java | 6 +
.../oap/query/graphql/resolver/TopologyQuery.java | 7 +
.../src/main/resources/query-protocol | 2 +-
.../elasticsearch/query/TopologyQueryEsDAO.java | 101 ++++++++++----
13 files changed, 503 insertions(+), 50 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
index 7a44a31..a676500 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
@@ -30,10 +30,10 @@ import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancej
import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancejvmmemorypool.ServiceInstanceJVMMemoryPoolDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancerelation.ServiceInstanceRelationDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.servicerelation.ServiceRelationDispatcher;
+import org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation.EndpointCallRelationDispatcher;
import org.apache.skywalking.oap.server.core.analysis.manual.service.*;
import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.ServiceCallRelationDispatcher;
-import org.apache.skywalking.oap.server.core.source.Scope;
-import org.apache.skywalking.oap.server.core.source.Source;
+import org.apache.skywalking.oap.server.core.source.*;
import org.slf4j.*;
/**
@@ -59,7 +59,7 @@ public class DispatcherManager {
this.dispatcherMap.put(Scope.ServiceRelation, new SourceDispatcher[] {new ServiceRelationDispatcher(), new ServiceCallRelationDispatcher()});
this.dispatcherMap.put(Scope.ServiceInstanceRelation, new SourceDispatcher[] {new ServiceInstanceRelationDispatcher()});
- this.dispatcherMap.put(Scope.EndpointRelation, new SourceDispatcher[] {new EndpointRelationDispatcher()});
+ this.dispatcherMap.put(Scope.EndpointRelation, new SourceDispatcher[] {new EndpointRelationDispatcher(), new EndpointCallRelationDispatcher()});
this.dispatcherMap.put(Scope.ServiceInstanceJVMCPU, new SourceDispatcher[] {new ServiceInstanceJVMCPUDispatcher()});
this.dispatcherMap.put(Scope.ServiceInstanceJVMGC, new SourceDispatcher[] {new ServiceInstanceJVMGCDispatcher()});
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointCallRelationDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointCallRelationDispatcher.java
new file mode 100644
index 0000000..8b01169
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointCallRelationDispatcher.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation;
+
+import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
+import org.apache.skywalking.oap.server.core.analysis.worker.IndicatorProcess;
+import org.apache.skywalking.oap.server.core.source.EndpointRelation;
+
+/**
+ * @author wusheng
+ */
+public class EndpointCallRelationDispatcher implements SourceDispatcher<EndpointRelation> {
+ @Override
+ public void dispatch(EndpointRelation source) {
+ switch (source.getDetectPoint()) {
+ case CLIENT:
+ clientSide(source);
+ break;
+ case SERVER:
+ serverSide(source);
+ break;
+ }
+ }
+
+ private void serverSide(EndpointRelation source) {
+ EndpointRelationServerSideIndicator indicator = new EndpointRelationServerSideIndicator();
+ indicator.setTimeBucket(source.getTimeBucket());
+ indicator.setSourceEndpointId(source.getEndpointId());
+ indicator.setDestEndpointId(source.getChildEndpointId());
+ IndicatorProcess.INSTANCE.in(indicator);
+ }
+
+ private void clientSide(EndpointRelation source) {
+ EndpointRelationClientSideIndicator indicator = new EndpointRelationClientSideIndicator();
+ indicator.setTimeBucket(source.getTimeBucket());
+ indicator.setSourceEndpointId(source.getEndpointId());
+ indicator.setDestEndpointId(source.getChildEndpointId());
+ IndicatorProcess.INSTANCE.in(indicator);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointRelationClientSideIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointRelationClientSideIndicator.java
new file mode 100644
index 0000000..9bea74b
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointRelationClientSideIndicator.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation;
+
+import java.util.*;
+import lombok.*;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
+import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.annotation.*;
+
+@IndicatorType
+@StreamData
+@StorageEntity(name = EndpointRelationClientSideIndicator.INDEX_NAME, builder = EndpointRelationClientSideIndicator.Builder.class)
+public class EndpointRelationClientSideIndicator extends Indicator {
+
+ public static final String INDEX_NAME = "endpoint_relation_client_side";
+ public static final String SOURCE_ENDPOINT_ID = "source_endpoint_id";
+ public static final String DEST_ENDPOINT_ID = "dest_endpoint_id";
+
+ @Setter @Getter @Column(columnName = SOURCE_ENDPOINT_ID) @IDColumn private int sourceEndpointId;
+ @Setter @Getter @Column(columnName = DEST_ENDPOINT_ID) @IDColumn private int destEndpointId;
+
+ @Override public String id() {
+ String splitJointId = String.valueOf(getTimeBucket());
+ splitJointId += Const.ID_SPLIT + String.valueOf(sourceEndpointId);
+ splitJointId += Const.ID_SPLIT + String.valueOf(destEndpointId);
+ return splitJointId;
+ }
+
+ @Override public void combine(Indicator indicator) {
+
+ }
+
+ @Override public void calculate() {
+
+ }
+
+ @Override public Indicator toHour() {
+ EndpointRelationClientSideIndicator indicator = new EndpointRelationClientSideIndicator();
+ indicator.setTimeBucket(toTimeBucketInHour());
+ indicator.setSourceEndpointId(getSourceEndpointId());
+ indicator.setDestEndpointId(getDestEndpointId());
+ return indicator;
+ }
+
+ @Override public Indicator toDay() {
+ EndpointRelationClientSideIndicator indicator = new EndpointRelationClientSideIndicator();
+ indicator.setTimeBucket(toTimeBucketInDay());
+ indicator.setSourceEndpointId(getSourceEndpointId());
+ indicator.setDestEndpointId(getDestEndpointId());
+ return indicator;
+ }
+
+ @Override public Indicator toMonth() {
+ EndpointRelationClientSideIndicator indicator = new EndpointRelationClientSideIndicator();
+ indicator.setTimeBucket(toTimeBucketInMonth());
+ indicator.setSourceEndpointId(getSourceEndpointId());
+ indicator.setDestEndpointId(getDestEndpointId());
+ return indicator;
+ }
+
+ @Override public int remoteHashCode() {
+ int result = 17;
+ result = 31 * result + sourceEndpointId;
+ result = 31 * result + destEndpointId;
+ return result;
+ }
+
+ @Override public void deserialize(RemoteData remoteData) {
+ setSourceEndpointId(remoteData.getDataIntegers(0));
+ setDestEndpointId(remoteData.getDataIntegers(1));
+ setTimeBucket(remoteData.getDataLongs(0));
+ }
+
+ @Override public RemoteData.Builder serialize() {
+ RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
+
+ remoteBuilder.setDataIntegers(1, getDestEndpointId());
+ remoteBuilder.setDataIntegers(0, getSourceEndpointId());
+ remoteBuilder.setDataLongs(0, getTimeBucket());
+
+ return remoteBuilder;
+ }
+
+ @Override public int hashCode() {
+ int result = 17;
+ result = 31 * result + sourceEndpointId;
+ result = 31 * result + destEndpointId;
+ result = 31 * result + (int)getTimeBucket();
+ return result;
+ }
+
+ @Override public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+
+ EndpointRelationClientSideIndicator indicator = (EndpointRelationClientSideIndicator)obj;
+ if (sourceEndpointId != indicator.sourceEndpointId)
+ return false;
+ if (destEndpointId != indicator.destEndpointId)
+ return false;
+
+ if (getTimeBucket() != indicator.getTimeBucket())
+ return false;
+
+ return true;
+ }
+
+ public static class Builder implements StorageBuilder<EndpointRelationClientSideIndicator> {
+
+ @Override public EndpointRelationClientSideIndicator map2Data(Map<String, Object> dbMap) {
+ EndpointRelationClientSideIndicator indicator = new EndpointRelationClientSideIndicator();
+ indicator.setSourceEndpointId(((Number)dbMap.get(SOURCE_ENDPOINT_ID)).intValue());
+ indicator.setDestEndpointId(((Number)dbMap.get(DEST_ENDPOINT_ID)).intValue());
+ indicator.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
+ return indicator;
+ }
+
+ @Override public Map<String, Object> data2Map(EndpointRelationClientSideIndicator storageData) {
+ Map<String, Object> map = new HashMap<>();
+ map.put(TIME_BUCKET, storageData.getTimeBucket());
+ map.put(SOURCE_ENDPOINT_ID, storageData.getSourceEndpointId());
+ map.put(DEST_ENDPOINT_ID, storageData.getDestEndpointId());
+ return map;
+ }
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointRelationServerSideIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointRelationServerSideIndicator.java
new file mode 100644
index 0000000..52c7ee5
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointRelationServerSideIndicator.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation;
+
+import java.util.*;
+import lombok.*;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
+import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.annotation.*;
+
+@IndicatorType
+@StreamData
+@StorageEntity(name = EndpointRelationServerSideIndicator.INDEX_NAME, builder = EndpointRelationServerSideIndicator.Builder.class)
+public class EndpointRelationServerSideIndicator extends Indicator {
+
+ public static final String INDEX_NAME = "endpoint_relation_server_side";
+ public static final String SOURCE_ENDPOINT_ID = "source_endpoint_id";
+ public static final String DEST_ENDPOINT_ID = "dest_endpoint_id";
+
+ @Setter @Getter @Column(columnName = SOURCE_ENDPOINT_ID) @IDColumn private int sourceEndpointId;
+ @Setter @Getter @Column(columnName = DEST_ENDPOINT_ID) @IDColumn private int destEndpointId;
+
+ @Override public String id() {
+ String splitJointId = String.valueOf(getTimeBucket());
+ splitJointId += Const.ID_SPLIT + String.valueOf(sourceEndpointId);
+ splitJointId += Const.ID_SPLIT + String.valueOf(destEndpointId);
+ return splitJointId;
+ }
+
+ @Override public void combine(Indicator indicator) {
+
+ }
+
+ @Override public void calculate() {
+
+ }
+
+ @Override public Indicator toHour() {
+ EndpointRelationServerSideIndicator indicator = new EndpointRelationServerSideIndicator();
+ indicator.setTimeBucket(toTimeBucketInHour());
+ indicator.setSourceEndpointId(getSourceEndpointId());
+ indicator.setDestEndpointId(getDestEndpointId());
+ return indicator;
+ }
+
+ @Override public Indicator toDay() {
+ EndpointRelationServerSideIndicator indicator = new EndpointRelationServerSideIndicator();
+ indicator.setTimeBucket(toTimeBucketInDay());
+ indicator.setSourceEndpointId(getSourceEndpointId());
+ indicator.setDestEndpointId(getDestEndpointId());
+ return indicator;
+ }
+
+ @Override public Indicator toMonth() {
+ EndpointRelationServerSideIndicator indicator = new EndpointRelationServerSideIndicator();
+ indicator.setTimeBucket(toTimeBucketInMonth());
+ indicator.setSourceEndpointId(getSourceEndpointId());
+ indicator.setDestEndpointId(getDestEndpointId());
+ return indicator;
+ }
+
+ @Override public int remoteHashCode() {
+ int result = 17;
+ result = 31 * result + sourceEndpointId;
+ result = 31 * result + destEndpointId;
+ return result;
+ }
+
+ @Override public void deserialize(RemoteData remoteData) {
+ setSourceEndpointId(remoteData.getDataIntegers(0));
+ setDestEndpointId(remoteData.getDataIntegers(1));
+ setTimeBucket(remoteData.getDataLongs(0));
+ }
+
+ @Override public RemoteData.Builder serialize() {
+ RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
+
+ remoteBuilder.setDataIntegers(0, getSourceEndpointId());
+ remoteBuilder.setDataIntegers(1, getDestEndpointId());
+ remoteBuilder.setDataLongs(0, getTimeBucket());
+
+ return remoteBuilder;
+ }
+
+ @Override public int hashCode() {
+ int result = 17;
+ result = 31 * result + sourceEndpointId;
+ result = 31 * result + destEndpointId;
+ result = 31 * result + (int)getTimeBucket();
+ return result;
+ }
+
+ @Override public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+
+ EndpointRelationServerSideIndicator indicator = (EndpointRelationServerSideIndicator)obj;
+ if (sourceEndpointId != indicator.sourceEndpointId)
+ return false;
+ if (destEndpointId != indicator.destEndpointId)
+ return false;
+
+ if (getTimeBucket() != indicator.getTimeBucket())
+ return false;
+
+ return true;
+ }
+
+ public static class Builder implements StorageBuilder<EndpointRelationServerSideIndicator> {
+
+ @Override public EndpointRelationServerSideIndicator map2Data(Map<String, Object> dbMap) {
+ EndpointRelationServerSideIndicator indicator = new EndpointRelationServerSideIndicator();
+ indicator.setSourceEndpointId(((Number)dbMap.get(SOURCE_ENDPOINT_ID)).intValue());
+ indicator.setDestEndpointId(((Number)dbMap.get(DEST_ENDPOINT_ID)).intValue());
+ indicator.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
+ return indicator;
+ }
+
+ @Override public Map<String, Object> data2Map(EndpointRelationServerSideIndicator storageData) {
+ Map<String, Object> map = new HashMap<>();
+ map.put(SOURCE_ENDPOINT_ID, storageData.getSourceEndpointId());
+ map.put(DEST_ENDPOINT_ID, storageData.getDestEndpointId());
+ map.put(TIME_BUCKET, storageData.getTimeBucket());
+ return map;
+ }
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyBuilder.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyBuilder.java
index abdae7c..7be38bd 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyBuilder.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyBuilder.java
@@ -37,12 +37,10 @@ class TopologyBuilder {
private static final Logger logger = LoggerFactory.getLogger(TopologyBuilder.class);
private final ServiceInventoryCache serviceInventoryCache;
- // private final DateBetweenService dateBetweenService;
private final IComponentLibraryCatalogService componentLibraryCatalogService;
TopologyBuilder(ModuleManager moduleManager) {
this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).getService(ServiceInventoryCache.class);
-// this.dateBetweenService = new DateBetweenService(moduleManager);
this.componentLibraryCatalogService = moduleManager.find(CoreModule.NAME).getService(IComponentLibraryCatalogService.class);
}
@@ -56,8 +54,6 @@ class TopologyBuilder {
serviceRelationServerCalls = serverCallsFilter(serviceRelationServerCalls);
List<Node> nodes = new LinkedList<>();
- Map<Integer, Integer> applicationMinuteBetweenMap = new HashMap<>();
-
List<Call> calls = new LinkedList<>();
Set<Integer> nodeIds = new HashSet<>();
serviceRelationClientCalls.forEach(clientCall -> {
@@ -91,17 +87,13 @@ class TopologyBuilder {
int actualTargetId = mappings.getOrDefault(target.getSequence(), target.getSequence());
call.setTarget(actualTargetId);
call.setCallType(nodeCompMap.get(clientCall.getTarget()));
-// try {
-// call.setCpm(clientCall.getCalls() / getApplicationMinuteBetween(applicationMinuteBetweenMap, source.getSequence(), startSecondTimeBucket, endSecondTimeBucket));
-// } catch (ParseException e) {
-// logger.error(e.getMessage(), e);
-// }
+ call.setId(clientCall.getId());
calls.add(call);
});
- serviceRelationServerCalls.forEach(referenceMetric -> {
- ServiceInventory source = serviceInventoryCache.get(referenceMetric.getSource());
- ServiceInventory target = serviceInventoryCache.get(referenceMetric.getTarget());
+ serviceRelationServerCalls.forEach(serverCall -> {
+ ServiceInventory source = serviceInventoryCache.get(serverCall.getSource());
+ ServiceInventory target = serviceInventoryCache.get(serverCall.getTarget());
if (source.getSequence() == Const.NONE_SERVICE_ID) {
if (!nodeIds.contains(source.getSequence())) {
@@ -128,17 +120,13 @@ class TopologyBuilder {
Call call = new Call();
call.setSource(source.getSequence());
call.setTarget(target.getSequence());
+ call.setId(serverCall.getId());
if (source.getSequence() == Const.NONE_SERVICE_ID) {
call.setCallType(Const.EMPTY_STRING);
} else {
- call.setCallType(nodeCompMap.get(referenceMetric.getTarget()));
+ call.setCallType(nodeCompMap.get(serverCall.getTarget()));
}
-// try {
-// call.setCpm(referenceMetric.getCalls() / getApplicationMinuteBetween(applicationMinuteBetweenMap, target.getSequence(), startSecondTimeBucket, endSecondTimeBucket));
-// } catch (ParseException e) {
-// logger.error(e.getMessage(), e);
-// }
calls.add(call);
});
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java
index 930e2fa..3ba322f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java
@@ -20,6 +20,9 @@ package org.apache.skywalking.oap.server.core.query;
import java.io.IOException;
import java.util.*;
+import org.apache.skywalking.oap.server.core.*;
+import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
+import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
@@ -37,6 +40,8 @@ public class TopologyQueryService implements Service {
private final ModuleManager moduleManager;
private ITopologyQueryDAO topologyQueryDAO;
+ private EndpointInventoryCache endpointInventoryCache;
+ private IComponentLibraryCatalogService componentLibraryCatalogService;
public TopologyQueryService(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
@@ -49,6 +54,20 @@ public class TopologyQueryService implements Service {
return topologyQueryDAO;
}
+ private IComponentLibraryCatalogService getComponentLibraryCatalogService() {
+ if (componentLibraryCatalogService == null) {
+ componentLibraryCatalogService = moduleManager.find(CoreModule.NAME).getService(IComponentLibraryCatalogService.class);
+ }
+ return componentLibraryCatalogService;
+ }
+
+ private EndpointInventoryCache getEndpointInventoryCache() {
+ if (endpointInventoryCache == null) {
+ endpointInventoryCache = moduleManager.find(CoreModule.NAME).getService(EndpointInventoryCache.class);
+ }
+ return endpointInventoryCache;
+ }
+
public Topology getGlobalTopology(final Step step, final long startTB, final long endTB) throws IOException {
logger.debug("step: {}, startTimeBucket: {}, endTimeBucket: {}", step, startTB, endTB);
List<ServiceComponent> serviceComponents = getTopologyQueryDAO().loadServiceComponents(step, startTB, endTB);
@@ -95,4 +114,23 @@ public class TopologyQueryService implements Service {
return topology;
}
+
+ public Topology getEndpointTopology(final Step step, final long startTB, final long endTB,
+ final int endpointId) throws IOException {
+ List<ServiceComponent> serviceComponents = getTopologyQueryDAO().loadServiceComponents(step, startTB, endTB);
+
+ Map<Integer, String> components = new HashMap<>();
+ serviceComponents.forEach(component -> components.put(component.getServiceId(), getComponentLibraryCatalogService().getComponentName(component.getComponentId())));
+
+ List<Call> calls = getTopologyQueryDAO().loadSpecifiedDestOfServerSideEndpointRelations(step, startTB, endTB, endpointId);
+ calls.addAll(getTopologyQueryDAO().loadSpecifiedSourceOfClientSideEndpointRelations(step, startTB, endTB, endpointId));
+
+ calls.forEach(call -> {
+ call.setCallType(components.getOrDefault(getEndpointInventoryCache().get(call.getTarget()).getServiceId(), Const.UNKNOWN));
+ });
+
+ Topology topology = new Topology();
+ topology.getCalls().addAll(calls);
+ return topology;
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Call.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Call.java
index 7a2f807..92f7852 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Call.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Call.java
@@ -26,6 +26,5 @@ public class Call {
private int source;
private int target;
private String callType;
- private long calls;
- private long cpm;
+ private String id;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/EndpointRelation.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/EndpointRelation.java
index 76cb76f..d1a025e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/EndpointRelation.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/EndpointRelation.java
@@ -34,6 +34,10 @@ public class EndpointRelation extends Source {
return String.valueOf(endpointId) + Const.ID_SPLIT + String.valueOf(childEndpointId);
}
+ public static String buildEntityId(int endpointId, int childEndpointId) {
+ return String.valueOf(endpointId) + Const.ID_SPLIT + String.valueOf(childEndpointId);
+ }
+
@Getter @Setter private int endpointId;
@Getter @Setter private String endpoint;
@Getter @Setter private int serviceId;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceRelation.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceRelation.java
index b65b150..bed3929 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceRelation.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceRelation.java
@@ -34,6 +34,10 @@ public class ServiceRelation extends Source {
return String.valueOf(sourceServiceId) + Const.ID_SPLIT + String.valueOf(destServiceId);
}
+ public static String buildEntityId(int sourceServiceId, int destServiceId) {
+ return String.valueOf(sourceServiceId) + Const.ID_SPLIT + String.valueOf(destServiceId);
+ }
+
@Getter @Setter private int sourceServiceId;
@Getter @Setter private String sourceServiceName;
@Getter @Setter private String sourceServiceInstanceName;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopologyQueryDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopologyQueryDAO.java
index 74d31b9..514fb75 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopologyQueryDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopologyQueryDAO.java
@@ -42,4 +42,10 @@ public interface ITopologyQueryDAO extends Service {
List<ServiceMapping> loadServiceMappings(Step step, long startTB, long endTB) throws IOException;
List<ServiceComponent> loadServiceComponents(Step step, long startTB, long endTB) throws IOException;
+
+ List<Call> loadSpecifiedDestOfServerSideEndpointRelations(Step step, long startTB, long endTB,
+ int destEndpointId) throws IOException;
+
+ List<Call> loadSpecifiedSourceOfClientSideEndpointRelations(Step step, long startTB, long endTB,
+ int sourceEndpointId) throws IOException;
}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
index c90ffb7..185e858 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
@@ -56,4 +56,11 @@ public class TopologyQuery implements GraphQLQueryResolver {
return getQueryService().getServiceTopology(duration.getStep(), startTimeBucket, endTimeBucket, serviceId);
}
+
+ public Topology getEndpointTopology(final int endpointId, final Duration duration) throws IOException {
+ long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
+ long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
+
+ return getQueryService().getEndpointTopology(duration.getStep(), startTimeBucket, endTimeBucket, endpointId);
+ }
}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
index c6ee5d1..b93baa5 160000
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
@@ -1 +1 @@
-Subproject commit c6ee5d15f38024c3be61be579eac4035ffbfd4bf
+Subproject commit b93baa58852595164aae50e3e7be37683c220175
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java
index 5e7be1b..00de99e 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
import java.io.IOException;
import java.util.*;
import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation.*;
import org.apache.skywalking.oap.server.core.analysis.manual.service.*;
import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.*;
import org.apache.skywalking.oap.server.core.query.entity.*;
@@ -57,18 +58,22 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
setQueryCondition(sourceBuilder, startTB, endTB, serviceIds);
String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationServerSideIndicator.INDEX_NAME);
- return load(sourceBuilder, indexName, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID);
+ return load(sourceBuilder, indexName, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID, Source.Service);
}
@Override
public List<Call> loadSpecifiedClientSideServiceRelations(Step step, long startTB, long endTB,
List<Integer> serviceIds) throws IOException {
+ if (CollectionUtils.isEmpty(serviceIds)) {
+ throw new UnexpectedException("Service id is null");
+ }
+
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.size(0);
setQueryCondition(sourceBuilder, startTB, endTB, serviceIds);
String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationClientSideIndicator.INDEX_NAME);
- return load(sourceBuilder, indexName, ServiceRelationClientSideIndicator.SOURCE_SERVICE_ID, ServiceRelationClientSideIndicator.DEST_SERVICE_ID);
+ return load(sourceBuilder, indexName, ServiceRelationClientSideIndicator.SOURCE_SERVICE_ID, ServiceRelationClientSideIndicator.DEST_SERVICE_ID, Source.Service);
}
private void setQueryCondition(SearchSourceBuilder sourceBuilder, long startTB, long endTB,
@@ -95,7 +100,7 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
sourceBuilder.query(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
sourceBuilder.size(0);
- return load(sourceBuilder, indexName, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID);
+ return load(sourceBuilder, indexName, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID, Source.Service);
}
@Override public List<Call> loadClientSideServiceRelations(Step step, long startTB, long endTB) throws IOException {
@@ -104,29 +109,7 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
sourceBuilder.query(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
sourceBuilder.size(0);
- return load(sourceBuilder, indexName, ServiceRelationClientSideIndicator.SOURCE_SERVICE_ID, ServiceRelationClientSideIndicator.DEST_SERVICE_ID);
- }
-
- private List<Call> load(SearchSourceBuilder sourceBuilder, String indexName, String sourceCName,
- String destCName) throws IOException {
- TermsAggregationBuilder sourceAggregation = AggregationBuilders.terms(sourceCName).field(sourceCName).size(1000);
- sourceAggregation.subAggregation(AggregationBuilders.terms(destCName).field(destCName).size(1000));
- sourceBuilder.aggregation(sourceAggregation);
-
- SearchResponse response = getClient().search(indexName, sourceBuilder);
-
- List<Call> calls = new ArrayList<>();
- Terms sourceTerms = response.getAggregations().get(sourceCName);
- for (Terms.Bucket sourceBucket : sourceTerms.getBuckets()) {
- Terms destTerms = sourceBucket.getAggregations().get(destCName);
- for (Terms.Bucket destBucket : destTerms.getBuckets()) {
- Call value = new Call();
- value.setSource(sourceBucket.getKeyAsNumber().intValue());
- value.setTarget(destBucket.getKeyAsNumber().intValue());
- calls.add(value);
- }
- }
- return calls;
+ return load(sourceBuilder, indexName, ServiceRelationClientSideIndicator.SOURCE_SERVICE_ID, ServiceRelationClientSideIndicator.DEST_SERVICE_ID, Source.Service);
}
@Override public List<ServiceMapping> loadServiceMappings(Step step, long startTB, long endTB) throws IOException {
@@ -181,4 +164,70 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
}
return serviceComponents;
}
+
+ @Override
+ public List<Call> loadSpecifiedDestOfServerSideEndpointRelations(Step step, long startTB, long endTB,
+ int destEndpointId) throws IOException {
+ String indexName = TimePyramidTableNameBuilder.build(step, EndpointRelationServerSideIndicator.INDEX_NAME);
+
+ SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+ sourceBuilder.size(0);
+
+ BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
+ boolQuery.must().add(QueryBuilders.rangeQuery(EndpointRelationServerSideIndicator.TIME_BUCKET).gte(startTB).lte(endTB));
+ boolQuery.must().add(QueryBuilders.termQuery(EndpointRelationServerSideIndicator.DEST_ENDPOINT_ID, destEndpointId));
+ sourceBuilder.query(boolQuery);
+
+ return load(sourceBuilder, indexName, EndpointRelationServerSideIndicator.SOURCE_ENDPOINT_ID, EndpointRelationServerSideIndicator.DEST_ENDPOINT_ID, Source.Endpoint);
+ }
+
+ @Override
+ public List<Call> loadSpecifiedSourceOfClientSideEndpointRelations(Step step, long startTB, long endTB,
+ int sourceEndpointId) throws IOException {
+ String indexName = TimePyramidTableNameBuilder.build(step, EndpointRelationClientSideIndicator.INDEX_NAME);
+
+ SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+ sourceBuilder.size(0);
+
+ BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
+ boolQuery.must().add(QueryBuilders.rangeQuery(EndpointRelationClientSideIndicator.TIME_BUCKET).gte(startTB).lte(endTB));
+ boolQuery.must().add(QueryBuilders.termQuery(EndpointRelationClientSideIndicator.SOURCE_ENDPOINT_ID, sourceEndpointId));
+ sourceBuilder.query(boolQuery);
+
+ return load(sourceBuilder, indexName, EndpointRelationClientSideIndicator.SOURCE_ENDPOINT_ID, EndpointRelationClientSideIndicator.DEST_ENDPOINT_ID, Source.Endpoint);
+ }
+
+ private List<Call> load(SearchSourceBuilder sourceBuilder, String indexName, String sourceCName,
+ String destCName, Source source) throws IOException {
+ TermsAggregationBuilder sourceAggregation = AggregationBuilders.terms(sourceCName).field(sourceCName).size(1000);
+ sourceAggregation.subAggregation(AggregationBuilders.terms(destCName).field(destCName).size(1000));
+ sourceBuilder.aggregation(sourceAggregation);
+
+ SearchResponse response = getClient().search(indexName, sourceBuilder);
+
+ List<Call> calls = new ArrayList<>();
+ Terms sourceTerms = response.getAggregations().get(sourceCName);
+ for (Terms.Bucket sourceBucket : sourceTerms.getBuckets()) {
+ Terms destTerms = sourceBucket.getAggregations().get(destCName);
+ for (Terms.Bucket destBucket : destTerms.getBuckets()) {
+ Call value = new Call();
+ value.setSource(sourceBucket.getKeyAsNumber().intValue());
+ value.setTarget(destBucket.getKeyAsNumber().intValue());
+ switch (source) {
+ case Service:
+ value.setId(ServiceRelation.buildEntityId(value.getSource(), value.getTarget()));
+ break;
+ case Endpoint:
+ value.setId(EndpointRelation.buildEntityId(value.getSource(), value.getTarget()));
+ break;
+ }
+ calls.add(value);
+ }
+ }
+ return calls;
+ }
+
+ enum Source {
+ Service, Endpoint
+ }
}