You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2018/10/17 14:44:23 UTC

[GitHub] wu-sheng closed pull request #1785: Trigger service heartbeat when received service instance heartbeat.

wu-sheng closed pull request #1785: Trigger service heartbeat when received service instance heartbeat.
URL: https://github.com/apache/incubator-skywalking/pull/1785
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java
index 766fa7d79..077ba8b68 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java
@@ -26,7 +26,7 @@
 import org.apache.skywalking.oap.server.core.query.entity.*;
 import org.apache.skywalking.oap.server.core.source.*;
 import org.apache.skywalking.oap.server.core.storage.StorageModule;
-import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.query.*;
 import org.apache.skywalking.oap.server.library.module.*;
 import org.apache.skywalking.oap.server.library.module.Service;
 import org.slf4j.*;
@@ -40,6 +40,7 @@
 
     private final ModuleManager moduleManager;
     private ITopologyQueryDAO topologyQueryDAO;
+    private IMetadataQueryDAO metadataQueryDAO;
     private EndpointInventoryCache endpointInventoryCache;
     private IComponentLibraryCatalogService componentLibraryCatalogService;
 
@@ -47,6 +48,13 @@ public TopologyQueryService(ModuleManager moduleManager) {
         this.moduleManager = moduleManager;
     }
 
+    private IMetadataQueryDAO getMetadataQueryDAO() {
+        if (metadataQueryDAO == null) {
+            metadataQueryDAO = moduleManager.find(StorageModule.NAME).getService(IMetadataQueryDAO.class);
+        }
+        return metadataQueryDAO;
+    }
+
     private ITopologyQueryDAO getTopologyQueryDAO() {
         if (topologyQueryDAO == null) {
             topologyQueryDAO = moduleManager.find(StorageModule.NAME).getService(ITopologyQueryDAO.class);
@@ -68,7 +76,8 @@ private EndpointInventoryCache getEndpointInventoryCache() {
         return endpointInventoryCache;
     }
 
-    public Topology getGlobalTopology(final Step step, final long startTB, final long endTB) throws IOException {
+    public Topology getGlobalTopology(final Step step, final long startTB, final long endTB, final long startTimestamp,
+        final long endTimestamp) throws IOException {
         logger.debug("step: {}, startTimeBucket: {}, endTimeBucket: {}", step, startTB, endTB);
         List<ServiceComponent> serviceComponents = getTopologyQueryDAO().loadServiceComponents(step, startTB, endTB);
         List<ServiceMapping> serviceMappings = getTopologyQueryDAO().loadServiceMappings(step, startTB, endTB);
@@ -76,8 +85,31 @@ public Topology getGlobalTopology(final Step step, final long startTB, final lon
         List<Call> serviceRelationClientCalls = getTopologyQueryDAO().loadClientSideServiceRelations(step, startTB, endTB);
         List<Call> serviceRelationServerCalls = getTopologyQueryDAO().loadServerSideServiceRelations(step, startTB, endTB);
 
+        List<org.apache.skywalking.oap.server.core.query.entity.Service> serviceList = getMetadataQueryDAO().searchServices(startTimestamp, endTimestamp, null);
+
         TopologyBuilder builder = new TopologyBuilder(moduleManager);
-        return builder.build(serviceComponents, serviceMappings, serviceRelationClientCalls, serviceRelationServerCalls);
+        Topology topology = builder.build(serviceComponents, serviceMappings, serviceRelationClientCalls, serviceRelationServerCalls);
+
+        serviceList.forEach(service -> {
+            boolean contains = false;
+            for (Node node : topology.getNodes()) {
+                if (service.getId() == node.getId()) {
+                    contains = true;
+                    break;
+                }
+            }
+
+            if (!contains) {
+                Node newNode = new Node();
+                newNode.setId(service.getId());
+                newNode.setName(service.getName());
+                newNode.setReal(true);
+                newNode.setType(Const.UNKNOWN);
+                topology.getNodes().add(newNode);
+            }
+        });
+
+        return topology;
     }
 
     public Topology getServiceTopology(final Step step, final long startTB, final long endTB,
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Service.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Service.java
index 3f7a97c5e..76ed34e80 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Service.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Service.java
@@ -26,6 +26,6 @@
 @Getter
 @Setter
 public class Service {
-    private String id;
+    private int id;
     private String name;
 }
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
index ef5a7bc7b..2073b7719 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java
@@ -20,6 +20,7 @@
 
 import com.coxautodev.graphql.tools.GraphQLQueryResolver;
 import java.io.IOException;
+import java.text.ParseException;
 import org.apache.skywalking.oap.query.graphql.type.Duration;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.query.*;
@@ -45,11 +46,14 @@ private TopologyQueryService getQueryService() {
         return queryService;
     }
 
-    public Topology getGlobalTopology(final Duration duration) throws IOException {
+    public Topology getGlobalTopology(final Duration duration) throws IOException, ParseException {
         long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
         long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
 
-        return getQueryService().getGlobalTopology(duration.getStep(), startTimeBucket, endTimeBucket);
+        long startTimestamp = DurationUtils.INSTANCE.toTimestamp(duration.getStep(), duration.getStart());
+        long endTimestamp = DurationUtils.INSTANCE.toTimestamp(duration.getStep(), duration.getEnd());
+
+        return getQueryService().getGlobalTopology(duration.getStep(), startTimeBucket, endTimeBucket, startTimestamp, endTimestamp);
     }
 
     public Topology getServiceTopology(final int serviceId, final Duration duration) throws IOException {
diff --git a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java
index 4b12f8567..7e5364d35 100644
--- a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java
@@ -19,10 +19,12 @@
 package org.apache.skywalking.oap.server.receiver.register.provider.handler.v5.grpc;
 
 import io.grpc.stub.StreamObserver;
+import java.util.Objects;
 import org.apache.skywalking.apm.network.language.agent.*;
 import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
 import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
-import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
+import org.apache.skywalking.oap.server.core.register.service.*;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
 import org.slf4j.*;
@@ -34,9 +36,13 @@
 
     private static final Logger logger = LoggerFactory.getLogger(InstanceDiscoveryServiceHandler.class);
 
+    private final ServiceInstanceInventoryCache serviceInstanceInventoryCache;
+    private final IServiceInventoryRegister serviceInventoryRegister;
     private final IServiceInstanceInventoryRegister serviceInstanceInventoryRegister;
 
     public InstanceDiscoveryServiceHandler(ModuleManager moduleManager) {
+        this.serviceInstanceInventoryCache = moduleManager.find(CoreModule.NAME).getService(ServiceInstanceInventoryCache.class);
+        this.serviceInventoryRegister = moduleManager.find(CoreModule.NAME).getService(IServiceInventoryRegister.class);
         this.serviceInstanceInventoryRegister = moduleManager.find(CoreModule.NAME).getService(IServiceInstanceInventoryRegister.class);
     }
 
@@ -62,6 +68,14 @@ public void registerInstance(ApplicationInstance request,
         int serviceInstanceId = request.getApplicationInstanceId();
         long heartBeatTime = request.getHeartbeatTime();
         serviceInstanceInventoryRegister.heartbeat(serviceInstanceId, heartBeatTime);
+
+        ServiceInstanceInventory serviceInstanceInventory = serviceInstanceInventoryCache.get(serviceInstanceId);
+        if (Objects.nonNull(serviceInstanceInventory)) {
+            serviceInventoryRegister.heartbeat(serviceInstanceInventory.getServiceId(), heartBeatTime);
+        } else {
+            logger.warn("Can't found service instance by service instance id from cache, service instance id is: {}", serviceInstanceId);
+        }
+
         responseObserver.onNext(Downstream.getDefaultInstance());
         responseObserver.onCompleted();
     }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
index 76e7b3166..d0058e2dc 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
@@ -111,9 +111,12 @@ public MetadataQueryEsDAO(ElasticSearchClient client) {
 
         BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
         boolQueryBuilder.must().add(timeRangeQueryBuild(startTimestamp, endTimestamp));
+        boolQueryBuilder.must().add(QueryBuilders.termQuery(ServiceInventory.IS_ADDRESS, BooleanUtils.FALSE));
 
-        String matchCName = MatchCNameBuilder.INSTANCE.build(ServiceInventory.NAME);
-        boolQueryBuilder.must().add(QueryBuilders.matchQuery(matchCName, keyword));
+        if (StringUtils.isNotEmpty(keyword)) {
+            String matchCName = MatchCNameBuilder.INSTANCE.build(ServiceInventory.NAME);
+            boolQueryBuilder.must().add(QueryBuilders.matchQuery(matchCName, keyword));
+        }
 
         sourceBuilder.query(boolQueryBuilder);
         sourceBuilder.size(100);
@@ -127,7 +130,7 @@ public Service searchService(String serviceCode) throws IOException {
         GetResponse response = getClient().get(ServiceInventory.MODEL_NAME, ServiceInventory.buildId(serviceCode));
         if (response.isExists()) {
             Service service = new Service();
-            service.setId(String.valueOf(response.getSource().get(ServiceInventory.SEQUENCE)));
+            service.setId(((Number)response.getSource().get(ServiceInventory.SEQUENCE)).intValue());
             service.setName((String)response.getSource().get(ServiceInventory.NAME));
             return service;
         } else {
@@ -142,8 +145,8 @@ public Service searchService(String serviceCode) throws IOException {
         BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
         boolQueryBuilder.must().add(QueryBuilders.termQuery(EndpointInventory.SERVICE_ID, serviceId));
 
-        String matchCName = MatchCNameBuilder.INSTANCE.build(EndpointInventory.NAME);
         if (StringUtils.isNotEmpty(keyword)) {
+            String matchCName = MatchCNameBuilder.INSTANCE.build(EndpointInventory.NAME);
             boolQueryBuilder.must().add(QueryBuilders.matchQuery(matchCName, keyword));
         }
 
@@ -208,7 +211,7 @@ public Service searchService(String serviceCode) throws IOException {
             Map<String, Object> sourceAsMap = searchHit.getSourceAsMap();
 
             Service service = new Service();
-            service.setId(String.valueOf(sourceAsMap.get(ServiceInventory.SEQUENCE)));
+            service.setId(((Number)sourceAsMap.get(ServiceInventory.SEQUENCE)).intValue());
             service.setName((String)sourceAsMap.get(ServiceInventory.NAME));
             services.add(service);
         }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services