You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/03/28 18:21:00 UTC
[incubator-skywalking] 01/01: Codebase for zipkin span persistence.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch zipkin-trace
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
commit 6a0729d175fee086b5d36602f087178052e53f60
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Thu Mar 28 11:20:45 2019 -0700
Codebase for zipkin span persistence.
---
.../oap/server/core/source/DefaultScopeDefine.java | 1 +
.../zipkin-receiver-plugin/pom.xml | 5 +
.../server/receiver/zipkin/CoreRegisterLinker.java | 11 +-
.../receiver/zipkin/ZipkinReceiverConfig.java | 47 +--------
.../receiver/zipkin/ZipkinReceiverProvider.java | 15 +--
.../{ => analysis}/Receiver2AnalysisBridge.java | 8 +-
.../ZipkinSkyWalkingTransfer.java} | 43 +-------
.../{ => analysis}/ZipkinTraceOSInfoBuilder.java | 2 +-
.../zipkin/{ => analysis}/cache/CacheFactory.java | 4 +-
.../zipkin/{ => analysis}/cache/ISpanCache.java | 2 +-
.../cache/caffeine/CaffeineSpanCache.java | 8 +-
.../{ => analysis}/data/SkyWalkingTrace.java | 2 +-
.../zipkin/{ => analysis}/data/ZipkinTrace.java | 2 +-
.../{ => analysis}/transform/SegmentBuilder.java | 7 +-
.../{ => analysis}/transform/SegmentListener.java | 4 +-
.../transform/Zipkin2SkyWalkingTransfer.java | 6 +-
.../receiver/zipkin/handler/SpanProcessor.java | 44 ++++----
.../zipkin/handler/SpanV1JettyHandler.java | 25 +++--
.../zipkin/handler/SpanV2JettyHandler.java | 21 ++--
.../server/receiver/zipkin/trace/SpanForward.java | 95 +++++++++++++++++
.../transform/SpringSleuthSegmentBuilderTest.java | 4 +-
oap-server/server-starter/pom.xml | 5 +
oap-server/server-storage-plugin/pom.xml | 1 +
.../StorageModuleElasticsearchConfig.java | 4 +-
.../{ => storage-zipkin-plugin}/pom.xml | 20 ++--
.../server/storage/plugin/zipkin/ZipkinSpan.java | 52 ++++++++++
.../storage/plugin/zipkin/ZipkinSpanRecord.java | 112 ++++++++++++++++++++
.../plugin/zipkin/ZipkinSpanRecordDispatcher.java | 48 +++++++++
.../StorageModuleElasticsearchProvider.java | 114 +++++++++++++++++++++
...alking.oap.server.library.module.ModuleProvider | 19 ++++
30 files changed, 575 insertions(+), 156 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
index 78b3565..aec1665 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
@@ -59,6 +59,7 @@ public class DefaultScopeDefine {
public static final int SERVICE_INSTANCE_CLR_GC = 20;
public static final int SERVICE_INSTANCE_CLR_THREAD = 21;
public static final int ENVOY_INSTANCE_METRIC = 22;
+ public static final int ZIPKIN_SPAN = 23;
/**
* Catalog of scope, the indicator processor could use this to group all generated indicators by oal tool.
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml
index d2adbc1..a332f73 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml
@@ -37,6 +37,11 @@
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
+ <artifactId>storage-zipkin-plugin</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-register-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/CoreRegisterLinker.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/CoreRegisterLinker.java
index 2459199..57e4bc7 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/CoreRegisterLinker.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/CoreRegisterLinker.java
@@ -19,14 +19,14 @@
package org.apache.skywalking.oap.server.receiver.zipkin;
import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
+import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
public class CoreRegisterLinker {
private static volatile ModuleManager MODULE_MANAGER;
private static volatile IServiceInventoryRegister SERVICE_INVENTORY_REGISTER;
private static volatile IServiceInstanceInventoryRegister SERVICE_INSTANCE_INVENTORY_REGISTER;
+ private static volatile IEndpointInventoryRegister ENDPOINT_INVENTORY_REGISTER;
public static void setModuleManager(ModuleManager moduleManager) {
CoreRegisterLinker.MODULE_MANAGER = moduleManager;
@@ -45,4 +45,11 @@ public class CoreRegisterLinker {
}
return SERVICE_INSTANCE_INVENTORY_REGISTER;
}
+
+ public static IEndpointInventoryRegister getEndpointInventoryRegister() {
+ if (ENDPOINT_INVENTORY_REGISTER == null) {
+ ENDPOINT_INVENTORY_REGISTER = MODULE_MANAGER.find(CoreModule.NAME).provider().getService(IEndpointInventoryRegister.class);
+ }
+ return ENDPOINT_INVENTORY_REGISTER;
+ }
}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java
index a5e14af..cf9ff22 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java
@@ -18,57 +18,20 @@
package org.apache.skywalking.oap.server.receiver.zipkin;
+import lombok.*;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
/**
* @author wusheng
*/
+@Setter
+@Getter
public class ZipkinReceiverConfig extends ModuleConfig {
private String host;
private int port;
private String contextPath;
-
private int expireTime = 20;
-
private int maxCacheSize = 1_000_000;
-
- public String getHost() {
- return host;
- }
-
- public void setHost(String host) {
- this.host = host;
- }
-
- public int getPort() {
- return port;
- }
-
- public void setPort(int port) {
- this.port = port;
- }
-
- public String getContextPath() {
- return contextPath;
- }
-
- public void setContextPath(String contextPath) {
- this.contextPath = contextPath;
- }
-
- public int getExpireTime() {
- return expireTime;
- }
-
- public void setExpireTime(int expireTime) {
- this.expireTime = expireTime;
- }
-
- public int getMaxCacheSize() {
- return maxCacheSize;
- }
-
- public void setMaxCacheSize(int maxCacheSize) {
- this.maxCacheSize = maxCacheSize;
- }
+ private boolean needAnalysis = false;
+ private boolean registerZipkinEndpoint = true;
}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java
index 2eb2079..d9e55ef 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java
@@ -27,9 +27,10 @@ import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.*;
import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanV1JettyHandler;
import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanV2JettyHandler;
-import org.apache.skywalking.oap.server.receiver.zipkin.transform.Zipkin2SkyWalkingTransfer;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.Zipkin2SkyWalkingTransfer;
/**
* @author wusheng
@@ -65,12 +66,14 @@ public class ZipkinReceiverProvider extends ModuleProvider {
jettyServer = new JettyServer(config.getHost(), config.getPort(), config.getContextPath());
jettyServer.initialize();
- jettyServer.addHandler(new SpanV1JettyHandler(config));
- jettyServer.addHandler(new SpanV2JettyHandler(config));
+ jettyServer.addHandler(new SpanV1JettyHandler(config, getManager()));
+ jettyServer.addHandler(new SpanV2JettyHandler(config, getManager()));
- ISegmentParserService segmentParseService = getManager().find(TraceModule.NAME).provider().getService(ISegmentParserService.class);
- Receiver2AnalysisBridge bridge = new Receiver2AnalysisBridge(segmentParseService);
- Zipkin2SkyWalkingTransfer.INSTANCE.addListener(bridge);
+ if (config.isNeedAnalysis()) {
+ ISegmentParserService segmentParseService = getManager().find(TraceModule.NAME).provider().getService(ISegmentParserService.class);
+ Receiver2AnalysisBridge bridge = new Receiver2AnalysisBridge(segmentParseService);
+ Zipkin2SkyWalkingTransfer.INSTANCE.addListener(bridge);
+ }
}
@Override public void notifyAfterCompleted() throws ModuleStartException {
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/Receiver2AnalysisBridge.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/Receiver2AnalysisBridge.java
similarity index 82%
rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/Receiver2AnalysisBridge.java
rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/Receiver2AnalysisBridge.java
index 1381da3..53051e5 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/Receiver2AnalysisBridge.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/Receiver2AnalysisBridge.java
@@ -16,12 +16,12 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.zipkin;
+package org.apache.skywalking.oap.server.receiver.zipkin.analysis;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService;
-import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace;
-import org.apache.skywalking.oap.server.receiver.zipkin.transform.SegmentListener;
-import org.apache.skywalking.oap.server.receiver.zipkin.transform.Zipkin2SkyWalkingTransfer;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.SegmentListener;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.Zipkin2SkyWalkingTransfer;
/**
* Send the segments to Analysis module, like receiving segments from native SkyWalking agents.
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/ZipkinSkyWalkingTransfer.java
similarity index 50%
copy from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java
copy to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/ZipkinSkyWalkingTransfer.java
index 48774f9..c4ec894 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/ZipkinSkyWalkingTransfer.java
@@ -16,34 +16,15 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.zipkin.handler;
+package org.apache.skywalking.oap.server.receiver.zipkin.analysis;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
import java.util.List;
-import java.util.zip.GZIPInputStream;
-import javax.servlet.http.HttpServletRequest;
-import org.apache.skywalking.oap.server.receiver.zipkin.CoreRegisterLinker;
-import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
-import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinTraceOSInfoBuilder;
-import org.apache.skywalking.oap.server.receiver.zipkin.cache.CacheFactory;
+import org.apache.skywalking.oap.server.receiver.zipkin.*;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.CacheFactory;
import zipkin2.Span;
-import zipkin2.codec.SpanBytesDecoder;
-
-public class SpanProcessor {
- void convert(ZipkinReceiverConfig config, SpanBytesDecoder decoder, HttpServletRequest request) throws IOException {
- InputStream inputStream = getInputStream(request);
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- byte[] buffer = new byte[2048];
- int readCntOnce;
-
- while ((readCntOnce = inputStream.read(buffer)) >= 0) {
- out.write(buffer, 0, readCntOnce);
- }
-
- List<Span> spanList = decoder.decodeList(out.toByteArray());
+public class ZipkinSkyWalkingTransfer {
+ public void doTransfer(ZipkinReceiverConfig config, List<Span> spanList) {
spanList.forEach(span -> {
// In Zipkin, the local service name represents the application owner.
String applicationCode = span.localServiceName();
@@ -59,18 +40,4 @@ public class SpanProcessor {
CacheFactory.INSTANCE.get(config).addSpan(span);
});
}
-
- private InputStream getInputStream(HttpServletRequest request) throws IOException {
- InputStream requestInStream;
-
- String headEncoding = request.getHeader("accept-encoding");
- if (headEncoding != null && (headEncoding.indexOf("gzip") != -1)) {
- requestInStream = new GZIPInputStream(request.getInputStream());
- } else {
- requestInStream = request.getInputStream();
- }
-
- return requestInStream;
- }
-
}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinTraceOSInfoBuilder.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/ZipkinTraceOSInfoBuilder.java
similarity index 94%
rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinTraceOSInfoBuilder.java
rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/ZipkinTraceOSInfoBuilder.java
index 451ba53..7d286f8 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinTraceOSInfoBuilder.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/ZipkinTraceOSInfoBuilder.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.zipkin;
+package org.apache.skywalking.oap.server.receiver.zipkin.analysis;
import com.google.gson.JsonObject;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/CacheFactory.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/CacheFactory.java
similarity index 89%
rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/CacheFactory.java
rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/CacheFactory.java
index c4d5b06..8893001 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/CacheFactory.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/CacheFactory.java
@@ -16,10 +16,10 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.zipkin.cache;
+package org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
-import org.apache.skywalking.oap.server.receiver.zipkin.cache.caffeine.CaffeineSpanCache;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.caffeine.CaffeineSpanCache;
/**
* @author wusheng
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/ISpanCache.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/ISpanCache.java
similarity index 92%
rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/ISpanCache.java
rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/ISpanCache.java
index b122bcf..0f9f3e4 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/ISpanCache.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/ISpanCache.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.zipkin.cache;
+package org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache;
import zipkin2.Span;
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/caffeine/CaffeineSpanCache.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/caffeine/CaffeineSpanCache.java
similarity index 91%
rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/caffeine/CaffeineSpanCache.java
rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/caffeine/CaffeineSpanCache.java
index c42e710..d8dc260 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/caffeine/CaffeineSpanCache.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/caffeine/CaffeineSpanCache.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.zipkin.cache.caffeine;
+package org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.caffeine;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
@@ -28,9 +28,9 @@ import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
-import org.apache.skywalking.oap.server.receiver.zipkin.cache.ISpanCache;
-import org.apache.skywalking.oap.server.receiver.zipkin.data.ZipkinTrace;
-import org.apache.skywalking.oap.server.receiver.zipkin.transform.Zipkin2SkyWalkingTransfer;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.ISpanCache;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.ZipkinTrace;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.Zipkin2SkyWalkingTransfer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin2.Span;
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/SkyWalkingTrace.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/SkyWalkingTrace.java
similarity index 96%
rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/SkyWalkingTrace.java
rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/SkyWalkingTrace.java
index 38c59ec..ab8712c 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/SkyWalkingTrace.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/SkyWalkingTrace.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.zipkin.data;
+package org.apache.skywalking.oap.server.receiver.zipkin.analysis.data;
import java.util.LinkedList;
import java.util.List;
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/ZipkinTrace.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/ZipkinTrace.java
similarity index 95%
rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/ZipkinTrace.java
rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/ZipkinTrace.java
index d12beb8..e54a613 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/ZipkinTrace.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/ZipkinTrace.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.zipkin.data;
+package org.apache.skywalking.oap.server.receiver.zipkin.analysis.data;
import java.util.LinkedList;
import java.util.List;
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentBuilder.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentBuilder.java
similarity index 98%
rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentBuilder.java
rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentBuilder.java
index 3f687a6..e5af454 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentBuilder.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentBuilder.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.zipkin.transform;
+package org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform;
import com.google.common.base.Strings;
import java.util.*;
@@ -25,8 +25,9 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.skywalking.apm.network.common.KeyStringValuePair;
import org.apache.skywalking.apm.network.language.agent.*;
import org.apache.skywalking.apm.network.language.agent.v2.*;
-import org.apache.skywalking.oap.server.receiver.zipkin.*;
-import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace;
+import org.apache.skywalking.oap.server.receiver.zipkin.CoreRegisterLinker;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.*;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace;
import org.eclipse.jetty.util.StringUtil;
import zipkin2.*;
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentListener.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentListener.java
similarity index 84%
rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentListener.java
rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentListener.java
index 9a0b7c7..5c37c9a 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentListener.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentListener.java
@@ -16,9 +16,9 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.zipkin.transform;
+package org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform;
-import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace;
public interface SegmentListener {
void notify(SkyWalkingTrace trace);
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/Zipkin2SkyWalkingTransfer.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/Zipkin2SkyWalkingTransfer.java
similarity index 86%
rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/Zipkin2SkyWalkingTransfer.java
rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/Zipkin2SkyWalkingTransfer.java
index b41e50e..25a59ab 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/Zipkin2SkyWalkingTransfer.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/Zipkin2SkyWalkingTransfer.java
@@ -16,12 +16,12 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.zipkin.transform;
+package org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform;
import java.util.LinkedList;
import java.util.List;
-import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace;
-import org.apache.skywalking.oap.server.receiver.zipkin.data.ZipkinTrace;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.ZipkinTrace;
import zipkin2.Span;
/**
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java
index 48774f9..a033f4c 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java
@@ -18,20 +18,31 @@
package org.apache.skywalking.oap.server.receiver.zipkin.handler;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
+import java.io.*;
import java.util.List;
import java.util.zip.GZIPInputStream;
import javax.servlet.http.HttpServletRequest;
-import org.apache.skywalking.oap.server.receiver.zipkin.CoreRegisterLinker;
+import org.apache.skywalking.oap.server.core.cache.*;
+import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
-import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinTraceOSInfoBuilder;
-import org.apache.skywalking.oap.server.receiver.zipkin.cache.CacheFactory;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.ZipkinSkyWalkingTransfer;
+import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward;
import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;
public class SpanProcessor {
+ private SourceReceiver receiver;
+ private ServiceInventoryCache serviceInventoryCache;
+ private EndpointInventoryCache endpointInventoryCache;
+
+ public SpanProcessor(SourceReceiver receiver,
+ ServiceInventoryCache serviceInventoryCache,
+ EndpointInventoryCache endpointInventoryCache) {
+ this.receiver = receiver;
+ this.serviceInventoryCache = serviceInventoryCache;
+ this.endpointInventoryCache = endpointInventoryCache;
+ }
+
void convert(ZipkinReceiverConfig config, SpanBytesDecoder decoder, HttpServletRequest request) throws IOException {
InputStream inputStream = getInputStream(request);
ByteArrayOutputStream out = new ByteArrayOutputStream();
@@ -44,20 +55,13 @@ public class SpanProcessor {
List<Span> spanList = decoder.decodeList(out.toByteArray());
- spanList.forEach(span -> {
- // In Zipkin, the local service name represents the application owner.
- String applicationCode = span.localServiceName();
- if (applicationCode != null) {
- int applicationId = CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(applicationCode, null);
- if (applicationId != 0) {
- CoreRegisterLinker.getServiceInstanceInventoryRegister().getOrCreate(applicationId, applicationCode, applicationCode,
- span.timestampAsLong(),
- ZipkinTraceOSInfoBuilder.getOSInfoForZipkin(applicationCode));
- }
- }
-
- CacheFactory.INSTANCE.get(config).addSpan(span);
- });
+ if (config.isNeedAnalysis()) {
+ ZipkinSkyWalkingTransfer transfer = new ZipkinSkyWalkingTransfer();
+ transfer.doTransfer(config, spanList);
+ } else {
+ SpanForward forward = new SpanForward(config, receiver, serviceInventoryCache, endpointInventoryCache);
+ forward.send(spanList);
+ }
}
private InputStream getInputStream(HttpServletRequest request) throws IOException {
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java
index 77f38e4..8f3634f 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java
@@ -18,20 +18,29 @@
package org.apache.skywalking.oap.server.receiver.zipkin.handler;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.*;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.cache.*;
+import org.apache.skywalking.oap.server.core.source.SourceReceiver;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.jetty.JettyHandler;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
import zipkin2.codec.SpanBytesDecoder;
public class SpanV1JettyHandler extends JettyHandler {
private static final Logger logger = LoggerFactory.getLogger(SpanV2JettyHandler.class);
private ZipkinReceiverConfig config;
+ private SourceReceiver sourceReceiver;
+ private ServiceInventoryCache serviceInventoryCache;
+ private EndpointInventoryCache endpointInventoryCache;
- public SpanV1JettyHandler(ZipkinReceiverConfig config) {
+ public SpanV1JettyHandler(ZipkinReceiverConfig config,
+ ModuleManager manager) {
+ sourceReceiver = manager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
+ serviceInventoryCache = manager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class);
+ endpointInventoryCache = manager.find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class);
this.config = config;
}
@@ -49,10 +58,10 @@ public class SpanV1JettyHandler extends JettyHandler {
String type = request.getHeader("Content-Type");
SpanBytesDecoder decoder = type != null && type.contains("/x-thrift")
- ? SpanBytesDecoder.THRIFT
- : SpanBytesDecoder.JSON_V1;
+ ? SpanBytesDecoder.THRIFT
+ : SpanBytesDecoder.JSON_V1;
- SpanProcessor processor = new SpanProcessor();
+ SpanProcessor processor = new SpanProcessor(sourceReceiver, serviceInventoryCache, endpointInventoryCache);
processor.convert(config, decoder, request);
response.setStatus(202);
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java
index 7c8705a..92cf3e7 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java
@@ -18,12 +18,14 @@
package org.apache.skywalking.oap.server.receiver.zipkin.handler;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.*;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.cache.*;
+import org.apache.skywalking.oap.server.core.source.SourceReceiver;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.jetty.JettyHandler;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
import zipkin2.codec.SpanBytesDecoder;
/**
@@ -33,8 +35,15 @@ public class SpanV2JettyHandler extends JettyHandler {
private static final Logger logger = LoggerFactory.getLogger(SpanV2JettyHandler.class);
private ZipkinReceiverConfig config;
+ private SourceReceiver sourceReceiver;
+ private ServiceInventoryCache serviceInventoryCache;
+ private EndpointInventoryCache endpointInventoryCache;
- public SpanV2JettyHandler(ZipkinReceiverConfig config) {
+ public SpanV2JettyHandler(ZipkinReceiverConfig config,
+ ModuleManager manager) {
+ sourceReceiver = manager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
+ serviceInventoryCache = manager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class);
+ endpointInventoryCache = manager.find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class);
this.config = config;
}
@@ -55,7 +64,7 @@ public class SpanV2JettyHandler extends JettyHandler {
? SpanBytesDecoder.PROTO3
: SpanBytesDecoder.JSON_V2;
- SpanProcessor processor = new SpanProcessor();
+ SpanProcessor processor = new SpanProcessor(sourceReceiver, serviceInventoryCache, endpointInventoryCache);
processor.convert(config, decoder, request);
response.setStatus(202);
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java
new file mode 100644
index 0000000..2665169
--- /dev/null
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.zipkin.trace;
+
+import java.util.List;
+import org.apache.skywalking.apm.util.StringUtil;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.cache.*;
+import org.apache.skywalking.oap.server.core.source.*;
+import org.apache.skywalking.oap.server.library.util.BooleanUtils;
+import org.apache.skywalking.oap.server.receiver.zipkin.*;
+import org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpan;
+import zipkin2.Span;
+
+/**
+ * @author wusheng
+ */
+public class SpanForward {
+ private ZipkinReceiverConfig config;
+ private SourceReceiver receiver;
+ private ServiceInventoryCache serviceInventoryCache;
+ private EndpointInventoryCache endpointInventoryCache;
+
+ public SpanForward(ZipkinReceiverConfig config, SourceReceiver receiver,
+ ServiceInventoryCache serviceInventoryCache,
+ EndpointInventoryCache endpointInventoryCache) {
+ this.config = config;
+ this.receiver = receiver;
+ this.serviceInventoryCache = serviceInventoryCache;
+ this.endpointInventoryCache = endpointInventoryCache;
+ }
+
+ public void send(List<Span> spanList) {
+ spanList.forEach(span -> {
+ ZipkinSpan zipkinSpan = new ZipkinSpan();
+ zipkinSpan.setTraceId(span.traceId());
+ zipkinSpan.setSpanId(span.id());
+ String serviceName = span.localServiceName();
+ int serviceId = Const.NONE;
+ if (!StringUtil.isEmpty(serviceName)) {
+ serviceId = serviceInventoryCache.getServiceId(serviceName);
+ if (serviceId != Const.NONE) {
+ zipkinSpan.setServiceId(serviceId);
+ } else {
+ /**
+ * Only register, but don't wait.
+ * For this span, service id will be missed.
+ */
+ CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(serviceName, null);
+ }
+ }
+
+ String spanName = span.name();
+ Span.Kind kind = span.kind();
+ switch (kind) {
+ case SERVER:
+ case CONSUMER:
+ if (!StringUtil.isEmpty(spanName) && serviceId != Const.NONE) {
+ int endpointId = endpointInventoryCache.getEndpointId(serviceId, spanName,
+ DetectPoint.SERVER.ordinal());
+ if (endpointId != Const.NONE) {
+ zipkinSpan.setEndpointId(endpointId);
+ } else if (config.isRegisterZipkinEndpoint()) {
+ CoreRegisterLinker.getEndpointInventoryRegister().getOrCreate(serviceId, spanName, DetectPoint.SERVER);
+ }
+ }
+ }
+ if (!StringUtil.isEmpty(spanName)) {
+ zipkinSpan.setEndpointName(spanName);
+ }
+
+ zipkinSpan.setStartTime(span.timestampAsLong());
+ zipkinSpan.setEndTime(span.timestampAsLong() + span.durationAsLong());
+ zipkinSpan.setIsError(BooleanUtils.booleanToValue(false));
+
+ receiver.receive(zipkinSpan);
+ });
+ }
+}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SpringSleuthSegmentBuilderTest.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SpringSleuthSegmentBuilderTest.java
similarity index 98%
rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SpringSleuthSegmentBuilderTest.java
rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SpringSleuthSegmentBuilderTest.java
index 9a53885..0c576c7 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SpringSleuthSegmentBuilderTest.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SpringSleuthSegmentBuilderTest.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.zipkin.transform;
+package org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform;
import com.google.gson.JsonObject;
import java.io.UnsupportedEncodingException;
@@ -26,7 +26,7 @@ import org.apache.skywalking.apm.network.language.agent.v2.*;
import org.apache.skywalking.oap.server.core.register.NodeType;
import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.receiver.zipkin.CoreRegisterLinker;
-import org.apache.skywalking.oap.server.receiver.zipkin.data.*;
+import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.*;
import org.junit.*;
import org.powermock.reflect.Whitebox;
import zipkin2.Span;
diff --git a/oap-server/server-starter/pom.xml b/oap-server/server-starter/pom.xml
index 37c5330..cfd77d3 100644
--- a/oap-server/server-starter/pom.xml
+++ b/oap-server/server-starter/pom.xml
@@ -126,6 +126,11 @@
<artifactId>storage-elasticsearch-plugin</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>storage-zipkin-plugin</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- storage module -->
<!-- queryBuild module -->
diff --git a/oap-server/server-storage-plugin/pom.xml b/oap-server/server-storage-plugin/pom.xml
index 4cb4fcb..99291e1 100644
--- a/oap-server/server-storage-plugin/pom.xml
+++ b/oap-server/server-storage-plugin/pom.xml
@@ -30,6 +30,7 @@
<modules>
<module>storage-jdbc-hikaricp-plugin</module>
<module>storage-elasticsearch-plugin</module>
+ <module>storage-zipkin-plugin</module>
</modules>
</project>
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
index 7d461aa..6608a58 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
@@ -59,7 +59,7 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
this.password = password;
}
- int getIndexShardsNumber() {
+ public int getIndexShardsNumber() {
return indexShardsNumber;
}
@@ -67,7 +67,7 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
this.indexShardsNumber = indexShardsNumber;
}
- int getIndexReplicasNumber() {
+ public int getIndexReplicasNumber() {
return indexReplicasNumber;
}
diff --git a/oap-server/server-storage-plugin/pom.xml b/oap-server/server-storage-plugin/storage-zipkin-plugin/pom.xml
similarity index 63%
copy from oap-server/server-storage-plugin/pom.xml
copy to oap-server/server-storage-plugin/storage-zipkin-plugin/pom.xml
index 4cb4fcb..658b47a 100644
--- a/oap-server/server-storage-plugin/pom.xml
+++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/pom.xml
@@ -17,19 +17,23 @@
~
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>oap-server</artifactId>
+ <artifactId>server-storage-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>6.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>server-storage-plugin</artifactId>
- <packaging>pom</packaging>
- <modules>
- <module>storage-jdbc-hikaricp-plugin</module>
- <module>storage-elasticsearch-plugin</module>
- </modules>
+ <artifactId>storage-zipkin-plugin</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>storage-elasticsearch-plugin</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
</project>
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpan.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpan.java
new file mode 100644
index 0000000..0072d9f
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpan.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.zipkin;
+
+import lombok.*;
+import org.apache.skywalking.oap.server.core.source.*;
+
+import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.*;
+
+/**
+ * @author peng-yongsheng
+ */
+@ScopeDeclaration(id = ZIPKIN_SPAN, name = "ZipkinSpan")
+public class ZipkinSpan extends Source {
+
+ @Override public int scope() {
+ return DefaultScopeDefine.SEGMENT;
+ }
+
+ @Override public String getEntityId() {
+ return spanId;
+ }
+
+ @Setter @Getter private String traceId;
+ @Setter @Getter private String spanId;
+ @Setter @Getter private int serviceId;
+ @Setter @Getter private int serviceInstanceId;
+ @Setter @Getter private String endpointName;
+ @Setter @Getter private int endpointId;
+ @Setter @Getter private long startTime;
+ @Setter @Getter private long endTime;
+ @Setter @Getter private int latency;
+ @Setter @Getter private int isError;
+ @Setter @Getter private byte[] dataBinary;
+ @Setter @Getter private int version;
+}
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java
new file mode 100644
index 0000000..3b97139
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.zipkin;
+
+import java.util.*;
+import lombok.*;
+import org.apache.skywalking.apm.util.StringUtil;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordType;
+import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.annotation.*;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+
+@RecordType
+@StorageEntity(name = ZipkinSpanRecord.INDEX_NAME, builder = ZipkinSpanRecord.Builder.class, sourceScopeId = DefaultScopeDefine.ZIPKIN_SPAN)
+public class ZipkinSpanRecord extends Record {
+ public static final String INDEX_NAME = "segment";
+ public static final String TRACE_ID = "trace_id";
+ public static final String SPAN_ID = "span_id";
+ public static final String SERVICE_ID = "service_id";
+ public static final String SERVICE_INSTANCE_ID = "service_instance_id";
+ public static final String ENDPOINT_NAME = "endpoint_name";
+ public static final String ENDPOINT_ID = "endpoint_id";
+ public static final String START_TIME = "start_time";
+ public static final String END_TIME = "end_time";
+ public static final String LATENCY = "latency";
+ public static final String IS_ERROR = "is_error";
+ public static final String DATA_BINARY = "data_binary";
+ public static final String VERSION = "version";
+
+ @Setter @Getter @Column(columnName = TRACE_ID) @IDColumn private String traceId;
+ @Setter @Getter @Column(columnName = SPAN_ID) @IDColumn private String spanId;
+ @Setter @Getter @Column(columnName = SERVICE_ID) @IDColumn private int serviceId;
+ @Setter @Getter @Column(columnName = SERVICE_INSTANCE_ID) @IDColumn private int serviceInstanceId;
+ @Setter @Getter @Column(columnName = ENDPOINT_NAME, matchQuery = true) @IDColumn private String endpointName;
+ @Setter @Getter @Column(columnName = ENDPOINT_ID) @IDColumn private int endpointId;
+ @Setter @Getter @Column(columnName = START_TIME) @IDColumn private long startTime;
+ @Setter @Getter @Column(columnName = END_TIME) @IDColumn private long endTime;
+ @Setter @Getter @Column(columnName = LATENCY) @IDColumn private int latency;
+ @Setter @Getter @Column(columnName = IS_ERROR) @IDColumn private int isError;
+ @Setter @Getter @Column(columnName = DATA_BINARY) @IDColumn private byte[] dataBinary;
+ @Setter @Getter @Column(columnName = VERSION) @IDColumn private int version;
+
+ @Override public String id() {
+ return traceId + "-" + spanId;
+ }
+
+ public static class Builder implements StorageBuilder<ZipkinSpanRecord> {
+
+ @Override public Map<String, Object> data2Map(ZipkinSpanRecord storageData) {
+ Map<String, Object> map = new HashMap<>();
+ map.put(TRACE_ID, storageData.getTraceId());
+ map.put(SPAN_ID, storageData.getSpanId());
+ map.put(SERVICE_ID, storageData.getServiceId());
+ map.put(SERVICE_INSTANCE_ID, storageData.getServiceInstanceId());
+ map.put(ENDPOINT_NAME, storageData.getEndpointName());
+ map.put(ENDPOINT_ID, storageData.getEndpointId());
+ map.put(START_TIME, storageData.getStartTime());
+ map.put(END_TIME, storageData.getEndTime());
+ map.put(LATENCY, storageData.getLatency());
+ map.put(IS_ERROR, storageData.getIsError());
+ map.put(TIME_BUCKET, storageData.getTimeBucket());
+ if (CollectionUtils.isEmpty(storageData.getDataBinary())) {
+ map.put(DATA_BINARY, Const.EMPTY_STRING);
+ } else {
+ map.put(DATA_BINARY, new String(Base64.getEncoder().encode(storageData.getDataBinary())));
+ }
+ map.put(VERSION, storageData.getVersion());
+ return map;
+ }
+
+ @Override public ZipkinSpanRecord map2Data(Map<String, Object> dbMap) {
+ ZipkinSpanRecord record = new ZipkinSpanRecord();
+ record.setTraceId((String)dbMap.get(TRACE_ID));
+ record.setSpanId((String)dbMap.get(SPAN_ID));
+ record.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue());
+ record.setServiceInstanceId(((Number)dbMap.get(SERVICE_INSTANCE_ID)).intValue());
+ record.setEndpointName((String)dbMap.get(ENDPOINT_NAME));
+ record.setEndpointId(((Number)dbMap.get(ENDPOINT_ID)).intValue());
+ record.setStartTime(((Number)dbMap.get(START_TIME)).longValue());
+ record.setEndTime(((Number)dbMap.get(END_TIME)).longValue());
+ record.setLatency(((Number)dbMap.get(LATENCY)).intValue());
+ record.setIsError(((Number)dbMap.get(IS_ERROR)).intValue());
+ record.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
+ if (StringUtil.isEmpty((String)dbMap.get(DATA_BINARY))) {
+ record.setDataBinary(new byte[] {});
+ } else {
+ record.setDataBinary(Base64.getDecoder().decode((String)dbMap.get(DATA_BINARY)));
+ }
+ record.setVersion(((Number)dbMap.get(VERSION)).intValue());
+ return record;
+ }
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecordDispatcher.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecordDispatcher.java
new file mode 100644
index 0000000..71aa9b2
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecordDispatcher.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.zipkin;
+
+import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
+import org.apache.skywalking.oap.server.core.analysis.worker.RecordProcess;
+
+/**
+ * Dispatch for Zipkin native mode spans.
+ *
+ * @author wusheng
+ */
+public class ZipkinSpanRecordDispatcher implements SourceDispatcher<ZipkinSpan> {
+ @Override public void dispatch(ZipkinSpan source) {
+ ZipkinSpanRecord segment = new ZipkinSpanRecord();
+ segment.setTraceId(source.getTraceId());
+ segment.setSpanId(source.getSpanId());
+ segment.setServiceId(source.getServiceId());
+ segment.setServiceInstanceId(source.getServiceInstanceId());
+ segment.setEndpointName(source.getEndpointName());
+ segment.setEndpointId(source.getEndpointId());
+ segment.setStartTime(source.getStartTime());
+ segment.setEndTime(source.getEndTime());
+ segment.setLatency(source.getLatency());
+ segment.setIsError(source.getIsError());
+ segment.setDataBinary(source.getDataBinary());
+ segment.setTimeBucket(source.getTimeBucket());
+ segment.setVersion(source.getVersion());
+
+ RecordProcess.INSTANCE.in(segment);
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/StorageModuleElasticsearchProvider.java
new file mode 100644
index 0000000..472d01a
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.zipkin.elasticsearch;
+
+import org.apache.skywalking.apm.util.StringUtil;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.cache.*;
+import org.apache.skywalking.oap.server.core.storage.query.*;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.*;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.*;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock.*;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.*;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class StorageModuleElasticsearchProvider extends ModuleProvider {
+
+ private static final Logger logger = LoggerFactory.getLogger(StorageModuleElasticsearchProvider.class);
+
+ private final StorageModuleElasticsearchConfig config;
+ private ElasticSearchClient elasticSearchClient;
+
+ public StorageModuleElasticsearchProvider() {
+ super();
+ this.config = new StorageModuleElasticsearchConfig();
+ }
+
+ @Override
+ public String name() {
+ return "elasticsearch";
+ }
+
+ @Override
+ public Class<? extends ModuleDefine> module() {
+ return StorageModule.class;
+ }
+
+ @Override
+ public ModuleConfig createConfigBeanIfAbsent() {
+ return config;
+ }
+
+ @Override
+ public void prepare() throws ServiceNotProvidedException {
+ if (!StringUtil.isEmpty(config.getNameSpace())) {
+ config.setNameSpace(config.getNameSpace().toLowerCase());
+ }
+ elasticSearchClient = new ElasticSearchClient(config.getClusterNodes(), config.getNameSpace(), config.getUser(), config.getPassword());
+
+ this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getBulkSize(), config.getFlushInterval(), config.getConcurrentRequests()));
+ this.registerServiceImplementation(StorageDAO.class, new StorageEsDAO(elasticSearchClient));
+ this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockDAOImpl(elasticSearchClient));
+ this.registerServiceImplementation(IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(elasticSearchClient));
+
+ this.registerServiceImplementation(IServiceInventoryCacheDAO.class, new ServiceInventoryCacheEsDAO(elasticSearchClient));
+ this.registerServiceImplementation(IServiceInstanceInventoryCacheDAO.class, new ServiceInstanceInventoryCacheDAO(elasticSearchClient));
+ this.registerServiceImplementation(IEndpointInventoryCacheDAO.class, new EndpointInventoryCacheEsDAO(elasticSearchClient));
+ this.registerServiceImplementation(INetworkAddressInventoryCacheDAO.class, new NetworkAddressInventoryCacheEsDAO(elasticSearchClient));
+
+ this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearchClient));
+ this.registerServiceImplementation(IMetricQueryDAO.class, new MetricQueryEsDAO(elasticSearchClient));
+ this.registerServiceImplementation(ITraceQueryDAO.class, new TraceQueryEsDAO(elasticSearchClient));
+ this.registerServiceImplementation(IMetadataQueryDAO.class, new MetadataQueryEsDAO(elasticSearchClient));
+ this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQueryEsDAO(elasticSearchClient));
+ this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEsDAO(elasticSearchClient));
+ this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQueryEsDAO(elasticSearchClient));
+ }
+
+ @Override
+ public void start() throws ModuleStartException {
+ try {
+ elasticSearchClient.connect();
+
+ StorageEsInstaller installer = new StorageEsInstaller(getManager(), config.getIndexShardsNumber(), config.getIndexReplicasNumber());
+ installer.install(elasticSearchClient);
+
+ RegisterLockInstaller lockInstaller = new RegisterLockInstaller(elasticSearchClient);
+ lockInstaller.install();
+ } catch (StorageException e) {
+ throw new ModuleStartException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void notifyAfterCompleted() {
+ }
+
+ @Override
+ public String[] requiredModules() {
+ return new String[] {CoreModule.NAME};
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
new file mode 100644
index 0000000..5f028bc
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.storage.plugin.zipkin.elasticsearch.StorageModuleElasticsearchProvider
\ No newline at end of file