You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/04/02 00:09:40 UTC

[incubator-skywalking] branch jaeger-receiver updated: Finish Jaeger query tests.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch jaeger-receiver
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/jaeger-receiver by this push:
     new b61f445  Finish Jaeger query tests.
b61f445 is described below

commit b61f445a91b8db0d9e5a5635cecea8428d34b7f3
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Mon Apr 1 17:09:28 2019 -0700

    Finish Jaeger query tests.
---
 .../server/receiver/jaeger/JaegerGRPCHandler.java  | 142 +++++++++++----------
 .../elasticsearch/JaegerTraceQueryEsDAO.java       |  29 +++--
 .../elasticsearch/ZipkinTraceQueryEsDAO.java       |   5 +-
 3 files changed, 98 insertions(+), 78 deletions(-)

diff --git a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerGRPCHandler.java b/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerGRPCHandler.java
index 445fcbe..9cb5227 100644
--- a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerGRPCHandler.java
+++ b/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerGRPCHandler.java
@@ -19,13 +19,15 @@
 package org.apache.skywalking.aop.server.receiver.jaeger;
 
 import com.google.gson.JsonObject;
+import com.google.protobuf.ByteString;
 import io.grpc.stub.StreamObserver;
 import io.jaegertracing.api_v2.*;
 import java.time.Instant;
+import java.util.Base64;
 import org.apache.skywalking.apm.util.StringUtil;
 import org.apache.skywalking.oap.server.core.Const;
 import org.apache.skywalking.oap.server.core.source.*;
-import org.apache.skywalking.oap.server.library.util.BooleanUtils;
+import org.apache.skywalking.oap.server.library.util.*;
 import org.apache.skywalking.oap.server.receiver.sharing.server.CoreRegisterLinker;
 import org.apache.skywalking.oap.server.storage.plugin.jaeger.JaegerSpan;
 import org.slf4j.*;
