You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/04/02 00:09:40 UTC
[incubator-skywalking] branch jaeger-receiver updated: Finish
Jaeger query tests.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch jaeger-receiver
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/jaeger-receiver by this push:
new b61f445 Finish Jaeger query tests.
b61f445 is described below
commit b61f445a91b8db0d9e5a5635cecea8428d34b7f3
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Mon Apr 1 17:09:28 2019 -0700
Finish Jaeger query tests.
---
.../server/receiver/jaeger/JaegerGRPCHandler.java | 142 +++++++++++----------
.../elasticsearch/JaegerTraceQueryEsDAO.java | 29 +++--
.../elasticsearch/ZipkinTraceQueryEsDAO.java | 5 +-
3 files changed, 98 insertions(+), 78 deletions(-)
diff --git a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerGRPCHandler.java b/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerGRPCHandler.java
index 445fcbe..9cb5227 100644
--- a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerGRPCHandler.java
+++ b/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerGRPCHandler.java
@@ -19,13 +19,15 @@
package org.apache.skywalking.aop.server.receiver.jaeger;
import com.google.gson.JsonObject;
+import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import io.jaegertracing.api_v2.*;
import java.time.Instant;
+import java.util.Base64;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.source.*;
-import org.apache.skywalking.oap.server.library.util.BooleanUtils;
+import org.apache.skywalking.oap.server.library.util.*;
import org.apache.skywalking.oap.server.receiver.sharing.server.CoreRegisterLinker;
import org.apache.skywalking.oap.server.storage.plugin.jaeger.JaegerSpan;
import org.slf4j.*;
@@ -49,81 +51,93 @@ public class JaegerGRPCHandler extends CollectorServiceGrpc.CollectorServiceImpl
StreamObserver<Collector.PostSpansResponse> responseObserver) {
request.getBatch().getSpansList().forEach(span -> {
- if (logger.isDebugEnabled()) {
- logger.debug(span.toString());
- }
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug(span.toString());
+ }
- JaegerSpan jaegerSpan = new JaegerSpan();
- jaegerSpan.setTraceId(span.getTraceId().toStringUtf8());
- jaegerSpan.setSpanId(span.getSpanId().toStringUtf8());
- Model.Process process = span.getProcess();
- int serviceId = Const.NONE;
- String serviceName = null;
- if (process != null) {
- serviceName = process.getServiceName();
- }
- if (StringUtil.isEmpty(serviceName)) {
- serviceName = "UNKNOWN";
- }
- serviceId = CoreRegisterLinker.getServiceInventoryCache().getServiceId(serviceName);
- if (serviceId != Const.NONE) {
- jaegerSpan.setServiceId(serviceId);
- } else {
- JsonObject properties = new JsonObject();
+ JaegerSpan jaegerSpan = new JaegerSpan();
+ jaegerSpan.setTraceId(format(span.getTraceId()));
+ jaegerSpan.setSpanId(format(span.getSpanId()));
+ Model.Process process = span.getProcess();
+ int serviceId = Const.NONE;
+ String serviceName = null;
if (process != null) {
- process.getTagsList().forEach(keyValue -> {
- String key = keyValue.getKey();
- Model.ValueType valueVType = keyValue.getVType();
- switch (valueVType) {
- case STRING:
- properties.addProperty(key, keyValue.getVStr());
- break;
- case INT64:
- properties.addProperty(key, keyValue.getVInt64());
- break;
- case BOOL:
- properties.addProperty(key, keyValue.getVBool());
- break;
- case FLOAT64:
- properties.addProperty(key, keyValue.getVFloat64());
- break;
- }
- });
+ serviceName = process.getServiceName();
+ }
+ if (StringUtil.isEmpty(serviceName)) {
+ serviceName = "UNKNOWN";
+ }
+ serviceId = CoreRegisterLinker.getServiceInventoryCache().getServiceId(serviceName);
+ if (serviceId != Const.NONE) {
+ jaegerSpan.setServiceId(serviceId);
+ } else {
+ JsonObject properties = new JsonObject();
+ if (process != null) {
+ process.getTagsList().forEach(keyValue -> {
+ String key = keyValue.getKey();
+ Model.ValueType valueVType = keyValue.getVType();
+ switch (valueVType) {
+ case STRING:
+ properties.addProperty(key, keyValue.getVStr());
+ break;
+ case INT64:
+ properties.addProperty(key, keyValue.getVInt64());
+ break;
+ case BOOL:
+ properties.addProperty(key, keyValue.getVBool());
+ break;
+ case FLOAT64:
+ properties.addProperty(key, keyValue.getVFloat64());
+ break;
+ }
+ });
+ }
+ CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(serviceName, properties);
}
- CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(serviceName, properties);
- }
- long duration = span.getDuration().getNanos() / 1_000_000;
- jaegerSpan.setStartTime(Instant.ofEpochSecond(span.getStartTime().getSeconds(), span.getStartTime().getNanos()).toEpochMilli());
- jaegerSpan.setEndTime(jaegerSpan.getStartTime() + duration);
- jaegerSpan.setLatency((int)duration);
- jaegerSpan.setDataBinary(span.toByteArray());
+ long duration = span.getDuration().getNanos() / 1_000_000;
+ jaegerSpan.setStartTime(Instant.ofEpochSecond(span.getStartTime().getSeconds(), span.getStartTime().getNanos()).toEpochMilli());
+ long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(jaegerSpan.getStartTime());
+ jaegerSpan.setTimeBucket(timeBucket);
+ jaegerSpan.setEndTime(jaegerSpan.getStartTime() + duration);
+ jaegerSpan.setLatency((int)duration);
+ jaegerSpan.setDataBinary(span.toByteArray());
+ jaegerSpan.setEndpointName(span.getOperationName());
- int finalServiceId = serviceId;
- span.getTagsList().forEach(tag -> {
- String key = tag.getKey();
- if ("error".equals(key)) {
- boolean status = tag.getVBool();
- jaegerSpan.setIsError(BooleanUtils.booleanToValue(status));
- } else if ("span.kind".equals(key)) {
- String kind = tag.getVStr();
- if ("server".equals(kind) || "consumer".equals(kind)) {
- String endpointName = span.getOperationName();
- jaegerSpan.setEndpointName(endpointName);
- int endpointId = CoreRegisterLinker.getEndpointInventoryCache().getEndpointId(finalServiceId, endpointName,
- DetectPoint.SERVER.ordinal());
- if (endpointId != Const.NONE) {
- CoreRegisterLinker.getEndpointInventoryRegister().getOrCreate(finalServiceId, endpointName, DetectPoint.SERVER);
+ int finalServiceId = serviceId;
+ span.getTagsList().forEach(tag -> {
+ String key = tag.getKey();
+ if ("error".equals(key)) {
+ boolean status = tag.getVBool();
+ jaegerSpan.setIsError(BooleanUtils.booleanToValue(status));
+ } else if ("span.kind".equals(key)) {
+ String kind = tag.getVStr();
+ if ("server".equals(kind) || "consumer".equals(kind)) {
+ String endpointName = span.getOperationName();
+ jaegerSpan.setEndpointName(endpointName);
+ int endpointId = CoreRegisterLinker.getEndpointInventoryCache().getEndpointId(finalServiceId, endpointName,
+ DetectPoint.SERVER.ordinal());
+ if (endpointId != Const.NONE) {
+ CoreRegisterLinker.getEndpointInventoryRegister().getOrCreate(finalServiceId, endpointName, DetectPoint.SERVER);
+ }
}
}
- }
- });
+ });
- receiver.receive(jaegerSpan);
+ receiver.receive(jaegerSpan);
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ }
});
responseObserver.onNext(Collector.PostSpansResponse.newBuilder().build());
responseObserver.onCompleted();
}
+ private String format(ByteString bytes) {
+ Base64.Encoder encoder = Base64.getEncoder();
+ return encoder.encodeToString(bytes.toByteArray());
+ }
+
}
diff --git a/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/elasticsearch/JaegerTraceQueryEsDAO.java b/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/elasticsearch/JaegerTraceQueryEsDAO.java
index ddceb00..cf9a244 100644
--- a/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/elasticsearch/JaegerTraceQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/elasticsearch/JaegerTraceQueryEsDAO.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.storage.plugin.jaeger.elasticsearch;
import com.google.common.base.Strings;
+import com.google.protobuf.ByteString;
import io.jaegertracing.api_v2.Model;
import java.io.IOException;
import java.time.Instant;
@@ -159,7 +160,6 @@ public class JaegerTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
List<Span> spanList = new ArrayList<>();
- boolean isFirst = true;
for (SearchHit searchHit : response.getHits().getHits()) {
int serviceId = ((Number)searchHit.getSourceAsMap().get(SERVICE_ID)).intValue();
long startTime = ((Number)searchHit.getSourceAsMap().get(START_TIME)).longValue();
@@ -170,7 +170,7 @@ public class JaegerTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
Span swSpan = new Span();
- swSpan.setTraceId(jaegerSpan.getTraceId().toStringUtf8());
+ swSpan.setTraceId(format(jaegerSpan.getTraceId()));
swSpan.setEndpointName(jaegerSpan.getOperationName());
swSpan.setStartTime(startTime);
swSpan.setEndTime(endTime);
@@ -232,22 +232,21 @@ public class JaegerTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
if (serviceId != Const.NONE) {
swSpan.setServiceCode(serviceInventoryCache.get(serviceId).getName());
+ } else {
+ swSpan.setServiceCode("UNKNOWN");
}
swSpan.setSpanId(0);
swSpan.setParentSpanId(-1);
- String spanId = id(jaegerSpan.getTraceId().toStringUtf8(), jaegerSpan.getSpanId().toStringUtf8());
+ String spanId = id(format(jaegerSpan.getTraceId()), format(jaegerSpan.getSpanId()));
swSpan.setSegmentSpanId(spanId);
swSpan.setSegmentId(spanId);
- if (isFirst) {
- swSpan.setRoot(true);
- swSpan.setSegmentParentSpanId("");
- isFirst = false;
- } else {
- jaegerSpan.getReferencesList().forEach(jaegerRef -> {
+ List<Model.SpanRef> spanReferencesList = jaegerSpan.getReferencesList();
+ if (spanReferencesList.size() > 0) {
+ spanReferencesList.forEach(jaegerRef -> {
Ref ref = new Ref();
- ref.setTraceId(jaegerRef.getTraceId().toStringUtf8());
- String parentId = id(jaegerRef.getTraceId().toStringUtf8(), jaegerRef.getSpanId().toStringUtf8());
+ ref.setTraceId(format(jaegerRef.getTraceId()));
+ String parentId = id(format(jaegerRef.getTraceId()), format(jaegerRef.getSpanId()));
ref.setParentSegmentId(parentId);
ref.setType(RefType.CROSS_PROCESS);
ref.setParentSpanId(0);
@@ -255,6 +254,9 @@ public class JaegerTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
swSpan.getRefs().add(ref);
swSpan.setSegmentParentSpanId(parentId);
});
+ } else {
+ swSpan.setRoot(true);
+ swSpan.setSegmentParentSpanId("");
}
spanList.add(swSpan);
}
@@ -264,4 +266,9 @@ public class JaegerTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
private String id(String traceId, String spanId) {
return traceId + "_" + spanId;
}
+
+ private String format(ByteString bytes) {
+ Base64.Encoder encoder = Base64.getEncoder();
+ return encoder.encodeToString(bytes.toByteArray());
+ }
}
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java
index 448e745..d7ae20c 100644
--- a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java
@@ -22,6 +22,7 @@ import com.google.common.base.Strings;
import java.io.IOException;
import java.util.*;
import lombok.Setter;
+import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
@@ -158,7 +159,6 @@ public class ZipkinTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
List<org.apache.skywalking.oap.server.core.query.entity.Span> spanList = new ArrayList<>();
- boolean isFirst = true;
for (SearchHit searchHit : response.getHits().getHits()) {
int serviceId = ((Number)searchHit.getSourceAsMap().get(SERVICE_ID)).intValue();
String dataBinaryBase64 = (String)searchHit.getSourceAsMap().get(SegmentRecord.DATA_BINARY);
@@ -201,10 +201,9 @@ public class ZipkinTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
}
- if (isFirst) {
+ if (StringUtil.isEmpty(span.parentId())) {
swSpan.setRoot(true);
swSpan.setSegmentParentSpanId("");
- isFirst = false;
} else {
Ref ref = new Ref();
ref.setTraceId(span.traceId());