You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2018/11/14 14:39:30 UTC
[incubator-skywalking] 01/01: Change some codes to make zipkin
receiver works, not done yet.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch zipkin-receiver
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
commit db16cd7798523b43d63e9232db55624068a83b8c
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Wed Nov 14 22:39:11 2018 +0800
Change some codes to make zipkin receiver works, not done yet.
---
oap-server/pom.xml | 14 +
.../server/receiver/trace/module/TraceModule.java | 8 +-
.../trace/provider/TraceModuleProvider.java | 4 +-
.../parser/ISegmentParserListenerManager.java} | 18 +-
.../parser/SegmentParserListenerManager.java | 6 +-
.../zipkin-receiver-plugin/pom.xml | 27 +-
.../server/receiver/zipkin/CoreRegisterLinker.java | 57 +++
.../receiver/zipkin/Receiver2AnalysisBridge.java | 47 ++
.../server/receiver/zipkin/RegisterServices.java | 78 ++++
.../receiver/zipkin/ZipkinReceiverConfig.java | 73 ++++
.../receiver/zipkin/ZipkinReceiverModule.java} | 15 +-
.../receiver/zipkin/ZipkinReceiverProvider.java | 81 ++++
.../receiver/zipkin/cache/CacheFactory.java} | 31 +-
.../server/receiver/zipkin/cache/ISpanCache.java} | 17 +-
.../zipkin/cache/caffeine/CaffeineSpanCache.java | 98 +++++
.../receiver/zipkin/data/SkyWalkingTrace.java | 57 +++
.../server/receiver/zipkin/data/ZipkinTrace.java | 62 +++
.../receiver/zipkin/handler/SpanProcessor.java | 74 ++++
.../zipkin/handler/SpanV1JettyHandler.java | 66 +++
.../zipkin/handler/SpanV2JettyHandler.java | 68 +++
.../receiver/zipkin/transform/SegmentBuilder.java | 476 +++++++++++++++++++++
.../zipkin/transform/SegmentListener.java} | 18 +-
.../transform/Zipkin2SkyWalkingTransfer.java | 57 +++
...ywalking.apm.collector.core.module.ModuleDefine | 37 ++
...alking.apm.collector.core.module.ModuleProvider | 37 ++
25 files changed, 1464 insertions(+), 62 deletions(-)
diff --git a/oap-server/pom.xml b/oap-server/pom.xml
index 81d5b89..c82ba6c 100644
--- a/oap-server/pom.xml
+++ b/oap-server/pom.xml
@@ -60,6 +60,8 @@
<joda-time.version>2.9.9</joda-time.version>
<kubernetes.version>2.0.0</kubernetes.version>
<hikaricp.version>3.1.0</hikaricp.version>
+ <zipkin.version>2.9.1</zipkin.version>
+ <caffeine.version>2.6.2</caffeine.version>
</properties>
<dependencies>
@@ -276,6 +278,18 @@
<artifactId>HikariCP</artifactId>
<version>${hikaricp.version}</version>
</dependency>
+ <!-- for zipkin receiver -->
+ <dependency>
+ <groupId>io.zipkin.zipkin2</groupId>
+ <artifactId>zipkin</artifactId>
+ <version>${zipkin.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ <version>${caffeine.version}</version>
+ </dependency>
+ <!-- -->
</dependencies>
</dependencyManagement>
</project>
\ No newline at end of file
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/module/TraceModule.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/module/TraceModule.java
index 6b0558f..2a926a4 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/module/TraceModule.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/module/TraceModule.java
@@ -19,17 +19,21 @@
package org.apache.skywalking.oap.server.receiver.trace.module;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserListenerManager;
/**
* @author peng-yongsheng
*/
public class TraceModule extends ModuleDefine {
+ public static final String NAME = "receiver-trace";
@Override public String name() {
- return "receiver-trace";
+ return NAME;
}
@Override public Class[] services() {
- return new Class[0];
+ return new Class[] {
+ ISegmentParserListenerManager.class
+ };
}
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
index 0a2a739..edf2441 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
@@ -57,12 +57,14 @@ public class TraceModuleProvider extends ModuleProvider {
@Override public void prepare() {
}
- @Override public void start() throws ModuleStartException {
+ @Override public void start() throws ModuleStartException, ServiceNotProvidedException {
SegmentParserListenerManager listenerManager = new SegmentParserListenerManager();
listenerManager.add(new MultiScopesSpanListener.Factory());
listenerManager.add(new ServiceMappingSpanListener.Factory());
listenerManager.add(new SegmentSpanListener.Factory());
+ registerServiceImplementation(ISegmentParserListenerManager.class, listenerManager);
+
GRPCHandlerRegister grpcHandlerRegister = getManager().find(CoreModule.NAME).getService(GRPCHandlerRegister.class);
JettyHandlerRegister jettyHandlerRegister = getManager().find(CoreModule.NAME).getService(JettyHandlerRegister.class);
try {
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/module/TraceModule.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserListenerManager.java
similarity index 68%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/module/TraceModule.java
copy to oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserListenerManager.java
index 6b0558f..90ca1c6 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/module/TraceModule.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserListenerManager.java
@@ -16,20 +16,14 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.trace.module;
+package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.library.module.Service;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListenerFactory;
/**
- * @author peng-yongsheng
+ * @author wusheng
*/
-public class TraceModule extends ModuleDefine {
-
- @Override public String name() {
- return "receiver-trace";
- }
-
- @Override public Class[] services() {
- return new Class[0];
- }
+public interface ISegmentParserListenerManager extends Service {
+ void add(SpanListenerFactory spanListenerFactory);
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParserListenerManager.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParserListenerManager.java
index a2dd303..36a80a4 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParserListenerManager.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParserListenerManager.java
@@ -18,13 +18,14 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
-import java.util.*;
+import java.util.LinkedList;
+import java.util.List;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListenerFactory;
/**
* @author peng-yongsheng
*/
-public class SegmentParserListenerManager {
+public class SegmentParserListenerManager implements ISegmentParserListenerManager {
private List<SpanListenerFactory> spanListenerFactories;
@@ -32,6 +33,7 @@ public class SegmentParserListenerManager {
this.spanListenerFactories = new LinkedList<>();
}
+ @Override
public void add(SpanListenerFactory spanListenerFactory) {
spanListenerFactories.add(spanListenerFactory);
}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml
index 58699c1..f171ab0 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml
@@ -17,7 +17,8 @@
~
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>server-receiver-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
@@ -28,4 +29,28 @@
<artifactId>zipkin-receiver-plugin</artifactId>
<packaging>jar</packaging>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>skywalking-trace-receiver-plugin</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>skywalking-register-receiver-plugin</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.zipkin.zipkin2</groupId>
+ <artifactId>zipkin</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ </dependency>
+ </dependencies>
</project>
\ No newline at end of file
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/CoreRegisterLinker.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/CoreRegisterLinker.java
new file mode 100644
index 0000000..b974fe1
--- /dev/null
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/CoreRegisterLinker.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.zipkin;
+
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.register.service.IEndpointInventoryRegister;
+import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
+import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+public class CoreRegisterLinker {
+ private static volatile ModuleManager MODULE_MANAGER;
+ private static volatile IServiceInventoryRegister SERVICE_INVENTORY_REGISTER;
+ private static volatile IServiceInstanceInventoryRegister SERVICE_INSTANCE_INVENTORY_REGISTER;
+ private static volatile IEndpointInventoryRegister ENDPOINT_INVENTORY_REGISTER;
+
+ public static void setModuleManager(ModuleManager moduleManager) {
+ CoreRegisterLinker.MODULE_MANAGER = moduleManager;
+ }
+
+ public static IServiceInventoryRegister getServiceInventoryRegister() {
+ if (SERVICE_INVENTORY_REGISTER == null) {
+ SERVICE_INVENTORY_REGISTER = MODULE_MANAGER.find(CoreModule.NAME).getService(IServiceInventoryRegister.class);
+ }
+ return SERVICE_INVENTORY_REGISTER;
+ }
+
+ public static IServiceInstanceInventoryRegister getServiceInstanceInventoryRegister() {
+ if (SERVICE_INSTANCE_INVENTORY_REGISTER == null) {
+ SERVICE_INSTANCE_INVENTORY_REGISTER = MODULE_MANAGER.find(CoreModule.NAME).getService(IServiceInstanceInventoryRegister.class);
+ }
+ return SERVICE_INSTANCE_INVENTORY_REGISTER;
+ }
+
+ public static IEndpointInventoryRegister getEndpointInventoryRegister() {
+ if (ENDPOINT_INVENTORY_REGISTER == null) {
+ ENDPOINT_INVENTORY_REGISTER = MODULE_MANAGER.find(CoreModule.NAME).getService(IEndpointInventoryRegister.class);
+ }
+ return ENDPOINT_INVENTORY_REGISTER;
+ }
+}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/Receiver2AnalysisBridge.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/Receiver2AnalysisBridge.java
new file mode 100644
index 0000000..c8eb7bb
--- /dev/null
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/Receiver2AnalysisBridge.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.zipkin;
+
+import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace;
+import org.apache.skywalking.oap.server.receiver.zipkin.transform.SegmentListener;
+import org.apache.skywalking.oap.server.receiver.zipkin.transform.Zipkin2SkyWalkingTransfer;
+
+/**
+ * Send the segments to Analysis module, like receiving segments from native SkyWalking agents.
+ */
+public class Receiver2AnalysisBridge implements SegmentListener {
+ private ISegmentParseService segmentParseService;
+
+ public Receiver2AnalysisBridge(ISegmentParseService segmentParseService) {
+ this.segmentParseService = segmentParseService;
+ }
+
+ /**
+ * Add this bridge as listener to Zipkin span transfer.
+ */
+ public void build() {
+ Zipkin2SkyWalkingTransfer.INSTANCE.addListener(this);
+ }
+
+ @Override
+ public void notify(SkyWalkingTrace trace) {
+ trace.toUpstreamSegment().forEach(upstream -> segmentParseService.parse(upstream.build(), ISegmentParseService.Source.Agent));
+
+ }
+}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/RegisterServices.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/RegisterServices.java
new file mode 100644
index 0000000..5b0f40c
--- /dev/null
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/RegisterServices.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.zipkin;
+
+import org.apache.skywalking.apm.collector.analysis.register.define.service.AgentOsInfo;
+import org.apache.skywalking.apm.collector.analysis.register.define.service.IApplicationIDService;
+import org.apache.skywalking.apm.collector.analysis.register.define.service.IInstanceIDService;
+import org.apache.skywalking.apm.collector.analysis.register.define.service.INetworkAddressIDService;
+import org.apache.skywalking.apm.collector.analysis.register.define.service.IServiceNameService;
+
+/**
+ * @author wusheng
+ */
+public class RegisterServices {
+ private IApplicationIDService applicationIDService;
+
+ private IInstanceIDService instanceIDService;
+
+ private INetworkAddressIDService networkAddressIDService;
+
+ private IServiceNameService serviceNameService;
+
+ public RegisterServices(
+ IApplicationIDService applicationIDService,
+ IInstanceIDService instanceIDService,
+ INetworkAddressIDService networkAddressIDService,
+ IServiceNameService serviceNameService) {
+ this.applicationIDService = applicationIDService;
+ this.instanceIDService = instanceIDService;
+ this.networkAddressIDService = networkAddressIDService;
+ this.serviceNameService = serviceNameService;
+ }
+
+ public IApplicationIDService getApplicationIDService() {
+ return applicationIDService;
+ }
+
+ public IInstanceIDService getInstanceIDService() {
+ return instanceIDService;
+ }
+
+ public INetworkAddressIDService getNetworkAddressIDService() {
+ return networkAddressIDService;
+ }
+
+ public IServiceNameService getServiceNameService() {
+ return serviceNameService;
+ }
+
+ /**
+ * @param applicationId
+ * @param agentUUID in zipkin translation, always means application code. Because no UUID for each process.
+ * @return
+ */
+ public int getOrCreateApplicationInstanceId(int applicationId, String agentUUID) {
+ AgentOsInfo agentOsInfo = new AgentOsInfo();
+ agentOsInfo.setHostname("N/A");
+ agentOsInfo.setOsName("N/A");
+ agentOsInfo.setProcessNo(-1);
+ return getInstanceIDService().getOrCreateByAgentUUID(applicationId, agentUUID, System.currentTimeMillis(), agentOsInfo);
+ }
+}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java
new file mode 100644
index 0000000..bfdd640
--- /dev/null
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.zipkin;
+
+
+/**
+ * @author wusheng
+ */
+public class ZipkinReceiverConfig {
+ private String host;
+ private int port;
+ private String contextPath;
+
+ private int expireTime = 20;
+
+ private int maxCacheSize = 1_000_000;
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public String getContextPath() {
+ return contextPath;
+ }
+
+ public void setContextPath(String contextPath) {
+ this.contextPath = contextPath;
+ }
+
+ public int getExpireTime() {
+ return expireTime;
+ }
+
+ public void setExpireTime(int expireTime) {
+ this.expireTime = expireTime;
+ }
+
+ public int getMaxCacheSize() {
+ return maxCacheSize;
+ }
+
+ public void setMaxCacheSize(int maxCacheSize) {
+ this.maxCacheSize = maxCacheSize;
+ }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/module/TraceModule.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverModule.java
similarity index 63%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/module/TraceModule.java
copy to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverModule.java
index 6b0558f..c51f0a4 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/module/TraceModule.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverModule.java
@@ -16,17 +16,24 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.trace.module;
+package org.apache.skywalking.oap.server.receiver.zipkin;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
/**
- * @author peng-yongsheng
+ * Zipkin receiver module provides the HTTP, protoc serve for any SDK or agent by following Zipkin format.
+ *
+ * At this moment, Zipkin format is not compatible with SkyWalking, especially HEADERs.
+ * Please don't consider this as a Zipkin-SkyWalking integration,
+ * it is provided for adding analysis, aggregation and visualization capabilities to zipkin backend.
+ *
+ * @author wusheng
*/
-public class TraceModule extends ModuleDefine {
+public class ZipkinReceiverModule extends ModuleDefine {
+ public static final String NAME = "receiver_zipkin";
@Override public String name() {
- return "receiver-trace";
+ return NAME;
}
@Override public Class[] services() {
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java
new file mode 100644
index 0000000..6cefe8e
--- /dev/null
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.zipkin;
+
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.library.module.ModuleProvider;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
+import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
+import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanV1JettyHandler;
+import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanV2JettyHandler;
+
+/**
+ * @author wusheng
+ */
+public class ZipkinReceiverProvider extends ModuleProvider {
+ public static final String NAME = "default";
+ private ZipkinReceiverConfig config;
+
+ public ZipkinReceiverProvider() {
+ config = new ZipkinReceiverConfig();
+ }
+
+ @Override public String name() {
+ return NAME;
+ }
+
+ @Override public Class<? extends ModuleDefine> module() {
+ return ZipkinReceiverModule.class;
+ }
+
+ @Override public ModuleConfig createConfigBeanIfAbsent() {
+ return config;
+ }
+
+ @Override public void prepare() throws ServiceNotProvidedException {
+
+ }
+
+ @Override public void start() throws ServiceNotProvidedException, ModuleStartException {
+ CoreRegisterLinker.setModuleManager(getManager());
+
+ JettyServer jettyServer = new JettyServer(config.getHost(), config.getPort(), config.getContextPath());
+ jettyServer.initialize();
+
+ jettyServer.addHandler(new SpanV1JettyHandler(config));
+ jettyServer.addHandler(new SpanV2JettyHandler(config));
+
+
+
+ ISegmentParseService segmentParseService = getManager().find(TraceModule.NAME).getService(ISegmentParseService.class);
+ Receiver2AnalysisBridge bridge = new Receiver2AnalysisBridge(segmentParseService);
+ Zipkin2SkyWalkingTransfer.INSTANCE.addListener(bridge);
+ }
+
+ @Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
+
+ }
+
+ @Override public String[] requiredModules() {
+ return new String[] {TraceModule.NAME};
+ }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParserListenerManager.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/CacheFactory.java
similarity index 53%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParserListenerManager.java
copy to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/CacheFactory.java
index a2dd303..c4d5b06 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParserListenerManager.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/CacheFactory.java
@@ -16,27 +16,30 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
+package org.apache.skywalking.oap.server.receiver.zipkin.cache;
-import java.util.*;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListenerFactory;
+import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
+import org.apache.skywalking.oap.server.receiver.zipkin.cache.caffeine.CaffeineSpanCache;
/**
- * @author peng-yongsheng
+ * @author wusheng
*/
-public class SegmentParserListenerManager {
+public class CacheFactory {
+ public static final CacheFactory INSTANCE = new CacheFactory();
- private List<SpanListenerFactory> spanListenerFactories;
+ private ISpanCache implementor;
- public SegmentParserListenerManager() {
- this.spanListenerFactories = new LinkedList<>();
+ private CacheFactory() {
}
- public void add(SpanListenerFactory spanListenerFactory) {
- spanListenerFactories.add(spanListenerFactory);
- }
-
- List<SpanListenerFactory> getSpanListenerFactories() {
- return spanListenerFactories;
+ public ISpanCache get(ZipkinReceiverConfig config) {
+ if (implementor == null) {
+ synchronized (INSTANCE) {
+ if (implementor == null) {
+ implementor = new CaffeineSpanCache(config);
+ }
+ }
+ }
+ return implementor;
}
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/module/TraceModule.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/ISpanCache.java
similarity index 69%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/module/TraceModule.java
copy to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/ISpanCache.java
index 6b0558f..b122bcf 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/module/TraceModule.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/ISpanCache.java
@@ -16,20 +16,13 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.trace.module;
+package org.apache.skywalking.oap.server.receiver.zipkin.cache;
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import zipkin2.Span;
/**
- * @author peng-yongsheng
+ * @author wusheng
*/
-public class TraceModule extends ModuleDefine {
-
- @Override public String name() {
- return "receiver-trace";
- }
-
- @Override public Class[] services() {
- return new Class[0];
- }
+public interface ISpanCache {
+ void addSpan(Span span);
}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/caffeine/CaffeineSpanCache.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/caffeine/CaffeineSpanCache.java
new file mode 100644
index 0000000..c42e710
--- /dev/null
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/caffeine/CaffeineSpanCache.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.zipkin.cache.caffeine;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
+import org.apache.skywalking.oap.server.receiver.zipkin.cache.ISpanCache;
+import org.apache.skywalking.oap.server.receiver.zipkin.data.ZipkinTrace;
+import org.apache.skywalking.oap.server.receiver.zipkin.transform.Zipkin2SkyWalkingTransfer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import zipkin2.Span;
+
+/**
+ * NOTICE: FROM my test, Caffeine cache triggers/checks expire only face write/read op.
+ * In order to make trace finish in time, I have to set a timer to write a meaningless trace, for active expire.
+ *
+ * @author wusheng
+ */
+public class CaffeineSpanCache implements ISpanCache, RemovalListener<String, ZipkinTrace> {
+ private static final Logger logger = LoggerFactory.getLogger(CaffeineSpanCache.class);
+ private Cache<String, ZipkinTrace> inProcessSpanCache;
+ private ReentrantLock newTraceLock;
+
+ public CaffeineSpanCache(ZipkinReceiverConfig config) {
+ newTraceLock = new ReentrantLock();
+ inProcessSpanCache = Caffeine.newBuilder()
+ .expireAfterWrite(config.getExpireTime(), TimeUnit.SECONDS)
+ .maximumSize(config.getMaxCacheSize())
+ .removalListener(this)
+ .build();
+ Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
+ inProcessSpanCache.put("ACTIVE", new ZipkinTrace.TriggerTrace());
+ }, 2, 3, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Zipkin trace finished by the expired rule.
+ *
+ * @param key
+ * @param trace
+ * @param cause
+ */
+ @Override
+ public void onRemoval(@Nullable String key, @Nullable ZipkinTrace trace, @Nonnull RemovalCause cause) {
+ if (trace instanceof ZipkinTrace.TriggerTrace) {
+ return;
+ }
+ try {
+ Zipkin2SkyWalkingTransfer.INSTANCE.transfer(trace);
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ logger.warn("Zipkin trace:" + trace);
+ }
+ }
+
+ @Override
+ public void addSpan(Span span) {
+ ZipkinTrace trace = inProcessSpanCache.getIfPresent(span.traceId());
+ if (trace == null) {
+ newTraceLock.lock();
+ try {
+ trace = inProcessSpanCache.getIfPresent(span.traceId());
+ if (trace == null) {
+ trace = new ZipkinTrace();
+ inProcessSpanCache.put(span.traceId(), trace);
+ }
+ } finally {
+ newTraceLock.unlock();
+ }
+ }
+ trace.addSpan(span);
+ }
+}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/SkyWalkingTrace.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/SkyWalkingTrace.java
new file mode 100644
index 0000000..9eca022
--- /dev/null
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/SkyWalkingTrace.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.zipkin.data;
+
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.skywalking.apm.network.language.agent.TraceSegmentObject;
+import org.apache.skywalking.apm.network.language.agent.UniqueId;
+import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
+
+/**
+ * Each SkyWalkingTrace consists of segments in each application, original from {@link ZipkinTrace}s
+ */
+public class SkyWalkingTrace {
+ private UniqueId globalTraceId;
+ private List<TraceSegmentObject.Builder> segmentList;
+
+ public SkyWalkingTrace(UniqueId globalTraceId, List<TraceSegmentObject.Builder> segmentList) {
+ this.globalTraceId = globalTraceId;
+ this.segmentList = segmentList;
+ }
+
+ public List<UpstreamSegment.Builder> toUpstreamSegment() {
+ List<UpstreamSegment.Builder> newUpstreamList = new LinkedList<>();
+ segmentList.forEach(segment -> {
+ UpstreamSegment.Builder builder = UpstreamSegment.newBuilder();
+ builder.addGlobalTraceIds(globalTraceId);
+ builder.setSegment(segment.build().toByteString());
+ newUpstreamList.add(builder);
+ });
+ return newUpstreamList;
+ }
+
+ public UniqueId getGlobalTraceId() {
+ return globalTraceId;
+ }
+
+ public List<TraceSegmentObject.Builder> getSegmentList() {
+ return segmentList;
+ }
+}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/ZipkinTrace.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/ZipkinTrace.java
new file mode 100644
index 0000000..d12beb8
--- /dev/null
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/ZipkinTrace.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.zipkin.data;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
+import zipkin2.Span;
+
+/**
+ * @author wusheng
+ */
+public class ZipkinTrace {
+ private List<Span> spans;
+ private ReentrantLock spanWriteLock;
+
+ public ZipkinTrace() {
+ spans = new LinkedList<>();
+ spanWriteLock = new ReentrantLock();
+ }
+
+ public void addSpan(Span span) {
+ spanWriteLock.lock();
+ try {
+ spans.add(span);
+ } finally {
+ spanWriteLock.unlock();
+ }
+ }
+
+ public List<Span> getSpans() {
+ return spans;
+ }
+
+ @Override
+ public String toString() {
+ return "ZipkinTrace{" +
+ "spans=" + spans +
+ '}';
+ }
+
+ public static class TriggerTrace extends ZipkinTrace {
+
+
+ }
+}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java
new file mode 100644
index 0000000..0dfcaae
--- /dev/null
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.zipkin.handler;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.zip.GZIPInputStream;
+import javax.servlet.http.HttpServletRequest;
+import org.apache.skywalking.apm.collector.receiver.zipkin.provider.RegisterServices;
+import org.apache.skywalking.apm.collector.receiver.zipkin.provider.ZipkinReceiverConfig;
+import org.apache.skywalking.apm.collector.receiver.zipkin.provider.cache.CacheFactory;
+import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
+import org.apache.skywalking.oap.server.receiver.zipkin.cache.CacheFactory;
+import zipkin2.Span;
+import zipkin2.codec.SpanBytesDecoder;
+
+public class SpanProcessor {
+ void convert(ZipkinReceiverConfig config, SpanBytesDecoder decoder, HttpServletRequest request) throws IOException {
+ InputStream inputStream = getInputStream(request);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ byte[] buffer = new byte[2048];
+ int readCntOnce;
+
+ while ((readCntOnce = inputStream.read(buffer)) >= 0) {
+ out.write(buffer, 0, readCntOnce);
+ }
+
+ List<Span> spanList = decoder.decodeList(out.toByteArray());
+
+ spanList.forEach(span -> {
+ // In Zipkin, the local service name represents the application owner.
+ String applicationCode = span.localServiceName();
+ if (applicationCode != null) {
+ int applicationId = registerServices.getApplicationIDService().getOrCreateForApplicationCode(applicationCode);
+ if (applicationId != 0) {
+ registerServices.getOrCreateApplicationInstanceId(applicationId, applicationCode);
+ }
+ }
+
+ CacheFactory.INSTANCE.get(config).addSpan(span);
+ });
+ }
+
+ private InputStream getInputStream(HttpServletRequest request) throws IOException {
+ InputStream requestInStream;
+
+ String headEncoding = request.getHeader("accept-encoding");
+ if (headEncoding != null && (headEncoding.indexOf("gzip") != -1)) {
+ requestInStream = new GZIPInputStream(request.getInputStream());
+ } else {
+ requestInStream = request.getInputStream();
+ }
+
+ return requestInStream;
+ }
+}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java
new file mode 100644
index 0000000..77f38e4
--- /dev/null
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.zipkin.handler;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.skywalking.oap.server.library.server.jetty.JettyHandler;
+import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import zipkin2.codec.SpanBytesDecoder;
+
+public class SpanV1JettyHandler extends JettyHandler {
+ private static final Logger logger = LoggerFactory.getLogger(SpanV2JettyHandler.class);
+
+ private ZipkinReceiverConfig config;
+
+ public SpanV1JettyHandler(ZipkinReceiverConfig config) {
+ this.config = config;
+ }
+
+ @Override
+ public String pathSpec() {
+ return "/api/v1/spans";
+ }
+
+ @Override
+ protected void doPost(HttpServletRequest request, HttpServletResponse response) {
+ response.setContentType("application/json");
+ response.setCharacterEncoding("utf-8");
+
+ try {
+ String type = request.getHeader("Content-Type");
+
+ SpanBytesDecoder decoder = type != null && type.contains("/x-thrift")
+ ? SpanBytesDecoder.THRIFT
+ : SpanBytesDecoder.JSON_V1;
+
+ SpanProcessor processor = new SpanProcessor();
+ processor.convert(config, decoder, request);
+
+ response.setStatus(202);
+ } catch (Exception e) {
+ response.setStatus(500);
+
+ logger.error(e.getMessage(), e);
+ }
+ }
+
+}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java
new file mode 100644
index 0000000..7c8705a
--- /dev/null
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.zipkin.handler;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.skywalking.oap.server.library.server.jetty.JettyHandler;
+import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import zipkin2.codec.SpanBytesDecoder;
+
+/**
+ * @author wusheng
+ */
+public class SpanV2JettyHandler extends JettyHandler {
+ private static final Logger logger = LoggerFactory.getLogger(SpanV2JettyHandler.class);
+
+ private ZipkinReceiverConfig config;
+
+ public SpanV2JettyHandler(ZipkinReceiverConfig config) {
+ this.config = config;
+ }
+
+ @Override
+ public String pathSpec() {
+ return "/api/v2/spans";
+ }
+
+ @Override
+ protected void doPost(HttpServletRequest request, HttpServletResponse response) {
+ response.setContentType("application/json");
+ response.setCharacterEncoding("utf-8");
+
+ try {
+ String type = request.getHeader("Content-Type");
+
+ SpanBytesDecoder decoder = type != null && type.contains("/x-protobuf")
+ ? SpanBytesDecoder.PROTO3
+ : SpanBytesDecoder.JSON_V2;
+
+ SpanProcessor processor = new SpanProcessor();
+ processor.convert(config, decoder, request);
+
+ response.setStatus(202);
+ } catch (Exception e) {
+ response.setStatus(500);
+
+ logger.error(e.getMessage(), e);
+ }
+ }
+}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentBuilder.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentBuilder.java
new file mode 100644
index 0000000..37d6e50
--- /dev/null
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentBuilder.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.zipkin.transform;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.skywalking.apm.network.language.agent.SpanObject;
+import org.apache.skywalking.apm.network.language.agent.TraceSegmentObject;
+import org.apache.skywalking.oap.server.library.util.StringUtils;
+import org.apache.skywalking.oap.server.receiver.zipkin.RegisterServices;
+import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace;
+import org.eclipse.jetty.util.StringUtil;
+import zipkin2.Endpoint;
+import zipkin2.Span;
+
+/**
+ * @author wusheng
+ */
+public class SegmentBuilder {
+ private Context context;
+ private LinkedList<Segment> segments;
+ private Map<String, ClientSideSpan> clientPartSpan;
+
+ private SegmentBuilder() {
+ segments = new LinkedList<>();
+ context = new Context();
+ clientPartSpan = new HashMap<>();
+ }
+
+ public static SkyWalkingTrace build(List<Span> traceSpans) throws Exception {
+ SegmentBuilder builder = new SegmentBuilder();
+ // This map groups the spans by their parent id, in order to assist to build tree.
+ // key: parentId
+ // value: span
+ Map<String, List<Span>> childSpanMap = new HashMap<>();
+ AtomicReference<Span> root = new AtomicReference<>();
+ traceSpans.forEach(span -> {
+ if (span.parentId() == null) {
+ root.set(span);
+ }
+ List<Span> spanList = childSpanMap.get(span.parentId());
+ if (spanList == null) {
+ spanList = new LinkedList<>();
+ spanList.add(span);
+ childSpanMap.put(span.parentId(), spanList);
+ } else {
+ spanList.add(span);
+ }
+ });
+
+ Span rootSpan = root.get();
+ if (rootSpan != null) {
+ String applicationCode = rootSpan.localServiceName();
+ // If root span doesn't include applicationCode, a.k.a local service name,
+ // Segment can't be built
+ // Ignore the whole trace.
+ // :P Hope anyone could provide better solution.
+ // Wu Sheng.
+ if (StringUtils.isNotEmpty(applicationCode)) {
+ builder.context.addApp(applicationCode);
+
+ SpanObject.Builder rootSpanBuilder = builder.initSpan(null, null, rootSpan, true);
+ builder.context.currentSegment().addSpan(rootSpanBuilder);
+ builder.scanSpansFromRoot(rootSpanBuilder, rootSpan, childSpanMap);
+
+ builder.segments.add(builder.context.removeApp());
+ }
+ }
+
+ List<TraceSegmentObject.Builder> segmentBuilders = new LinkedList<>();
+ builder.segments.forEach(segment -> {
+ TraceSegmentObject.Builder traceSegmentBuilder = segment.freeze();
+ segmentBuilders.add(traceSegmentBuilder);
+ instanceHeartBeatService.heartBeat(traceSegmentBuilder.getApplicationInstanceId(), segment.getEndTime());
+ });
+ return new SkyWalkingTrace(builder.generateTraceOrSegmentId(), segmentBuilders);
+ }
+
+ private void scanSpansFromRoot(SpanObject.Builder parentSegmentSpan, Span parent,
+ Map<String, List<Span>> childSpanMap) throws Exception {
+ String parentId = parent.id();
+ // get child spans by parent span id
+ List<Span> spanList = childSpanMap.get(parentId);
+ if (spanList == null) {
+ return;
+ }
+ for (Span childSpan : spanList) {
+ String localServiceName = childSpan.localServiceName();
+ boolean isNewApp = false;
+ if (StringUtil.isNotBlank(localServiceName)) {
+ if (context.isAppChanged(localServiceName)) {
+ isNewApp = true;
+ }
+ }
+
+ try {
+ if (isNewApp) {
+ context.addApp(localServiceName, registerServices);
+ }
+ SpanObject.Builder childSpanBuilder = initSpan(parentSegmentSpan, parent, childSpan, isNewApp);
+
+ context.currentSegment().addSpan(childSpanBuilder);
+ scanSpansFromRoot(childSpanBuilder, childSpan, childSpanMap, registerServices);
+
+ } finally {
+ if (isNewApp) {
+ segments.add(context.removeApp());
+ }
+ }
+ }
+ }
+
+ private SpanObject.Builder initSpan(SpanObject.Builder parentSegmentSpan, Span parentSpan, Span span,
+ boolean isSegmentRoot) {
+ SpanObject.Builder spanBuilder = SpanObject.newBuilder();
+ spanBuilder.setSpanId(context.currentIDs().nextSpanId());
+ if (isSegmentRoot) {
+ // spanId = -1, means no parent span
+ // spanId is considered unique, and from a positive sequence in each segment.
+ spanBuilder.setParentSpanId(-1);
+ }
+ if (!isSegmentRoot && parentSegmentSpan != null) {
+ spanBuilder.setParentSpanId(parentSegmentSpan.getSpanId());
+ }
+ Span.Kind kind = span.kind();
+ spanBuilder.setOperationName(span.name());
+ ClientSideSpan clientSideSpan;
+ switch (kind) {
+ case CLIENT:
+ spanBuilder.setSpanType(SpanType.Exit);
+ String peer = endpoint2Peer(span.remoteEndpoint());
+ if (peer != null) {
+ spanBuilder.setPeer(peer);
+ }
+ clientSideSpan = new ClientSideSpan(span, spanBuilder);
+ clientPartSpan.put(span.id(), clientSideSpan);
+ break;
+ case SERVER:
+ spanBuilder.setSpanType(SpanType.Entry);
+ this.buildRef(spanBuilder, span, parentSegmentSpan, parentSpan);
+ break;
+ case CONSUMER:
+ spanBuilder.setSpanType(SpanType.Entry);
+ this.buildRef(spanBuilder, span, parentSegmentSpan, parentSpan);
+ break;
+ case PRODUCER:
+ spanBuilder.setSpanType(SpanType.Exit);
+ peer = endpoint2Peer(span.remoteEndpoint());
+ if (peer != null) {
+ spanBuilder.setPeer(peer);
+ }
+ clientSideSpan = new ClientSideSpan(span, spanBuilder);
+ clientPartSpan.put(span.id(), clientSideSpan);
+ break;
+ default:
+ spanBuilder.setSpanType(SpanType.Local);
+ }
+ // microseconds in Zipkin -> milliseconds in SkyWalking
+ long startTime = span.timestamp() / 1000;
+ // Some implement of zipkin client not include duration field in its report
+ // package when duration's value be 0ms, Causing a null pointer exception here.
+ Long durationObj = span.duration();
+ long duration = (durationObj == null) ? 0 : durationObj.longValue() / 1000;
+ spanBuilder.setStartTime(startTime);
+ spanBuilder.setEndTime(startTime + duration);
+
+ span.tags().forEach((tagKey, tagValue) -> spanBuilder.addTags(
+ KeyWithStringValue.newBuilder().setKey(tagKey).setValue(tagValue).build())
+ );
+
+ span.annotations().forEach(annotation ->
+ spanBuilder.addLogs(LogMessage.newBuilder().setTime(annotation.timestamp() / 1000).addData(
+ KeyWithStringValue.newBuilder().setKey("zipkin.annotation").setValue(annotation.value()).build()
+ ))
+ );
+
+ return spanBuilder;
+ }
+
+ private String endpoint2Peer(Endpoint endpoint) {
+ if (endpoint == null) {
+ return null;
+ }
+ String ip = null;
+ if (StringUtils.isNotEmpty(endpoint.ipv4())) {
+ ip = endpoint.ipv4();
+ } else if (StringUtils.isNotEmpty(endpoint.ipv6())) {
+ ip = endpoint.ipv6();
+ }
+ if (StringUtils.isEmpty(ip)) {
+ return null;
+ }
+ int port = endpoint.port();
+ return port == 0 ? ip : ip + ":" + port;
+ }
+
+ private void buildRef(SpanObject.Builder spanBuilder, Span span, SpanObject.Builder parentSegmentSpan,
+ Span parentSpan) {
+ Segment parentSegment = context.parentSegment();
+ if (parentSegment == null) {
+ return;
+ }
+ Segment rootSegment = context.rootSegment();
+ if (rootSegment == null) {
+ return;
+ }
+
+ if (span.shared() != null && span.shared()) {
+ // using same span id in client and server for RPC
+ // SkyWalking will build both sides of span
+ ClientSideSpan clientSideSpan = clientPartSpan.get(span.id());
+ parentSegmentSpan = clientSideSpan.getBuilder();
+ parentSpan = clientSideSpan.getSpan();
+ }
+
+ String ip = null;
+ int port = 0;
+ Endpoint serverEndpoint = span.localEndpoint();
+ Endpoint clientEndpoint = parentSpan.remoteEndpoint();
+ if (clientEndpoint != null) {
+ if (StringUtil.isBlank(ip)) {
+ if (StringUtils.isNotEmpty(clientEndpoint.ipv4())) {
+ ip = clientEndpoint.ipv4();
+ } else if (StringUtils.isNotEmpty(clientEndpoint.ipv6())) {
+ ip = clientEndpoint.ipv6();
+ }
+ port = clientEndpoint.port();
+ }
+ }
+ if (serverEndpoint != null) {
+ if (StringUtils.isNotEmpty(serverEndpoint.ipv4())) {
+ ip = serverEndpoint.ipv4();
+ } else if (StringUtils.isNotEmpty(serverEndpoint.ipv6())) {
+ ip = serverEndpoint.ipv6();
+ }
+ }
+
+ if (StringUtil.isBlank(ip)) {
+ //The IP is the most important for building the ref at both sides.
+ return;
+ }
+
+ TraceSegmentReference.Builder refBuilder = TraceSegmentReference.newBuilder();
+ refBuilder.setEntryApplicationInstanceId(rootSegment.builder().getApplicationInstanceId());
+ int serviceId = rootSegment.getEntryServiceId();
+ if (serviceId == 0) {
+ refBuilder.setEntryServiceName(rootSegment.getEntryServiceName());
+ } else {
+ refBuilder.setEntryServiceId(serviceId);
+ }
+ refBuilder.setEntryApplicationInstanceId(rootSegment.builder().getApplicationInstanceId());
+
+ // parent ref info
+ refBuilder.setNetworkAddress(port == 0 ? ip : ip + ":" + port);
+ parentSegmentSpan.setPeer(refBuilder.getNetworkAddress());
+ refBuilder.setParentApplicationInstanceId(parentSegment.builder().getApplicationInstanceId());
+ refBuilder.setParentSpanId(parentSegmentSpan.getSpanId());
+ refBuilder.setParentTraceSegmentId(parentSegment.builder().getTraceSegmentId());
+ int parentServiceId = parentSegment.getEntryServiceId();
+ if (parentServiceId == 0) {
+ refBuilder.setParentServiceName(parentSegment.getEntryServiceName());
+ } else {
+ refBuilder.setParentServiceId(parentServiceId);
+ }
+ refBuilder.setRefType(RefType.CrossProcess);
+
+ spanBuilder.addRefs(refBuilder);
+ }
+
+ /**
+ * Context holds the values in build process.
+ */
+ private class Context {
+ private LinkedList<Segment> segmentsStack = new LinkedList<>();
+
+ private boolean isAppChanged(String applicationCode) {
+ return StringUtils.isNotEmpty(applicationCode) && !applicationCode.equals(currentIDs().applicationCode);
+ }
+
+ private Segment addApp(String applicationCode,
+ RegisterServices registerServices) throws Exception {
+ int applicationId = waitForExchange(() ->
+ registerServices.getApplicationIDService().getOrCreateForApplicationCode(applicationCode),
+ 10
+ );
+
+ int appInstanceId = waitForExchange(() ->
+ registerServices.getOrCreateApplicationInstanceId(applicationId, applicationCode),
+ 10
+ );
+
+ Segment segment = new Segment(applicationCode, applicationId, appInstanceId);
+ segmentsStack.add(segment);
+ return segment;
+ }
+
+ private IDCollection currentIDs() {
+ return segmentsStack.getLast().ids;
+ }
+
+ private Segment currentSegment() {
+ return segmentsStack.getLast();
+ }
+
+ private Segment parentSegment() {
+ if (segmentsStack.size() < 2) {
+ return null;
+ } else {
+ return segmentsStack.get(segmentsStack.size() - 2);
+ }
+
+ }
+
+ private Segment rootSegment() {
+ if (segmentsStack.size() < 2) {
+ return null;
+ } else {
+ return segmentsStack.getFirst();
+ }
+ }
+
+ private Segment removeApp() {
+ return segmentsStack.removeLast();
+ }
+
+ private int waitForExchange(Callable<Integer> callable, int retry) throws Exception {
+ for (int i = 0; i < retry; i++) {
+ Integer id = callable.call();
+ if (id == 0) {
+ Thread.sleep(1000L);
+ } else {
+ return id;
+ }
+ }
+ throw new TimeoutException("ID exchange costs more than expected.");
+ }
+ }
+
+ private class Segment {
+ private TraceSegmentObject.Builder segmentBuilder;
+ private IDCollection ids;
+ private int entryServiceId = 0;
+ private String entryServiceName = null;
+ private List<SpanObject.Builder> spans;
+ private long endTime = 0;
+
+ private Segment(String applicationCode, int applicationId, int appInstanceId) {
+ ids = new IDCollection(applicationCode, applicationId, appInstanceId);
+ spans = new LinkedList<>();
+ segmentBuilder = TraceSegmentObject.newBuilder();
+ segmentBuilder.setApplicationId(applicationId);
+ segmentBuilder.setApplicationInstanceId(appInstanceId);
+ segmentBuilder.setTraceSegmentId(generateTraceOrSegmentId());
+ }
+
+ private TraceSegmentObject.Builder builder() {
+ return segmentBuilder;
+ }
+
+ private void addSpan(SpanObject.Builder spanBuilder) {
+ String operationName = spanBuilder.getOperationName();
+ if (entryServiceId == 0 && StringUtils.isNotEmpty(operationName)) {
+ if (SpanType.Entry == spanBuilder.getSpanType()) {
+ if (StringUtils.isNotEmpty(operationName)) {
+ entryServiceName = operationName;
+ } else {
+ entryServiceId = spanBuilder.getOperationNameId();
+ }
+ }
+ }
+
+ // init by root span
+ if (spanBuilder.getSpanId() == 1 && entryServiceId == 0) {
+ if (StringUtils.isNotEmpty(operationName)) {
+ entryServiceName = operationName;
+ } else {
+ entryServiceId = spanBuilder.getOperationNameId();
+ }
+ }
+
+ spans.add(spanBuilder);
+ if (spanBuilder.getEndTime() > endTime) {
+ endTime = spanBuilder.getEndTime();
+ }
+ }
+
+ public int getEntryServiceId() {
+ return entryServiceId;
+ }
+
+ public String getEntryServiceName() {
+ return entryServiceName;
+ }
+
+ private IDCollection ids() {
+ return ids;
+ }
+
+ public TraceSegmentObject.Builder freeze() {
+ for (SpanObject.Builder span : spans) {
+ segmentBuilder.addSpans(span);
+ }
+ return segmentBuilder;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+ }
+
+ private class IDCollection {
+ private String applicationCode;
+ private int appId;
+ private int instanceId;
+ private int spanIdSeq;
+
+ private IDCollection(String applicationCode, int appId, int instanceId) {
+ this.applicationCode = applicationCode;
+ this.appId = appId;
+ this.instanceId = instanceId;
+ this.spanIdSeq = 0;
+ }
+
+ private int nextSpanId() {
+ return spanIdSeq++;
+ }
+ }
+
+ private UniqueId generateTraceOrSegmentId() {
+ return UniqueId.newBuilder()
+ .addIdParts(ThreadLocalRandom.current().nextLong())
+ .addIdParts(ThreadLocalRandom.current().nextLong())
+ .addIdParts(ThreadLocalRandom.current().nextLong())
+ .build();
+ }
+
+ private class ClientSideSpan {
+ private Span span;
+ private SpanObject.Builder builder;
+
+ public ClientSideSpan(Span span, SpanObject.Builder builder) {
+ this.span = span;
+ this.builder = builder;
+ }
+
+ public Span getSpan() {
+ return span;
+ }
+
+ public SpanObject.Builder getBuilder() {
+ return builder;
+ }
+ }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/module/TraceModule.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentListener.java
similarity index 68%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/module/TraceModule.java
copy to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentListener.java
index 6b0558f..9a0b7c7 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/module/TraceModule.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentListener.java
@@ -16,20 +16,10 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.trace.module;
+package org.apache.skywalking.oap.server.receiver.zipkin.transform;
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace;
-/**
- * @author peng-yongsheng
- */
-public class TraceModule extends ModuleDefine {
-
- @Override public String name() {
- return "receiver-trace";
- }
-
- @Override public Class[] services() {
- return new Class[0];
- }
+public interface SegmentListener {
+ void notify(SkyWalkingTrace trace);
}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/Zipkin2SkyWalkingTransfer.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/Zipkin2SkyWalkingTransfer.java
new file mode 100644
index 0000000..392fdf8
--- /dev/null
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/Zipkin2SkyWalkingTransfer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.zipkin.transform;
+
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.skywalking.apm.collector.analysis.metric.define.service.IInstanceHeartBeatService;
+import org.apache.skywalking.apm.collector.receiver.zipkin.provider.RegisterServices;
+import org.apache.skywalking.apm.collector.receiver.zipkin.provider.data.SkyWalkingTrace;
+import org.apache.skywalking.apm.collector.receiver.zipkin.provider.data.ZipkinTrace;
+import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace;
+import org.apache.skywalking.oap.server.receiver.zipkin.data.ZipkinTrace;
+import zipkin2.Span;
+
+/**
+ * @author wusheng
+ */
+public class Zipkin2SkyWalkingTransfer {
+ public static Zipkin2SkyWalkingTransfer INSTANCE = new Zipkin2SkyWalkingTransfer();
+ private List<SegmentListener> listeners = new LinkedList<>();
+
+ private Zipkin2SkyWalkingTransfer() {
+ }
+
+ public void addListener(SegmentListener listener) {
+ listeners.add(listener);
+ }
+
+ public void transfer(ZipkinTrace trace) throws Exception {
+ List<Span> traceSpans = trace.getSpans();
+
+ if (traceSpans.size() > 0) {
+ SkyWalkingTrace skyWalkingTrace = SegmentBuilder.build(traceSpans);
+
+ listeners.forEach(listener ->
+ listener.notify(skyWalkingTrace)
+ );
+
+ }
+ }
+}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.apm.collector.core.module.ModuleDefine b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.apm.collector.core.module.ModuleDefine
new file mode 100644
index 0000000..0f1527e
--- /dev/null
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.apm.collector.core.module.ModuleDefine
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.apm.collector.core.module.ModuleProvider b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.apm.collector.core.module.ModuleProvider
new file mode 100644
index 0000000..c3b5897
--- /dev/null
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.apm.collector.core.module.ModuleProvider
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverProvider