You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/07/14 01:45:18 UTC
[skywalking] 01/01: Zipkin tracer context implementation draft.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch zipkin
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 9b65221ba05318613613202b5611a4af307f9fa8
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Tue Jul 14 09:44:13 2020 +0800
Zipkin tracer context implementation draft.
---
.../apm/agent/core/context/ContextCarrier.java | 140 +++++++---------
.../apm/agent/core/context/ContextManager.java | 13 +-
.../apm/agent/core/context/ContextSnapshot.java | 75 +++++++--
.../apm/agent/core/context/CorrelationContext.java | 4 +-
.../apm/agent/core/context/PrimaryContext.java | 96 +++++++++++
...xtSnapshot.java => PrimaryContextSnapshot.java} | 30 +---
.../apm/agent/core/context/SW8CarrierItem.java | 8 +-
...CarrierItem.java => SW8PrimaryCarrierItem.java} | 15 +-
.../apm/agent/core/context/TracingContext.java | 29 ++--
.../agent/core/context/trace/TraceSegmentRef.java | 22 +--
.../core/context/ContextCarrierV3HeaderTest.java | 79 +++++----
.../apm/agent/core/context/ContextManagerTest.java | 22 +--
.../core/context/IgnoredTracerContextTest.java | 6 +-
.../EventBusImplDeliverToHandlerInterceptor.java | 2 +-
.../vertx3/HandlerRegistrationInterceptor.java | 2 +-
apm-sniffer/optional-reporter-plugins/pom.xml | 121 ++++++++++++++
.../zipkin-reporter-plugin/pom.xml | 73 ++++++++
.../reporter/zipkin/GRPCBlockingService.java} | 24 +--
.../reporter/zipkin/ZipkinContextManager.java | 46 ++++++
.../apm/plugin/reporter/zipkin/ZipkinSpan.java | 184 +++++++++++++++++++++
.../reporter/zipkin/ZipkinTraceReporter.java | 67 ++++++++
.../reporter/zipkin/ZipkinTracerContext.java | 159 ++++++++++++++++++
...ache.skywalking.apm.agent.core.boot.BootService | 30 ++++
23 files changed, 1031 insertions(+), 216 deletions(-)
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextCarrier.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextCarrier.java
index e288c1b..2c7005e 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextCarrier.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextCarrier.java
@@ -19,116 +19,98 @@
package org.apache.skywalking.apm.agent.core.context;
import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
import lombok.Getter;
-import lombok.Setter;
-import org.apache.skywalking.apm.agent.core.base64.Base64;
-import org.apache.skywalking.apm.agent.core.conf.Constants;
-import org.apache.skywalking.apm.util.StringUtil;
/**
* {@link ContextCarrier} is a data carrier of {@link TracingContext}. It holds the snapshot (current state) of {@link
* TracingContext}.
* <p>
*/
-@Setter
-@Getter
public class ContextCarrier implements Serializable {
- private String traceId;
- private String traceSegmentId;
- private int spanId = -1;
- private String parentService = Constants.EMPTY_STRING;
- private String parentServiceInstance = Constants.EMPTY_STRING;
- private String parentEndpoint;
- private String addressUsedAtClient;
-
+ @Getter
+ private PrimaryContext primaryContext = new PrimaryContext();
+ @Getter
private CorrelationContext correlationContext = new CorrelationContext();
+ @Getter
private ExtensionContext extensionContext = new ExtensionContext();
- public CarrierItem items() {
- SW8ExtensionCarrierItem sw8ExtensionCarrierItem = new SW8ExtensionCarrierItem(extensionContext, null);
- SW8CorrelationCarrierItem sw8CorrelationCarrierItem = new SW8CorrelationCarrierItem(correlationContext, sw8ExtensionCarrierItem);
- SW8CarrierItem sw8CarrierItem = new SW8CarrierItem(this, sw8CorrelationCarrierItem);
- return new CarrierItemHead(sw8CarrierItem);
- }
-
/**
- * Serialize this {@link ContextCarrier} to a {@link String}, with '|' split.
+ * Additional keys for reading propagated context.
+ *
+ * <p>These context should never be used on the core and plugin codes.</p>
*
- * @return the serialization string.
+ * Only highly customized core extension, such as new tracer or new tracer context should use this to re-use agent
+ * propagation mechanism.
*/
- String serialize(HeaderVersion version) {
- if (this.isValid(version)) {
- return StringUtil.join(
- '-',
- "1",
- Base64.encode(this.getTraceId()),
- Base64.encode(this.getTraceSegmentId()),
- this.getSpanId() + "",
- Base64.encode(this.getParentService()),
- Base64.encode(this.getParentServiceInstance()),
- Base64.encode(this.getParentEndpoint()),
- Base64.encode(this.getAddressUsedAtClient())
- );
- }
- return "";
- }
-
+ private String[] customKeys;
/**
- * Initialize fields with the given text.
+ * Additional key:value(s) for propagation.
+ *
+ * <p>These context should never be used on the core and plugin codes.</p>
*
- * @param text carries {@link #traceSegmentId} and {@link #spanId}, with '|' split.
+ * Only highly customized core extension, such as new tracer or new tracer context should use this to re-use agent
+ * propagation mechanism.
*/
- ContextCarrier deserialize(String text, HeaderVersion version) {
- if (text == null) {
- return this;
- }
- if (HeaderVersion.v3.equals(version)) {
- String[] parts = text.split("-", 8);
- if (parts.length == 8) {
- try {
- // parts[0] is sample flag, always trace if header exists.
- this.traceId = Base64.decode2UTFString(parts[1]);
- this.traceSegmentId = Base64.decode2UTFString(parts[2]);
- this.spanId = Integer.parseInt(parts[3]);
- this.parentService = Base64.decode2UTFString(parts[4]);
- this.parentServiceInstance = Base64.decode2UTFString(parts[5]);
- this.parentEndpoint = Base64.decode2UTFString(parts[6]);
- this.addressUsedAtClient = Base64.decode2UTFString(parts[7]);
- } catch (IllegalArgumentException ignored) {
+ private Map<String, String> customContext;
+ /**
+ * @return items required to propagate
+ */
+ public CarrierItem items() {
+ CarrierItem customItemsHead = null;
+ if (customContext != null) {
+ for (final Map.Entry<String, String> keyValuePair : customContext.entrySet()) {
+ customItemsHead = new CarrierItem(keyValuePair.getKey(), keyValuePair.getValue(), customItemsHead);
+ }
+ } else {
+ if (customKeys != null) {
+ for (final String customKey : customKeys) {
+ customItemsHead = new CarrierItem(customKey, "", customItemsHead);
}
}
}
- return this;
+ SW8ExtensionCarrierItem sw8ExtensionCarrierItem = new SW8ExtensionCarrierItem(
+ extensionContext, customItemsHead);
+ SW8CorrelationCarrierItem sw8CorrelationCarrierItem = new SW8CorrelationCarrierItem(
+ correlationContext, sw8ExtensionCarrierItem);
+ SW8CarrierItem sw8CarrierItem = new SW8CarrierItem(primaryContext, sw8CorrelationCarrierItem);
+ return new CarrierItemHead(sw8CarrierItem);
}
- public boolean isValid() {
- return isValid(HeaderVersion.v3);
+ /**
+ * Add custom key:value pair to propagate. Only work before the injection.
+ */
+ public void addCustomContext(String key, String value) {
+ if (customContext == null) {
+ customContext = new HashMap<>();
+ }
+ customContext.put(key, value);
}
/**
- * Make sure this {@link ContextCarrier} has been initialized.
- *
- * @return true for unbroken {@link ContextCarrier} or no-initialized. Otherwise, false;
+ * Read propagated context. The key should be set through {@link #setCustomKeys(String...)} before read.
*/
- boolean isValid(HeaderVersion version) {
- if (HeaderVersion.v3 == version) {
- return StringUtil.isNotEmpty(traceId)
- && StringUtil.isNotEmpty(traceSegmentId)
- && getSpanId() > -1
- && StringUtil.isNotEmpty(parentService)
- && StringUtil.isNotEmpty(parentServiceInstance)
- && StringUtil.isNotEmpty(parentEndpoint)
- && StringUtil.isNotEmpty(addressUsedAtClient);
+ public String readCustomContext(String key) {
+ if (customContext == null) {
+ return null;
+ } else {
+ return customContext.get(key);
}
- return false;
}
- public CorrelationContext getCorrelationContext() {
- return correlationContext;
+ /**
+ * @return true if SkyWalking primary context is valid.
+ */
+ public boolean isValid() {
+ return primaryContext.isValid();
}
- public enum HeaderVersion {
- v3
+ /**
+ * Add custom key(s) to read from propagated context(usually headers or metadata of RPC).
+ */
+ public void setCustomKeys(final String... customKeys) {
+ this.customKeys = customKeys;
}
}
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManager.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManager.java
index 46f7dc0..3117092 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManager.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManager.java
@@ -83,9 +83,12 @@ public class ContextManager implements BootService {
AbstractSpan span;
AbstractTracerContext context;
operationName = StringUtil.cut(operationName, OPERATION_NAME_THRESHOLD);
- if (carrier != null && carrier.isValid()) {
- SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class);
- samplingService.forceSampled();
+ if (carrier != null) {
+ if (carrier.isValid()) {
+ // If primary context exists, force sampling activated.
+ SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class);
+ samplingService.forceSampled();
+ }
context = getOrCreate(operationName, true);
span = context.createEntrySpan(operationName);
context.extract(carrier);
@@ -127,9 +130,7 @@ public class ContextManager implements BootService {
if (carrier == null) {
throw new IllegalArgumentException("ContextCarrier can't be null.");
}
- if (carrier.isValid()) {
- get().extract(carrier);
- }
+ get().extract(carrier);
}
public static ContextSnapshot capture() {
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextSnapshot.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextSnapshot.java
index 9db53a1..8031b4e 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextSnapshot.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextSnapshot.java
@@ -18,6 +18,8 @@
package org.apache.skywalking.apm.agent.core.context;
+import java.util.HashMap;
+import java.util.Map;
import lombok.Getter;
import org.apache.skywalking.apm.agent.core.context.ids.DistributedTraceId;
@@ -25,32 +27,48 @@ import org.apache.skywalking.apm.agent.core.context.ids.DistributedTraceId;
* The <code>ContextSnapshot</code> is a snapshot for current context. The snapshot carries the info for building
* reference between two segments in two thread, but have a causal relationship.
*/
-@Getter
public class ContextSnapshot {
- private DistributedTraceId traceId;
- private String traceSegmentId;
- private int spanId;
- private String parentEndpoint;
-
+ @Getter
+ private PrimaryContextSnapshot primaryContextSnapshot;
+ @Getter
private CorrelationContext correlationContext;
+ @Getter
private ExtensionContext extensionContext;
- ContextSnapshot(String traceSegmentId,
- int spanId,
- DistributedTraceId primaryTraceId,
- String parentEndpoint,
- CorrelationContext correlationContext,
- ExtensionContext extensionContext) {
- this.traceSegmentId = traceSegmentId;
- this.spanId = spanId;
- this.traceId = primaryTraceId;
- this.parentEndpoint = parentEndpoint;
+ /**
+ * Additional key:value(s) for propagation.
+ *
+ * <p>These context should never be used on the core and plugin codes.</p>
+ *
+ * Only highly customized core extension, such as new tracer or new tracer context should use this to re-use agent
+ * propagation mechanism.
+ */
+ private Map<String, Object> customContext;
+
+ /**
+ * Create standard ContextSnapshot for SkyWalking default core.
+ */
+ public ContextSnapshot(String traceSegmentId,
+ int spanId,
+ DistributedTraceId primaryTraceId,
+ String parentEndpoint,
+ CorrelationContext correlationContext,
+ ExtensionContext extensionContext) {
+ this.primaryContextSnapshot =
+ new PrimaryContextSnapshot(primaryTraceId, traceSegmentId, spanId, parentEndpoint);
this.correlationContext = correlationContext.clone();
this.extensionContext = extensionContext.clone();
}
+ /**
+ * Create an empty ContextSnapshot shell, for extension only.
+ */
+ public ContextSnapshot(CorrelationContext correlationContext) {
+ this.correlationContext = correlationContext.clone();
+ }
+
public boolean isFromCurrent() {
- return traceSegmentId != null && traceSegmentId.equals(ContextManager.capture().getTraceSegmentId());
+ return primaryContextSnapshot.isFromCurrent();
}
public CorrelationContext getCorrelationContext() {
@@ -58,6 +76,27 @@ public class ContextSnapshot {
}
public boolean isValid() {
- return traceSegmentId != null && spanId > -1 && traceId != null;
+ return primaryContextSnapshot.isValid();
+ }
+
+ /**
+ * Add custom key:value pair to propagate. Only work in the capture stage.
+ */
+ public void addCustomContext(String key, Object value) {
+ if (customContext == null) {
+ customContext = new HashMap<>();
+ }
+ customContext.put(key, value);
+ }
+
+ /**
+ * Read cached propagated context.
+ */
+ public Object readCustomContext(String key) {
+ if (customContext == null) {
+ return null;
+ } else {
+ return customContext.get(key);
+ }
}
}
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/CorrelationContext.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/CorrelationContext.java
index 4f860b2..e28072d 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/CorrelationContext.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/CorrelationContext.java
@@ -118,14 +118,14 @@ public class CorrelationContext {
/**
* Prepare for the cross-process propagation. Inject the {@link #data} into {@link ContextCarrier#getCorrelationContext()}
*/
- void inject(ContextCarrier carrier) {
+ public void inject(ContextCarrier carrier) {
carrier.getCorrelationContext().data.putAll(this.data);
}
/**
* Extra the {@link ContextCarrier#getCorrelationContext()} into this context.
*/
- void extract(ContextCarrier carrier) {
+ public void extract(ContextCarrier carrier) {
final Map<String, String> carrierCorrelationContext = carrier.getCorrelationContext().data;
for (Map.Entry<String, String> entry : carrierCorrelationContext.entrySet()) {
// Only data with limited count of elements can be added
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/PrimaryContext.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/PrimaryContext.java
new file mode 100644
index 0000000..d34aabb
--- /dev/null
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/PrimaryContext.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.agent.core.context;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.skywalking.apm.agent.core.base64.Base64;
+import org.apache.skywalking.apm.agent.core.conf.Constants;
+import org.apache.skywalking.apm.util.StringUtil;
+
+@Setter
+@Getter
+public class PrimaryContext {
+ private String traceId;
+ private String traceSegmentId;
+ private int spanId = -1;
+ private String parentService = Constants.EMPTY_STRING;
+ private String parentServiceInstance = Constants.EMPTY_STRING;
+ private String parentEndpoint;
+ private String addressUsedAtClient;
+
+ /**
+ * Serialize this {@link ContextCarrier} to a {@link String}, with '|' split.
+ *
+ * @return the serialization string.
+ */
+ String serialize() {
+ if (this.isValid()) {
+ return StringUtil.join(
+ '-',
+ "1",
+ Base64.encode(this.getTraceId()),
+ Base64.encode(this.getTraceSegmentId()),
+ this.getSpanId() + "",
+ Base64.encode(this.getParentService()),
+ Base64.encode(this.getParentServiceInstance()),
+ Base64.encode(this.getParentEndpoint()),
+ Base64.encode(this.getAddressUsedAtClient())
+ );
+ }
+ return "";
+ }
+
+ /**
+ * Initialize fields with the given text.
+ *
+ * @param text carries {@link #traceSegmentId} and {@link #spanId}, with '|' split.
+ */
+ PrimaryContext deserialize(String text) {
+ if (text == null) {
+ return this;
+ }
+ String[] parts = text.split("-", 8);
+ if (parts.length == 8) {
+ try {
+ // parts[0] is sample flag, always trace if header exists.
+ this.traceId = Base64.decode2UTFString(parts[1]);
+ this.traceSegmentId = Base64.decode2UTFString(parts[2]);
+ this.spanId = Integer.parseInt(parts[3]);
+ this.parentService = Base64.decode2UTFString(parts[4]);
+ this.parentServiceInstance = Base64.decode2UTFString(parts[5]);
+ this.parentEndpoint = Base64.decode2UTFString(parts[6]);
+ this.addressUsedAtClient = Base64.decode2UTFString(parts[7]);
+ } catch (IllegalArgumentException ignored) {
+
+ }
+ }
+ return this;
+ }
+
+ public boolean isValid() {
+ return StringUtil.isNotEmpty(traceId)
+ && StringUtil.isNotEmpty(traceSegmentId)
+ && getSpanId() > -1
+ && StringUtil.isNotEmpty(parentService)
+ && StringUtil.isNotEmpty(parentServiceInstance)
+ && StringUtil.isNotEmpty(parentEndpoint)
+ && StringUtil.isNotEmpty(addressUsedAtClient);
+ }
+}
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextSnapshot.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/PrimaryContextSnapshot.java
similarity index 58%
copy from apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextSnapshot.java
copy to apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/PrimaryContextSnapshot.java
index 9db53a1..eee4291 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextSnapshot.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/PrimaryContextSnapshot.java
@@ -21,40 +21,26 @@ package org.apache.skywalking.apm.agent.core.context;
import lombok.Getter;
import org.apache.skywalking.apm.agent.core.context.ids.DistributedTraceId;
-/**
- * The <code>ContextSnapshot</code> is a snapshot for current context. The snapshot carries the info for building
- * reference between two segments in two thread, but have a causal relationship.
- */
@Getter
-public class ContextSnapshot {
+public class PrimaryContextSnapshot {
private DistributedTraceId traceId;
private String traceSegmentId;
private int spanId;
private String parentEndpoint;
- private CorrelationContext correlationContext;
- private ExtensionContext extensionContext;
-
- ContextSnapshot(String traceSegmentId,
- int spanId,
- DistributedTraceId primaryTraceId,
- String parentEndpoint,
- CorrelationContext correlationContext,
- ExtensionContext extensionContext) {
+ public PrimaryContextSnapshot(final DistributedTraceId traceId,
+ final String traceSegmentId,
+ final int spanId,
+ final String parentEndpoint) {
+ this.traceId = traceId;
this.traceSegmentId = traceSegmentId;
this.spanId = spanId;
- this.traceId = primaryTraceId;
this.parentEndpoint = parentEndpoint;
- this.correlationContext = correlationContext.clone();
- this.extensionContext = extensionContext.clone();
}
public boolean isFromCurrent() {
- return traceSegmentId != null && traceSegmentId.equals(ContextManager.capture().getTraceSegmentId());
- }
-
- public CorrelationContext getCorrelationContext() {
- return correlationContext;
+ return traceSegmentId != null
+ && traceSegmentId.equals(ContextManager.capture().getPrimaryContextSnapshot().getTraceSegmentId());
}
public boolean isValid() {
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW8CarrierItem.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW8CarrierItem.java
index 07f79f2..139b5dd 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW8CarrierItem.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW8CarrierItem.java
@@ -20,15 +20,15 @@ package org.apache.skywalking.apm.agent.core.context;
public class SW8CarrierItem extends CarrierItem {
public static final String HEADER_NAME = "sw8";
- private ContextCarrier carrier;
+ private PrimaryContext carrier;
- public SW8CarrierItem(ContextCarrier carrier, CarrierItem next) {
- super(HEADER_NAME, carrier.serialize(ContextCarrier.HeaderVersion.v3), next);
+ public SW8CarrierItem(PrimaryContext carrier, CarrierItem next) {
+ super(HEADER_NAME, carrier.serialize(), next);
this.carrier = carrier;
}
@Override
public void setHeadValue(String headValue) {
- carrier.deserialize(headValue, ContextCarrier.HeaderVersion.v3);
+ carrier.deserialize(headValue);
}
}
\ No newline at end of file
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW8CarrierItem.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW8PrimaryCarrierItem.java
similarity index 73%
copy from apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW8CarrierItem.java
copy to apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW8PrimaryCarrierItem.java
index 07f79f2..afe10ab 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW8CarrierItem.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW8PrimaryCarrierItem.java
@@ -18,17 +18,18 @@
package org.apache.skywalking.apm.agent.core.context;
-public class SW8CarrierItem extends CarrierItem {
+public class SW8PrimaryCarrierItem extends CarrierItem {
public static final String HEADER_NAME = "sw8";
- private ContextCarrier carrier;
+ private final PrimaryContext primaryContext;
- public SW8CarrierItem(ContextCarrier carrier, CarrierItem next) {
- super(HEADER_NAME, carrier.serialize(ContextCarrier.HeaderVersion.v3), next);
- this.carrier = carrier;
+ public SW8PrimaryCarrierItem(PrimaryContext primaryContext, CarrierItem next) {
+ super(HEADER_NAME, primaryContext.serialize(), next);
+ this.primaryContext = primaryContext;
}
@Override
public void setHeadValue(String headValue) {
- carrier.deserialize(headValue, ContextCarrier.HeaderVersion.v3);
+ this.primaryContext.deserialize(headValue);
}
-}
\ No newline at end of file
+
+}
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java
index d803084..639e257 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java
@@ -170,13 +170,14 @@ public class TracingContext implements AbstractTracerContext {
throw new IllegalStateException("Exit span doesn't include meaningful peer information.");
}
- carrier.setTraceId(getReadablePrimaryTraceId());
- carrier.setTraceSegmentId(this.segment.getTraceSegmentId());
- carrier.setSpanId(exitSpan.getSpanId());
- carrier.setParentService(Config.Agent.SERVICE_NAME);
- carrier.setParentServiceInstance(Config.Agent.INSTANCE_NAME);
- carrier.setParentEndpoint(first().getOperationName());
- carrier.setAddressUsedAtClient(peer);
+ final PrimaryContext primaryContext = carrier.getPrimaryContext();
+ primaryContext.setTraceId(getReadablePrimaryTraceId());
+ primaryContext.setTraceSegmentId(this.segment.getTraceSegmentId());
+ primaryContext.setSpanId(exitSpan.getSpanId());
+ primaryContext.setParentService(Config.Agent.SERVICE_NAME);
+ primaryContext.setParentServiceInstance(Config.Agent.INSTANCE_NAME);
+ primaryContext.setParentEndpoint(first().getOperationName());
+ primaryContext.setAddressUsedAtClient(peer);
this.correlationContext.inject(carrier);
this.extensionContext.inject(carrier);
@@ -189,12 +190,14 @@ public class TracingContext implements AbstractTracerContext {
*/
@Override
public void extract(ContextCarrier carrier) {
- TraceSegmentRef ref = new TraceSegmentRef(carrier);
- this.segment.ref(ref);
- this.segment.relatedGlobalTraces(new PropagatedTraceId(carrier.getTraceId()));
AbstractSpan span = this.activeSpan();
- if (span instanceof EntrySpan) {
- span.ref(ref);
+ if (carrier.isValid()) {
+ TraceSegmentRef ref = new TraceSegmentRef(carrier);
+ this.segment.ref(ref);
+ this.segment.relatedGlobalTraces(new PropagatedTraceId(carrier.getPrimaryContext().getTraceId()));
+ if (span instanceof EntrySpan) {
+ span.ref(ref);
+ }
}
this.correlationContext.extract(carrier);
@@ -232,7 +235,7 @@ public class TracingContext implements AbstractTracerContext {
TraceSegmentRef segmentRef = new TraceSegmentRef(snapshot);
this.segment.ref(segmentRef);
this.activeSpan().ref(segmentRef);
- this.segment.relatedGlobalTraces(snapshot.getTraceId());
+ this.segment.relatedGlobalTraces(snapshot.getPrimaryContextSnapshot().getTraceId());
this.correlationContext.continued(snapshot);
this.extensionContext.continued(snapshot);
this.extensionContext.handle(this.activeSpan());
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/TraceSegmentRef.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/TraceSegmentRef.java
index 688dca1..517396a 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/TraceSegmentRef.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/TraceSegmentRef.java
@@ -48,23 +48,23 @@ public class TraceSegmentRef {
*/
public TraceSegmentRef(ContextCarrier carrier) {
this.type = SegmentRefType.CROSS_PROCESS;
- this.traceId = carrier.getTraceId();
- this.traceSegmentId = carrier.getTraceSegmentId();
- this.spanId = carrier.getSpanId();
- this.parentService = carrier.getParentService();
- this.parentServiceInstance = carrier.getParentServiceInstance();
- this.parentEndpoint = carrier.getParentEndpoint();
- this.addressUsedAtClient = carrier.getAddressUsedAtClient();
+ this.traceId = carrier.getPrimaryContext().getTraceId();
+ this.traceSegmentId = carrier.getPrimaryContext().getTraceSegmentId();
+ this.spanId = carrier.getPrimaryContext().getSpanId();
+ this.parentService = carrier.getPrimaryContext().getParentService();
+ this.parentServiceInstance = carrier.getPrimaryContext().getParentServiceInstance();
+ this.parentEndpoint = carrier.getPrimaryContext().getParentEndpoint();
+ this.addressUsedAtClient = carrier.getPrimaryContext().getAddressUsedAtClient();
}
public TraceSegmentRef(ContextSnapshot snapshot) {
this.type = SegmentRefType.CROSS_THREAD;
- this.traceId = snapshot.getTraceId().getId();
- this.traceSegmentId = snapshot.getTraceSegmentId();
- this.spanId = snapshot.getSpanId();
+ this.traceId = snapshot.getPrimaryContextSnapshot().getTraceId().getId();
+ this.traceSegmentId = snapshot.getPrimaryContextSnapshot().getTraceSegmentId();
+ this.spanId = snapshot.getPrimaryContextSnapshot().getSpanId();
this.parentService = Config.Agent.SERVICE_NAME;
this.parentServiceInstance = Config.Agent.INSTANCE_NAME;
- this.parentEndpoint = snapshot.getParentEndpoint();
+ this.parentEndpoint = snapshot.getPrimaryContextSnapshot().getParentEndpoint();
}
public SegmentReference transform() {
diff --git a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/ContextCarrierV3HeaderTest.java b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/ContextCarrierV3HeaderTest.java
index f45e093..dc7aa56 100644
--- a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/ContextCarrierV3HeaderTest.java
+++ b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/ContextCarrierV3HeaderTest.java
@@ -52,14 +52,14 @@ public class ContextCarrierV3HeaderTest {
List<DistributedTraceId> distributedTraceIds = new ArrayList<>();
ContextCarrier contextCarrier = new ContextCarrier();
- contextCarrier.setTraceSegmentId("1.2.3");
- contextCarrier.setTraceId("3.4.5");
- contextCarrier.setSpanId(4);
- contextCarrier.setParentService("service");
- contextCarrier.setParentServiceInstance("instance");
- contextCarrier.setAddressUsedAtClient("127.0.0.1:8080");
- contextCarrier.setParentEndpoint("/portal");
- contextCarrier.setParentEndpoint("/app");
+ contextCarrier.getPrimaryContext().setTraceSegmentId("1.2.3");
+ contextCarrier.getPrimaryContext().setTraceId("3.4.5");
+ contextCarrier.getPrimaryContext().setSpanId(4);
+ contextCarrier.getPrimaryContext().setParentService("service");
+ contextCarrier.getPrimaryContext().setParentServiceInstance("instance");
+ contextCarrier.getPrimaryContext().setAddressUsedAtClient("127.0.0.1:8080");
+ contextCarrier.getPrimaryContext().setParentEndpoint("/portal");
+ contextCarrier.getPrimaryContext().setParentEndpoint("/app");
contextCarrier.getCorrelationContext().put("test", "true");
@@ -69,7 +69,10 @@ public class ContextCarrierV3HeaderTest {
while (next.hasNext()) {
next = next.next();
if (next.getHeadKey().equals(SW8CarrierItem.HEADER_NAME)) {
- Assert.assertEquals("1-My40LjU=-MS4yLjM=-4-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA=", next.getHeadValue());
+ Assert.assertEquals(
+ "1-My40LjU=-MS4yLjM=-4-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA=",
+ next.getHeadValue()
+ );
} else if (next.getHeadKey().equals(SW8CorrelationCarrierItem.HEADER_NAME)) {
/**
* customKey:customValue
@@ -88,7 +91,10 @@ public class ContextCarrierV3HeaderTest {
while (next.hasNext()) {
next = next.next();
if (next.getHeadKey().equals(SW8CarrierItem.HEADER_NAME)) {
- Assert.assertEquals("1-My40LjU=-MS4yLjM=-4-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA=", next.getHeadValue());
+ Assert.assertEquals(
+ "1-My40LjU=-MS4yLjM=-4-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA=",
+ next.getHeadValue()
+ );
} else if (next.getHeadKey().equals(SW8CorrelationCarrierItem.HEADER_NAME)) {
Assert.assertEquals("dGVzdA==:dHJ1ZQ==", next.getHeadValue());
} else if (next.getHeadKey().equals(SW8ExtensionCarrierItem.HEADER_NAME)) {
@@ -107,26 +113,26 @@ public class ContextCarrierV3HeaderTest {
distributedTraceIds.add(new PropagatedTraceId("3.4.5"));
ContextCarrier contextCarrier = new ContextCarrier();
- contextCarrier.setTraceSegmentId("1.2.3");
- contextCarrier.setTraceId("3.4.5");
- contextCarrier.setSpanId(4);
- contextCarrier.setParentService("service");
- contextCarrier.setParentServiceInstance("instance");
- contextCarrier.setAddressUsedAtClient("127.0.0.1:8080");
- contextCarrier.setParentEndpoint("/portal");
- contextCarrier.setParentEndpoint("/app");
+ contextCarrier.getPrimaryContext().setTraceSegmentId("1.2.3");
+ contextCarrier.getPrimaryContext().setTraceId("3.4.5");
+ contextCarrier.getPrimaryContext().setSpanId(4);
+ contextCarrier.getPrimaryContext().setParentService("service");
+ contextCarrier.getPrimaryContext().setParentServiceInstance("instance");
+ contextCarrier.getPrimaryContext().setAddressUsedAtClient("127.0.0.1:8080");
+ contextCarrier.getPrimaryContext().setParentEndpoint("/portal");
+ contextCarrier.getPrimaryContext().setParentEndpoint("/app");
contextCarrier.getCorrelationContext().put("test", "true");
contextCarrier.getExtensionContext().deserialize("1");
CarrierItem next = contextCarrier.items();
- String sw6HeaderValue = null;
+ String sw8HeaderValue = null;
String correlationHeaderValue = null;
String extensionHeaderValue = null;
while (next.hasNext()) {
next = next.next();
if (next.getHeadKey().equals(SW8CarrierItem.HEADER_NAME)) {
- sw6HeaderValue = next.getHeadValue();
+ sw8HeaderValue = next.getHeadValue();
} else if (next.getHeadKey().equals(SW8CorrelationCarrierItem.HEADER_NAME)) {
correlationHeaderValue = next.getHeadValue();
} else if (next.getHeadKey().equals(SW8ExtensionCarrierItem.HEADER_NAME)) {
@@ -141,7 +147,7 @@ public class ContextCarrierV3HeaderTest {
while (next.hasNext()) {
next = next.next();
if (next.getHeadKey().equals(SW8CarrierItem.HEADER_NAME)) {
- next.setHeadValue(sw6HeaderValue);
+ next.setHeadValue(sw8HeaderValue);
} else if (next.getHeadKey().equals(SW8CorrelationCarrierItem.HEADER_NAME)) {
next.setHeadValue(correlationHeaderValue);
} else if (next.getHeadKey().equals(SW8ExtensionCarrierItem.HEADER_NAME)) {
@@ -152,13 +158,30 @@ public class ContextCarrierV3HeaderTest {
}
Assert.assertTrue(contextCarrier2.isValid());
- Assert.assertEquals(contextCarrier.getSpanId(), contextCarrier2.getSpanId());
- Assert.assertEquals(contextCarrier.getAddressUsedAtClient(), contextCarrier2.getAddressUsedAtClient());
- Assert.assertEquals(contextCarrier.getTraceId(), contextCarrier2.getTraceId());
- Assert.assertEquals(contextCarrier.getTraceSegmentId(), contextCarrier2.getTraceSegmentId());
- Assert.assertEquals(contextCarrier.getParentService(), contextCarrier2.getParentService());
- Assert.assertEquals(contextCarrier.getParentServiceInstance(), contextCarrier2.getParentServiceInstance());
- Assert.assertEquals(contextCarrier.getParentEndpoint(), contextCarrier2.getParentEndpoint());
+ Assert.assertEquals(
+ contextCarrier.getPrimaryContext().getSpanId(), contextCarrier2.getPrimaryContext().getSpanId());
+ Assert.assertEquals(
+ contextCarrier.getPrimaryContext().getAddressUsedAtClient(),
+ contextCarrier2.getPrimaryContext().getAddressUsedAtClient()
+ );
+ Assert.assertEquals(
+ contextCarrier.getPrimaryContext().getTraceId(), contextCarrier2.getPrimaryContext().getTraceId());
+ Assert.assertEquals(
+ contextCarrier.getPrimaryContext().getTraceSegmentId(),
+ contextCarrier2.getPrimaryContext().getTraceSegmentId()
+ );
+ Assert.assertEquals(
+ contextCarrier.getPrimaryContext().getParentService(),
+ contextCarrier2.getPrimaryContext().getParentService()
+ );
+ Assert.assertEquals(
+ contextCarrier.getPrimaryContext().getParentServiceInstance(),
+ contextCarrier2.getPrimaryContext().getParentServiceInstance()
+ );
+ Assert.assertEquals(
+ contextCarrier.getPrimaryContext().getParentEndpoint(),
+ contextCarrier2.getPrimaryContext().getParentEndpoint()
+ );
Assert.assertEquals(contextCarrier.getCorrelationContext(), contextCarrier2.getCorrelationContext());
Assert.assertEquals(contextCarrier.getExtensionContext(), contextCarrier2.getExtensionContext());
}
diff --git a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/ContextManagerTest.java b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/ContextManagerTest.java
index 774da23..fce0ab8 100644
--- a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/ContextManagerTest.java
+++ b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/ContextManagerTest.java
@@ -103,9 +103,9 @@ public class ContextManagerTest {
@Test
public void createMultipleEntrySpan() {
- ContextCarrier contextCarrier = new ContextCarrier().deserialize(
- "1-My40LjU=-MS4yLjM=-4-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA=",
- ContextCarrier.HeaderVersion.v3
+ ContextCarrier contextCarrier = new ContextCarrier();
+ contextCarrier.getPrimaryContext().deserialize(
+ "1-My40LjU=-MS4yLjM=-4-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA="
);
assertTrue(contextCarrier.isValid());
@@ -159,8 +159,8 @@ public class ContextManagerTest {
assertThat(logs.size(), is(1));
assertThat(logs.get(0).getLogs().size(), is(4));
- assertThat(injectContextCarrier.getSpanId(), is(1));
- assertThat(injectContextCarrier.getAddressUsedAtClient(), is("127.0.0.1:12800"));
+ assertThat(injectContextCarrier.getPrimaryContext().getSpanId(), is(1));
+ assertThat(injectContextCarrier.getPrimaryContext().getAddressUsedAtClient(), is("127.0.0.1:12800"));
}
@Test
@@ -210,18 +210,18 @@ public class ContextManagerTest {
assertThat(actualEntrySpan.getSpanId(), is(0));
assertThat(AbstractTracingSpanHelper.getParentSpanId(actualEntrySpan), is(-1));
- assertThat(firstExitSpanContextCarrier.getAddressUsedAtClient(), is("127.0.0.1:8080"));
- assertThat(firstExitSpanContextCarrier.getSpanId(), is(1));
+ assertThat(firstExitSpanContextCarrier.getPrimaryContext().getAddressUsedAtClient(), is("127.0.0.1:8080"));
+ assertThat(firstExitSpanContextCarrier.getPrimaryContext().getSpanId(), is(1));
- assertThat(secondExitSpanContextCarrier.getSpanId(), is(1));
+ assertThat(secondExitSpanContextCarrier.getPrimaryContext().getSpanId(), is(1));
}
@Test
public void testTransform() throws InvalidProtocolBufferException {
- ContextCarrier contextCarrier = new ContextCarrier().deserialize(
- "1-My40LjU=-MS4yLjM=-3-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA=",
- ContextCarrier.HeaderVersion.v3
+ ContextCarrier contextCarrier = new ContextCarrier();
+ contextCarrier.getPrimaryContext().deserialize(
+ "1-My40LjU=-MS4yLjM=-3-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA="
);
assertTrue(contextCarrier.isValid());
diff --git a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/IgnoredTracerContextTest.java b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/IgnoredTracerContextTest.java
index b46fcdc..3c63847 100644
--- a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/IgnoredTracerContextTest.java
+++ b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/IgnoredTracerContextTest.java
@@ -95,9 +95,9 @@ public class IgnoredTracerContextTest {
ContextManager.stopSpan();
assertThat(abstractSpan.getClass().getName(), is(NoopSpan.class.getName()));
- assertNull(contextCarrier.getParentEndpoint());
- assertThat(contextCarrier.getSpanId(), is(-1));
- assertNull(contextCarrier.getAddressUsedAtClient());
+ assertNull(contextCarrier.getPrimaryContext().getParentEndpoint());
+ assertThat(contextCarrier.getPrimaryContext().getSpanId(), is(-1));
+ assertNull(contextCarrier.getPrimaryContext().getAddressUsedAtClient());
LinkedList<IgnoredTracerContext> ignoredTracerContexts = storage.getIgnoredTracerContexts();
assertThat(ignoredTracerContexts.size(), is(1));
diff --git a/apm-sniffer/apm-sdk-plugin/vertx-plugins/vertx-core-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/vertx3/EventBusImplDeliverToHandlerInterceptor.java b/apm-sniffer/apm-sdk-plugin/vertx-plugins/vertx-core-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/vertx3/EventBusImplDeliverToHandlerInterceptor.java
index 8e32945..9f769fb 100644
--- a/apm-sniffer/apm-sdk-plugin/vertx-plugins/vertx-core-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/vertx3/EventBusImplDeliverToHandlerInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/vertx-plugins/vertx-core-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/vertx3/EventBusImplDeliverToHandlerInterceptor.java
@@ -46,7 +46,7 @@ public class EventBusImplDeliverToHandlerInterceptor implements InstanceMethodsA
AbstractSpan span;
if (VertxContext.hasContext(message.replyAddress())) {
VertxContext context = VertxContext.peekContext(message.replyAddress());
- span = ContextManager.createLocalSpan(context.getContextSnapshot().getParentEndpoint());
+ span = ContextManager.createLocalSpan(context.getContextSnapshot().getPrimaryContextSnapshot().getParentEndpoint());
ContextManager.continued(context.getContextSnapshot());
} else {
span = ContextManager.createLocalSpan(message.address());
diff --git a/apm-sniffer/apm-sdk-plugin/vertx-plugins/vertx-core-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/vertx3/HandlerRegistrationInterceptor.java b/apm-sniffer/apm-sdk-plugin/vertx-plugins/vertx-core-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/vertx3/HandlerRegistrationInterceptor.java
index ff8db51..6662439 100644
--- a/apm-sniffer/apm-sdk-plugin/vertx-plugins/vertx-core-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/vertx3/HandlerRegistrationInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/vertx-plugins/vertx-core-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/vertx3/HandlerRegistrationInterceptor.java
@@ -59,7 +59,7 @@ public class HandlerRegistrationInterceptor implements InstanceMethodsAroundInte
} else {
if (VertxContext.hasContext(message.replyAddress())) {
VertxContext context = VertxContext.peekContext(message.replyAddress());
- span = ContextManager.createLocalSpan(context.getContextSnapshot().getParentEndpoint());
+ span = ContextManager.createLocalSpan(context.getContextSnapshot().getPrimaryContextSnapshot().getParentEndpoint());
ContextManager.continued(context.getContextSnapshot());
} else {
span = ContextManager.createLocalSpan(message.address());
diff --git a/apm-sniffer/optional-reporter-plugins/pom.xml b/apm-sniffer/optional-reporter-plugins/pom.xml
new file mode 100644
index 0000000..140eac2
--- /dev/null
+++ b/apm-sniffer/optional-reporter-plugins/pom.xml
@@ -0,0 +1,121 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>apm-sniffer</artifactId>
+ <groupId>org.apache.skywalking</groupId>
+ <version>8.1.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>optional-reporter-plugins</artifactId>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>zipkin-reporter-plugin</module>
+ </modules>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+
+ <agent.package.dest.dir>${project.build.directory}/../../../../skywalking-agent</agent.package.dest.dir>
+ <optional.reporter.plugins.dest.dir>${agent.package.dest.dir}/optional-reporter-plugins</optional.reporter.plugins.dest.dir>
+
+ <ant-contrib.version>1.0b3</ant-contrib.version>
+ <ant-nodeps.version>1.8.1</ant-nodeps.version>
+
+ <kafk-clients.version>2.4.1</kafk-clients.version>
+ <spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>apm-agent-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>apm-util</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>apm-test-tools</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <tasks>
+ <taskdef resource="net/sf/antcontrib/antcontrib.properties"
+ classpathref="maven.runtime.classpath"/>
+ <if>
+ <equals arg1="${project.packaging}" arg2="jar"/>
+ <then>
+ <mkdir dir="${optional.reporter.plugins.dest.dir}"/>
+ <mkdir dir="${agent.package.dest.dir}/reporter-plugins" />
+ <copy
+ file="${project.build.directory}/${project.artifactId}-${project.version}.jar"
+ tofile="${optional.reporter.plugins.dest.dir}/${project.artifactId}-${project.version}.jar"
+ overwrite="true"/>
+ </then>
+ </if>
+ </tasks>
+ </configuration>
+ </execution>
+ </executions>
+ <dependencies>
+ <dependency>
+ <groupId>ant-contrib</groupId>
+ <artifactId>ant-contrib</artifactId>
+ <version>${ant-contrib.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>ant</groupId>
+ <artifactId>ant</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ant</groupId>
+ <artifactId>ant-nodeps</artifactId>
+ <version>${ant-nodeps.version}</version>
+ </dependency>
+ </dependencies>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git a/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/pom.xml b/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/pom.xml
new file mode 100644
index 0000000..9ace787
--- /dev/null
+++ b/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>optional-reporter-plugins</artifactId>
+ <version>8.1.0-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>zipkin-reporter-plugin</artifactId>
+ <version>8.1.0-SNAPSHOT</version>
+
+ <properties>
+ <zipkin.version>5.12.3</zipkin.version>
+ <zipkin.okhttp3.version>2.15.0</zipkin.okhttp3.version>
+ <zipkin.reporter.brave.version>2.15.0</zipkin.reporter.brave.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.zipkin.brave</groupId>
+ <artifactId>brave</artifactId>
+ <version>${zipkin.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.zipkin.brave</groupId>
+ <artifactId>brave-instrumentation-http</artifactId>
+ <version>${zipkin.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.zipkin.brave</groupId>
+ <artifactId>brave-instrumentation-rpc</artifactId>
+ <version>${zipkin.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.zipkin.brave</groupId>
+ <artifactId>brave-instrumentation-messaging</artifactId>
+ <version>${zipkin.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.zipkin.reporter2</groupId>
+ <artifactId>zipkin-sender-okhttp3</artifactId>
+ <version>${zipkin.okhttp3.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.zipkin.reporter2</groupId>
+ <artifactId>zipkin-reporter-brave</artifactId>
+ <version>${zipkin.reporter.brave.version}</version>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW8CarrierItem.java b/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/GRPCBlockingService.java
similarity index 57%
copy from apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW8CarrierItem.java
copy to apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/GRPCBlockingService.java
index 07f79f2..5613c02 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW8CarrierItem.java
+++ b/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/GRPCBlockingService.java
@@ -16,19 +16,23 @@
*
*/
-package org.apache.skywalking.apm.agent.core.context;
+package org.apache.skywalking.apm.plugin.reporter.zipkin;
-public class SW8CarrierItem extends CarrierItem {
- public static final String HEADER_NAME = "sw8";
- private ContextCarrier carrier;
+import org.apache.skywalking.apm.agent.core.boot.OverrideImplementor;
+import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
- public SW8CarrierItem(ContextCarrier carrier, CarrierItem next) {
- super(HEADER_NAME, carrier.serialize(ContextCarrier.HeaderVersion.v3), next);
- this.carrier = carrier;
+/**
+ * The {@link org.apache.skywalking.apm.agent.core.conf.Config.Collector#BACKEND_SERVICE} is used by {@link
+ * ZipkinTraceReporter}, so, this service prevents the default implementation activate the grpc channel through this
+ * parameter.
+ */
+@OverrideImplementor(GRPCChannelManager.class)
+public class GRPCBlockingService extends GRPCChannelManager {
+ @Override
+ public void prepare() {
}
@Override
- public void setHeadValue(String headValue) {
- carrier.deserialize(headValue, ContextCarrier.HeaderVersion.v3);
+ public void boot() {
}
-}
\ No newline at end of file
+}
diff --git a/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/ZipkinContextManager.java b/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/ZipkinContextManager.java
new file mode 100644
index 0000000..eb22db6
--- /dev/null
+++ b/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/ZipkinContextManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.reporter.zipkin;
+
+import org.apache.skywalking.apm.agent.core.boot.OverrideImplementor;
+import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
+import org.apache.skywalking.apm.agent.core.context.AbstractTracerContext;
+import org.apache.skywalking.apm.agent.core.context.ContextManagerExtendService;
+
+/**
+ * ZipkinContextManager used Brave APIs to manage the Zipkin tracing context, including span start/stop/tag/log,
+ * inject/extract in across process, and capture/continue in across thread.
+ */
+@OverrideImplementor(ContextManagerExtendService.class)
+public class ZipkinContextManager extends ContextManagerExtendService {
+ private ZipkinTraceReporter zipkinTraceReporter;
+
+ @Override
+ public void prepare() {
+ zipkinTraceReporter = ServiceManager.INSTANCE.findService(ZipkinTraceReporter.class);
+ }
+
+ /**
+ * Create AbstractTracerContext with as all new Zipkin tracer.
+ */
+ @Override
+ public AbstractTracerContext createTraceContext(final String operationName, final boolean forceSampling) {
+ return new ZipkinTracerContext(zipkinTraceReporter.getTracing());
+ }
+}
diff --git a/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/ZipkinSpan.java b/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/ZipkinSpan.java
new file mode 100644
index 0000000..e207ffd
--- /dev/null
+++ b/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/ZipkinSpan.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.reporter.zipkin;
+
+import brave.Span;
+import java.util.Map;
+import lombok.Getter;
+import org.apache.skywalking.apm.agent.core.context.tag.AbstractTag;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegmentRef;
+import org.apache.skywalking.apm.network.trace.component.Component;
+
+/**
+ * Zipkin span is the API bridge for Zipkin span running in the SkyWalking shell.
+ */
+@Getter
+public class ZipkinSpan implements AbstractSpan {
+ private final Span span;
+ private boolean isEntry;
+ private boolean isExit;
+ @Getter
+ private boolean isAsync = false;
+
+ public ZipkinSpan(final Span span) {
+ this.span = span;
+ this.isEntry = false;
+ this.isExit = false;
+ }
+
+ public ZipkinSpan setEntry(final boolean entry) {
+ isEntry = entry;
+ span.kind(Span.Kind.SERVER);
+ return this;
+ }
+
+ public ZipkinSpan setExit(final boolean exit) {
+ isExit = exit;
+ span.kind(Span.Kind.CLIENT);
+ return this;
+ }
+
+ @Override
+ public AbstractSpan setComponent(final Component component) {
+ span.tag("component", component.getName());
+ return this;
+ }
+
+ @Override
+ public AbstractSpan setLayer(final SpanLayer layer) {
+ span.tag("layer", layer.name());
+ if (isEntry && layer.equals(SpanLayer.MQ)) {
+ span.kind(Span.Kind.CONSUMER);
+ } else if (isExit && layer.equals(SpanLayer.MQ)) {
+ span.kind(Span.Kind.PRODUCER);
+ }
+ return this;
+ }
+
+ @Override
+ public AbstractSpan tag(final String key, final String value) {
+ span.tag(key, value);
+ return this;
+ }
+
+ @Override
+ public AbstractSpan tag(final AbstractTag<?> tag, final String value) {
+ span.tag(tag.key(), value);
+ return this;
+ }
+
+ @Override
+ public AbstractSpan log(final Throwable t) {
+ span.error(t);
+ return this;
+ }
+
+ @Override
+ public AbstractSpan errorOccurred() {
+ return this;
+ }
+
+ @Override
+ public boolean isEntry() {
+ return isEntry;
+ }
+
+ @Override
+ public boolean isExit() {
+ return isExit;
+ }
+
+ @Override
+ public AbstractSpan log(final long timestamp, final Map<String, ?> event) {
+ return null;
+ }
+
+ @Override
+ public AbstractSpan setOperationName(final String operationName) {
+ span.name(operationName);
+ return this;
+ }
+
+ @Override
+ public AbstractSpan start() {
+ span.start();
+ return this;
+ }
+
+ /**
+ * @return 0 always, as span is not readable before finished.
+ */
+ @Override
+ public int getSpanId() {
+ return 0;
+ }
+
+ /**
+ * @return empty string, as span is not readable before finished.
+ */
+ @Override
+ public String getOperationName() {
+ return "";
+ }
+
+ @Override
+ public void ref(final TraceSegmentRef ref) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public AbstractSpan start(final long startTime) {
+ span.start(startTime);
+ return this;
+ }
+
+ @Override
+ public AbstractSpan setPeer(final String remotePeer) {
+ span.remoteServiceName(remotePeer);
+ return this;
+ }
+
+ @Override
+ public boolean isProfiling() {
+ return false;
+ }
+
+ @Override
+ public void skipAnalysis() {
+
+ }
+
+ @Override
+ public AbstractSpan prepareForAsync() {
+ isAsync = true;
+ return this;
+ }
+
+ @Override
+ public AbstractSpan asyncFinish() {
+ span.finish();
+ return this;
+ }
+
+ public void stop() {
+ span.finish();
+ }
+}
diff --git a/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/ZipkinTraceReporter.java b/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/ZipkinTraceReporter.java
new file mode 100644
index 0000000..7081a17
--- /dev/null
+++ b/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/ZipkinTraceReporter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.reporter.zipkin;
+
+import brave.Tracing;
+import lombok.Getter;
+import org.apache.skywalking.apm.agent.core.boot.OverrideImplementor;
+import org.apache.skywalking.apm.agent.core.conf.Config;
+import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClient;
+import zipkin2.reporter.brave.AsyncZipkinSpanHandler;
+import zipkin2.reporter.okhttp3.OkHttpSender;
+
+/**
+ * Zipkin traces are reported through brave client. This override implementor majorly make sure the original memory
+ * queue and grpc client doesn't work, and set up the Zipkin client in the right way.
+ */
+@OverrideImplementor(TraceSegmentServiceClient.class)
+public class ZipkinTraceReporter extends TraceSegmentServiceClient {
+ @Getter
+ private Tracing tracing;
+ private OkHttpSender sender;
+ private AsyncZipkinSpanHandler zipkinSpanHandler;
+
+ @Override
+ public void prepare() {
+ }
+
+ /**
+ * Set up the Zipkin reporter, use {@link Config.Collector#BACKEND_SERVICE} as the report URL. Typically, the path
+ * should be http://ip:port/api/v2/spans
+ */
+ @Override
+ public void boot() {
+ sender = OkHttpSender.create(Config.Collector.BACKEND_SERVICE);
+ zipkinSpanHandler = AsyncZipkinSpanHandler.create(sender);
+
+ // Create a tracing component with the service name you want to see in Zipkin.
+ tracing = Tracing.newBuilder()
+ .localServiceName(Config.Agent.SERVICE_NAME)
+ .addSpanHandler(zipkinSpanHandler)
+ .build();
+ }
+
+ @Override
+ public void shutdown() {
+ tracing.close();
+ zipkinSpanHandler.close();
+ sender.close();
+ }
+}
diff --git a/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/ZipkinTracerContext.java b/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/ZipkinTracerContext.java
new file mode 100644
index 0000000..4b9c01c
--- /dev/null
+++ b/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/ZipkinTracerContext.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.reporter.zipkin;
+
+import brave.Span;
+import brave.Tracer;
+import brave.Tracing;
+import brave.propagation.TraceContext;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.skywalking.apm.agent.core.context.AbstractTracerContext;
+import org.apache.skywalking.apm.agent.core.context.AsyncSpan;
+import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
+import org.apache.skywalking.apm.agent.core.context.CorrelationContext;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+
+/**
+ * ZipkinTracerContext is an API wrapper of Zipkin tracer.
+ */
+public class ZipkinTracerContext implements AbstractTracerContext {
+ private static String B3_NAME = "b3";
+
+ /**
+ * Running span cache of the current Zipkin context. This takes the responsibility of determining when this context
+ * should be closed. The key time point is all running context has been closed.
+ */
+ private Map<Span, ZipkinSpan> runningSpans;
+ private final Tracing tracing;
+ private final Tracer tracer;
+ private final CorrelationContext correlationContext;
+
+ public ZipkinTracerContext(final Tracing tracing) {
+ this.tracing = tracing;
+ this.tracer = tracing.tracer();
+ runningSpans = new ConcurrentHashMap<>();
+ this.correlationContext = new CorrelationContext();
+ }
+
+ @Override
+ public void inject(final ContextCarrier carrier) {
+ tracing.propagation().injector((request, key, value) -> carrier.addCustomContext(key, value))
+ .inject(tracing.currentTraceContext().get(), null);
+
+ this.correlationContext.inject(carrier);
+ }
+
+ @Override
+ public void extract(final ContextCarrier carrier) {
+ carrier.setCustomKeys(B3_NAME);
+ tracing.propagation().extractor((request, key) -> carrier.readCustomContext(key)).extract(null);
+ this.correlationContext.extract(carrier);
+ }
+
+ @Override
+ public ContextSnapshot capture() {
+ final TraceContext traceContext = tracing.currentTraceContext().get();
+ ContextSnapshot contextSnapshot = new ContextSnapshot(correlationContext);
+ contextSnapshot.addCustomContext(B3_NAME, traceContext);
+ return contextSnapshot;
+ }
+
+ @Override
+ public void continued(final ContextSnapshot snapshot) {
+ final TraceContext traceContext = (TraceContext) snapshot.readCustomContext(B3_NAME);
+ tracing.currentTraceContext().newScope(traceContext);
+ }
+
+ @Override
+ public String getReadablePrimaryTraceId() {
+ final Span span = tracer.currentSpan();
+ return span == null ? "N/A" : span.context().traceIdString();
+ }
+
+ @Override
+ public AbstractSpan createEntrySpan(final String operationName) {
+ final Span span = tracer.nextSpan().name(operationName);
+ return createOrGet(span).setEntry(true);
+ }
+
+ @Override
+ public AbstractSpan createLocalSpan(final String operationName) {
+ final Span span = tracer.nextSpan().name(operationName);
+ return createOrGet(span);
+ }
+
+ @Override
+ public AbstractSpan createExitSpan(final String operationName, final String remotePeer) {
+ final Span span = tracer.nextSpan().name(operationName);
+ span.remoteServiceName(remotePeer);
+ return createOrGet(span).setExit(true);
+ }
+
+ @Override
+ public AbstractSpan activeSpan() {
+ final Span span = tracer.currentSpan();
+ if (span == null) {
+ throw new IllegalStateException("No active span.");
+ }
+ return createOrGet(span);
+ }
+
+ /**
+ * @param span to finish
+ * @return true once no active span.
+ */
+ @Override
+ public boolean stopSpan(final AbstractSpan span) {
+ final ZipkinSpan zipkinSpan = (ZipkinSpan) span;
+ if (!zipkinSpan.isAsync()) {
+ zipkinSpan.stop();
+ }
+ runningSpans.remove(span);
+ return runningSpans.isEmpty();
+ }
+
+ @Override
+ public AbstractTracerContext awaitFinishAsync() {
+ return this;
+ }
+
+ @Override
+ public void asyncStop(final AsyncSpan span) {
+ ((ZipkinSpan) span).stop();
+ }
+
+ @Override
+ public CorrelationContext getCorrelationContext() {
+ return null;
+ }
+
+ private ZipkinSpan createOrGet(Span span) {
+ ZipkinSpan zipkinSpan = runningSpans.get(span);
+ if (zipkinSpan == null) {
+ zipkinSpan = new ZipkinSpan(span);
+ final ZipkinSpan prevValue = runningSpans.putIfAbsent(span, zipkinSpan);
+ if (prevValue != null) {
+ zipkinSpan = prevValue;
+ }
+ }
+ return zipkinSpan;
+ }
+}
diff --git a/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService b/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService
new file mode 100644
index 0000000..e28a93a
--- /dev/null
+++ b/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.apm.plugin.reporter.zipkin.ZipkinTraceReporter
+org.apache.skywalking.apm.plugin.reporter.zipkin.ZipkinContextManager
+org.apache.skywalking.apm.plugin.reporter.zipkin.GRPCBlockingService
+
+#org.apache.skywalking.apm.agent.core.jvm.JVMService
+#org.apache.skywalking.apm.agent.core.remote.ServiceManagementClient
+#org.apache.skywalking.apm.agent.core.context.ContextManagerExtendService
+#org.apache.skywalking.apm.agent.core.commands.CommandService
+#org.apache.skywalking.apm.agent.core.commands.CommandExecutorService
+#org.apache.skywalking.apm.agent.core.profile.ProfileTaskChannelService
+#org.apache.skywalking.apm.agent.core.profile.ProfileTaskExecutionService
+#org.apache.skywalking.apm.agent.core.meter.MeterService
\ No newline at end of file