You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2018/10/17 14:44:28 UTC

[incubator-skywalking] branch master updated: Trigger service heartbeat when received service instance heartbeat. (#1785)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 4063397  Trigger service heartbeat when received service instance heartbeat. (#1785)
4063397 is described below

commit 406339798849e96b593b8262d8544b194d78c1df
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Wed Oct 17 22:44:21 2018 +0800

    Trigger service heartbeat when received service instance heartbeat. (#1785)
    
    Fixed some meta query bugs.
---
 .../server/core/query/TopologyQueryService.java    | 38 ++++++++++++++++++++--
 .../oap/server/core/query/entity/Service.java      |  2 +-
 .../oap/query/graphql/resolver/TopologyQuery.java  |  8 +++--
 .../v5/grpc/InstanceDiscoveryServiceHandler.java   | 16 ++++++++-
 .../elasticsearch/query/MetadataQueryEsDAO.java    | 13 +++++---
 5 files changed, 65 insertions(+), 12 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java
index 766fa7d..077ba8b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java
@@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogServ
 import org.apache.skywalking.oap.server.core.query.entity.*;
 import org.apache.skywalking.oap.server.core.source.*;
 import org.apache.skywalking.oap.server.core.storage.StorageModule;
-import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.query.*;
 import org.apache.skywalking.oap.server.library.module.*;
 import org.apache.skywalking.oap.server.library.module.Service;
 import org.slf4j.*;
@@ -40,6 +40,7 @@ public class TopologyQueryService implements Service {
 
     private final ModuleManager moduleManager;
     private ITopologyQueryDAO topologyQueryDAO;
+    private IMetadataQueryDAO metadataQueryDAO;
     private EndpointInventoryCache endpointInventoryCache;
     private IComponentLibraryCatalogService componentLibraryCatalogService;
 
@@ -47,6 +48,13 @@ public class TopologyQueryService implements Service {
         this.moduleManager = moduleManager;
     }
 
+    private IMetadataQueryDAO getMetadataQueryDAO() {
+        if (metadataQueryDAO == null) {
+            metadataQueryDAO = moduleManager.find(StorageModule.NAME).getService(IMetadataQueryDAO.class);
+        }
+        return metadataQueryDAO;
+    }
+
     private ITopologyQueryDAO getTopologyQueryDAO() {
         if (topologyQueryDAO == null) {
             topologyQueryDAO = moduleManager.find(StorageModule.NAME).getService(ITopologyQueryDAO.class);
@@ -68,7 +76,8 @@ public class TopologyQueryService implements Service {
         return endpointInventoryCache;
     }
 
-    public Topology getGlobalTopology(final Step step, final long startTB, final long endTB) throws IOException {
+    public Topology getGlobalTopology(final Step step, final long startTB, final long endTB, final long startTimestamp,
+        final long endTimestamp) throws IOException {
         logger.debug("step: {}, startTimeBucket: {}, endTimeBucket: {}", step, startTB, endTB);
         List<ServiceComponent> serviceComponents = getTopologyQueryDAO().loadServiceComponents(step, startTB, endTB);
         List<ServiceMapping> serviceMappings = getTopologyQueryDAO().loadServiceMappings(step, startTB, endTB);
@@ -76,8 +85,31 @@ public class TopologyQueryService implements Service {
         List<Call> serviceRelationClientCalls = getTopologyQueryDAO().loadClientSideServiceRelations(step, startTB, endTB);
         List<Call> serviceRelationServerCalls = getTopologyQueryDAO().loadServerSideServiceRelations(step, startTB, endTB);
 
+        List<org.apache.skywalking.oap.server.core.query.entity.Service> serviceList = getMetadataQueryDAO().searchServices(startTimestamp, endTimestamp, null);
+
         TopologyBuilder builder = new TopologyBuilder(moduleManager);
-        return builder.build(serviceComponents, serviceMappings, serviceRelationClientCalls, serviceRelationServerCalls);
+        Topology topology = builder.build(serviceComponents, serviceMappings, serviceRelationClientCalls, serviceRelationServerCalls);
+
+        serviceList.forEach(service -> {
+            boolean contains = false;
+            for (Node node : topology.getNodes()) {
+                if (service.getId() == node.getId()) {
+                    contains = true;
+                    break;
+                }
+            }
+
+            if (!contains) {
+                Node newNode = new Node();
+                newNode.setId(service.getId());
+                newNode.setName(service.getName());
+                newNode.setReal(true);
+                newNode.setType(Const.UNKNOWN);
+                topology.getNodes().add(newNode);
+            }
+        });
+
+        return topology;
     }
 
     public Topology getServiceTopology(final Step step, final long startTB, final long endTB,
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Service.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Service.java
index 3f7a97c..76ed34e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Service.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Service.java
@@ -26,6 +26,6 @@ import lombok.*;
 @Getter
 @Setter
 public class Service {
-    private String id;
+    private int id;
     private String name;
 }
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
index ef5a7bc..2073b77 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.query.graphql.resolver;
 
 import com.coxautodev.graphql.tools.GraphQLQueryResolver;
 import java.io.IOException;
+import java.text.ParseException;
 import org.apache.skywalking.oap.query.graphql.type.Duration;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.query.*;
@@ -45,11 +46,14 @@ public class TopologyQuery implements GraphQLQueryResolver {
         return queryService;
     }
 
-    public Topology getGlobalTopology(final Duration duration) throws IOException {
+    public Topology getGlobalTopology(final Duration duration) throws IOException, ParseException {
         long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
         long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
 
-        return getQueryService().getGlobalTopology(duration.getStep(), startTimeBucket, endTimeBucket);
+        long startTimestamp = DurationUtils.INSTANCE.toTimestamp(duration.getStep(), duration.getStart());
+        long endTimestamp = DurationUtils.INSTANCE.toTimestamp(duration.getStep(), duration.getEnd());
+
+        return getQueryService().getGlobalTopology(duration.getStep(), startTimeBucket, endTimeBucket, startTimestamp, endTimestamp);
     }
 
     public Topology getServiceTopology(final int serviceId, final Duration duration) throws IOException {
diff --git a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java
index 4b12f85..7e5364d 100644
--- a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java
@@ -19,10 +19,12 @@
 package org.apache.skywalking.oap.server.receiver.register.provider.handler.v5.grpc;
 
 import io.grpc.stub.StreamObserver;
+import java.util.Objects;
 import org.apache.skywalking.apm.network.language.agent.*;
 import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
 import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
-import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
+import org.apache.skywalking.oap.server.core.register.service.*;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
 import org.slf4j.*;
@@ -34,9 +36,13 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp
 
     private static final Logger logger = LoggerFactory.getLogger(InstanceDiscoveryServiceHandler.class);
 
+    private final ServiceInstanceInventoryCache serviceInstanceInventoryCache;
+    private final IServiceInventoryRegister serviceInventoryRegister;
     private final IServiceInstanceInventoryRegister serviceInstanceInventoryRegister;
 
     public InstanceDiscoveryServiceHandler(ModuleManager moduleManager) {
+        this.serviceInstanceInventoryCache = moduleManager.find(CoreModule.NAME).getService(ServiceInstanceInventoryCache.class);
+        this.serviceInventoryRegister = moduleManager.find(CoreModule.NAME).getService(IServiceInventoryRegister.class);
         this.serviceInstanceInventoryRegister = moduleManager.find(CoreModule.NAME).getService(IServiceInstanceInventoryRegister.class);
     }
 
@@ -62,6 +68,14 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp
         int serviceInstanceId = request.getApplicationInstanceId();
         long heartBeatTime = request.getHeartbeatTime();
         serviceInstanceInventoryRegister.heartbeat(serviceInstanceId, heartBeatTime);
+
+        ServiceInstanceInventory serviceInstanceInventory = serviceInstanceInventoryCache.get(serviceInstanceId);
+        if (Objects.nonNull(serviceInstanceInventory)) {
+            serviceInventoryRegister.heartbeat(serviceInstanceInventory.getServiceId(), heartBeatTime);
+        } else {
+            logger.warn("Can't found service instance by service instance id from cache, service instance id is: {}", serviceInstanceId);
+        }
+
         responseObserver.onNext(Downstream.getDefaultInstance());
         responseObserver.onCompleted();
     }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
index 76e7b31..d0058e2 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
@@ -111,9 +111,12 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
 
         BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
         boolQueryBuilder.must().add(timeRangeQueryBuild(startTimestamp, endTimestamp));
+        boolQueryBuilder.must().add(QueryBuilders.termQuery(ServiceInventory.IS_ADDRESS, BooleanUtils.FALSE));
 
-        String matchCName = MatchCNameBuilder.INSTANCE.build(ServiceInventory.NAME);
-        boolQueryBuilder.must().add(QueryBuilders.matchQuery(matchCName, keyword));
+        if (StringUtils.isNotEmpty(keyword)) {
+            String matchCName = MatchCNameBuilder.INSTANCE.build(ServiceInventory.NAME);
+            boolQueryBuilder.must().add(QueryBuilders.matchQuery(matchCName, keyword));
+        }
 
         sourceBuilder.query(boolQueryBuilder);
         sourceBuilder.size(100);
@@ -127,7 +130,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
         GetResponse response = getClient().get(ServiceInventory.MODEL_NAME, ServiceInventory.buildId(serviceCode));
         if (response.isExists()) {
             Service service = new Service();
-            service.setId(String.valueOf(response.getSource().get(ServiceInventory.SEQUENCE)));
+            service.setId(((Number)response.getSource().get(ServiceInventory.SEQUENCE)).intValue());
             service.setName((String)response.getSource().get(ServiceInventory.NAME));
             return service;
         } else {
@@ -142,8 +145,8 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
         BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
         boolQueryBuilder.must().add(QueryBuilders.termQuery(EndpointInventory.SERVICE_ID, serviceId));
 
-        String matchCName = MatchCNameBuilder.INSTANCE.build(EndpointInventory.NAME);
         if (StringUtils.isNotEmpty(keyword)) {
+            String matchCName = MatchCNameBuilder.INSTANCE.build(EndpointInventory.NAME);
             boolQueryBuilder.must().add(QueryBuilders.matchQuery(matchCName, keyword));
         }
 
@@ -208,7 +211,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
             Map<String, Object> sourceAsMap = searchHit.getSourceAsMap();
 
             Service service = new Service();
-            service.setId(String.valueOf(sourceAsMap.get(ServiceInventory.SEQUENCE)));
+            service.setId(((Number)sourceAsMap.get(ServiceInventory.SEQUENCE)).intValue());
             service.setName((String)sourceAsMap.get(ServiceInventory.NAME));
             services.add(service);
         }