You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/20 12:06:40 UTC
[camel] branch camel-3.7.x updated: Feature/zipkin spans new state
(#5435)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.7.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.7.x by this push:
new 45ac24a Feature/zipkin spans new state (#5435)
45ac24a is described below
commit 45ac24a3dce0d77fc51ca61f9cec57f9428a6fde
Author: Samrat Dhillon <sa...@gmail.com>
AuthorDate: Tue Apr 20 08:05:48 2021 -0400
Feature/zipkin spans new state (#5435)
* safer way to fix CAMEL-16509 Incorrect span timing information reported by camel-zipkin when using parallel processing with multicast/recipientList
* make defensive copy of ZipkinState
Co-authored-by: Samrat Dhillon <sa...@innovapost.com>
---
.../java/org/apache/camel/zipkin/ZipkinState.java | 51 ++++++++--------------
.../java/org/apache/camel/zipkin/ZipkinTracer.java | 10 +++--
.../org/apache/camel/zipkin/ZipkinStateTest.java | 30 ++++++++++++-
3 files changed, 54 insertions(+), 37 deletions(-)
diff --git a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinState.java b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinState.java
index 6cfa2f4..2e06d61 100644
--- a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinState.java
+++ b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinState.java
@@ -16,12 +16,10 @@
*/
package org.apache.camel.zipkin;
-import java.util.ArrayDeque;
import java.util.Deque;
-import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedDeque;
import brave.Span;
-import brave.propagation.TraceContextOrSamplingFlags;
import org.apache.camel.Exchange;
/**
@@ -34,14 +32,25 @@ public final class ZipkinState {
public static final String KEY = "CamelZipkinState";
- private final Deque<Span> clientSpans = new ArrayDeque<>();
- private final Deque<Span> serverSpans = new ArrayDeque<>();
+ private final Deque<Span> clientSpans = new ConcurrentLinkedDeque<>();
+ private final Deque<Span> serverSpans = new ConcurrentLinkedDeque<>();
- public synchronized void pushClientSpan(Span span) {
+ public ZipkinState() {
+
+ }
+
+ public static ZipkinState fromZipkinState(ZipkinState fromState) {
+ ZipkinState state = new ZipkinState();
+ state.clientSpans.addAll(fromState.clientSpans);
+ state.serverSpans.addAll(fromState.serverSpans);
+ return state;
+ }
+
+ public void pushClientSpan(Span span) {
clientSpans.push(span);
}
- public synchronized Span popClientSpan() {
+ public Span popClientSpan() {
if (!clientSpans.isEmpty()) {
return clientSpans.pop();
} else {
@@ -49,11 +58,11 @@ public final class ZipkinState {
}
}
- public synchronized void pushServerSpan(Span span) {
+ public void pushServerSpan(Span span) {
serverSpans.push(span);
}
- public synchronized Span popServerSpan() {
+ public Span popServerSpan() {
if (!serverSpans.isEmpty()) {
return serverSpans.pop();
} else {
@@ -61,7 +70,7 @@ public final class ZipkinState {
}
}
- private Span peekServerSpan() {
+ public Span peekServerSpan() {
if (!serverSpans.isEmpty()) {
return serverSpans.peek();
} else {
@@ -69,26 +78,4 @@ public final class ZipkinState {
}
}
- public synchronized Span findMatchingServerSpan(Exchange exchange) {
- String spanId = (String) exchange.getIn().getHeader(ZipkinConstants.SPAN_ID);
- Span lastSpan = peekServerSpan();
- if (spanId == null) {
- return lastSpan;
- }
- TraceContextOrSamplingFlags traceContext
- = ZipkinTracer.EXTRACTOR.extract(new CamelRequest(exchange.getIn(), Span.Kind.SERVER));
- if (traceContext.context().spanId() == lastSpan.context().spanId()) {
- return lastSpan;
- }
-
- Iterator<Span> spanItr = serverSpans.iterator();
- while (spanItr.hasNext()) {
- Span span = spanItr.next();
- if (span.context().spanId() == traceContext.context().spanId()) {
- return span;
- }
- }
- return lastSpan;
- }
-
}
diff --git a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java
index 7883981..c94c6f9 100644
--- a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java
+++ b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java
@@ -589,11 +589,13 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
ZipkinState state = event.getExchange().getProperty(ZipkinState.KEY, ZipkinState.class);
if (state == null) {
state = new ZipkinState();
- event.getExchange().setProperty(ZipkinState.KEY, state);
+ } else {
+ state = ZipkinState.fromZipkinState(state);
}
+ event.getExchange().setProperty(ZipkinState.KEY, state);
// if we started from a server span then lets reuse that when we call a
// downstream service
- Span last = state.findMatchingServerSpan(event.getExchange());
+ Span last = state.peekServerSpan();
Span span;
if (last != null) {
span = brave.tracer().newChild(last.context());
@@ -677,8 +679,10 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
ZipkinState state = exchange.getProperty(ZipkinState.KEY, ZipkinState.class);
if (state == null) {
state = new ZipkinState();
- exchange.setProperty(ZipkinState.KEY, state);
+ } else {
+ state = ZipkinState.fromZipkinState(state);
}
+ exchange.setProperty(ZipkinState.KEY, state);
Span span = null;
Span.Kind spanKind = getConsumerComponentSpanKind(exchange.getFromEndpoint());
CamelRequest cr = new CamelRequest(exchange.getIn(), spanKind);
diff --git a/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinStateTest.java b/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinStateTest.java
index 00917b7..92b1604 100644
--- a/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinStateTest.java
+++ b/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinStateTest.java
@@ -53,16 +53,42 @@ public class ZipkinStateTest {
exchange2.getIn().setHeader(ZipkinConstants.PARENT_SPAN_ID, context.spanIdString());
exchange2.getIn().setHeader(ZipkinConstants.SPAN_ID, span2.context().spanIdString());
- Span retrived = state.findMatchingServerSpan(exchange2);
+ Span retrived = state.peekServerSpan();
assertThat(retrived.context().spanId()).isEqualTo(span2.context().spanId());
assertThat(retrived.context().parentId()).isEqualTo(span2.context().parentId());
assertThat(retrived.context().traceId()).isEqualTo(span2.context().traceId());
- retrived = state.findMatchingServerSpan(exchange1);
+ state.popServerSpan();
+
+ retrived = state.peekServerSpan();
assertThat(retrived.context().spanId()).isEqualTo(span1.context().spanId());
assertThat(retrived.context().parentId()).isEqualTo(span1.context().parentId());
assertThat(retrived.context().traceId()).isEqualTo(span1.context().traceId());
}
+ @Test
+ public void testZipkinStateSafeCopy() {
+ TraceContext context = TraceContext.newBuilder().traceId(1L).spanId(2L).parentId(3L).build();
+ TraceContextOrSamplingFlags sampling = TraceContextOrSamplingFlags.newBuilder(context).build();
+ Tracing tracing = Tracing.newBuilder().build();
+
+ Span span1 = tracing.tracer().nextSpan(sampling);
+ state.pushServerSpan(span1);
+
+ Span span2 = tracing.tracer().nextSpan(sampling);
+ state.pushServerSpan(span2);
+
+ Span span3 = tracing.tracer().nextSpan(sampling);
+ ZipkinState state2 = ZipkinState.fromZipkinState(state);
+ state2.pushServerSpan(span3);
+
+ //original object intact
+ assertThat(state.peekServerSpan().context()).isNotEqualTo(span3.context()).usingRecursiveComparison();
+
+ //new object has the new span
+ assertThat(state2.peekServerSpan().context()).isEqualTo(span3.context()).usingRecursiveComparison();
+
+ }
+
}