You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2018/11/16 15:19:13 UTC
[incubator-skywalking] branch zipkin-receiver updated: @adriancole
Zipkin receiver is coming back again.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch zipkin-receiver
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/zipkin-receiver by this push:
new 1eea939 @adriancole Zipkin receiver is coming back again.
1eea939 is described below
commit 1eea9394b8d7155cd7db028f61691184af4ff3ec
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Fri Nov 16 23:19:01 2018 +0800
@adriancole Zipkin receiver is coming back again.
---
.../server/receiver/trace/module/TraceModule.java | 4 +-
.../trace/provider/TraceModuleProvider.java | 17 +++--
.../parser/ISegmentParserListenerManager.java | 3 +-
...enerManager.java => ISegmentParserService.java} | 6 +-
...rManager.java => SegmentParserServiceImpl.java} | 17 +++--
.../receiver/zipkin/Receiver2AnalysisBridge.java | 7 +-
.../server/receiver/zipkin/RegisterServices.java | 78 ----------------------
.../receiver/zipkin/ZipkinReceiverConfig.java | 3 +-
.../receiver/zipkin/ZipkinReceiverProvider.java | 6 +-
.../receiver/zipkin/ZipkinTraceOSInfoBuilder.java} | 14 ++--
.../receiver/zipkin/handler/SpanProcessor.java | 12 ++--
.../receiver/zipkin/transform/SegmentBuilder.java | 65 ++++++++++--------
.../transform/Zipkin2SkyWalkingTransfer.java | 6 +-
13 files changed, 93 insertions(+), 145 deletions(-)
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/module/TraceModule.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/module/TraceModule.java
index 2a926a4..8b84e18 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/module/TraceModule.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/module/TraceModule.java
@@ -19,7 +19,7 @@
package org.apache.skywalking.oap.server.receiver.trace.module;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserListenerManager;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService;
/**
* @author peng-yongsheng
@@ -33,7 +33,7 @@ public class TraceModule extends ModuleDefine {
@Override public Class[] services() {
return new Class[] {
- ISegmentParserListenerManager.class
+ ISegmentParserService.class
};
}
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
index edf2441..47bc6ec 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
@@ -20,12 +20,20 @@ package org.apache.skywalking.oap.server.receiver.trace.provider;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.server.*;
-import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
+import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.library.module.ModuleProvider;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v5.grpc.TraceSegmentServiceHandler;
import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v5.rest.TraceSegmentServletHandler;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.*;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParse;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParserListenerManager;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParserServiceImpl;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.endpoint.MultiScopesSpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.segment.SegmentSpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.service.ServiceMappingSpanListener;
@@ -63,12 +71,11 @@ public class TraceModuleProvider extends ModuleProvider {
listenerManager.add(new ServiceMappingSpanListener.Factory());
listenerManager.add(new SegmentSpanListener.Factory());
- registerServiceImplementation(ISegmentParserListenerManager.class, listenerManager);
-
GRPCHandlerRegister grpcHandlerRegister = getManager().find(CoreModule.NAME).getService(GRPCHandlerRegister.class);
JettyHandlerRegister jettyHandlerRegister = getManager().find(CoreModule.NAME).getService(JettyHandlerRegister.class);
try {
SegmentParse.Producer segmentProducer = new SegmentParse.Producer(getManager(), listenerManager);
+ this.registerServiceImplementation(ISegmentParserService.class, new SegmentParserServiceImpl(segmentProducer));
grpcHandlerRegister.addHandler(new TraceSegmentServiceHandler(segmentProducer));
jettyHandlerRegister.addHandler(new TraceSegmentServletHandler(segmentProducer));
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserListenerManager.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserListenerManager.java
index 90ca1c6..91b05ae 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserListenerManager.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserListenerManager.java
@@ -18,12 +18,11 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
-import org.apache.skywalking.oap.server.library.module.Service;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListenerFactory;
/**
* @author wusheng
*/
-public interface ISegmentParserListenerManager extends Service {
+public interface ISegmentParserListenerManager {
void add(SpanListenerFactory spanListenerFactory);
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserListenerManager.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserService.java
similarity index 81%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserListenerManager.java
copy to oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserService.java
index 90ca1c6..4579d48 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserListenerManager.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserService.java
@@ -18,12 +18,12 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
+import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
import org.apache.skywalking.oap.server.library.module.Service;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListenerFactory;
/**
* @author wusheng
*/
-public interface ISegmentParserListenerManager extends Service {
- void add(SpanListenerFactory spanListenerFactory);
+public interface ISegmentParserService extends Service {
+ void send(UpstreamSegment segment);
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserListenerManager.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParserServiceImpl.java
similarity index 65%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserListenerManager.java
copy to oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParserServiceImpl.java
index 90ca1c6..2d0ab6d 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserListenerManager.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParserServiceImpl.java
@@ -18,12 +18,21 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
-import org.apache.skywalking.oap.server.library.module.Service;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListenerFactory;
+import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
/**
* @author wusheng
*/
-public interface ISegmentParserListenerManager extends Service {
- void add(SpanListenerFactory spanListenerFactory);
+public class SegmentParserServiceImpl implements ISegmentParserService {
+ private final SegmentParse.Producer segmentProducer;
+
+ public SegmentParserServiceImpl(
+ SegmentParse.Producer segmentProducer) {
+ this.segmentProducer = segmentProducer;
+ }
+
+ @Override
+ public void send(UpstreamSegment segment) {
+ segmentProducer.send(segment, SegmentParse.Source.Agent);
+ }
}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/Receiver2AnalysisBridge.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/Receiver2AnalysisBridge.java
index c8eb7bb..1381da3 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/Receiver2AnalysisBridge.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/Receiver2AnalysisBridge.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.receiver.zipkin;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService;
import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace;
import org.apache.skywalking.oap.server.receiver.zipkin.transform.SegmentListener;
import org.apache.skywalking.oap.server.receiver.zipkin.transform.Zipkin2SkyWalkingTransfer;
@@ -26,9 +27,9 @@ import org.apache.skywalking.oap.server.receiver.zipkin.transform.Zipkin2SkyWalk
* Send the segments to Analysis module, like receiving segments from native SkyWalking agents.
*/
public class Receiver2AnalysisBridge implements SegmentListener {
- private ISegmentParseService segmentParseService;
+ private ISegmentParserService segmentParseService;
- public Receiver2AnalysisBridge(ISegmentParseService segmentParseService) {
+ public Receiver2AnalysisBridge(ISegmentParserService segmentParseService) {
this.segmentParseService = segmentParseService;
}
@@ -41,7 +42,7 @@ public class Receiver2AnalysisBridge implements SegmentListener {
@Override
public void notify(SkyWalkingTrace trace) {
- trace.toUpstreamSegment().forEach(upstream -> segmentParseService.parse(upstream.build(), ISegmentParseService.Source.Agent));
+ trace.toUpstreamSegment().forEach(upstream -> segmentParseService.send(upstream.build()));
}
}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/RegisterServices.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/RegisterServices.java
deleted file mode 100644
index 5b0f40c..0000000
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/RegisterServices.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.receiver.zipkin;
-
-import org.apache.skywalking.apm.collector.analysis.register.define.service.AgentOsInfo;
-import org.apache.skywalking.apm.collector.analysis.register.define.service.IApplicationIDService;
-import org.apache.skywalking.apm.collector.analysis.register.define.service.IInstanceIDService;
-import org.apache.skywalking.apm.collector.analysis.register.define.service.INetworkAddressIDService;
-import org.apache.skywalking.apm.collector.analysis.register.define.service.IServiceNameService;
-
-/**
- * @author wusheng
- */
-public class RegisterServices {
- private IApplicationIDService applicationIDService;
-
- private IInstanceIDService instanceIDService;
-
- private INetworkAddressIDService networkAddressIDService;
-
- private IServiceNameService serviceNameService;
-
- public RegisterServices(
- IApplicationIDService applicationIDService,
- IInstanceIDService instanceIDService,
- INetworkAddressIDService networkAddressIDService,
- IServiceNameService serviceNameService) {
- this.applicationIDService = applicationIDService;
- this.instanceIDService = instanceIDService;
- this.networkAddressIDService = networkAddressIDService;
- this.serviceNameService = serviceNameService;
- }
-
- public IApplicationIDService getApplicationIDService() {
- return applicationIDService;
- }
-
- public IInstanceIDService getInstanceIDService() {
- return instanceIDService;
- }
-
- public INetworkAddressIDService getNetworkAddressIDService() {
- return networkAddressIDService;
- }
-
- public IServiceNameService getServiceNameService() {
- return serviceNameService;
- }
-
- /**
- * @param applicationId
- * @param agentUUID in zipkin translation, always means application code. Because no UUID for each process.
- * @return
- */
- public int getOrCreateApplicationInstanceId(int applicationId, String agentUUID) {
- AgentOsInfo agentOsInfo = new AgentOsInfo();
- agentOsInfo.setHostname("N/A");
- agentOsInfo.setOsName("N/A");
- agentOsInfo.setProcessNo(-1);
- return getInstanceIDService().getOrCreateByAgentUUID(applicationId, agentUUID, System.currentTimeMillis(), agentOsInfo);
- }
-}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java
index bfdd640..a5e14af 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java
@@ -18,11 +18,12 @@
package org.apache.skywalking.oap.server.receiver.zipkin;
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
/**
* @author wusheng
*/
-public class ZipkinReceiverConfig {
+public class ZipkinReceiverConfig extends ModuleConfig {
private String host;
private int port;
private String contextPath;
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java
index 6cefe8e..82d3bb9 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java
@@ -25,8 +25,10 @@ import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService;
import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanV1JettyHandler;
import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanV2JettyHandler;
+import org.apache.skywalking.oap.server.receiver.zipkin.transform.Zipkin2SkyWalkingTransfer;
/**
* @author wusheng
@@ -64,9 +66,7 @@ public class ZipkinReceiverProvider extends ModuleProvider {
jettyServer.addHandler(new SpanV1JettyHandler(config));
jettyServer.addHandler(new SpanV2JettyHandler(config));
-
-
- ISegmentParseService segmentParseService = getManager().find(TraceModule.NAME).getService(ISegmentParseService.class);
+ ISegmentParserService segmentParseService = getManager().find(TraceModule.NAME).getService(ISegmentParserService.class);
Receiver2AnalysisBridge bridge = new Receiver2AnalysisBridge(segmentParseService);
Zipkin2SkyWalkingTransfer.INSTANCE.addListener(bridge);
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserListenerManager.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinTraceOSInfoBuilder.java
similarity index 65%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserListenerManager.java
copy to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinTraceOSInfoBuilder.java
index 90ca1c6..cf929f5 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserListenerManager.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinTraceOSInfoBuilder.java
@@ -16,14 +16,18 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
+package org.apache.skywalking.oap.server.receiver.zipkin;
-import org.apache.skywalking.oap.server.library.module.Service;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListenerFactory;
+import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
/**
* @author wusheng
*/
-public interface ISegmentParserListenerManager extends Service {
- void add(SpanListenerFactory spanListenerFactory);
+public class ZipkinTraceOSInfoBuilder {
+
+ public static ServiceInstanceInventory.AgentOsInfo getOSInfoForZipkin(String instanceName) {
+ ServiceInstanceInventory.AgentOsInfo osInfo = new ServiceInstanceInventory.AgentOsInfo();
+ osInfo.setHostname(instanceName);
+ return osInfo;
+ }
}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java
index 0dfcaae..f5e97be 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java
@@ -24,10 +24,9 @@ import java.io.InputStream;
import java.util.List;
import java.util.zip.GZIPInputStream;
import javax.servlet.http.HttpServletRequest;
-import org.apache.skywalking.apm.collector.receiver.zipkin.provider.RegisterServices;
-import org.apache.skywalking.apm.collector.receiver.zipkin.provider.ZipkinReceiverConfig;
-import org.apache.skywalking.apm.collector.receiver.zipkin.provider.cache.CacheFactory;
+import org.apache.skywalking.oap.server.receiver.zipkin.CoreRegisterLinker;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
+import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinTraceOSInfoBuilder;
import org.apache.skywalking.oap.server.receiver.zipkin.cache.CacheFactory;
import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;
@@ -49,9 +48,11 @@ public class SpanProcessor {
// In Zipkin, the local service name represents the application owner.
String applicationCode = span.localServiceName();
if (applicationCode != null) {
- int applicationId = registerServices.getApplicationIDService().getOrCreateForApplicationCode(applicationCode);
+ int applicationId = CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(applicationCode);
if (applicationId != 0) {
- registerServices.getOrCreateApplicationInstanceId(applicationId, applicationCode);
+ CoreRegisterLinker.getServiceInstanceInventoryRegister().getOrCreate(applicationId, applicationCode, applicationCode,
+ span.timestampAsLong(),
+ ZipkinTraceOSInfoBuilder.getOSInfoForZipkin(applicationCode));
}
}
@@ -71,4 +72,5 @@ public class SpanProcessor {
return requestInStream;
}
+
}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentBuilder.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentBuilder.java
index 37d6e50..31e73d4 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentBuilder.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentBuilder.java
@@ -26,10 +26,17 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.skywalking.apm.network.language.agent.KeyWithStringValue;
+import org.apache.skywalking.apm.network.language.agent.LogMessage;
+import org.apache.skywalking.apm.network.language.agent.RefType;
import org.apache.skywalking.apm.network.language.agent.SpanObject;
+import org.apache.skywalking.apm.network.language.agent.SpanType;
import org.apache.skywalking.apm.network.language.agent.TraceSegmentObject;
+import org.apache.skywalking.apm.network.language.agent.TraceSegmentReference;
+import org.apache.skywalking.apm.network.language.agent.UniqueId;
import org.apache.skywalking.oap.server.library.util.StringUtils;
-import org.apache.skywalking.oap.server.receiver.zipkin.RegisterServices;
+import org.apache.skywalking.oap.server.receiver.zipkin.CoreRegisterLinker;
+import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinTraceOSInfoBuilder;
import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace;
import org.eclipse.jetty.util.StringUtil;
import zipkin2.Endpoint;
@@ -79,7 +86,7 @@ public class SegmentBuilder {
// :P Hope anyone could provide better solution.
// Wu Sheng.
if (StringUtils.isNotEmpty(applicationCode)) {
- builder.context.addApp(applicationCode);
+ builder.context.addApp(applicationCode, rootSpan.timestampAsLong());
SpanObject.Builder rootSpanBuilder = builder.initSpan(null, null, rootSpan, true);
builder.context.currentSegment().addSpan(rootSpanBuilder);
@@ -93,13 +100,13 @@ public class SegmentBuilder {
builder.segments.forEach(segment -> {
TraceSegmentObject.Builder traceSegmentBuilder = segment.freeze();
segmentBuilders.add(traceSegmentBuilder);
- instanceHeartBeatService.heartBeat(traceSegmentBuilder.getApplicationInstanceId(), segment.getEndTime());
+ CoreRegisterLinker.getServiceInstanceInventoryRegister().heartbeat(traceSegmentBuilder.getApplicationInstanceId(), segment.getEndTime());
});
return new SkyWalkingTrace(builder.generateTraceOrSegmentId(), segmentBuilders);
}
private void scanSpansFromRoot(SpanObject.Builder parentSegmentSpan, Span parent,
- Map<String, List<Span>> childSpanMap) throws Exception {
+ Map<String, List<Span>> childSpanMap) throws Exception {
String parentId = parent.id();
// get child spans by parent span id
List<Span> spanList = childSpanMap.get(parentId);
@@ -117,12 +124,12 @@ public class SegmentBuilder {
try {
if (isNewApp) {
- context.addApp(localServiceName, registerServices);
+ context.addApp(localServiceName, childSpan.timestampAsLong());
}
SpanObject.Builder childSpanBuilder = initSpan(parentSegmentSpan, parent, childSpan, isNewApp);
context.currentSegment().addSpan(childSpanBuilder);
- scanSpansFromRoot(childSpanBuilder, childSpan, childSpanMap, registerServices);
+ scanSpansFromRoot(childSpanBuilder, childSpan, childSpanMap);
} finally {
if (isNewApp) {
@@ -133,7 +140,7 @@ public class SegmentBuilder {
}
private SpanObject.Builder initSpan(SpanObject.Builder parentSegmentSpan, Span parentSpan, Span span,
- boolean isSegmentRoot) {
+ boolean isSegmentRoot) {
SpanObject.Builder spanBuilder = SpanObject.newBuilder();
spanBuilder.setSpanId(context.currentIDs().nextSpanId());
if (isSegmentRoot) {
@@ -187,13 +194,13 @@ public class SegmentBuilder {
spanBuilder.setEndTime(startTime + duration);
span.tags().forEach((tagKey, tagValue) -> spanBuilder.addTags(
- KeyWithStringValue.newBuilder().setKey(tagKey).setValue(tagValue).build())
+ KeyWithStringValue.newBuilder().setKey(tagKey).setValue(tagValue).build())
);
span.annotations().forEach(annotation ->
- spanBuilder.addLogs(LogMessage.newBuilder().setTime(annotation.timestamp() / 1000).addData(
- KeyWithStringValue.newBuilder().setKey("zipkin.annotation").setValue(annotation.value()).build()
- ))
+ spanBuilder.addLogs(LogMessage.newBuilder().setTime(annotation.timestamp() / 1000).addData(
+ KeyWithStringValue.newBuilder().setKey("zipkin.annotation").setValue(annotation.value()).build()
+ ))
);
return spanBuilder;
@@ -217,7 +224,7 @@ public class SegmentBuilder {
}
private void buildRef(SpanObject.Builder spanBuilder, Span span, SpanObject.Builder parentSegmentSpan,
- Span parentSpan) {
+ Span parentSpan) {
Segment parentSegment = context.parentSegment();
if (parentSegment == null) {
return;
@@ -299,19 +306,19 @@ public class SegmentBuilder {
return StringUtils.isNotEmpty(applicationCode) && !applicationCode.equals(currentIDs().applicationCode);
}
- private Segment addApp(String applicationCode,
- RegisterServices registerServices) throws Exception {
- int applicationId = waitForExchange(() ->
- registerServices.getApplicationIDService().getOrCreateForApplicationCode(applicationCode),
- 10
+ private Segment addApp(String applicationCode, long registerTime) throws Exception {
+ int serviceId = waitForExchange(() ->
+ CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(applicationCode),
+ 10
);
- int appInstanceId = waitForExchange(() ->
- registerServices.getOrCreateApplicationInstanceId(applicationId, applicationCode),
- 10
+ int serviceInstanceId = waitForExchange(() ->
+ CoreRegisterLinker.getServiceInstanceInventoryRegister().getOrCreate(serviceId, applicationCode, applicationCode,
+ registerTime, ZipkinTraceOSInfoBuilder.getOSInfoForZipkin(applicationCode)),
+ 10
);
- Segment segment = new Segment(applicationCode, applicationId, appInstanceId);
+ Segment segment = new Segment(applicationCode, serviceId, serviceInstanceId);
segmentsStack.add(segment);
return segment;
}
@@ -366,12 +373,12 @@ public class SegmentBuilder {
private List<SpanObject.Builder> spans;
private long endTime = 0;
- private Segment(String applicationCode, int applicationId, int appInstanceId) {
- ids = new IDCollection(applicationCode, applicationId, appInstanceId);
+ private Segment(String applicationCode, int serviceId, int serviceInstanceId) {
+ ids = new IDCollection(applicationCode, serviceId, serviceInstanceId);
spans = new LinkedList<>();
segmentBuilder = TraceSegmentObject.newBuilder();
- segmentBuilder.setApplicationId(applicationId);
- segmentBuilder.setApplicationInstanceId(appInstanceId);
+ segmentBuilder.setApplicationId(serviceId);
+ segmentBuilder.setApplicationInstanceId(serviceInstanceId);
segmentBuilder.setTraceSegmentId(generateTraceOrSegmentId());
}
@@ -450,10 +457,10 @@ public class SegmentBuilder {
private UniqueId generateTraceOrSegmentId() {
return UniqueId.newBuilder()
- .addIdParts(ThreadLocalRandom.current().nextLong())
- .addIdParts(ThreadLocalRandom.current().nextLong())
- .addIdParts(ThreadLocalRandom.current().nextLong())
- .build();
+ .addIdParts(ThreadLocalRandom.current().nextLong())
+ .addIdParts(ThreadLocalRandom.current().nextLong())
+ .addIdParts(ThreadLocalRandom.current().nextLong())
+ .build();
}
private class ClientSideSpan {
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/Zipkin2SkyWalkingTransfer.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/Zipkin2SkyWalkingTransfer.java
index 392fdf8..b41e50e 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/Zipkin2SkyWalkingTransfer.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/Zipkin2SkyWalkingTransfer.java
@@ -20,10 +20,6 @@ package org.apache.skywalking.oap.server.receiver.zipkin.transform;
import java.util.LinkedList;
import java.util.List;
-import org.apache.skywalking.apm.collector.analysis.metric.define.service.IInstanceHeartBeatService;
-import org.apache.skywalking.apm.collector.receiver.zipkin.provider.RegisterServices;
-import org.apache.skywalking.apm.collector.receiver.zipkin.provider.data.SkyWalkingTrace;
-import org.apache.skywalking.apm.collector.receiver.zipkin.provider.data.ZipkinTrace;
import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace;
import org.apache.skywalking.oap.server.receiver.zipkin.data.ZipkinTrace;
import zipkin2.Span;
@@ -49,7 +45,7 @@ public class Zipkin2SkyWalkingTransfer {
SkyWalkingTrace skyWalkingTrace = SegmentBuilder.build(traceSpans);
listeners.forEach(listener ->
- listener.notify(skyWalkingTrace)
+ listener.notify(skyWalkingTrace)
);
}