You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by wu...@apache.org on 2018/09/13 11:10:53 UTC
[incubator-skywalking] branch master updated: Global topology and
service topology write by manual. (#1671)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new babc6d2 Global topology and service topology write by manual. (#1671)
babc6d2 is described below
commit babc6d293097c1bbdf87b84b87ae0bdaa63f1c02
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Thu Sep 13 19:10:47 2018 +0800
Global topology and service topology write by manual. (#1671)
---
.../ServiceCallRelationDispatcher.java | 21 ++-
...ava => ServiceRelationClientSideIndicator.java} | 36 ++--
...ava => ServiceRelationServerSideIndicator.java} | 32 ++--
.../server/core/query/TopologyQueryService.java | 103 +++++-------
.../oap/server/core/storage/StorageModule.java | 4 +-
.../core/storage/query/ITopologyQueryDAO.java | 45 +++++
.../oap/query/graphql/resolver/TopologyQuery.java | 2 +-
.../StorageModuleElasticsearchProvider.java | 4 +
.../storage/plugin/elasticsearch/base/EsDAO.java | 2 +-
.../elasticsearch/query/TopologyQueryEsDAO.java | 184 +++++++++++++++++++++
10 files changed, 323 insertions(+), 110 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationDispatcher.java
index aad015f..c1e3cb4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationDispatcher.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationDispatcher.java
@@ -28,11 +28,26 @@ import org.apache.skywalking.oap.server.core.source.ServiceRelation;
public class ServiceCallRelationDispatcher implements SourceDispatcher<ServiceRelation> {
@Override
public void dispatch(ServiceRelation source) {
- doDispatch(source);
+ switch (source.getDetectPoint()) {
+ case SERVER:
+ serverSide(source);
+ break;
+ case CLIENT:
+ clientSide(source);
+ break;
+ }
}
- public void doDispatch(ServiceRelation source) {
- ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+ private void serverSide(ServiceRelation source) {
+ ServiceRelationServerSideIndicator indicator = new ServiceRelationServerSideIndicator();
+ indicator.setTimeBucket(source.getTimeBucket());
+ indicator.setSourceServiceId(source.getSourceServiceId());
+ indicator.setDestServiceId(source.getDestServiceId());
+ IndicatorProcess.INSTANCE.in(indicator);
+ }
+
+ private void clientSide(ServiceRelation source) {
+ ServiceRelationClientSideIndicator indicator = new ServiceRelationClientSideIndicator();
indicator.setTimeBucket(source.getTimeBucket());
indicator.setSourceServiceId(source.getSourceServiceId());
indicator.setDestServiceId(source.getDestServiceId());
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationClientSideIndicator.java
similarity index 81%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationIndicator.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationClientSideIndicator.java
index 0663607..cd38b00 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationClientSideIndicator.java
@@ -18,26 +18,22 @@
package org.apache.skywalking.oap.server.core.analysis.manual.servicerelation;
-import java.util.HashMap;
-import java.util.Map;
-import lombok.Getter;
-import lombok.Setter;
+import java.util.*;
+import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
-import org.apache.skywalking.oap.server.core.storage.annotation.Column;
-import org.apache.skywalking.oap.server.core.storage.annotation.IDColumn;
-import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntity;
+import org.apache.skywalking.oap.server.core.storage.annotation.*;
@IndicatorType
@StreamData
-@StorageEntity(name = ServiceCallRelationIndicator.INDEX_NAME, builder = ServiceCallRelationIndicator.Builder.class)
-public class ServiceCallRelationIndicator extends Indicator {
+@StorageEntity(name = ServiceRelationClientSideIndicator.INDEX_NAME, builder = ServiceRelationClientSideIndicator.Builder.class)
+public class ServiceRelationClientSideIndicator extends Indicator {
- public static final String INDEX_NAME = "service_call_relation";
+ public static final String INDEX_NAME = "service_relation_client_side";
public static final String SOURCE_SERVICE_ID = "source_service_id";
public static final String DEST_SERVICE_ID = "dest_service_id";
@@ -60,7 +56,7 @@ public class ServiceCallRelationIndicator extends Indicator {
}
@Override public Indicator toHour() {
- ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+ ServiceRelationClientSideIndicator indicator = new ServiceRelationClientSideIndicator();
indicator.setTimeBucket(toTimeBucketInHour());
indicator.setSourceServiceId(getSourceServiceId());
indicator.setDestServiceId(getDestServiceId());
@@ -68,7 +64,7 @@ public class ServiceCallRelationIndicator extends Indicator {
}
@Override public Indicator toDay() {
- ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+ ServiceRelationClientSideIndicator indicator = new ServiceRelationClientSideIndicator();
indicator.setTimeBucket(toTimeBucketInDay());
indicator.setSourceServiceId(getSourceServiceId());
indicator.setDestServiceId(getDestServiceId());
@@ -76,7 +72,7 @@ public class ServiceCallRelationIndicator extends Indicator {
}
@Override public Indicator toMonth() {
- ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+ ServiceRelationClientSideIndicator indicator = new ServiceRelationClientSideIndicator();
indicator.setTimeBucket(toTimeBucketInMonth());
indicator.setSourceServiceId(getSourceServiceId());
indicator.setDestServiceId(getDestServiceId());
@@ -99,8 +95,8 @@ public class ServiceCallRelationIndicator extends Indicator {
@Override public RemoteData.Builder serialize() {
RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
- remoteBuilder.setDataIntegers(0, getSourceServiceId());
remoteBuilder.setDataIntegers(1, getDestServiceId());
+ remoteBuilder.setDataIntegers(0, getSourceServiceId());
remoteBuilder.setDataLongs(0, getTimeBucket());
return remoteBuilder;
@@ -122,7 +118,7 @@ public class ServiceCallRelationIndicator extends Indicator {
if (getClass() != obj.getClass())
return false;
- ServiceCallRelationIndicator indicator = (ServiceCallRelationIndicator)obj;
+ ServiceRelationClientSideIndicator indicator = (ServiceRelationClientSideIndicator)obj;
if (sourceServiceId != indicator.sourceServiceId)
return false;
if (destServiceId != indicator.destServiceId)
@@ -134,21 +130,21 @@ public class ServiceCallRelationIndicator extends Indicator {
return true;
}
- public static class Builder implements StorageBuilder<ServiceCallRelationIndicator> {
+ public static class Builder implements StorageBuilder<ServiceRelationClientSideIndicator> {
- @Override public ServiceCallRelationIndicator map2Data(Map<String, Object> dbMap) {
- ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+ @Override public ServiceRelationClientSideIndicator map2Data(Map<String, Object> dbMap) {
+ ServiceRelationClientSideIndicator indicator = new ServiceRelationClientSideIndicator();
indicator.setSourceServiceId(((Number)dbMap.get(SOURCE_SERVICE_ID)).intValue());
indicator.setDestServiceId(((Number)dbMap.get(DEST_SERVICE_ID)).intValue());
indicator.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
return indicator;
}
- @Override public Map<String, Object> data2Map(ServiceCallRelationIndicator storageData) {
+ @Override public Map<String, Object> data2Map(ServiceRelationClientSideIndicator storageData) {
Map<String, Object> map = new HashMap<>();
+ map.put(TIME_BUCKET, storageData.getTimeBucket());
map.put(SOURCE_SERVICE_ID, storageData.getSourceServiceId());
map.put(DEST_SERVICE_ID, storageData.getDestServiceId());
- map.put(TIME_BUCKET, storageData.getTimeBucket());
return map;
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationServerSideIndicator.java
similarity index 81%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationIndicator.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationServerSideIndicator.java
index 0663607..c438c68 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationServerSideIndicator.java
@@ -18,26 +18,22 @@
package org.apache.skywalking.oap.server.core.analysis.manual.servicerelation;
-import java.util.HashMap;
-import java.util.Map;
-import lombok.Getter;
-import lombok.Setter;
+import java.util.*;
+import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
-import org.apache.skywalking.oap.server.core.storage.annotation.Column;
-import org.apache.skywalking.oap.server.core.storage.annotation.IDColumn;
-import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntity;
+import org.apache.skywalking.oap.server.core.storage.annotation.*;
@IndicatorType
@StreamData
-@StorageEntity(name = ServiceCallRelationIndicator.INDEX_NAME, builder = ServiceCallRelationIndicator.Builder.class)
-public class ServiceCallRelationIndicator extends Indicator {
+@StorageEntity(name = ServiceRelationServerSideIndicator.INDEX_NAME, builder = ServiceRelationServerSideIndicator.Builder.class)
+public class ServiceRelationServerSideIndicator extends Indicator {
- public static final String INDEX_NAME = "service_call_relation";
+ public static final String INDEX_NAME = "service_relation_server_side";
public static final String SOURCE_SERVICE_ID = "source_service_id";
public static final String DEST_SERVICE_ID = "dest_service_id";
@@ -60,7 +56,7 @@ public class ServiceCallRelationIndicator extends Indicator {
}
@Override public Indicator toHour() {
- ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+ ServiceRelationServerSideIndicator indicator = new ServiceRelationServerSideIndicator();
indicator.setTimeBucket(toTimeBucketInHour());
indicator.setSourceServiceId(getSourceServiceId());
indicator.setDestServiceId(getDestServiceId());
@@ -68,7 +64,7 @@ public class ServiceCallRelationIndicator extends Indicator {
}
@Override public Indicator toDay() {
- ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+ ServiceRelationServerSideIndicator indicator = new ServiceRelationServerSideIndicator();
indicator.setTimeBucket(toTimeBucketInDay());
indicator.setSourceServiceId(getSourceServiceId());
indicator.setDestServiceId(getDestServiceId());
@@ -76,7 +72,7 @@ public class ServiceCallRelationIndicator extends Indicator {
}
@Override public Indicator toMonth() {
- ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+ ServiceRelationServerSideIndicator indicator = new ServiceRelationServerSideIndicator();
indicator.setTimeBucket(toTimeBucketInMonth());
indicator.setSourceServiceId(getSourceServiceId());
indicator.setDestServiceId(getDestServiceId());
@@ -122,7 +118,7 @@ public class ServiceCallRelationIndicator extends Indicator {
if (getClass() != obj.getClass())
return false;
- ServiceCallRelationIndicator indicator = (ServiceCallRelationIndicator)obj;
+ ServiceRelationServerSideIndicator indicator = (ServiceRelationServerSideIndicator)obj;
if (sourceServiceId != indicator.sourceServiceId)
return false;
if (destServiceId != indicator.destServiceId)
@@ -134,17 +130,17 @@ public class ServiceCallRelationIndicator extends Indicator {
return true;
}
- public static class Builder implements StorageBuilder<ServiceCallRelationIndicator> {
+ public static class Builder implements StorageBuilder<ServiceRelationServerSideIndicator> {
- @Override public ServiceCallRelationIndicator map2Data(Map<String, Object> dbMap) {
- ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
+ @Override public ServiceRelationServerSideIndicator map2Data(Map<String, Object> dbMap) {
+ ServiceRelationServerSideIndicator indicator = new ServiceRelationServerSideIndicator();
indicator.setSourceServiceId(((Number)dbMap.get(SOURCE_SERVICE_ID)).intValue());
indicator.setDestServiceId(((Number)dbMap.get(DEST_SERVICE_ID)).intValue());
indicator.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
return indicator;
}
- @Override public Map<String, Object> data2Map(ServiceCallRelationIndicator storageData) {
+ @Override public Map<String, Object> data2Map(ServiceRelationServerSideIndicator storageData) {
Map<String, Object> map = new HashMap<>();
map.put(SOURCE_SERVICE_ID, storageData.getSourceServiceId());
map.put(DEST_SERVICE_ID, storageData.getDestServiceId());
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java
index 57993dd..930e2fa 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java
@@ -20,12 +20,10 @@ package org.apache.skywalking.oap.server.core.query;
import java.io.IOException;
import java.util.*;
-import org.apache.skywalking.oap.server.core.analysis.manual.service.*;
import org.apache.skywalking.oap.server.core.query.entity.*;
-import org.apache.skywalking.oap.server.core.query.sql.*;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
-import org.apache.skywalking.oap.server.core.storage.query.*;
+import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.module.Service;
import org.slf4j.*;
@@ -38,90 +36,63 @@ public class TopologyQueryService implements Service {
private static final Logger logger = LoggerFactory.getLogger(TopologyQueryService.class);
private final ModuleManager moduleManager;
- private IMetricQueryDAO metricQueryDAO;
- private IUniqueQueryDAO uniqueQueryDAO;
+ private ITopologyQueryDAO topologyQueryDAO;
public TopologyQueryService(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
- private IMetricQueryDAO getMetricQueryDAO() {
- if (metricQueryDAO == null) {
- metricQueryDAO = moduleManager.find(StorageModule.NAME).getService(IMetricQueryDAO.class);
+ private ITopologyQueryDAO getTopologyQueryDAO() {
+ if (topologyQueryDAO == null) {
+ topologyQueryDAO = moduleManager.find(StorageModule.NAME).getService(ITopologyQueryDAO.class);
}
- return metricQueryDAO;
- }
-
- private IUniqueQueryDAO getUniqueQueryDAO() {
- if (uniqueQueryDAO == null) {
- uniqueQueryDAO = moduleManager.find(StorageModule.NAME).getService(IUniqueQueryDAO.class);
- }
- return uniqueQueryDAO;
+ return topologyQueryDAO;
}
public Topology getGlobalTopology(final Step step, final long startTB, final long endTB) throws IOException {
logger.debug("step: {}, startTimeBucket: {}, endTimeBucket: {}", step, startTB, endTB);
- List<ServiceComponent> serviceComponents = loadServiceComponent(step, startTB, endTB);
- List<ServiceMapping> serviceMappings = loadServiceMapping(step, startTB, endTB);
+ List<ServiceComponent> serviceComponents = getTopologyQueryDAO().loadServiceComponents(step, startTB, endTB);
+ List<ServiceMapping> serviceMappings = getTopologyQueryDAO().loadServiceMappings(step, startTB, endTB);
- List<Call> serviceRelationClientCalls = loadServiceRelationCalls(step, startTB, endTB, "service_relation_client_calls_sum");
- List<Call> serviceRelationServerCalls = loadServiceRelationCalls(step, startTB, endTB, "service_relation_server_calls_sum");
+ List<Call> serviceRelationClientCalls = getTopologyQueryDAO().loadClientSideServiceRelations(step, startTB, endTB);
+ List<Call> serviceRelationServerCalls = getTopologyQueryDAO().loadServerSideServiceRelations(step, startTB, endTB);
TopologyBuilder builder = new TopologyBuilder(moduleManager);
return builder.build(serviceComponents, serviceMappings, serviceRelationClientCalls, serviceRelationServerCalls);
}
- public Topology getServiceTopology(final Step step, final long startTimeBucket, final long endTimeBucket,
- final String serviceId) {
- return new Topology();
- }
-
- private List<ServiceComponent> loadServiceComponent(final Step step, final long startTB,
- final long endTB) throws IOException {
- List<TwoIdGroup> twoIdGroups = getUniqueQueryDAO().aggregation(ServiceComponentIndicator.INDEX_NAME, step, startTB, endTB,
- new Where(), ServiceComponentIndicator.SERVICE_ID, ServiceComponentIndicator.COMPONENT_ID);
-
- List<ServiceComponent> serviceComponents = new ArrayList<>();
- twoIdGroups.forEach(twoIdGroup -> {
- ServiceComponent serviceComponent = new ServiceComponent();
- serviceComponent.setServiceId(twoIdGroup.getId1());
- serviceComponent.setComponentId(twoIdGroup.getId2());
- serviceComponents.add(serviceComponent);
- });
-
- return serviceComponents;
- }
-
- private List<ServiceMapping> loadServiceMapping(final Step step, final long startTB,
- final long endTB) throws IOException {
- List<TwoIdGroup> twoIdGroups = getUniqueQueryDAO().aggregation(ServiceMappingIndicator.INDEX_NAME, step, startTB, endTB,
- new Where(), ServiceMappingIndicator.SERVICE_ID, ServiceMappingIndicator.MAPPING_SERVICE_ID);
-
- List<ServiceMapping> serviceMappings = new ArrayList<>();
- twoIdGroups.forEach(twoIdGroup -> {
- ServiceMapping serviceMapping = new ServiceMapping();
- serviceMapping.setServiceId(twoIdGroup.getId1());
- serviceMapping.setMappingServiceId(twoIdGroup.getId2());
- serviceMappings.add(serviceMapping);
+ public Topology getServiceTopology(final Step step, final long startTB, final long endTB,
+ final int serviceId) throws IOException {
+ List<ServiceComponent> serviceComponents = getTopologyQueryDAO().loadServiceComponents(step, startTB, endTB);
+ List<ServiceMapping> serviceMappings = getTopologyQueryDAO().loadServiceMappings(step, startTB, endTB);
+
+ Set<Integer> serviceIds = new HashSet<>();
+ serviceIds.add(serviceId);
+ serviceMappings.forEach(mapping -> {
+ if (mapping.getServiceId() == serviceId) {
+ serviceIds.add(mapping.getMappingServiceId());
+ }
});
+ List<Integer> serviceIdList = new ArrayList<>(serviceIds);
- return serviceMappings;
- }
-
- private List<Call> loadServiceRelationCalls(final Step step, final long startTB, final long endTB,
- String indName) throws IOException {
- List<TwoIdGroupValue> twoIdGroupValues = getMetricQueryDAO().aggregation(indName, step, startTB, endTB, new Where(), "source_service_id", "dest_service_id", "value", Function.Sum);
+ List<Call> serviceRelationClientCalls = getTopologyQueryDAO().loadSpecifiedClientSideServiceRelations(step, startTB, endTB, serviceIdList);
+ List<Call> serviceRelationServerCalls = getTopologyQueryDAO().loadSpecifiedServerSideServiceRelations(step, startTB, endTB, serviceIdList);
- List<Call> clientCalls = new ArrayList<>();
+ TopologyBuilder builder = new TopologyBuilder(moduleManager);
+ Topology topology = builder.build(serviceComponents, serviceMappings, serviceRelationClientCalls, serviceRelationServerCalls);
- twoIdGroupValues.forEach(twoIdGroupValue -> {
- Call call = new Call();
- call.setSource(twoIdGroupValue.getId1());
- call.setTarget(twoIdGroupValue.getId2());
- call.setCalls(twoIdGroupValue.getValue().longValue());
- clientCalls.add(call);
+ Set<Integer> nodeIds = new HashSet<>();
+ topology.getCalls().forEach(call -> {
+ nodeIds.add(call.getSource());
+ nodeIds.add(call.getTarget());
});
- return clientCalls;
+ for (int i = topology.getNodes().size() - 1; i >= 0; i--) {
+ if (!nodeIds.contains(topology.getNodes().get(i).getId())) {
+ topology.getNodes().remove(i);
+ }
+ }
+
+ return topology;
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
index 6bf519d..c1bf763 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.storage;
import org.apache.skywalking.oap.server.core.storage.cache.*;
+import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
/**
@@ -36,6 +37,7 @@ public class StorageModule extends ModuleDefine {
return new Class[] {
IBatchDAO.class, StorageDAO.class, IRegisterLockDAO.class,
IServiceInventoryCacheDAO.class, IServiceInstanceInventoryCacheDAO.class,
- IEndpointInventoryCacheDAO.class, INetworkAddressInventoryCacheDAO.class};
+ IEndpointInventoryCacheDAO.class, INetworkAddressInventoryCacheDAO.class,
+ ITopologyQueryDAO.class};
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopologyQueryDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopologyQueryDAO.java
new file mode 100644
index 0000000..74d31b9
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopologyQueryDAO.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage.query;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.skywalking.oap.server.core.query.entity.*;
+import org.apache.skywalking.oap.server.core.source.*;
+import org.apache.skywalking.oap.server.library.module.Service;
+
+/**
+ * @author peng-yongsheng
+ */
+public interface ITopologyQueryDAO extends Service {
+
+ List<Call> loadSpecifiedServerSideServiceRelations(Step step, long startTB, long endTB,
+ List<Integer> serviceIds) throws IOException;
+
+ List<Call> loadSpecifiedClientSideServiceRelations(Step step, long startTB, long endTB,
+ List<Integer> serviceIds) throws IOException;
+
+ List<Call> loadServerSideServiceRelations(Step step, long startTB, long endTB) throws IOException;
+
+ List<Call> loadClientSideServiceRelations(Step step, long startTB, long endTB) throws IOException;
+
+ List<ServiceMapping> loadServiceMappings(Step step, long startTB, long endTB) throws IOException;
+
+ List<ServiceComponent> loadServiceComponents(Step step, long startTB, long endTB) throws IOException;
+}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
index 501513f..c90ffb7 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
@@ -50,7 +50,7 @@ public class TopologyQuery implements GraphQLQueryResolver {
return getQueryService().getGlobalTopology(duration.getStep(), startTimeBucket, endTimeBucket);
}
- public Topology getServiceTopology(final String serviceId, final Duration duration) {
+ public Topology getServiceTopology(final int serviceId, final Duration duration) throws IOException {
long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index 8acc093..696e86e 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -20,12 +20,14 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.cache.*;
+import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.library.client.NameSpace;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock.*;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopologyQueryEsDAO;
import org.slf4j.*;
/**
@@ -72,6 +74,8 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
this.registerServiceImplementation(IServiceInstanceInventoryCacheDAO.class, new ServiceInstanceInventoryCacheDAO(elasticSearchClient));
this.registerServiceImplementation(IEndpointInventoryCacheDAO.class, new EndpointInventoryCacheEsDAO(elasticSearchClient));
this.registerServiceImplementation(INetworkAddressInventoryCacheDAO.class, new NetworkAddressInventoryCacheEsDAO(elasticSearchClient));
+
+ this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearchClient));
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
index fcc4a1e..48bb44c 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
@@ -34,7 +34,7 @@ public abstract class EsDAO extends AbstractDAO<ElasticSearchClient> {
super(client);
}
- public void queryBuild(SearchSourceBuilder sourceBuilder, Where where, long startTB, long endTB) {
+ public final void queryBuild(SearchSourceBuilder sourceBuilder, Where where, long startTB, long endTB) {
RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(Indicator.TIME_BUCKET).gte(startTB).lte(endTB);
if (where.getKeyValues().isEmpty()) {
sourceBuilder.query(rangeQueryBuilder);
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java
new file mode 100644
index 0000000..5e7be1b
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
+
+import java.io.IOException;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.manual.service.*;
+import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.*;
+import org.apache.skywalking.oap.server.core.query.entity.*;
+import org.apache.skywalking.oap.server.core.source.*;
+import org.apache.skywalking.oap.server.core.storage.TimePyramidTableNameBuilder;
+import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.*;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+import org.elasticsearch.search.aggregations.bucket.terms.*;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+
+/**
+ * @author peng-yongsheng
+ */
+public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
+
+ public TopologyQueryEsDAO(ElasticSearchClient client) {
+ super(client);
+ }
+
+ @Override
+ public List<Call> loadSpecifiedServerSideServiceRelations(Step step, long startTB, long endTB,
+ List<Integer> serviceIds) throws IOException {
+ if (CollectionUtils.isEmpty(serviceIds)) {
+ throw new UnexpectedException("Service id is null");
+ }
+
+ SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+ sourceBuilder.size(0);
+ setQueryCondition(sourceBuilder, startTB, endTB, serviceIds);
+
+ String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationServerSideIndicator.INDEX_NAME);
+ return load(sourceBuilder, indexName, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID);
+ }
+
+ @Override
+ public List<Call> loadSpecifiedClientSideServiceRelations(Step step, long startTB, long endTB,
+ List<Integer> serviceIds) throws IOException {
+ SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+ sourceBuilder.size(0);
+ setQueryCondition(sourceBuilder, startTB, endTB, serviceIds);
+
+ String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationClientSideIndicator.INDEX_NAME);
+ return load(sourceBuilder, indexName, ServiceRelationClientSideIndicator.SOURCE_SERVICE_ID, ServiceRelationClientSideIndicator.DEST_SERVICE_ID);
+ }
+
+ private void setQueryCondition(SearchSourceBuilder sourceBuilder, long startTB, long endTB,
+ List<Integer> serviceIds) {
+ BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
+ boolQuery.must().add(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
+
+ BoolQueryBuilder serviceIdBoolQuery = QueryBuilders.boolQuery();
+ boolQuery.must().add(serviceIdBoolQuery);
+
+ if (serviceIds.size() == 1) {
+ boolQuery.should().add(QueryBuilders.termQuery(ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, serviceIds.get(0)));
+ boolQuery.should().add(QueryBuilders.termQuery(ServiceRelationServerSideIndicator.DEST_SERVICE_ID, serviceIds.get(0)));
+ } else {
+ boolQuery.should().add(QueryBuilders.termsQuery(ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, serviceIds));
+ boolQuery.should().add(QueryBuilders.termsQuery(ServiceRelationServerSideIndicator.DEST_SERVICE_ID, serviceIds));
+ }
+ sourceBuilder.query(boolQuery);
+ }
+
+ @Override public List<Call> loadServerSideServiceRelations(Step step, long startTB, long endTB) throws IOException {
+ String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationServerSideIndicator.INDEX_NAME);
+ SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+ sourceBuilder.query(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
+ sourceBuilder.size(0);
+
+ return load(sourceBuilder, indexName, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID);
+ }
+
+ @Override public List<Call> loadClientSideServiceRelations(Step step, long startTB, long endTB) throws IOException {
+ String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationClientSideIndicator.INDEX_NAME);
+ SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+ sourceBuilder.query(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
+ sourceBuilder.size(0);
+
+ return load(sourceBuilder, indexName, ServiceRelationClientSideIndicator.SOURCE_SERVICE_ID, ServiceRelationClientSideIndicator.DEST_SERVICE_ID);
+ }
+
+ private List<Call> load(SearchSourceBuilder sourceBuilder, String indexName, String sourceCName,
+ String destCName) throws IOException {
+ TermsAggregationBuilder sourceAggregation = AggregationBuilders.terms(sourceCName).field(sourceCName).size(1000);
+ sourceAggregation.subAggregation(AggregationBuilders.terms(destCName).field(destCName).size(1000));
+ sourceBuilder.aggregation(sourceAggregation);
+
+ SearchResponse response = getClient().search(indexName, sourceBuilder);
+
+ List<Call> calls = new ArrayList<>();
+ Terms sourceTerms = response.getAggregations().get(sourceCName);
+ for (Terms.Bucket sourceBucket : sourceTerms.getBuckets()) {
+ Terms destTerms = sourceBucket.getAggregations().get(destCName);
+ for (Terms.Bucket destBucket : destTerms.getBuckets()) {
+ Call value = new Call();
+ value.setSource(sourceBucket.getKeyAsNumber().intValue());
+ value.setTarget(destBucket.getKeyAsNumber().intValue());
+ calls.add(value);
+ }
+ }
+ return calls;
+ }
+
+ @Override public List<ServiceMapping> loadServiceMappings(Step step, long startTB, long endTB) throws IOException {
+ String indexName = TimePyramidTableNameBuilder.build(step, ServiceMappingIndicator.INDEX_NAME);
+ SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+ sourceBuilder.query(QueryBuilders.rangeQuery(ServiceMappingIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
+ sourceBuilder.size(0);
+
+ TermsAggregationBuilder sourceAggregation = AggregationBuilders.terms(ServiceMappingIndicator.SERVICE_ID).field(ServiceMappingIndicator.SERVICE_ID).size(1000);
+ sourceAggregation.subAggregation(AggregationBuilders.terms(ServiceMappingIndicator.MAPPING_SERVICE_ID).field(ServiceMappingIndicator.MAPPING_SERVICE_ID).size(1000));
+ sourceBuilder.aggregation(sourceAggregation);
+
+ SearchResponse response = getClient().search(indexName, sourceBuilder);
+
+ List<ServiceMapping> serviceMappings = new ArrayList<>();
+ Terms serviceIdTerms = response.getAggregations().get(ServiceMappingIndicator.SERVICE_ID);
+ for (Terms.Bucket serviceIdBucket : serviceIdTerms.getBuckets()) {
+ Terms mappingServiceIdTerms = serviceIdBucket.getAggregations().get(ServiceMappingIndicator.MAPPING_SERVICE_ID);
+ for (Terms.Bucket mappingServiceIdBucket : mappingServiceIdTerms.getBuckets()) {
+ ServiceMapping serviceMapping = new ServiceMapping();
+ serviceMapping.setServiceId(serviceIdBucket.getKeyAsNumber().intValue());
+ serviceMapping.setMappingServiceId(mappingServiceIdBucket.getKeyAsNumber().intValue());
+ serviceMappings.add(serviceMapping);
+ }
+ }
+ return serviceMappings;
+ }
+
+ @Override
+ public List<ServiceComponent> loadServiceComponents(Step step, long startTB, long endTB) throws IOException {
+ String indexName = TimePyramidTableNameBuilder.build(step, ServiceComponentIndicator.INDEX_NAME);
+ SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+ sourceBuilder.query(QueryBuilders.rangeQuery(ServiceComponentIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
+ sourceBuilder.size(0);
+
+ TermsAggregationBuilder sourceAggregation = AggregationBuilders.terms(ServiceComponentIndicator.SERVICE_ID).field(ServiceComponentIndicator.SERVICE_ID).size(1000);
+ sourceAggregation.subAggregation(AggregationBuilders.terms(ServiceComponentIndicator.COMPONENT_ID).field(ServiceComponentIndicator.COMPONENT_ID).size(1000));
+ sourceBuilder.aggregation(sourceAggregation);
+
+ SearchResponse response = getClient().search(indexName, sourceBuilder);
+
+ List<ServiceComponent> serviceComponents = new ArrayList<>();
+ Terms serviceIdTerms = response.getAggregations().get(ServiceComponentIndicator.SERVICE_ID);
+ for (Terms.Bucket serviceIdBucket : serviceIdTerms.getBuckets()) {
+ Terms componentIdTerms = serviceIdBucket.getAggregations().get(ServiceComponentIndicator.COMPONENT_ID);
+ for (Terms.Bucket componentIdBucket : componentIdTerms.getBuckets()) {
+ ServiceComponent serviceComponent = new ServiceComponent();
+ serviceComponent.setServiceId(serviceIdBucket.getKeyAsNumber().intValue());
+ serviceComponent.setComponentId(componentIdBucket.getKeyAsNumber().intValue());
+ serviceComponents.add(serviceComponent);
+ }
+ }
+ return serviceComponents;
+ }
+}