You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2018/11/01 06:46:08 UTC

[incubator-skywalking] branch bug-fix updated: Fix bug of mesh heartbeat and rest receiver.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch bug-fix
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/bug-fix by this push:
     new 34ac3fe  Fix bug of mesh heartbeat and rest receiver.
34ac3fe is described below

commit 34ac3fe3a2c17a72330775d4cab139686d21afcf
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Thu Nov 1 14:45:59 2018 +0800

    Fix bug of mesh heartbeat and rest receiver.
---
 .../receiver/mesh/TelemetryDataDispatcher.java     | 27 ++++++++++++++++++++++
 .../v5/grpc/InstanceDiscoveryServiceHandler.java   |  2 +-
 .../v5/rest/InstanceHeartBeatServletHandler.java   | 14 +++++++++++
 .../elasticsearch/query/MetadataQueryEsDAO.java    |  1 -
 4 files changed, 42 insertions(+), 2 deletions(-)

diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
index 7fa6b05..7c9e749 100644
--- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
+++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
@@ -18,13 +18,16 @@
 
 package org.apache.skywalking.aop.server.receiver.mesh;
 
+import java.util.Objects;
 import org.apache.logging.log4j.util.Strings;
 import org.apache.skywalking.apm.network.servicemesh.Protocol;
 import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
 import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
+import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
 import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
+import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
 import org.apache.skywalking.oap.server.core.source.All;
 import org.apache.skywalking.oap.server.core.source.DetectPoint;
 import org.apache.skywalking.oap.server.core.source.Endpoint;
@@ -36,6 +39,8 @@ import org.apache.skywalking.oap.server.core.source.ServiceRelation;
 import org.apache.skywalking.oap.server.core.source.SourceReceiver;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * TelemetryDataDispatcher processes the {@link ServiceMeshMetric} format telemetry data, transfers it to source
