You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by pe...@apache.org on 2019/03/30 17:36:44 UTC
[incubator-skywalking] branch master updated: Support Backend acts
as pure Zipkin collector (#2424)
This is an automated email from the ASF dual-hosted git repository.
pengys pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 601b472 Support Backend acts as pure Zipkin collector (#2424)
601b472 is described below
commit 601b472ef6ebe5c0d026a5afcc5e6856683c0328
Author: 吴晟 Wu Sheng <wu...@foxmail.com>
AuthorDate: Sat Mar 30 10:36:36 2019 -0700
Support Backend acts as pure Zipkin collector (#2424)
* Codebase for zipkin span persistence.
* Fix missing fields in storage.
* Miss the latency field.
* Finish some tests.
* Fix wrong latency.
* Finish doc and reset application.yml
* Make the description more clear.
---
docs/en/setup/backend/backend-receivers.md | 28 ++-
docs/en/setup/backend/backend-storage.md | 20 ++
.../oap/server/core/query/TraceQueryService.java | 20 +-
.../oap/server/core/query/entity/KeyValue.java | 8 +
.../oap/server/core/source/DefaultScopeDefine.java | 1 +
.../server/core/storage/query/ITraceQueryDAO.java | 8 +
.../zipkin-receiver-plugin/pom.xml | 5 +
.../server/receiver/zipkin/CoreRegisterLinker.java | 11 +-
.../receiver/zipkin/ZipkinReceiverConfig.java | 47 +----
.../receiver/zipkin/ZipkinReceiverProvider.java | 25 ++-
.../{ => analysis}/Receiver2AnalysisBridge.java | 8 +-
.../ZipkinSkyWalkingTransfer.java} | 43 +---
.../{ => analysis}/ZipkinTraceOSInfoBuilder.java | 2 +-
.../zipkin/{ => analysis}/cache/CacheFactory.java | 4 +-
.../zipkin/{ => analysis}/cache/ISpanCache.java | 2 +-
.../cache/caffeine/CaffeineSpanCache.java | 8 +-
.../{ => analysis}/data/SkyWalkingTrace.java | 2 +-
.../zipkin/{ => analysis}/data/ZipkinTrace.java | 2 +-
.../{ => analysis}/transform/SegmentBuilder.java | 7 +-
.../{ => analysis}/transform/SegmentListener.java | 4 +-
.../transform/Zipkin2SkyWalkingTransfer.java | 6 +-
.../SpanEncode.java} | 29 ++-
.../receiver/zipkin/handler/SpanProcessor.java | 46 +++--
.../zipkin/handler/SpanV1JettyHandler.java | 29 ++-
.../zipkin/handler/SpanV2JettyHandler.java | 25 ++-
.../server/receiver/zipkin/trace/SpanForward.java | 109 ++++++++++
.../transform/SpringSleuthSegmentBuilderTest.java | 4 +-
oap-server/server-starter/pom.xml | 5 +
oap-server/server-storage-plugin/pom.xml | 1 +
.../StorageModuleElasticsearchConfig.java | 4 +-
.../StorageModuleElasticsearchProvider.java | 4 +-
.../elasticsearch/query/TraceQueryEsDAO.java | 4 +
.../plugin/jdbc/h2/dao/H2TraceQueryDAO.java | 4 +
.../storage-zipkin-plugin}/pom.xml | 19 +-
.../server/storage/plugin/zipkin/ZipkinSpan.java | 52 +++++
.../storage/plugin/zipkin/ZipkinSpanRecord.java | 112 +++++++++++
.../plugin/zipkin/ZipkinSpanRecordDispatcher.java | 48 +++++
.../ZipkinStorageModuleElasticsearchProvider.java | 57 ++++++
.../elasticsearch/ZipkinTraceQueryEsDAO.java | 222 +++++++++++++++++++++
...alking.oap.server.library.module.ModuleProvider | 19 ++
40 files changed, 867 insertions(+), 187 deletions(-)
diff --git a/docs/en/setup/backend/backend-receivers.md b/docs/en/setup/backend/backend-receivers.md
index 237240c..a4302a9 100644
--- a/docs/en/setup/backend/backend-receivers.md
+++ b/docs/en/setup/backend/backend-receivers.md
@@ -11,8 +11,7 @@ We have following receivers, and `default` implementors are provided in our Apac
1. **receiver-jvm**. gRPC services accept JVM metric data.
1. **istio-telemetry**. Istio telemetry is from Istio official bypass adaptor, this receiver match its gRPC services.
1. **envoy-metric**. Envoy `metrics_service` supported by this receiver. OAL script support all GAUGE type metrics.
-1. **receiver_zipkin**. HTTP service accepts Span in Zipkin v1 and v2 formats. Notice, this receiver only
-works as expected in backend single node mode. Cluster mode is not supported. Welcome anyone to improve this.
+1. **receiver_zipkin**. See [details](#zipkin-receiver).
The sample settings of these receivers should be already in default `application.yml`, and also list here
```yaml
@@ -59,4 +58,27 @@ receiver-sharing-server:
```
Notice, if you add these settings, make sure they are not as same as core module,
-because gRPC/HTTP servers of core are still used for UI and OAP internal communications.
\ No newline at end of file
+because gRPC/HTTP servers of core are still used for UI and OAP internal communications.
+
+## Zipkin receiver
+Zipkin receiver could work in two different mode.
+1. Tracing mode(default). Tracing mode is that, skywalking OAP acts like zipkin collector,
+fully supports Zipkin v1/v2 formats through HTTP service,
+also provide persistence and query in skywalking UI.
+But it wouldn't analysis metric from them. In most case, I suggest you could use this feature, when metrics come from service mesh.
+Notice, in this mode, Zipkin receiver requires `zipkin-elasticsearch` storage implementation active.
+Read [this](backend-storage.md#elasticsearch-6-with-zipkin-trace-extension) to know
+how to active.
+1. Analysis mode(Not production ready), receive Zipkin v1/v2 formats through HTTP service. Transform the trace to skywalking
+native format, and analysis like skywalking trace. This feature can't work in production env right now,
+because of Zipkin tag/endpoint value unpredictable, we can't make sure it fits production env requirements.
+
+Active `analysis mode`, you should set `needAnalysis` config.
+```yaml
+receiver_zipkin:
+ default:
+ host: ${SW_RECEIVER_ZIPKIN_HOST:0.0.0.0}
+ port: ${SW_RECEIVER_ZIPKIN_PORT:9411}
+ contextPath: ${SW_RECEIVER_ZIPKIN_CONTEXT_PATH:/}
+ needAnalysis: true
+```
\ No newline at end of file
diff --git a/docs/en/setup/backend/backend-storage.md b/docs/en/setup/backend/backend-storage.md
index b9baa3e..f769690 100644
--- a/docs/en/setup/backend/backend-storage.md
+++ b/docs/en/setup/backend/backend-storage.md
@@ -49,6 +49,26 @@ storage:
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
```
+### ElasticSearch 6 with Zipkin trace extension
+This implementation shares most of `elasticsearch`, just extend to support zipkin span storage.
+It has all same configs.
+```yaml
+storage:
+ zipkin-elasticsearch:
+ nameSpace: ${SW_NAMESPACE:""}
+ clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
+ user: ${SW_ES_USER:""}
+ password: ${SW_ES_PASSWORD:""}
+ indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
+ indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
+ # Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
+ bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:2000} # Execute the bulk every 2000 requests
+ bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb
+ flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
+ concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
+```
+
+
### About Namespace
When namespace is set, names of all indexes in ElasticSearch will use it as prefix.
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java
index b735525..d0a9df7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java
@@ -103,14 +103,18 @@ public class TraceQueryService implements Service {
Trace trace = new Trace();
List<SegmentRecord> segmentRecords = getTraceQueryDAO().queryByTraceId(traceId);
- for (SegmentRecord segment : segmentRecords) {
- if (nonNull(segment)) {
- if (segment.getVersion() == 2) {
- SegmentObject segmentObject = SegmentObject.parseFrom(segment.getDataBinary());
- trace.getSpans().addAll(buildSpanV2List(traceId, segment.getSegmentId(), segment.getServiceId(), segmentObject.getSpansList()));
- } else {
- TraceSegmentObject segmentObject = TraceSegmentObject.parseFrom(segment.getDataBinary());
- trace.getSpans().addAll(buildSpanList(traceId, segment.getSegmentId(), segment.getServiceId(), segmentObject.getSpansList()));
+ if (segmentRecords.isEmpty()) {
+ trace.getSpans().addAll(getTraceQueryDAO().doFlexibleTraceQuery(traceId));
+ } else {
+ for (SegmentRecord segment : segmentRecords) {
+ if (nonNull(segment)) {
+ if (segment.getVersion() == 2) {
+ SegmentObject segmentObject = SegmentObject.parseFrom(segment.getDataBinary());
+ trace.getSpans().addAll(buildSpanV2List(traceId, segment.getSegmentId(), segment.getServiceId(), segmentObject.getSpansList()));
+ } else {
+ TraceSegmentObject segmentObject = TraceSegmentObject.parseFrom(segment.getDataBinary());
+ trace.getSpans().addAll(buildSpanList(traceId, segment.getSegmentId(), segment.getServiceId(), segmentObject.getSpansList()));
+ }
}
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/KeyValue.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/KeyValue.java
index 46e355e..b6cbd95 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/KeyValue.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/KeyValue.java
@@ -28,4 +28,12 @@ import lombok.*;
public class KeyValue {
private String key;
private String value;
+
+ public KeyValue(String key, String value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public KeyValue() {
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
index 78b3565..aec1665 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
@@ -59,6 +59,7 @@ public class DefaultScopeDefine {
public static final int SERVICE_INSTANCE_CLR_GC = 20;
public static final int SERVICE_INSTANCE_CLR_THREAD = 21;
public static final int ENVOY_INSTANCE_METRIC = 22;
+ public static final int ZIPKIN_SPAN = 23;
/**
* Catalog of scope, the indicator processor could use this to group all generated indicators by oal tool.
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java
index fc98d03..6517d2f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java
@@ -34,4 +34,12 @@ public interface ITraceQueryDAO extends Service {
int limit, int from, TraceState traceState, QueryOrder queryOrder) throws IOException;
List<SegmentRecord> queryByTraceId(String traceId) throws IOException;
+
+ /**
+ * This method gives more flexible for unnative
+ * @param traceId
+ * @return
+ * @throws IOException
+ */
+ List<Span> doFlexibleTraceQuery(String traceId) throws IOException;
}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml
index d2adbc1..a332f73 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml
@@ -37,6 +37,11 @@
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
+ <artifactId>storage-zipkin-plugin</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-register-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/CoreRegisterLinker.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/CoreRegisterLinker.java
index 2459199..57e4bc7 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/CoreRegisterLinker.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/CoreRegisterLinker.java
@@ -19,14 +19,14 @@
package org.apache.skywalking.oap.server.receiver.zipkin;
import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
+import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
public class CoreRegisterLinker {
private static volatile ModuleManager MODULE_MANAGER;
private static volatile IServiceInventoryRegister SERVICE_INVENTORY_REGISTER;
private static volatile IServiceInstanceInventoryRegister SERVICE_INSTANCE_INVENTORY_REGISTER;
+ private static volatile IEndpointInventoryRegister ENDPOINT_INVENTORY_REGISTER;
public static void setModuleManager(ModuleManager moduleManager) {
CoreRegisterLinker.MODULE_MANAGER = moduleManager;
@@ -45,4 +45,11 @@ public class CoreRegisterLinker {
}
return SERVICE_INSTANCE_INVENTORY_REGISTER;
}
+
+ public static IEndpointInventoryRegister getEndpointInventoryRegister() {
+ if (ENDPOINT_INVENTORY_REGISTER == null) {
+ ENDPOINT_INVENTORY_REGISTER = MODULE_MANAGER.find(CoreModule.NAME).provider().getService(IEndpointInventoryRegister.class);
+ }
+ return ENDPOINT_INVENTORY_REGISTER;
+ }
}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java
index a5e14af..cf9ff22 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java
@@ -18,57 +18,20 @@
package org.apache.skywalking.oap.server.receiver.zipkin;
+import lombok.*;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
/**
* @author wusheng
*/
+@Setter
+@Getter
public class ZipkinReceiverConfig extends ModuleConfig {
private String host;
private int port;
private String contextPath;
-
private int expireTime = 20;
-
private int maxCacheSize = 1_000_000;
-
- public String getHost() {
- return host;
- }
-
- public void setHost(String host) {
- this.host = host;
- }
-
- public int getPort() {
- return port;
- }
-
- public void setPort(int port) {
- this.port = port;
- }
-
- public String getContextPath() {
- return contextPath;
- }
-
- public void setContextPath(String contextPath) {
- this.contextPath = contextPath;
- }
-
- public int getExpireTime() {
- return expireTime;
- }
-
- public void setExpireTime(int expireTime) {
- this.expireTime = expireTime;
- }
-
- public int getMaxCacheSize() {
- return maxCacheSize;
- }
-
- public void setMaxCacheSize(int maxCacheSize) {
- this.maxCacheSize = maxCacheSize;
- }
+ private boolean needAnalysis = false;
+ private boolean registerZipkinEndpoint = true;
}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java
index 2eb2079..50e1e63 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.receiver.zipkin;
+import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
@@ -27,9 +28,10 @@ import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.*;
import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanV1JettyHandler;
import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanV2JettyHandler;
-import org.apache.skywalking.oap.server.receiver.zipkin.transform.Zipkin2SkyWalkingTransfer;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.Zipkin2SkyWalkingTransfer;
/**
* @author wusheng
@@ -65,12 +67,14 @@ public class ZipkinReceiverProvider extends ModuleProvider {
jettyServer = new JettyServer(config.getHost(), config.getPort(), config.getContextPath());
jettyServer.initialize();
- jettyServer.addHandler(new SpanV1JettyHandler(config));
- jettyServer.addHandler(new SpanV2JettyHandler(config));
+ jettyServer.addHandler(new SpanV1JettyHandler(config, getManager()));
+ jettyServer.addHandler(new SpanV2JettyHandler(config, getManager()));
- ISegmentParserService segmentParseService = getManager().find(TraceModule.NAME).provider().getService(ISegmentParserService.class);
- Receiver2AnalysisBridge bridge = new Receiver2AnalysisBridge(segmentParseService);
- Zipkin2SkyWalkingTransfer.INSTANCE.addListener(bridge);
+ if (config.isNeedAnalysis()) {
+ ISegmentParserService segmentParseService = getManager().find(TraceModule.NAME).provider().getService(ISegmentParserService.class);
+ Receiver2AnalysisBridge bridge = new Receiver2AnalysisBridge(segmentParseService);
+ Zipkin2SkyWalkingTransfer.INSTANCE.addListener(bridge);
+ }
}
@Override public void notifyAfterCompleted() throws ModuleStartException {
@@ -82,6 +86,13 @@ public class ZipkinReceiverProvider extends ModuleProvider {
}
@Override public String[] requiredModules() {
- return new String[] {TraceModule.NAME};
+ if (config.isNeedAnalysis()) {
+ return new String[] {TraceModule.NAME};
+ } else {
+ /**
+ * In pure trace status, we don't need the trace receiver.
+ */
+ return new String[] {CoreModule.NAME};
+ }
}
}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/Receiver2AnalysisBridge.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/Receiver2AnalysisBridge.java
similarity index 82%
rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/Receiver2AnalysisBridge.java
rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/Receiver2AnalysisBridge.java
index 1381da3..53051e5 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/Receiver2AnalysisBridge.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/Receiver2AnalysisBridge.java
@@ -16,12 +16,12 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.zipkin;
+package org.apache.skywalking.oap.server.receiver.zipkin.analysis;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService;
-import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace;
-import org.apache.skywalking.oap.server.receiver.zipkin.transform.SegmentListener;
-import org.apache.skywalking.oap.server.receiver.zipkin.transform.Zipkin2SkyWalkingTransfer;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.SegmentListener;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.Zipkin2SkyWalkingTransfer;
/**
* Send the segments to Analysis module, like receiving segments from native SkyWalking agents.
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/ZipkinSkyWalkingTransfer.java
similarity index 50%
copy from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java
copy to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/ZipkinSkyWalkingTransfer.java
index 48774f9..c4ec894 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/ZipkinSkyWalkingTransfer.java
@@ -16,34 +16,15 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.zipkin.handler;
+package org.apache.skywalking.oap.server.receiver.zipkin.analysis;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
import java.util.List;
-import java.util.zip.GZIPInputStream;
-import javax.servlet.http.HttpServletRequest;
-import org.apache.skywalking.oap.server.receiver.zipkin.CoreRegisterLinker;
-import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
-import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinTraceOSInfoBuilder;
-import org.apache.skywalking.oap.server.receiver.zipkin.cache.CacheFactory;
+import org.apache.skywalking.oap.server.receiver.zipkin.*;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.CacheFactory;
import zipkin2.Span;
-import zipkin2.codec.SpanBytesDecoder;
-
-public class SpanProcessor {
- void convert(ZipkinReceiverConfig config, SpanBytesDecoder decoder, HttpServletRequest request) throws IOException {
- InputStream inputStream = getInputStream(request);
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- byte[] buffer = new byte[2048];
- int readCntOnce;
-
- while ((readCntOnce = inputStream.read(buffer)) >= 0) {
- out.write(buffer, 0, readCntOnce);
- }
-
- List<Span> spanList = decoder.decodeList(out.toByteArray());
+public class ZipkinSkyWalkingTransfer {
+ public void doTransfer(ZipkinReceiverConfig config, List<Span> spanList) {
spanList.forEach(span -> {
// In Zipkin, the local service name represents the application owner.
String applicationCode = span.localServiceName();
@@ -59,18 +40,4 @@ public class SpanProcessor {
CacheFactory.INSTANCE.get(config).addSpan(span);
});
}
-
- private InputStream getInputStream(HttpServletRequest request) throws IOException {
- InputStream requestInStream;
-
- String headEncoding = request.getHeader("accept-encoding");
- if (headEncoding != null && (headEncoding.indexOf("gzip") != -1)) {
- requestInStream = new GZIPInputStream(request.getInputStream());
- } else {
- requestInStream = request.getInputStream();
- }
-
- return requestInStream;
- }
-
}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinTraceOSInfoBuilder.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/ZipkinTraceOSInfoBuilder.java
similarity index 94%
copy from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinTraceOSInfoBuilder.java
copy to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/ZipkinTraceOSInfoBuilder.java
index 451ba53..7d286f8 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinTraceOSInfoBuilder.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/ZipkinTraceOSInfoBuilder.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.zipkin;
+package org.apache.skywalking.oap.server.receiver.zipkin.analysis;
import com.google.gson.JsonObject;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/CacheFactory.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/CacheFactory.java
similarity index 89%
rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/CacheFactory.java
rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/CacheFactory.java
index c4d5b06..8893001 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/CacheFactory.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/CacheFactory.java
@@ -16,10 +16,10 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.zipkin.cache;
+package org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
-import org.apache.skywalking.oap.server.receiver.zipkin.cache.caffeine.CaffeineSpanCache;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.caffeine.CaffeineSpanCache;
/**
* @author wusheng
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/ISpanCache.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/ISpanCache.java
similarity index 92%
rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/ISpanCache.java
rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/ISpanCache.java
index b122bcf..0f9f3e4 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/ISpanCache.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/ISpanCache.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.zipkin.cache;
+package org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache;
import zipkin2.Span;
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/caffeine/CaffeineSpanCache.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/caffeine/CaffeineSpanCache.java
similarity index 91%
rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/caffeine/CaffeineSpanCache.java
rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/caffeine/CaffeineSpanCache.java
index c42e710..d8dc260 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/caffeine/CaffeineSpanCache.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/caffeine/CaffeineSpanCache.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.zipkin.cache.caffeine;
+package org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.caffeine;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
@@ -28,9 +28,9 @@ import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
-import org.apache.skywalking.oap.server.receiver.zipkin.cache.ISpanCache;
-import org.apache.skywalking.oap.server.receiver.zipkin.data.ZipkinTrace;
-import org.apache.skywalking.oap.server.receiver.zipkin.transform.Zipkin2SkyWalkingTransfer;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.ISpanCache;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.ZipkinTrace;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.Zipkin2SkyWalkingTransfer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin2.Span;
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/SkyWalkingTrace.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/SkyWalkingTrace.java
similarity index 96%
rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/SkyWalkingTrace.java
rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/SkyWalkingTrace.java
index 38c59ec..ab8712c 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/SkyWalkingTrace.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/SkyWalkingTrace.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.zipkin.data;
+package org.apache.skywalking.oap.server.receiver.zipkin.analysis.data;
import java.util.LinkedList;
import java.util.List;
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/ZipkinTrace.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/ZipkinTrace.java
similarity index 95%
rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/ZipkinTrace.java
rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/ZipkinTrace.java
index d12beb8..e54a613 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/ZipkinTrace.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/ZipkinTrace.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.zipkin.data;
+package org.apache.skywalking.oap.server.receiver.zipkin.analysis.data;
import java.util.LinkedList;
import java.util.List;
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentBuilder.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentBuilder.java
similarity index 98%
rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentBuilder.java
rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentBuilder.java
index 3f687a6..e5af454 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentBuilder.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentBuilder.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.zipkin.transform;
+package org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform;
import com.google.common.base.Strings;
import java.util.*;
@@ -25,8 +25,9 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.skywalking.apm.network.common.KeyStringValuePair;
import org.apache.skywalking.apm.network.language.agent.*;
import org.apache.skywalking.apm.network.language.agent.v2.*;
-import org.apache.skywalking.oap.server.receiver.zipkin.*;
-import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace;
+import org.apache.skywalking.oap.server.receiver.zipkin.CoreRegisterLinker;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.*;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace;
import org.eclipse.jetty.util.StringUtil;
import zipkin2.*;
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentListener.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentListener.java
similarity index 84%
rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentListener.java
rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentListener.java
index 9a0b7c7..5c37c9a 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentListener.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentListener.java
@@ -16,9 +16,9 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.zipkin.transform;
+package org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform;
-import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace;
public interface SegmentListener {
void notify(SkyWalkingTrace trace);
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/Zipkin2SkyWalkingTransfer.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/Zipkin2SkyWalkingTransfer.java
similarity index 86%
rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/Zipkin2SkyWalkingTransfer.java
rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/Zipkin2SkyWalkingTransfer.java
index b41e50e..25a59ab 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/Zipkin2SkyWalkingTransfer.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/Zipkin2SkyWalkingTransfer.java
@@ -16,12 +16,12 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.zipkin.transform;
+package org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform;
import java.util.LinkedList;
import java.util.List;
-import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace;
-import org.apache.skywalking.oap.server.receiver.zipkin.data.ZipkinTrace;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.ZipkinTrace;
import zipkin2.Span;
/**
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinTraceOSInfoBuilder.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanEncode.java
similarity index 58%
rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinTraceOSInfoBuilder.java
rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanEncode.java
index 451ba53..ee5a91c 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinTraceOSInfoBuilder.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanEncode.java
@@ -16,19 +16,30 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.zipkin;
-
-import com.google.gson.JsonObject;
-import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
+package org.apache.skywalking.oap.server.receiver.zipkin.handler;
/**
* @author wusheng
*/
-public class ZipkinTraceOSInfoBuilder {
+public class SpanEncode {
+ public static final int PROTO3 = 1;
+ public static final int JSON_V2 = 2;
+ public static final int THRIFT = 3;
+ public static final int JSON_V1 = 4;
+
+ public static boolean isProto3(int encode) {
+ return PROTO3 == encode;
+ }
+
+ public static boolean isJsonV2(int encode) {
+ return JSON_V2 == encode;
+ }
+
+ public static boolean isThrift(int encode) {
+ return THRIFT == encode;
+ }
- public static JsonObject getOSInfoForZipkin(String instanceName) {
- JsonObject properties = new JsonObject();
- properties.addProperty(ServiceInstanceInventory.PropertyUtil.HOST_NAME, instanceName);
- return properties;
+ public static boolean isJsonV1(int encode) {
+ return JSON_V1 == encode;
}
}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java
index 48774f9..30a713f 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java
@@ -18,20 +18,33 @@
package org.apache.skywalking.oap.server.receiver.zipkin.handler;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
+import java.io.*;
import java.util.List;
import java.util.zip.GZIPInputStream;
import javax.servlet.http.HttpServletRequest;
-import org.apache.skywalking.oap.server.receiver.zipkin.CoreRegisterLinker;
+import org.apache.skywalking.oap.server.core.cache.*;
+import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
-import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinTraceOSInfoBuilder;
-import org.apache.skywalking.oap.server.receiver.zipkin.cache.CacheFactory;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.ZipkinSkyWalkingTransfer;
+import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward;
import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;
public class SpanProcessor {
+ private SourceReceiver receiver;
+ private ServiceInventoryCache serviceInventoryCache;
+ private EndpointInventoryCache endpointInventoryCache;
+ private int encode;
+
+ public SpanProcessor(SourceReceiver receiver,
+ ServiceInventoryCache serviceInventoryCache,
+ EndpointInventoryCache endpointInventoryCache, int encode) {
+ this.receiver = receiver;
+ this.serviceInventoryCache = serviceInventoryCache;
+ this.endpointInventoryCache = endpointInventoryCache;
+ this.encode = encode;
+ }
+
void convert(ZipkinReceiverConfig config, SpanBytesDecoder decoder, HttpServletRequest request) throws IOException {
InputStream inputStream = getInputStream(request);
ByteArrayOutputStream out = new ByteArrayOutputStream();
@@ -44,20 +57,13 @@ public class SpanProcessor {
List<Span> spanList = decoder.decodeList(out.toByteArray());
- spanList.forEach(span -> {
- // In Zipkin, the local service name represents the application owner.
- String applicationCode = span.localServiceName();
- if (applicationCode != null) {
- int applicationId = CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(applicationCode, null);
- if (applicationId != 0) {
- CoreRegisterLinker.getServiceInstanceInventoryRegister().getOrCreate(applicationId, applicationCode, applicationCode,
- span.timestampAsLong(),
- ZipkinTraceOSInfoBuilder.getOSInfoForZipkin(applicationCode));
- }
- }
-
- CacheFactory.INSTANCE.get(config).addSpan(span);
- });
+ if (config.isNeedAnalysis()) {
+ ZipkinSkyWalkingTransfer transfer = new ZipkinSkyWalkingTransfer();
+ transfer.doTransfer(config, spanList);
+ } else {
+ SpanForward forward = new SpanForward(config, receiver, serviceInventoryCache, endpointInventoryCache, encode);
+ forward.send(spanList);
+ }
}
private InputStream getInputStream(HttpServletRequest request) throws IOException {
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java
index 77f38e4..4ab8b8d 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java
@@ -18,20 +18,29 @@
package org.apache.skywalking.oap.server.receiver.zipkin.handler;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.*;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.cache.*;
+import org.apache.skywalking.oap.server.core.source.SourceReceiver;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.jetty.JettyHandler;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
import zipkin2.codec.SpanBytesDecoder;
public class SpanV1JettyHandler extends JettyHandler {
private static final Logger logger = LoggerFactory.getLogger(SpanV2JettyHandler.class);
private ZipkinReceiverConfig config;
+ private SourceReceiver sourceReceiver;
+ private ServiceInventoryCache serviceInventoryCache;
+ private EndpointInventoryCache endpointInventoryCache;
- public SpanV1JettyHandler(ZipkinReceiverConfig config) {
+ public SpanV1JettyHandler(ZipkinReceiverConfig config,
+ ModuleManager manager) {
+ sourceReceiver = manager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
+ serviceInventoryCache = manager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class);
+ endpointInventoryCache = manager.find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class);
this.config = config;
}
@@ -48,11 +57,13 @@ public class SpanV1JettyHandler extends JettyHandler {
try {
String type = request.getHeader("Content-Type");
- SpanBytesDecoder decoder = type != null && type.contains("/x-thrift")
- ? SpanBytesDecoder.THRIFT
- : SpanBytesDecoder.JSON_V1;
+ int encode = type != null && type.contains("/x-thrift") ? SpanEncode.THRIFT : SpanEncode.JSON_V1;
- SpanProcessor processor = new SpanProcessor();
+ SpanBytesDecoder decoder = SpanEncode.isThrift(encode)
+ ? SpanBytesDecoder.THRIFT
+ : SpanBytesDecoder.JSON_V1;
+
+ SpanProcessor processor = new SpanProcessor(sourceReceiver, serviceInventoryCache, endpointInventoryCache, encode);
processor.convert(config, decoder, request);
response.setStatus(202);
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java
index 7c8705a..a904c62 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java
@@ -18,12 +18,14 @@
package org.apache.skywalking.oap.server.receiver.zipkin.handler;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.*;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.cache.*;
+import org.apache.skywalking.oap.server.core.source.SourceReceiver;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.jetty.JettyHandler;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
import zipkin2.codec.SpanBytesDecoder;
/**
@@ -33,8 +35,15 @@ public class SpanV2JettyHandler extends JettyHandler {
private static final Logger logger = LoggerFactory.getLogger(SpanV2JettyHandler.class);
private ZipkinReceiverConfig config;
+ private SourceReceiver sourceReceiver;
+ private ServiceInventoryCache serviceInventoryCache;
+ private EndpointInventoryCache endpointInventoryCache;
- public SpanV2JettyHandler(ZipkinReceiverConfig config) {
+ public SpanV2JettyHandler(ZipkinReceiverConfig config,
+ ModuleManager manager) {
+ sourceReceiver = manager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
+ serviceInventoryCache = manager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class);
+ endpointInventoryCache = manager.find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class);
this.config = config;
}
@@ -51,11 +60,13 @@ public class SpanV2JettyHandler extends JettyHandler {
try {
String type = request.getHeader("Content-Type");
- SpanBytesDecoder decoder = type != null && type.contains("/x-protobuf")
+ int encode = type != null && type.contains("/x-protobuf") ? SpanEncode.PROTO3 : SpanEncode.JSON_V2;
+
+ SpanBytesDecoder decoder = SpanEncode.isProto3(encode)
? SpanBytesDecoder.PROTO3
: SpanBytesDecoder.JSON_V2;
- SpanProcessor processor = new SpanProcessor();
+ SpanProcessor processor = new SpanProcessor(sourceReceiver, serviceInventoryCache, endpointInventoryCache, encode);
processor.convert(config, decoder, request);
response.setStatus(202);
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java
new file mode 100644
index 0000000..3d38868
--- /dev/null
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.zipkin.trace;
+
+import java.util.List;
+import org.apache.skywalking.apm.util.StringUtil;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.cache.*;
+import org.apache.skywalking.oap.server.core.source.*;
+import org.apache.skywalking.oap.server.library.util.*;
+import org.apache.skywalking.oap.server.receiver.zipkin.*;
+import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanEncode;
+import org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpan;
+import zipkin2.Span;
+import zipkin2.codec.SpanBytesEncoder;
+
+/**
+ * @author wusheng
+ */
+public class SpanForward {
+ private ZipkinReceiverConfig config;
+ private SourceReceiver receiver;
+ private ServiceInventoryCache serviceInventoryCache;
+ private EndpointInventoryCache endpointInventoryCache;
+ private int encode;
+
+ public SpanForward(ZipkinReceiverConfig config, SourceReceiver receiver,
+ ServiceInventoryCache serviceInventoryCache,
+ EndpointInventoryCache endpointInventoryCache, int encode) {
+ this.config = config;
+ this.receiver = receiver;
+ this.serviceInventoryCache = serviceInventoryCache;
+ this.endpointInventoryCache = endpointInventoryCache;
+ this.encode = encode;
+ }
+
+ public void send(List<Span> spanList) {
+ spanList.forEach(span -> {
+ ZipkinSpan zipkinSpan = new ZipkinSpan();
+ zipkinSpan.setTraceId(span.traceId());
+ zipkinSpan.setSpanId(span.id());
+ String serviceName = span.localServiceName();
+ int serviceId = Const.NONE;
+ if (!StringUtil.isEmpty(serviceName)) {
+ serviceId = serviceInventoryCache.getServiceId(serviceName);
+ if (serviceId != Const.NONE) {
+ zipkinSpan.setServiceId(serviceId);
+ } else {
+ /**
+ * Only register, but don't wait.
+ * For this span, service id will be missed.
+ */
+ CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(serviceName, null);
+ }
+ }
+
+ String spanName = span.name();
+ Span.Kind kind = span.kind();
+ switch (kind) {
+ case SERVER:
+ case CONSUMER:
+ if (!StringUtil.isEmpty(spanName) && serviceId != Const.NONE) {
+ int endpointId = endpointInventoryCache.getEndpointId(serviceId, spanName,
+ DetectPoint.SERVER.ordinal());
+ if (endpointId != Const.NONE) {
+ zipkinSpan.setEndpointId(endpointId);
+ } else if (config.isRegisterZipkinEndpoint()) {
+ CoreRegisterLinker.getEndpointInventoryRegister().getOrCreate(serviceId, spanName, DetectPoint.SERVER);
+ }
+ }
+ }
+ if (!StringUtil.isEmpty(spanName)) {
+ zipkinSpan.setEndpointName(spanName);
+ }
+ long startTime = span.timestampAsLong() / 1000;
+ zipkinSpan.setStartTime(startTime);
+ if (startTime != 0) {
+ long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(zipkinSpan.getStartTime());
+ zipkinSpan.setTimeBucket(timeBucket);
+ }
+
+ long latency = span.durationAsLong() / 1000;
+
+ zipkinSpan.setEndTime(startTime + latency);
+ zipkinSpan.setIsError(BooleanUtils.booleanToValue(false));
+ zipkinSpan.setEncode(SpanEncode.PROTO3);
+ zipkinSpan.setLatency((int)latency);
+ zipkinSpan.setDataBinary(SpanBytesEncoder.PROTO3.encode(span));
+
+ receiver.receive(zipkinSpan);
+ });
+ }
+}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SpringSleuthSegmentBuilderTest.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SpringSleuthSegmentBuilderTest.java
similarity index 98%
rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SpringSleuthSegmentBuilderTest.java
rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SpringSleuthSegmentBuilderTest.java
index 9a53885..0c576c7 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SpringSleuthSegmentBuilderTest.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SpringSleuthSegmentBuilderTest.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.zipkin.transform;
+package org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform;
import com.google.gson.JsonObject;
import java.io.UnsupportedEncodingException;
@@ -26,7 +26,7 @@ import org.apache.skywalking.apm.network.language.agent.v2.*;
import org.apache.skywalking.oap.server.core.register.NodeType;
import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.receiver.zipkin.CoreRegisterLinker;
-import org.apache.skywalking.oap.server.receiver.zipkin.data.*;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.*;
import org.junit.*;
import org.powermock.reflect.Whitebox;
import zipkin2.Span;
diff --git a/oap-server/server-starter/pom.xml b/oap-server/server-starter/pom.xml
index 37c5330..cfd77d3 100644
--- a/oap-server/server-starter/pom.xml
+++ b/oap-server/server-starter/pom.xml
@@ -126,6 +126,11 @@
<artifactId>storage-elasticsearch-plugin</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>storage-zipkin-plugin</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- storage module -->
<!-- queryBuild module -->
diff --git a/oap-server/server-storage-plugin/pom.xml b/oap-server/server-storage-plugin/pom.xml
index 4cb4fcb..99291e1 100644
--- a/oap-server/server-storage-plugin/pom.xml
+++ b/oap-server/server-storage-plugin/pom.xml
@@ -30,6 +30,7 @@
<modules>
<module>storage-jdbc-hikaricp-plugin</module>
<module>storage-elasticsearch-plugin</module>
+ <module>storage-zipkin-plugin</module>
</modules>
</project>
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
index 7d461aa..6608a58 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
@@ -59,7 +59,7 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
this.password = password;
}
- int getIndexShardsNumber() {
+ public int getIndexShardsNumber() {
return indexShardsNumber;
}
@@ -67,7 +67,7 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
this.indexShardsNumber = indexShardsNumber;
}
- int getIndexReplicasNumber() {
+ public int getIndexReplicasNumber() {
return indexReplicasNumber;
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index 7d1a713..ed3bc4b 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -70,8 +70,8 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
private static final Logger logger = LoggerFactory.getLogger(StorageModuleElasticsearchProvider.class);
- private final StorageModuleElasticsearchConfig config;
- private ElasticSearchClient elasticSearchClient;
+ protected final StorageModuleElasticsearchConfig config;
+ protected ElasticSearchClient elasticSearchClient;
public StorageModuleElasticsearchProvider() {
super();
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java
index ac5645a..4528189 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java
@@ -147,4 +147,8 @@ public class TraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
}
return segmentRecords;
}
+
+ @Override public List<Span> doFlexibleTraceQuery(String traceId) throws IOException {
+ return Collections.emptyList();
+ }
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java
index ed8bf73..ec231e5 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java
@@ -166,6 +166,10 @@ public class H2TraceQueryDAO implements ITraceQueryDAO {
return segmentRecords;
}
+ @Override public List<Span> doFlexibleTraceQuery(String traceId) throws IOException {
+ return Collections.emptyList();
+ }
+
protected JDBCHikariCPClient getClient() {
return h2Client;
}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml b/oap-server/server-storage-plugin/storage-zipkin-plugin/pom.xml
similarity index 67%
copy from oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml
copy to oap-server/server-storage-plugin/storage-zipkin-plugin/pom.xml
index d2adbc1..61d313b 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml
+++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/pom.xml
@@ -17,34 +17,25 @@
~
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>server-receiver-plugin</artifactId>
+ <artifactId>server-storage-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>6.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>zipkin-receiver-plugin</artifactId>
- <packaging>jar</packaging>
+ <artifactId>storage-zipkin-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
- <artifactId>skywalking-trace-receiver-plugin</artifactId>
+ <artifactId>storage-elasticsearch-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.skywalking</groupId>
- <artifactId>skywalking-register-receiver-plugin</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>com.github.ben-manes.caffeine</groupId>
- <artifactId>caffeine</artifactId>
- </dependency>
- <dependency>
<groupId>io.zipkin.zipkin2</groupId>
<artifactId>zipkin</artifactId>
</dependency>
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpan.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpan.java
new file mode 100644
index 0000000..3cf99f6
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpan.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.zipkin;
+
+import lombok.*;
+import org.apache.skywalking.oap.server.core.source.*;
+
+import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.*;
+
+/**
+ * @author peng-yongsheng
+ */
+@ScopeDeclaration(id = ZIPKIN_SPAN, name = "ZipkinSpan")
+public class ZipkinSpan extends Source {
+
+ @Override public int scope() {
+ return DefaultScopeDefine.ZIPKIN_SPAN;
+ }
+
+ @Override public String getEntityId() {
+ return traceId + spanId;
+ }
+
+ @Setter @Getter private String traceId;
+ @Setter @Getter private String spanId;
+ @Setter @Getter private int serviceId;
+ @Setter @Getter private int serviceInstanceId;
+ @Setter @Getter private String endpointName;
+ @Setter @Getter private int endpointId;
+ @Setter @Getter private long startTime;
+ @Setter @Getter private long endTime;
+ @Setter @Getter private int latency;
+ @Setter @Getter private int isError;
+ @Setter @Getter private byte[] dataBinary;
+ @Setter @Getter private int encode;
+}
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java
new file mode 100644
index 0000000..531443b
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.zipkin;
+
+import java.util.*;
+import lombok.*;
+import org.apache.skywalking.apm.util.StringUtil;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordType;
+import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.annotation.*;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+
+@RecordType
+@StorageEntity(name = ZipkinSpanRecord.INDEX_NAME, builder = ZipkinSpanRecord.Builder.class, sourceScopeId = DefaultScopeDefine.ZIPKIN_SPAN)
+public class ZipkinSpanRecord extends Record {
+ public static final String INDEX_NAME = "zipkin_span";
+ public static final String TRACE_ID = "trace_id";
+ public static final String SPAN_ID = "span_id";
+ public static final String SERVICE_ID = "service_id";
+ public static final String SERVICE_INSTANCE_ID = "service_instance_id";
+ public static final String ENDPOINT_NAME = "endpoint_name";
+ public static final String ENDPOINT_ID = "endpoint_id";
+ public static final String START_TIME = "start_time";
+ public static final String END_TIME = "end_time";
+ public static final String LATENCY = "latency";
+ public static final String IS_ERROR = "is_error";
+ public static final String DATA_BINARY = "data_binary";
+ public static final String ENCODE = "encode";
+
+ @Setter @Getter @Column(columnName = TRACE_ID) @IDColumn private String traceId;
+ @Setter @Getter @Column(columnName = SPAN_ID) @IDColumn private String spanId;
+ @Setter @Getter @Column(columnName = SERVICE_ID) @IDColumn private int serviceId;
+ @Setter @Getter @Column(columnName = SERVICE_INSTANCE_ID) @IDColumn private int serviceInstanceId;
+ @Setter @Getter @Column(columnName = ENDPOINT_NAME, matchQuery = true) @IDColumn private String endpointName;
+ @Setter @Getter @Column(columnName = ENDPOINT_ID) @IDColumn private int endpointId;
+ @Setter @Getter @Column(columnName = START_TIME) @IDColumn private long startTime;
+ @Setter @Getter @Column(columnName = END_TIME) @IDColumn private long endTime;
+ @Setter @Getter @Column(columnName = LATENCY) @IDColumn private int latency;
+ @Setter @Getter @Column(columnName = IS_ERROR) @IDColumn private int isError;
+ @Setter @Getter @Column(columnName = DATA_BINARY) @IDColumn private byte[] dataBinary;
+ @Setter @Getter @Column(columnName = ENCODE) @IDColumn private int encode;
+
+ @Override public String id() {
+ return traceId + "-" + spanId;
+ }
+
+ public static class Builder implements StorageBuilder<ZipkinSpanRecord> {
+
+ @Override public Map<String, Object> data2Map(ZipkinSpanRecord storageData) {
+ Map<String, Object> map = new HashMap<>();
+ map.put(TRACE_ID, storageData.getTraceId());
+ map.put(SPAN_ID, storageData.getSpanId());
+ map.put(SERVICE_ID, storageData.getServiceId());
+ map.put(SERVICE_INSTANCE_ID, storageData.getServiceInstanceId());
+ map.put(ENDPOINT_NAME, storageData.getEndpointName());
+ map.put(ENDPOINT_ID, storageData.getEndpointId());
+ map.put(START_TIME, storageData.getStartTime());
+ map.put(END_TIME, storageData.getEndTime());
+ map.put(LATENCY, storageData.getLatency());
+ map.put(IS_ERROR, storageData.getIsError());
+ map.put(TIME_BUCKET, storageData.getTimeBucket());
+ if (CollectionUtils.isEmpty(storageData.getDataBinary())) {
+ map.put(DATA_BINARY, Const.EMPTY_STRING);
+ } else {
+ map.put(DATA_BINARY, new String(Base64.getEncoder().encode(storageData.getDataBinary())));
+ }
+ map.put(ENCODE, storageData.getEncode());
+ return map;
+ }
+
+ @Override public ZipkinSpanRecord map2Data(Map<String, Object> dbMap) {
+ ZipkinSpanRecord record = new ZipkinSpanRecord();
+ record.setTraceId((String)dbMap.get(TRACE_ID));
+ record.setSpanId((String)dbMap.get(SPAN_ID));
+ record.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue());
+ record.setServiceInstanceId(((Number)dbMap.get(SERVICE_INSTANCE_ID)).intValue());
+ record.setEndpointName((String)dbMap.get(ENDPOINT_NAME));
+ record.setEndpointId(((Number)dbMap.get(ENDPOINT_ID)).intValue());
+ record.setStartTime(((Number)dbMap.get(START_TIME)).longValue());
+ record.setEndTime(((Number)dbMap.get(END_TIME)).longValue());
+ record.setLatency(((Number)dbMap.get(LATENCY)).intValue());
+ record.setIsError(((Number)dbMap.get(IS_ERROR)).intValue());
+ record.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
+ if (StringUtil.isEmpty((String)dbMap.get(DATA_BINARY))) {
+ record.setDataBinary(new byte[] {});
+ } else {
+ record.setDataBinary(Base64.getDecoder().decode((String)dbMap.get(DATA_BINARY)));
+ }
+ record.setEncode(((Number)dbMap.get(ENCODE)).intValue());
+ return record;
+ }
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecordDispatcher.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecordDispatcher.java
new file mode 100644
index 0000000..eed4ca1
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecordDispatcher.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.zipkin;
+
+import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
+import org.apache.skywalking.oap.server.core.analysis.worker.RecordProcess;
+
+/**
+ * Dispatch for Zipkin native mode spans.
+ *
+ * @author wusheng
+ */
+public class ZipkinSpanRecordDispatcher implements SourceDispatcher<ZipkinSpan> {
+ @Override public void dispatch(ZipkinSpan source) {
+ ZipkinSpanRecord segment = new ZipkinSpanRecord();
+ segment.setTraceId(source.getTraceId());
+ segment.setSpanId(source.getSpanId());
+ segment.setServiceId(source.getServiceId());
+ segment.setServiceInstanceId(source.getServiceInstanceId());
+ segment.setEndpointName(source.getEndpointName());
+ segment.setEndpointId(source.getEndpointId());
+ segment.setStartTime(source.getStartTime());
+ segment.setEndTime(source.getEndTime());
+ segment.setLatency(source.getLatency());
+ segment.setIsError(source.getIsError());
+ segment.setDataBinary(source.getDataBinary());
+ segment.setTimeBucket(source.getTimeBucket());
+ segment.setEncode(source.getEncode());
+
+ RecordProcess.INSTANCE.in(segment);
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinStorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinStorageModuleElasticsearchProvider.java
new file mode 100644
index 0000000..e529fda
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinStorageModuleElasticsearchProvider.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.zipkin.elasticsearch;
+
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
+import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
+import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchProvider;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class ZipkinStorageModuleElasticsearchProvider extends StorageModuleElasticsearchProvider {
+
+ private static final Logger logger = LoggerFactory.getLogger(ZipkinStorageModuleElasticsearchProvider.class);
+ private ZipkinTraceQueryEsDAO traceQueryEsDAO;
+
+ @Override
+ public String name() {
+ return "zipkin-elasticsearch";
+ }
+
+ @Override
+ public void prepare() throws ServiceNotProvidedException {
+ super.prepare();
+ traceQueryEsDAO = new ZipkinTraceQueryEsDAO(elasticSearchClient);
+ this.registerServiceImplementation(ITraceQueryDAO.class, traceQueryEsDAO);
+ }
+
+ @Override public void notifyAfterCompleted() {
+ super.notifyAfterCompleted();
+ traceQueryEsDAO.setServiceInventoryCache(getManager().find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class));
+ }
+
+ @Override
+ public String[] requiredModules() {
+ return new String[] {CoreModule.NAME};
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java
new file mode 100644
index 0000000..448e745
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.zipkin.elasticsearch;
+
+import com.google.common.base.Strings;
+import java.io.IOException;
+import java.util.*;
+import lombok.Setter;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
+import org.apache.skywalking.oap.server.core.query.entity.*;
+import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.library.util.BooleanUtils;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
+import org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.*;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.aggregations.*;
+import org.elasticsearch.search.aggregations.bucket.terms.*;
+import org.elasticsearch.search.aggregations.metrics.max.Max;
+import org.elasticsearch.search.aggregations.metrics.min.Min;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+import zipkin2.Span;
+import zipkin2.codec.SpanBytesDecoder;
+
+import static org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord.*;
+
+public class ZipkinTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
+ @Setter
+ private ServiceInventoryCache serviceInventoryCache;
+
+ public ZipkinTraceQueryEsDAO(
+ ElasticSearchClient client) {
+ super(client);
+ }
+
+ @Override
+ public TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDuration, long maxDuration,
+ String endpointName, int serviceId, int serviceInstanceId, int endpointId, String traceId, int limit, int from,
+ TraceState traceState, QueryOrder queryOrder) throws IOException {
+
+ SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+
+ BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
+ sourceBuilder.query(boolQueryBuilder);
+ List<QueryBuilder> mustQueryList = boolQueryBuilder.must();
+
+ if (startSecondTB != 0 && endSecondTB != 0) {
+ mustQueryList.add(QueryBuilders.rangeQuery(TIME_BUCKET).gte(startSecondTB).lte(endSecondTB));
+ }
+
+ if (minDuration != 0 || maxDuration != 0) {
+ RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(LATENCY);
+ if (minDuration != 0) {
+ rangeQueryBuilder.gte(minDuration);
+ }
+ if (maxDuration != 0) {
+ rangeQueryBuilder.lte(maxDuration);
+ }
+ boolQueryBuilder.must().add(rangeQueryBuilder);
+ }
+ if (!Strings.isNullOrEmpty(endpointName)) {
+ mustQueryList.add(QueryBuilders.matchPhraseQuery(ENDPOINT_NAME, endpointName));
+ }
+ if (serviceId != 0) {
+ boolQueryBuilder.must().add(QueryBuilders.termQuery(SERVICE_ID, serviceId));
+ }
+ if (serviceInstanceId != 0) {
+ boolQueryBuilder.must().add(QueryBuilders.termQuery(SERVICE_INSTANCE_ID, serviceInstanceId));
+ }
+ if (endpointId != 0) {
+ boolQueryBuilder.must().add(QueryBuilders.termQuery(ENDPOINT_ID, endpointId));
+ }
+ if (!Strings.isNullOrEmpty(traceId)) {
+ boolQueryBuilder.must().add(QueryBuilders.termQuery(TRACE_ID, traceId));
+ }
+ switch (traceState) {
+ case ERROR:
+ mustQueryList.add(QueryBuilders.matchQuery(IS_ERROR, BooleanUtils.TRUE));
+ break;
+ case SUCCESS:
+ mustQueryList.add(QueryBuilders.matchQuery(IS_ERROR, BooleanUtils.FALSE));
+ break;
+ }
+
+ TermsAggregationBuilder builder = AggregationBuilders.terms(TRACE_ID).field(TRACE_ID).size(limit)
+ .subAggregation(
+ AggregationBuilders.max(LATENCY).field(LATENCY)
+ )
+ .subAggregation(
+ AggregationBuilders.min(START_TIME).field(START_TIME)
+ );
+ switch (queryOrder) {
+ case BY_START_TIME:
+ builder.order(BucketOrder.aggregation(START_TIME, false));
+ break;
+ case BY_DURATION:
+ builder.order(BucketOrder.aggregation(LATENCY, false));
+ break;
+ }
+ sourceBuilder.aggregation(builder);
+
+ SearchResponse response = getClient().search(ZipkinSpanRecord.INDEX_NAME, sourceBuilder);
+
+ TraceBrief traceBrief = new TraceBrief();
+
+ Terms terms = response.getAggregations().get(TRACE_ID);
+
+ for (Terms.Bucket termsBucket : terms.getBuckets()) {
+ BasicTrace basicTrace = new BasicTrace();
+
+ basicTrace.setSegmentId(termsBucket.getKeyAsString());
+ Min startTime = termsBucket.getAggregations().get(START_TIME);
+ Max latency = termsBucket.getAggregations().get(LATENCY);
+ basicTrace.setStart(String.valueOf((long)startTime.getValue()));
+ basicTrace.getEndpointNames().add("");
+ basicTrace.setDuration((int)latency.getValue());
+ basicTrace.setError(false);
+ basicTrace.getTraceIds().add(termsBucket.getKeyAsString());
+ traceBrief.getTraces().add(basicTrace);
+ }
+
+ return traceBrief;
+ }
+
+ @Override public List<SegmentRecord> queryByTraceId(String traceId) throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override public List<org.apache.skywalking.oap.server.core.query.entity.Span> doFlexibleTraceQuery(
+ String traceId) throws IOException {
+ SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+ sourceBuilder.query(QueryBuilders.termQuery(TRACE_ID, traceId));
+ sourceBuilder.sort(START_TIME, SortOrder.ASC);
+ sourceBuilder.size(1000);
+
+ SearchResponse response = getClient().search(ZipkinSpanRecord.INDEX_NAME, sourceBuilder);
+
+ List<org.apache.skywalking.oap.server.core.query.entity.Span> spanList = new ArrayList<>();
+
+ boolean isFirst = true;
+ for (SearchHit searchHit : response.getHits().getHits()) {
+ int serviceId = ((Number)searchHit.getSourceAsMap().get(SERVICE_ID)).intValue();
+ String dataBinaryBase64 = (String)searchHit.getSourceAsMap().get(SegmentRecord.DATA_BINARY);
+ Span span = SpanBytesDecoder.PROTO3.decodeOne(Base64.getDecoder().decode(dataBinaryBase64));
+
+ org.apache.skywalking.oap.server.core.query.entity.Span swSpan = new org.apache.skywalking.oap.server.core.query.entity.Span();
+
+ swSpan.setTraceId(span.traceId());
+ swSpan.setEndpointName(span.name());
+ swSpan.setStartTime(span.timestamp() / 1000);
+ swSpan.setEndTime(swSpan.getStartTime() + span.durationAsLong() / 1000);
+ span.tags().forEach((key, value) -> {
+ swSpan.getTags().add(new KeyValue(key, value));
+ });
+ span.annotations().forEach(annotation -> {
+ LogEntity entity = new LogEntity();
+ entity.setTime(annotation.timestamp() / 1000);
+ entity.getData().add(new KeyValue("annotation", annotation.value()));
+ swSpan.getLogs().add(entity);
+ });
+ if (serviceId != Const.NONE) {
+ swSpan.setServiceCode(serviceInventoryCache.get(serviceId).getName());
+ }
+ swSpan.setSpanId(0);
+ swSpan.setParentSpanId(-1);
+ swSpan.setSegmentSpanId(span.id());
+ swSpan.setSegmentId(span.id());
+ Span.Kind kind = span.kind();
+ switch (kind) {
+ case CLIENT:
+ case PRODUCER:
+ swSpan.setType("Entry");
+ break;
+ case SERVER:
+ case CONSUMER:
+ swSpan.setType("Exit");
+ break;
+ default:
+ swSpan.setType("Local");
+
+ }
+
+ if (isFirst) {
+ swSpan.setRoot(true);
+ swSpan.setSegmentParentSpanId("");
+ isFirst = false;
+ } else {
+ Ref ref = new Ref();
+ ref.setTraceId(span.traceId());
+ ref.setParentSegmentId(span.parentId());
+ ref.setType(RefType.CROSS_PROCESS);
+ ref.setParentSpanId(0);
+
+ swSpan.getRefs().add(ref);
+ swSpan.setSegmentParentSpanId(span.parentId());
+ }
+ spanList.add(swSpan);
+ }
+ return spanList;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
new file mode 100644
index 0000000..de8e186
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.storage.plugin.zipkin.elasticsearch.ZipkinStorageModuleElasticsearchProvider
\ No newline at end of file