@@ -49,81 +51,93 @@ public class JaegerGRPCHandler extends CollectorServiceGrpc.CollectorServiceImpl
         StreamObserver<Collector.PostSpansResponse> responseObserver) {
 
         request.getBatch().getSpansList().forEach(span -> {
-            if (logger.isDebugEnabled()) {
-                logger.debug(span.toString());
-            }
+            try {
+                if (logger.isDebugEnabled()) {
+                    logger.debug(span.toString());
+                }
 
-            JaegerSpan jaegerSpan = new JaegerSpan();
-            jaegerSpan.setTraceId(span.getTraceId().toStringUtf8());
-            jaegerSpan.setSpanId(span.getSpanId().toStringUtf8());
-            Model.Process process = span.getProcess();
-            int serviceId = Const.NONE;
-            String serviceName = null;
-            if (process != null) {
-                serviceName = process.getServiceName();
-            }
-            if (StringUtil.isEmpty(serviceName)) {
-                serviceName = "UNKNOWN";
-            }
-            serviceId = CoreRegisterLinker.getServiceInventoryCache().getServiceId(serviceName);
-            if (serviceId != Const.NONE) {
-                jaegerSpan.setServiceId(serviceId);
-            } else {
-                JsonObject properties = new JsonObject();
+                JaegerSpan jaegerSpan = new JaegerSpan();
+                jaegerSpan.setTraceId(format(span.getTraceId()));
+                jaegerSpan.setSpanId(format(span.getSpanId()));
+                Model.Process process = span.getProcess();
+                int serviceId = Const.NONE;
+                String serviceName = null;
                 if (process != null) {
-                    process.getTagsList().forEach(keyValue -> {
-                        String key = keyValue.getKey();
-                        Model.ValueType valueVType = keyValue.getVType();
-                        switch (valueVType) {
-                            case STRING:
-                                properties.addProperty(key, keyValue.getVStr());
-                                break;
-                            case INT64:
-                                properties.addProperty(key, keyValue.getVInt64());
-                                break;
-                            case BOOL:
-                                properties.addProperty(key, keyValue.getVBool());
-                                break;
-                            case FLOAT64:
-                                properties.addProperty(key, keyValue.getVFloat64());
-                                break;
-                        }
-                    });
+                    serviceName = process.getServiceName();
+                }
+                if (StringUtil.isEmpty(serviceName)) {
+                    serviceName = "UNKNOWN";
+                }
+                serviceId = CoreRegisterLinker.getServiceInventoryCache().getServiceId(serviceName);
+                if (serviceId != Const.NONE) {
+                    jaegerSpan.setServiceId(serviceId);
+                } else {
+                    JsonObject properties = new JsonObject();
+                    if (process != null) {
+                        process.getTagsList().forEach(keyValue -> {
+                            String key = keyValue.getKey();
+                            Model.ValueType valueVType = keyValue.getVType();
+                            switch (valueVType) {
+                                case STRING:
+                                    properties.addProperty(key, keyValue.getVStr());
+                                    break;
+                                case INT64:
+                                    properties.addProperty(key, keyValue.getVInt64());
+                                    break;
+                                case BOOL:
+                                    properties.addProperty(key, keyValue.getVBool());
+                                    break;
+                                case FLOAT64:
+                                    properties.addProperty(key, keyValue.getVFloat64());
+                                    break;
+                            }
+                        });
+                    }
+                    CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(serviceName, properties);
                 }
-                CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(serviceName, properties);
-            }
 
-            long duration = span.getDuration().getNanos() / 1_000_000;
-            jaegerSpan.setStartTime(Instant.ofEpochSecond(span.getStartTime().getSeconds(), span.getStartTime().getNanos()).toEpochMilli());
-            jaegerSpan.setEndTime(jaegerSpan.getStartTime() + duration);
-            jaegerSpan.setLatency((int)duration);
-            jaegerSpan.setDataBinary(span.toByteArray());
+                long duration = span.getDuration().getNanos() / 1_000_000;
+                jaegerSpan.setStartTime(Instant.ofEpochSecond(span.getStartTime().getSeconds(), span.getStartTime().getNanos()).toEpochMilli());
+                long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(jaegerSpan.getStartTime());
+                jaegerSpan.setTimeBucket(timeBucket);
+                jaegerSpan.setEndTime(jaegerSpan.getStartTime() + duration);
+                jaegerSpan.setLatency((int)duration);
+                jaegerSpan.setDataBinary(span.toByteArray());
+                jaegerSpan.setEndpointName(span.getOperationName());
 
-            int finalServiceId = serviceId;
-            span.getTagsList().forEach(tag -> {
-                String key = tag.getKey();
-                if ("error".equals(key)) {
-                    boolean status = tag.getVBool();
-                    jaegerSpan.setIsError(BooleanUtils.booleanToValue(status));
-                } else if ("span.kind".equals(key)) {
-                    String kind = tag.getVStr();
-                    if ("server".equals(kind) || "consumer".equals(kind)) {
-                        String endpointName = span.getOperationName();
-                        jaegerSpan.setEndpointName(endpointName);
-                        int endpointId = CoreRegisterLinker.getEndpointInventoryCache().getEndpointId(finalServiceId, endpointName,
-                            DetectPoint.SERVER.ordinal());
-                        if (endpointId != Const.NONE) {
-                            CoreRegisterLinker.getEndpointInventoryRegister().getOrCreate(finalServiceId, endpointName, DetectPoint.SERVER);
+                int finalServiceId = serviceId;
+                span.getTagsList().forEach(tag -> {
+                    String key = tag.getKey();
+                    if ("error".equals(key)) {
+                        boolean status = tag.getVBool();
+                        jaegerSpan.setIsError(BooleanUtils.booleanToValue(status));
+                    } else if ("span.kind".equals(key)) {
+                        String kind = tag.getVStr();
+                        if ("server".equals(kind) || "consumer".equals(kind)) {
+                            String endpointName = span.getOperationName();
+                            jaegerSpan.setEndpointName(endpointName);
+                            int endpointId = CoreRegisterLinker.getEndpointInventoryCache().getEndpointId(finalServiceId, endpointName,
+                                DetectPoint.SERVER.ordinal());
+                            if (endpointId != Const.NONE) {
+                                CoreRegisterLinker.getEndpointInventoryRegister().getOrCreate(finalServiceId, endpointName, DetectPoint.SERVER);
+                            }
                         }
                     }
-                }
-            });
+                });
 
-            receiver.receive(jaegerSpan);
+                receiver.receive(jaegerSpan);
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
         });
 
         responseObserver.onNext(Collector.PostSpansResponse.newBuilder().build());
         responseObserver.onCompleted();
     }
 
+    private String format(ByteString bytes) {
+        Base64.Encoder encoder = Base64.getEncoder();
+        return encoder.encodeToString(bytes.toByteArray());
+    }
+
 }
diff --git a/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/elasticsearch/JaegerTraceQueryEsDAO.java b/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/elasticsearch/JaegerTraceQueryEsDAO.java
index ddceb00..cf9a244 100644
--- a/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/elasticsearch/JaegerTraceQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/elasticsearch/JaegerTraceQueryEsDAO.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.storage.plugin.jaeger.elasticsearch;
 
 import com.google.common.base.Strings;
+import com.google.protobuf.ByteString;
 import io.jaegertracing.api_v2.Model;
 import java.io.IOException;
 import java.time.Instant;
@@ -159,7 +160,6 @@ public class JaegerTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
 
         List<Span> spanList = new ArrayList<>();
 
-        boolean isFirst = true;
         for (SearchHit searchHit : response.getHits().getHits()) {
             int serviceId = ((Number)searchHit.getSourceAsMap().get(SERVICE_ID)).intValue();
             long startTime = ((Number)searchHit.getSourceAsMap().get(START_TIME)).longValue();
@@ -170,7 +170,7 @@ public class JaegerTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
 
             Span swSpan = new Span();
 
-            swSpan.setTraceId(jaegerSpan.getTraceId().toStringUtf8());
+            swSpan.setTraceId(format(jaegerSpan.getTraceId()));
             swSpan.setEndpointName(jaegerSpan.getOperationName());
             swSpan.setStartTime(startTime);
             swSpan.setEndTime(endTime);
@@ -232,22 +232,21 @@ public class JaegerTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
 
             if (serviceId != Const.NONE) {
                 swSpan.setServiceCode(serviceInventoryCache.get(serviceId).getName());
+            } else {
+                swSpan.setServiceCode("UNKNOWN");
             }
             swSpan.setSpanId(0);
             swSpan.setParentSpanId(-1);
-            String spanId = id(jaegerSpan.getTraceId().toStringUtf8(), jaegerSpan.getSpanId().toStringUtf8());
+            String spanId = id(format(jaegerSpan.getTraceId()), format(jaegerSpan.getSpanId()));
             swSpan.setSegmentSpanId(spanId);
             swSpan.setSegmentId(spanId);
 
-            if (isFirst) {
-                swSpan.setRoot(true);
-                swSpan.setSegmentParentSpanId("");
-                isFirst = false;
-            } else {
-                jaegerSpan.getReferencesList().forEach(jaegerRef -> {
+            List<Model.SpanRef> spanReferencesList = jaegerSpan.getReferencesList();
+            if (spanReferencesList.size() > 0) {
+                spanReferencesList.forEach(jaegerRef -> {
                     Ref ref = new Ref();
-                    ref.setTraceId(jaegerRef.getTraceId().toStringUtf8());
-                    String parentId = id(jaegerRef.getTraceId().toStringUtf8(), jaegerRef.getSpanId().toStringUtf8());
+                    ref.setTraceId(format(jaegerRef.getTraceId()));
+                    String parentId = id(format(jaegerRef.getTraceId()), format(jaegerRef.getSpanId()));
                     ref.setParentSegmentId(parentId);
                     ref.setType(RefType.CROSS_PROCESS);
                     ref.setParentSpanId(0);
@@ -255,6 +254,9 @@ public class JaegerTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
                     swSpan.getRefs().add(ref);
                     swSpan.setSegmentParentSpanId(parentId);
                 });
+            } else {
+                swSpan.setRoot(true);
+                swSpan.setSegmentParentSpanId("");
             }
             spanList.add(swSpan);
         }
@@ -264,4 +266,9 @@ public class JaegerTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
     private String id(String traceId, String spanId) {
         return traceId + "_" + spanId;
     }
+
+    private String format(ByteString bytes) {
+        Base64.Encoder encoder = Base64.getEncoder();
+        return encoder.encodeToString(bytes.toByteArray());
+    }
 }
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java
index 448e745..d7ae20c 100644
--- a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java
@@ -22,6 +22,7 @@ import com.google.common.base.Strings;
 import java.io.IOException;
 import java.util.*;
 import lombok.Setter;
+import org.apache.skywalking.apm.util.StringUtil;
 import org.apache.skywalking.oap.server.core.Const;
 import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
 import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
@@ -158,7 +159,6 @@ public class ZipkinTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
 
         List<org.apache.skywalking.oap.server.core.query.entity.Span> spanList = new ArrayList<>();
 
-        boolean isFirst = true;
         for (SearchHit searchHit : response.getHits().getHits()) {
             int serviceId = ((Number)searchHit.getSourceAsMap().get(SERVICE_ID)).intValue();
             String dataBinaryBase64 = (String)searchHit.getSourceAsMap().get(SegmentRecord.DATA_BINARY);
@@ -201,10 +201,9 @@ public class ZipkinTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
 
             }
 
-            if (isFirst) {
+            if (StringUtil.isEmpty(span.parentId())) {
                 swSpan.setRoot(true);
                 swSpan.setSegmentParentSpanId("");
-                isFirst = false;
             } else {
                 Ref ref = new Ref();
                 ref.setTraceId(span.traceId());