You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2018/10/17 14:44:28 UTC
[incubator-skywalking] branch master updated: Trigger service
heartbeat when received service instance heartbeat. (#1785)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 4063397 Trigger service heartbeat when received service instance heartbeat. (#1785)
4063397 is described below
commit 406339798849e96b593b8262d8544b194d78c1df
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Wed Oct 17 22:44:21 2018 +0800
Trigger service heartbeat when received service instance heartbeat. (#1785)
Fixed some meta query bugs.
---
.../server/core/query/TopologyQueryService.java | 38 ++++++++++++++++++++--
.../oap/server/core/query/entity/Service.java | 2 +-
.../oap/query/graphql/resolver/TopologyQuery.java | 8 +++--
.../v5/grpc/InstanceDiscoveryServiceHandler.java | 16 ++++++++-
.../elasticsearch/query/MetadataQueryEsDAO.java | 13 +++++---
5 files changed, 65 insertions(+), 12 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java
index 766fa7d..077ba8b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java
@@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogServ
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
-import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.query.*;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.module.Service;
import org.slf4j.*;
@@ -40,6 +40,7 @@ public class TopologyQueryService implements Service {
private final ModuleManager moduleManager;
private ITopologyQueryDAO topologyQueryDAO;
+ private IMetadataQueryDAO metadataQueryDAO;
private EndpointInventoryCache endpointInventoryCache;
private IComponentLibraryCatalogService componentLibraryCatalogService;
@@ -47,6 +48,13 @@ public class TopologyQueryService implements Service {
this.moduleManager = moduleManager;
}
+ private IMetadataQueryDAO getMetadataQueryDAO() {
+ if (metadataQueryDAO == null) {
+ metadataQueryDAO = moduleManager.find(StorageModule.NAME).getService(IMetadataQueryDAO.class);
+ }
+ return metadataQueryDAO;
+ }
+
private ITopologyQueryDAO getTopologyQueryDAO() {
if (topologyQueryDAO == null) {
topologyQueryDAO = moduleManager.find(StorageModule.NAME).getService(ITopologyQueryDAO.class);
@@ -68,7 +76,8 @@ public class TopologyQueryService implements Service {
return endpointInventoryCache;
}
- public Topology getGlobalTopology(final Step step, final long startTB, final long endTB) throws IOException {
+ public Topology getGlobalTopology(final Step step, final long startTB, final long endTB, final long startTimestamp,
+ final long endTimestamp) throws IOException {
logger.debug("step: {}, startTimeBucket: {}, endTimeBucket: {}", step, startTB, endTB);
List<ServiceComponent> serviceComponents = getTopologyQueryDAO().loadServiceComponents(step, startTB, endTB);
List<ServiceMapping> serviceMappings = getTopologyQueryDAO().loadServiceMappings(step, startTB, endTB);
@@ -76,8 +85,31 @@ public class TopologyQueryService implements Service {
List<Call> serviceRelationClientCalls = getTopologyQueryDAO().loadClientSideServiceRelations(step, startTB, endTB);
List<Call> serviceRelationServerCalls = getTopologyQueryDAO().loadServerSideServiceRelations(step, startTB, endTB);
+ List<org.apache.skywalking.oap.server.core.query.entity.Service> serviceList = getMetadataQueryDAO().searchServices(startTimestamp, endTimestamp, null);
+
TopologyBuilder builder = new TopologyBuilder(moduleManager);
- return builder.build(serviceComponents, serviceMappings, serviceRelationClientCalls, serviceRelationServerCalls);
+ Topology topology = builder.build(serviceComponents, serviceMappings, serviceRelationClientCalls, serviceRelationServerCalls);
+
+ serviceList.forEach(service -> {
+ boolean contains = false;
+ for (Node node : topology.getNodes()) {
+ if (service.getId() == node.getId()) {
+ contains = true;
+ break;
+ }
+ }
+
+ if (!contains) {
+ Node newNode = new Node();
+ newNode.setId(service.getId());
+ newNode.setName(service.getName());
+ newNode.setReal(true);
+ newNode.setType(Const.UNKNOWN);
+ topology.getNodes().add(newNode);
+ }
+ });
+
+ return topology;
}
public Topology getServiceTopology(final Step step, final long startTB, final long endTB,
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Service.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Service.java
index 3f7a97c..76ed34e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Service.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Service.java
@@ -26,6 +26,6 @@ import lombok.*;
@Getter
@Setter
public class Service {
- private String id;
+ private int id;
private String name;
}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
index ef5a7bc..2073b77 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.query.graphql.resolver;
import com.coxautodev.graphql.tools.GraphQLQueryResolver;
import java.io.IOException;
+import java.text.ParseException;
import org.apache.skywalking.oap.query.graphql.type.Duration;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.query.*;
@@ -45,11 +46,14 @@ public class TopologyQuery implements GraphQLQueryResolver {
return queryService;
}
- public Topology getGlobalTopology(final Duration duration) throws IOException {
+ public Topology getGlobalTopology(final Duration duration) throws IOException, ParseException {
long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
- return getQueryService().getGlobalTopology(duration.getStep(), startTimeBucket, endTimeBucket);
+ long startTimestamp = DurationUtils.INSTANCE.toTimestamp(duration.getStep(), duration.getStart());
+ long endTimestamp = DurationUtils.INSTANCE.toTimestamp(duration.getStep(), duration.getEnd());
+
+ return getQueryService().getGlobalTopology(duration.getStep(), startTimeBucket, endTimeBucket, startTimestamp, endTimestamp);
}
public Topology getServiceTopology(final int serviceId, final Duration duration) throws IOException {
diff --git a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java
index 4b12f85..7e5364d 100644
--- a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java
@@ -19,10 +19,12 @@
package org.apache.skywalking.oap.server.receiver.register.provider.handler.v5.grpc;
import io.grpc.stub.StreamObserver;
+import java.util.Objects;
import org.apache.skywalking.apm.network.language.agent.*;
import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
-import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
+import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import org.slf4j.*;
@@ -34,9 +36,13 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp
private static final Logger logger = LoggerFactory.getLogger(InstanceDiscoveryServiceHandler.class);
+ private final ServiceInstanceInventoryCache serviceInstanceInventoryCache;
+ private final IServiceInventoryRegister serviceInventoryRegister;
private final IServiceInstanceInventoryRegister serviceInstanceInventoryRegister;
public InstanceDiscoveryServiceHandler(ModuleManager moduleManager) {
+ this.serviceInstanceInventoryCache = moduleManager.find(CoreModule.NAME).getService(ServiceInstanceInventoryCache.class);
+ this.serviceInventoryRegister = moduleManager.find(CoreModule.NAME).getService(IServiceInventoryRegister.class);
this.serviceInstanceInventoryRegister = moduleManager.find(CoreModule.NAME).getService(IServiceInstanceInventoryRegister.class);
}
@@ -62,6 +68,14 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp
int serviceInstanceId = request.getApplicationInstanceId();
long heartBeatTime = request.getHeartbeatTime();
serviceInstanceInventoryRegister.heartbeat(serviceInstanceId, heartBeatTime);
+
+ ServiceInstanceInventory serviceInstanceInventory = serviceInstanceInventoryCache.get(serviceInstanceId);
+ if (Objects.nonNull(serviceInstanceInventory)) {
+ serviceInventoryRegister.heartbeat(serviceInstanceInventory.getServiceId(), heartBeatTime);
+ } else {
+ logger.warn("Can't found service instance by service instance id from cache, service instance id is: {}", serviceInstanceId);
+ }
+
responseObserver.onNext(Downstream.getDefaultInstance());
responseObserver.onCompleted();
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
index 76e7b31..d0058e2 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
@@ -111,9 +111,12 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must().add(timeRangeQueryBuild(startTimestamp, endTimestamp));
+ boolQueryBuilder.must().add(QueryBuilders.termQuery(ServiceInventory.IS_ADDRESS, BooleanUtils.FALSE));
- String matchCName = MatchCNameBuilder.INSTANCE.build(ServiceInventory.NAME);
- boolQueryBuilder.must().add(QueryBuilders.matchQuery(matchCName, keyword));
+ if (StringUtils.isNotEmpty(keyword)) {
+ String matchCName = MatchCNameBuilder.INSTANCE.build(ServiceInventory.NAME);
+ boolQueryBuilder.must().add(QueryBuilders.matchQuery(matchCName, keyword));
+ }
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.size(100);
@@ -127,7 +130,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
GetResponse response = getClient().get(ServiceInventory.MODEL_NAME, ServiceInventory.buildId(serviceCode));
if (response.isExists()) {
Service service = new Service();
- service.setId(String.valueOf(response.getSource().get(ServiceInventory.SEQUENCE)));
+ service.setId(((Number)response.getSource().get(ServiceInventory.SEQUENCE)).intValue());
service.setName((String)response.getSource().get(ServiceInventory.NAME));
return service;
} else {
@@ -142,8 +145,8 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must().add(QueryBuilders.termQuery(EndpointInventory.SERVICE_ID, serviceId));
- String matchCName = MatchCNameBuilder.INSTANCE.build(EndpointInventory.NAME);
if (StringUtils.isNotEmpty(keyword)) {
+ String matchCName = MatchCNameBuilder.INSTANCE.build(EndpointInventory.NAME);
boolQueryBuilder.must().add(QueryBuilders.matchQuery(matchCName, keyword));
}
@@ -208,7 +211,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
Map<String, Object> sourceAsMap = searchHit.getSourceAsMap();
Service service = new Service();
- service.setId(String.valueOf(sourceAsMap.get(ServiceInventory.SEQUENCE)));
+ service.setId(((Number)sourceAsMap.get(ServiceInventory.SEQUENCE)).intValue());
service.setName((String)sourceAsMap.get(ServiceInventory.NAME));
services.add(service);
}