You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/20 12:06:40 UTC

[camel] branch camel-3.7.x updated: Feature/zipkin spans new state (#5435)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.7.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.7.x by this push:
     new 45ac24a  Feature/zipkin spans new state (#5435)
45ac24a is described below

commit 45ac24a3dce0d77fc51ca61f9cec57f9428a6fde
Author: Samrat Dhillon <sa...@gmail.com>
AuthorDate: Tue Apr 20 08:05:48 2021 -0400

    Feature/zipkin spans new state (#5435)
    
    * safer way to fix CAMEL-16509 Incorrect span timing information reported by camel-zipkin when using parallel processing with multicast/recipientList
    
    * make defensive copy of ZipkinState
    
    Co-authored-by: Samrat Dhillon <sa...@innovapost.com>
---
 .../java/org/apache/camel/zipkin/ZipkinState.java  | 51 ++++++++--------------
 .../java/org/apache/camel/zipkin/ZipkinTracer.java | 10 +++--
 .../org/apache/camel/zipkin/ZipkinStateTest.java   | 30 ++++++++++++-
 3 files changed, 54 insertions(+), 37 deletions(-)

diff --git a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinState.java b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinState.java
index 6cfa2f4..2e06d61 100644
--- a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinState.java
+++ b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinState.java
@@ -16,12 +16,10 @@
  */
 package org.apache.camel.zipkin;
 
-import java.util.ArrayDeque;
 import java.util.Deque;
-import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedDeque;
 
 import brave.Span;
-import brave.propagation.TraceContextOrSamplingFlags;
 import org.apache.camel.Exchange;
 
 /**
@@ -34,14 +32,25 @@ public final class ZipkinState {
 
     public static final String KEY = "CamelZipkinState";
 
-    private final Deque<Span> clientSpans = new ArrayDeque<>();
-    private final Deque<Span> serverSpans = new ArrayDeque<>();
+    private final Deque<Span> clientSpans = new ConcurrentLinkedDeque<>();
+    private final Deque<Span> serverSpans = new ConcurrentLinkedDeque<>();
 
-    public synchronized void pushClientSpan(Span span) {
+    public ZipkinState() {
+
+    }
+
+    public static ZipkinState fromZipkinState(ZipkinState fromState) {
+        ZipkinState state = new ZipkinState();
+        state.clientSpans.addAll(fromState.clientSpans);
+        state.serverSpans.addAll(fromState.serverSpans);
+        return state;
+    }
+
+    public void pushClientSpan(Span span) {
         clientSpans.push(span);
     }
 
-    public synchronized Span popClientSpan() {
+    public Span popClientSpan() {
         if (!clientSpans.isEmpty()) {
             return clientSpans.pop();
         } else {
@@ -49,11 +58,11 @@ public final class ZipkinState {
         }
     }
 
-    public synchronized void pushServerSpan(Span span) {
+    public void pushServerSpan(Span span) {
         serverSpans.push(span);
     }
 
-    public synchronized Span popServerSpan() {
+    public Span popServerSpan() {
         if (!serverSpans.isEmpty()) {
             return serverSpans.pop();
         } else {
@@ -61,7 +70,7 @@ public final class ZipkinState {
         }
     }
 
-    private Span peekServerSpan() {
+    public Span peekServerSpan() {
         if (!serverSpans.isEmpty()) {
             return serverSpans.peek();
         } else {
@@ -69,26 +78,4 @@ public final class ZipkinState {
         }
     }
 
-    public synchronized Span findMatchingServerSpan(Exchange exchange) {
-        String spanId = (String) exchange.getIn().getHeader(ZipkinConstants.SPAN_ID);
-        Span lastSpan = peekServerSpan();
-        if (spanId == null) {
-            return lastSpan;
-        }
-        TraceContextOrSamplingFlags traceContext
-                = ZipkinTracer.EXTRACTOR.extract(new CamelRequest(exchange.getIn(), Span.Kind.SERVER));
-        if (traceContext.context().spanId() == lastSpan.context().spanId()) {
-            return lastSpan;
-        }
-
-        Iterator<Span> spanItr = serverSpans.iterator();
-        while (spanItr.hasNext()) {
-            Span span = spanItr.next();
-            if (span.context().spanId() == traceContext.context().spanId()) {
-                return span;
-            }
-        }
-        return lastSpan;
-    }
-
 }
diff --git a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java
index 7883981..c94c6f9 100644
--- a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java
+++ b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java
@@ -589,11 +589,13 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
         ZipkinState state = event.getExchange().getProperty(ZipkinState.KEY, ZipkinState.class);
         if (state == null) {
             state = new ZipkinState();
-            event.getExchange().setProperty(ZipkinState.KEY, state);
+        } else {
+            state = ZipkinState.fromZipkinState(state);
         }
+        event.getExchange().setProperty(ZipkinState.KEY, state);
         // if we started from a server span then lets reuse that when we call a
         // downstream service
-        Span last = state.findMatchingServerSpan(event.getExchange());
+        Span last = state.peekServerSpan();
         Span span;
         if (last != null) {
             span = brave.tracer().newChild(last.context());
@@ -677,8 +679,10 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
         ZipkinState state = exchange.getProperty(ZipkinState.KEY, ZipkinState.class);
         if (state == null) {
             state = new ZipkinState();
-            exchange.setProperty(ZipkinState.KEY, state);
+        } else {
+            state = ZipkinState.fromZipkinState(state);
         }
+        exchange.setProperty(ZipkinState.KEY, state);
         Span span = null;
         Span.Kind spanKind = getConsumerComponentSpanKind(exchange.getFromEndpoint());
         CamelRequest cr = new CamelRequest(exchange.getIn(), spanKind);
diff --git a/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinStateTest.java b/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinStateTest.java
index 00917b7..92b1604 100644
--- a/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinStateTest.java
+++ b/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinStateTest.java
@@ -53,16 +53,42 @@ public class ZipkinStateTest {
         exchange2.getIn().setHeader(ZipkinConstants.PARENT_SPAN_ID, context.spanIdString());
         exchange2.getIn().setHeader(ZipkinConstants.SPAN_ID, span2.context().spanIdString());
 
-        Span retrived = state.findMatchingServerSpan(exchange2);
+        Span retrived = state.peekServerSpan();
         assertThat(retrived.context().spanId()).isEqualTo(span2.context().spanId());
         assertThat(retrived.context().parentId()).isEqualTo(span2.context().parentId());
         assertThat(retrived.context().traceId()).isEqualTo(span2.context().traceId());
 
-        retrived = state.findMatchingServerSpan(exchange1);
+        state.popServerSpan();
+
+        retrived = state.peekServerSpan();
         assertThat(retrived.context().spanId()).isEqualTo(span1.context().spanId());
         assertThat(retrived.context().parentId()).isEqualTo(span1.context().parentId());
         assertThat(retrived.context().traceId()).isEqualTo(span1.context().traceId());
 
     }
 
+    @Test
+    public void testZipkinStateSafeCopy() {
+        TraceContext context = TraceContext.newBuilder().traceId(1L).spanId(2L).parentId(3L).build();
+        TraceContextOrSamplingFlags sampling = TraceContextOrSamplingFlags.newBuilder(context).build();
+        Tracing tracing = Tracing.newBuilder().build();
+
+        Span span1 = tracing.tracer().nextSpan(sampling);
+        state.pushServerSpan(span1);
+
+        Span span2 = tracing.tracer().nextSpan(sampling);
+        state.pushServerSpan(span2);
+
+        Span span3 = tracing.tracer().nextSpan(sampling);
+        ZipkinState state2 = ZipkinState.fromZipkinState(state);
+        state2.pushServerSpan(span3);
+
+        //original object intact
+        assertThat(state.peekServerSpan().context()).isNotEqualTo(span3.context()).usingRecursiveComparison();
+
+        //new object has the new span
+        assertThat(state2.peekServerSpan().context()).isEqualTo(span3.context()).usingRecursiveComparison();
+
+    }
+
 }