@@ -44,11 +49,14 @@ import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
  * @author wusheng
  */
 public class TelemetryDataDispatcher {
+    private static final Logger logger = LoggerFactory.getLogger(TelemetryDataDispatcher.class);
+
     private static MeshDataBufferFileCache CACHE;
     private static ServiceInventoryCache SERVICE_CACHE;
     private static ServiceInstanceInventoryCache SERVICE_INSTANCE_CACHE;
     private static SourceReceiver SOURCE_RECEIVER;
     private static IServiceInstanceInventoryRegister SERVICE_INSTANCE_INVENTORY_REGISTER;
+    private static IServiceInventoryRegister SERVICE_INVENTORY_REGISTER;
 
     private TelemetryDataDispatcher() {
 
@@ -60,6 +68,7 @@ public class TelemetryDataDispatcher {
         SERVICE_INSTANCE_CACHE = moduleManager.find(CoreModule.NAME).getService(ServiceInstanceInventoryCache.class);
         SOURCE_RECEIVER = moduleManager.find(CoreModule.NAME).getService(SourceReceiver.class);
         SERVICE_INSTANCE_INVENTORY_REGISTER = moduleManager.find(CoreModule.NAME).getService(IServiceInstanceInventoryRegister.class);
+        SERVICE_INVENTORY_REGISTER = moduleManager.find(CoreModule.NAME).getService(IServiceInventoryRegister.class);
     }
 
     public static void preProcess(ServiceMeshMetric data) {
@@ -93,8 +102,26 @@ public class TelemetryDataDispatcher {
 
     private static void heartbeat(ServiceMeshMetricDataDecorator decorator, long minuteTimeBucket) {
         ServiceMeshMetric metric = decorator.getMetric();
+
+        // source
         SERVICE_INSTANCE_INVENTORY_REGISTER.heartbeat(metric.getSourceServiceInstanceId(), metric.getEndTime());
+        int instanceId = metric.getSourceServiceInstanceId();
+        ServiceInstanceInventory serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId);
+        if (Objects.nonNull(serviceInstanceInventory)) {
+            SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime());
+        } else {
+            logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId);
+        }
+
+        // dest
         SERVICE_INSTANCE_INVENTORY_REGISTER.heartbeat(metric.getDestServiceInstanceId(), metric.getEndTime());
+        instanceId = metric.getDestServiceInstanceId();
+        serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId);
+        if (Objects.nonNull(serviceInstanceInventory)) {
+            SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime());
+        } else {
+            logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId);
+        }
     }
 
     private static void toAll(ServiceMeshMetricDataDecorator decorator, long minuteTimeBucket) {
diff --git a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java
index 378162e..5da0182 100644
--- a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java
@@ -88,7 +88,7 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp
         if (Objects.nonNull(serviceInstanceInventory)) {
             serviceInventoryRegister.heartbeat(serviceInstanceInventory.getServiceId(), heartBeatTime);
         } else {
-            logger.warn("Can't found service instance by service instance id from cache, service instance id is: {}", serviceInstanceId);
+            logger.warn("Can't found service by service instance id from cache, service instance id is: {}", serviceInstanceId);
         }
 
         responseObserver.onNext(Downstream.getDefaultInstance());
diff --git a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/InstanceHeartBeatServletHandler.java b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/InstanceHeartBeatServletHandler.java
index 1c112c7..df2a7fe 100644
--- a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/InstanceHeartBeatServletHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/InstanceHeartBeatServletHandler.java
@@ -20,9 +20,13 @@ package org.apache.skywalking.oap.server.receiver.register.provider.handler.v5.r
 
 import com.google.gson.*;
 import java.io.IOException;
+import java.util.Objects;
 import javax.servlet.http.HttpServletRequest;
 import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
+import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
 import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
+import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.server.jetty.*;
 import org.slf4j.*;
@@ -35,6 +39,8 @@ public class InstanceHeartBeatServletHandler extends JettyJsonHandler {
     private static final Logger logger = LoggerFactory.getLogger(InstanceHeartBeatServletHandler.class);
 
     private final IServiceInstanceInventoryRegister serviceInstanceInventoryRegister;
+    private final ServiceInstanceInventoryCache serviceInstanceInventoryCache;
+    private final IServiceInventoryRegister serviceInventoryRegister;
     private final Gson gson = new Gson();
 
     private static final String INSTANCE_ID = "ii";
@@ -42,6 +48,8 @@ public class InstanceHeartBeatServletHandler extends JettyJsonHandler {
 
     public InstanceHeartBeatServletHandler(ModuleManager moduleManager) {
         this.serviceInstanceInventoryRegister = moduleManager.find(CoreModule.NAME).getService(IServiceInstanceInventoryRegister.class);
+        this.serviceInstanceInventoryCache = moduleManager.find(CoreModule.NAME).getService(ServiceInstanceInventoryCache.class);
+        this.serviceInventoryRegister = moduleManager.find(CoreModule.NAME).getService(IServiceInventoryRegister.class);
     }
 
     @Override public String pathSpec() {
@@ -60,6 +68,12 @@ public class InstanceHeartBeatServletHandler extends JettyJsonHandler {
             long heartBeatTime = heartBeat.get(HEARTBEAT_TIME).getAsLong();
 
             serviceInstanceInventoryRegister.heartbeat(instanceId, heartBeatTime);
+            ServiceInstanceInventory serviceInstanceInventory = serviceInstanceInventoryCache.get(instanceId);
+            if (Objects.nonNull(serviceInstanceInventory)) {
+                serviceInventoryRegister.heartbeat(serviceInstanceInventory.getServiceId(), heartBeatTime);
+            } else {
+                logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId);
+            }
         } catch (IOException e) {
             logger.error(e.getMessage(), e);
         }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
index ab8c727..442381a 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
@@ -61,7 +61,6 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
         SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
 
         BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
-        boolQueryBuilder.must().add(timeRangeQueryBuild(startTimestamp, endTimestamp));
 
         boolQueryBuilder.must().add(QueryBuilders.termQuery(EndpointInventory.DETECT_POINT, DetectPoint.SERVER.ordinal()));