You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2018/11/01 06:46:08 UTC
[incubator-skywalking] branch bug-fix updated: Fix bug of mesh
heartbeat and rest receiver.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch bug-fix
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/bug-fix by this push:
new 34ac3fe Fix bug of mesh heartbeat and rest receiver.
34ac3fe is described below
commit 34ac3fe3a2c17a72330775d4cab139686d21afcf
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Thu Nov 1 14:45:59 2018 +0800
Fix bug of mesh heartbeat and rest receiver.
---
.../receiver/mesh/TelemetryDataDispatcher.java | 27 ++++++++++++++++++++++
.../v5/grpc/InstanceDiscoveryServiceHandler.java | 2 +-
.../v5/rest/InstanceHeartBeatServletHandler.java | 14 +++++++++++
.../elasticsearch/query/MetadataQueryEsDAO.java | 1 -
4 files changed, 42 insertions(+), 2 deletions(-)
diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
index 7fa6b05..7c9e749 100644
--- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
+++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
@@ -18,13 +18,16 @@
package org.apache.skywalking.aop.server.receiver.mesh;
+import java.util.Objects;
import org.apache.logging.log4j.util.Strings;
import org.apache.skywalking.apm.network.servicemesh.Protocol;
import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
+import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
+import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
import org.apache.skywalking.oap.server.core.source.All;
import org.apache.skywalking.oap.server.core.source.DetectPoint;
import org.apache.skywalking.oap.server.core.source.Endpoint;
@@ -36,6 +39,8 @@ import org.apache.skywalking.oap.server.core.source.ServiceRelation;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* TelemetryDataDispatcher processes the {@link ServiceMeshMetric} format telemetry data, transfers it to source
@@ -44,11 +49,14 @@ import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
* @author wusheng
*/
public class TelemetryDataDispatcher {
+ private static final Logger logger = LoggerFactory.getLogger(TelemetryDataDispatcher.class);
+
private static MeshDataBufferFileCache CACHE;
private static ServiceInventoryCache SERVICE_CACHE;
private static ServiceInstanceInventoryCache SERVICE_INSTANCE_CACHE;
private static SourceReceiver SOURCE_RECEIVER;
private static IServiceInstanceInventoryRegister SERVICE_INSTANCE_INVENTORY_REGISTER;
+ private static IServiceInventoryRegister SERVICE_INVENTORY_REGISTER;
private TelemetryDataDispatcher() {
@@ -60,6 +68,7 @@ public class TelemetryDataDispatcher {
SERVICE_INSTANCE_CACHE = moduleManager.find(CoreModule.NAME).getService(ServiceInstanceInventoryCache.class);
SOURCE_RECEIVER = moduleManager.find(CoreModule.NAME).getService(SourceReceiver.class);
SERVICE_INSTANCE_INVENTORY_REGISTER = moduleManager.find(CoreModule.NAME).getService(IServiceInstanceInventoryRegister.class);
+ SERVICE_INVENTORY_REGISTER = moduleManager.find(CoreModule.NAME).getService(IServiceInventoryRegister.class);
}
public static void preProcess(ServiceMeshMetric data) {
@@ -93,8 +102,26 @@ public class TelemetryDataDispatcher {
private static void heartbeat(ServiceMeshMetricDataDecorator decorator, long minuteTimeBucket) {
ServiceMeshMetric metric = decorator.getMetric();
+
+ // source
SERVICE_INSTANCE_INVENTORY_REGISTER.heartbeat(metric.getSourceServiceInstanceId(), metric.getEndTime());
+ int instanceId = metric.getSourceServiceInstanceId();
+ ServiceInstanceInventory serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId);
+ if (Objects.nonNull(serviceInstanceInventory)) {
+ SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime());
+ } else {
+ logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId);
+ }
+
+ // dest
SERVICE_INSTANCE_INVENTORY_REGISTER.heartbeat(metric.getDestServiceInstanceId(), metric.getEndTime());
+ instanceId = metric.getDestServiceInstanceId();
+ serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId);
+ if (Objects.nonNull(serviceInstanceInventory)) {
+ SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime());
+ } else {
+ logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId);
+ }
}
private static void toAll(ServiceMeshMetricDataDecorator decorator, long minuteTimeBucket) {
diff --git a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java
index 378162e..5da0182 100644
--- a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java
@@ -88,7 +88,7 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp
if (Objects.nonNull(serviceInstanceInventory)) {
serviceInventoryRegister.heartbeat(serviceInstanceInventory.getServiceId(), heartBeatTime);
} else {
- logger.warn("Can't found service instance by service instance id from cache, service instance id is: {}", serviceInstanceId);
+ logger.warn("Can't found service by service instance id from cache, service instance id is: {}", serviceInstanceId);
}
responseObserver.onNext(Downstream.getDefaultInstance());
diff --git a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/InstanceHeartBeatServletHandler.java b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/InstanceHeartBeatServletHandler.java
index 1c112c7..df2a7fe 100644
--- a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/InstanceHeartBeatServletHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/InstanceHeartBeatServletHandler.java
@@ -20,9 +20,13 @@ package org.apache.skywalking.oap.server.receiver.register.provider.handler.v5.r
import com.google.gson.*;
import java.io.IOException;
+import java.util.Objects;
import javax.servlet.http.HttpServletRequest;
import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
+import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
+import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.jetty.*;
import org.slf4j.*;
@@ -35,6 +39,8 @@ public class InstanceHeartBeatServletHandler extends JettyJsonHandler {
private static final Logger logger = LoggerFactory.getLogger(InstanceHeartBeatServletHandler.class);
private final IServiceInstanceInventoryRegister serviceInstanceInventoryRegister;
+ private final ServiceInstanceInventoryCache serviceInstanceInventoryCache;
+ private final IServiceInventoryRegister serviceInventoryRegister;
private final Gson gson = new Gson();
private static final String INSTANCE_ID = "ii";
@@ -42,6 +48,8 @@ public class InstanceHeartBeatServletHandler extends JettyJsonHandler {
public InstanceHeartBeatServletHandler(ModuleManager moduleManager) {
this.serviceInstanceInventoryRegister = moduleManager.find(CoreModule.NAME).getService(IServiceInstanceInventoryRegister.class);
+ this.serviceInstanceInventoryCache = moduleManager.find(CoreModule.NAME).getService(ServiceInstanceInventoryCache.class);
+ this.serviceInventoryRegister = moduleManager.find(CoreModule.NAME).getService(IServiceInventoryRegister.class);
}
@Override public String pathSpec() {
@@ -60,6 +68,12 @@ public class InstanceHeartBeatServletHandler extends JettyJsonHandler {
long heartBeatTime = heartBeat.get(HEARTBEAT_TIME).getAsLong();
serviceInstanceInventoryRegister.heartbeat(instanceId, heartBeatTime);
+ ServiceInstanceInventory serviceInstanceInventory = serviceInstanceInventoryCache.get(instanceId);
+ if (Objects.nonNull(serviceInstanceInventory)) {
+ serviceInventoryRegister.heartbeat(serviceInstanceInventory.getServiceId(), heartBeatTime);
+ } else {
+ logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId);
+ }
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
index ab8c727..442381a 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
@@ -61,7 +61,6 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
- boolQueryBuilder.must().add(timeRangeQueryBuild(startTimestamp, endTimestamp));
boolQueryBuilder.must().add(QueryBuilders.termQuery(EndpointInventory.DETECT_POINT, DetectPoint.SERVER.ordinal()));