You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/03/28 19:01:57 UTC
[incubator-skywalking] branch zipkin-trace updated: Fix missing
fields in storage.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch zipkin-trace
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/zipkin-trace by this push:
new 48cf99a Fix missing fields in storage.
48cf99a is described below
commit 48cf99a4fc0c4d7cdee464637d02858aeb935ea2
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Thu Mar 28 12:01:44 2019 -0700
Fix missing fields in storage.
---
.../receiver/zipkin/ZipkinReceiverProvider.java | 10 ++++-
.../server/receiver/zipkin/handler/SpanEncode.java | 45 ++++++++++++++++++++++
.../receiver/zipkin/handler/SpanProcessor.java | 6 ++-
.../zipkin/handler/SpanV1JettyHandler.java | 6 ++-
.../zipkin/handler/SpanV2JettyHandler.java | 6 ++-
.../server/receiver/zipkin/trace/SpanForward.java | 19 +++++++--
.../src/main/resources/application.yml | 34 ++++++++++------
.../server/storage/plugin/zipkin/ZipkinSpan.java | 6 +--
.../storage/plugin/zipkin/ZipkinSpanRecord.java | 10 ++---
.../plugin/zipkin/ZipkinSpanRecordDispatcher.java | 2 +-
.../StorageModuleElasticsearchProvider.java | 2 +-
11 files changed, 114 insertions(+), 32 deletions(-)
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java
index d9e55ef..50e1e63 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.receiver.zipkin;
+import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
@@ -85,6 +86,13 @@ public class ZipkinReceiverProvider extends ModuleProvider {
}
@Override public String[] requiredModules() {
- return new String[] {TraceModule.NAME};
+ if (config.isNeedAnalysis()) {
+ return new String[] {TraceModule.NAME};
+ } else {
+ /**
+ * In pure trace status, we don't need the trace receiver.
+ */
+ return new String[] {CoreModule.NAME};
+ }
}
}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanEncode.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanEncode.java
new file mode 100644
index 0000000..ee5a91c
--- /dev/null
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanEncode.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.zipkin.handler;
+
+/**
+ * @author wusheng
+ */
+public class SpanEncode {
+ public static final int PROTO3 = 1;
+ public static final int JSON_V2 = 2;
+ public static final int THRIFT = 3;
+ public static final int JSON_V1 = 4;
+
+ public static boolean isProto3(int encode) {
+ return PROTO3 == encode;
+ }
+
+ public static boolean isJsonV2(int encode) {
+ return JSON_V2 == encode;
+ }
+
+ public static boolean isThrift(int encode) {
+ return THRIFT == encode;
+ }
+
+ public static boolean isJsonV1(int encode) {
+ return JSON_V1 == encode;
+ }
+}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java
index a033f4c..30a713f 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java
@@ -34,13 +34,15 @@ public class SpanProcessor {
private SourceReceiver receiver;
private ServiceInventoryCache serviceInventoryCache;
private EndpointInventoryCache endpointInventoryCache;
+ private int encode;
public SpanProcessor(SourceReceiver receiver,
ServiceInventoryCache serviceInventoryCache,
- EndpointInventoryCache endpointInventoryCache) {
+ EndpointInventoryCache endpointInventoryCache, int encode) {
this.receiver = receiver;
this.serviceInventoryCache = serviceInventoryCache;
this.endpointInventoryCache = endpointInventoryCache;
+ this.encode = encode;
}
void convert(ZipkinReceiverConfig config, SpanBytesDecoder decoder, HttpServletRequest request) throws IOException {
@@ -59,7 +61,7 @@ public class SpanProcessor {
ZipkinSkyWalkingTransfer transfer = new ZipkinSkyWalkingTransfer();
transfer.doTransfer(config, spanList);
} else {
- SpanForward forward = new SpanForward(config, receiver, serviceInventoryCache, endpointInventoryCache);
+ SpanForward forward = new SpanForward(config, receiver, serviceInventoryCache, endpointInventoryCache, encode);
forward.send(spanList);
}
}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java
index 8f3634f..4ab8b8d 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java
@@ -57,11 +57,13 @@ public class SpanV1JettyHandler extends JettyHandler {
try {
String type = request.getHeader("Content-Type");
- SpanBytesDecoder decoder = type != null && type.contains("/x-thrift")
+ int encode = type != null && type.contains("/x-thrift") ? SpanEncode.THRIFT : SpanEncode.JSON_V1;
+
+ SpanBytesDecoder decoder = SpanEncode.isThrift(encode)
? SpanBytesDecoder.THRIFT
: SpanBytesDecoder.JSON_V1;
- SpanProcessor processor = new SpanProcessor(sourceReceiver, serviceInventoryCache, endpointInventoryCache);
+ SpanProcessor processor = new SpanProcessor(sourceReceiver, serviceInventoryCache, endpointInventoryCache, encode);
processor.convert(config, decoder, request);
response.setStatus(202);
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java
index 92cf3e7..a904c62 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java
@@ -60,11 +60,13 @@ public class SpanV2JettyHandler extends JettyHandler {
try {
String type = request.getHeader("Content-Type");
- SpanBytesDecoder decoder = type != null && type.contains("/x-protobuf")
+ int encode = type != null && type.contains("/x-protobuf") ? SpanEncode.PROTO3 : SpanEncode.JSON_V2;
+
+ SpanBytesDecoder decoder = SpanEncode.isProto3(encode)
? SpanBytesDecoder.PROTO3
: SpanBytesDecoder.JSON_V2;
- SpanProcessor processor = new SpanProcessor(sourceReceiver, serviceInventoryCache, endpointInventoryCache);
+ SpanProcessor processor = new SpanProcessor(sourceReceiver, serviceInventoryCache, endpointInventoryCache, encode);
processor.convert(config, decoder, request);
response.setStatus(202);
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java
index 2665169..0c28a7d 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java
@@ -23,10 +23,12 @@ import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.source.*;
-import org.apache.skywalking.oap.server.library.util.BooleanUtils;
+import org.apache.skywalking.oap.server.library.util.*;
import org.apache.skywalking.oap.server.receiver.zipkin.*;
+import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanEncode;
import org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpan;
import zipkin2.Span;
+import zipkin2.codec.SpanBytesEncoder;
/**
* @author wusheng
@@ -36,14 +38,16 @@ public class SpanForward {
private SourceReceiver receiver;
private ServiceInventoryCache serviceInventoryCache;
private EndpointInventoryCache endpointInventoryCache;
+ private int encode;
public SpanForward(ZipkinReceiverConfig config, SourceReceiver receiver,
ServiceInventoryCache serviceInventoryCache,
- EndpointInventoryCache endpointInventoryCache) {
+ EndpointInventoryCache endpointInventoryCache, int encode) {
this.config = config;
this.receiver = receiver;
this.serviceInventoryCache = serviceInventoryCache;
this.endpointInventoryCache = endpointInventoryCache;
+ this.encode = encode;
}
public void send(List<Span> spanList) {
@@ -84,10 +88,17 @@ public class SpanForward {
if (!StringUtil.isEmpty(spanName)) {
zipkinSpan.setEndpointName(spanName);
}
+ long timestampAsLong = span.timestampAsLong();
+ zipkinSpan.setStartTime(timestampAsLong);
+ if (timestampAsLong != 0) {
+ long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(zipkinSpan.getStartTime());
+ zipkinSpan.setTimeBucket(timeBucket);
+ }
- zipkinSpan.setStartTime(span.timestampAsLong());
- zipkinSpan.setEndTime(span.timestampAsLong() + span.durationAsLong());
+ zipkinSpan.setEndTime(timestampAsLong + span.durationAsLong());
zipkinSpan.setIsError(BooleanUtils.booleanToValue(false));
+ zipkinSpan.setEncode(SpanEncode.PROTO3);
+ zipkinSpan.setDataBinary(SpanBytesEncoder.PROTO3.encode(span));
receiver.receive(zipkinSpan);
});
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index 4ddffed..c421504 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -55,7 +55,24 @@ core:
dayMetricsDataTTL: ${SW_CORE_DAY_METRIC_DATA_TTL:45} # Unit is day
monthMetricsDataTTL: ${SW_CORE_MONTH_METRIC_DATA_TTL:18} # Unit is month
storage:
- elasticsearch:
+# elasticsearch:
+# nameSpace: ${SW_NAMESPACE:""}
+# clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
+# user: ${SW_ES_USER:""}
+# password: ${SW_ES_PASSWORD:""}
+# indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
+# indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
+# # Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
+# bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:2000} # Execute the bulk every 2000 requests
+# bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb
+# flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
+# concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
+# h2:
+# driver: ${SW_STORAGE_H2_DRIVER:org.h2.jdbcx.JdbcDataSource}
+# url: ${SW_STORAGE_H2_URL:jdbc:h2:mem:skywalking-oap-db}
+# user: ${SW_STORAGE_H2_USER:sa}
+# mysql:
+ zipkin-elasticsearch:
nameSpace: ${SW_NAMESPACE:""}
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
user: ${SW_ES_USER:""}
@@ -67,11 +84,6 @@ storage:
bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
-# h2:
-# driver: ${SW_STORAGE_H2_DRIVER:org.h2.jdbcx.JdbcDataSource}
-# url: ${SW_STORAGE_H2_URL:jdbc:h2:mem:skywalking-oap-db}
-# user: ${SW_STORAGE_H2_USER:sa}
-# mysql:
receiver-sharing-server:
default:
receiver-register:
@@ -98,11 +110,11 @@ istio-telemetry:
default:
envoy-metric:
default:
-#receiver_zipkin:
-# default:
-# host: ${SW_RECEIVER_ZIPKIN_HOST:0.0.0.0}
-# port: ${SW_RECEIVER_ZIPKIN_PORT:9411}
-# contextPath: ${SW_RECEIVER_ZIPKIN_CONTEXT_PATH:/}
+receiver_zipkin:
+ default:
+ host: ${SW_RECEIVER_ZIPKIN_HOST:0.0.0.0}
+ port: ${SW_RECEIVER_ZIPKIN_PORT:9411}
+ contextPath: ${SW_RECEIVER_ZIPKIN_CONTEXT_PATH:/}
query:
graphql:
path: ${SW_QUERY_GRAPHQL_PATH:/graphql}
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpan.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpan.java
index 0072d9f..3cf99f6 100644
--- a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpan.java
+++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpan.java
@@ -30,11 +30,11 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.*;
public class ZipkinSpan extends Source {
@Override public int scope() {
- return DefaultScopeDefine.SEGMENT;
+ return DefaultScopeDefine.ZIPKIN_SPAN;
}
@Override public String getEntityId() {
- return spanId;
+ return traceId + spanId;
}
@Setter @Getter private String traceId;
@@ -48,5 +48,5 @@ public class ZipkinSpan extends Source {
@Setter @Getter private int latency;
@Setter @Getter private int isError;
@Setter @Getter private byte[] dataBinary;
- @Setter @Getter private int version;
+ @Setter @Getter private int encode;
}
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java
index 3b97139..531443b 100644
--- a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java
+++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java
@@ -32,7 +32,7 @@ import org.apache.skywalking.oap.server.library.util.CollectionUtils;
@RecordType
@StorageEntity(name = ZipkinSpanRecord.INDEX_NAME, builder = ZipkinSpanRecord.Builder.class, sourceScopeId = DefaultScopeDefine.ZIPKIN_SPAN)
public class ZipkinSpanRecord extends Record {
- public static final String INDEX_NAME = "segment";
+ public static final String INDEX_NAME = "zipkin_span";
public static final String TRACE_ID = "trace_id";
public static final String SPAN_ID = "span_id";
public static final String SERVICE_ID = "service_id";
@@ -44,7 +44,7 @@ public class ZipkinSpanRecord extends Record {
public static final String LATENCY = "latency";
public static final String IS_ERROR = "is_error";
public static final String DATA_BINARY = "data_binary";
- public static final String VERSION = "version";
+ public static final String ENCODE = "encode";
@Setter @Getter @Column(columnName = TRACE_ID) @IDColumn private String traceId;
@Setter @Getter @Column(columnName = SPAN_ID) @IDColumn private String spanId;
@@ -57,7 +57,7 @@ public class ZipkinSpanRecord extends Record {
@Setter @Getter @Column(columnName = LATENCY) @IDColumn private int latency;
@Setter @Getter @Column(columnName = IS_ERROR) @IDColumn private int isError;
@Setter @Getter @Column(columnName = DATA_BINARY) @IDColumn private byte[] dataBinary;
- @Setter @Getter @Column(columnName = VERSION) @IDColumn private int version;
+ @Setter @Getter @Column(columnName = ENCODE) @IDColumn private int encode;
@Override public String id() {
return traceId + "-" + spanId;
@@ -83,7 +83,7 @@ public class ZipkinSpanRecord extends Record {
} else {
map.put(DATA_BINARY, new String(Base64.getEncoder().encode(storageData.getDataBinary())));
}
- map.put(VERSION, storageData.getVersion());
+ map.put(ENCODE, storageData.getEncode());
return map;
}
@@ -105,7 +105,7 @@ public class ZipkinSpanRecord extends Record {
} else {
record.setDataBinary(Base64.getDecoder().decode((String)dbMap.get(DATA_BINARY)));
}
- record.setVersion(((Number)dbMap.get(VERSION)).intValue());
+ record.setEncode(((Number)dbMap.get(ENCODE)).intValue());
return record;
}
}
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecordDispatcher.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecordDispatcher.java
index 71aa9b2..eed4ca1 100644
--- a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecordDispatcher.java
+++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecordDispatcher.java
@@ -41,7 +41,7 @@ public class ZipkinSpanRecordDispatcher implements SourceDispatcher<ZipkinSpan>
segment.setIsError(source.getIsError());
segment.setDataBinary(source.getDataBinary());
segment.setTimeBucket(source.getTimeBucket());
- segment.setVersion(source.getVersion());
+ segment.setEncode(source.getEncode());
RecordProcess.INSTANCE.in(segment);
}
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/StorageModuleElasticsearchProvider.java
index 472d01a..cabd17c 100644
--- a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -49,7 +49,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
@Override
public String name() {
- return "elasticsearch";
+ return "zipkin-elasticsearch";
}
@Override