You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/07/21 16:12:30 UTC

[camel] branch camel-3.4.x updated: setting span kind as producer/consumer depending on the type of component in the route. If the span kind if producer then this changes the tracing header being set to b3 single format (#4026)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-3.4.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.4.x by this push:
     new 2a2886a  setting span kind as producer/consumer depending on the type of component in the route. If the span kind if producer then this changes the tracing header being set to b3 single format (#4026)
2a2886a is described below

commit 2a2886a5837e99d51bc530c7ac407657154d1a1f
Author: Samrat Dhillon <sa...@gmail.com>
AuthorDate: Tue Jul 21 12:12:16 2020 -0400

    setting span kind as producer/consumer depending on the type of component in the route. If the span kind if producer then this changes the tracing header being set to b3 single format (#4026)
    
    Co-authored-by: Samrat Dhillon <sa...@innovapost.com>
---
 components/camel-zipkin/pom.xml                    |   5 +
 .../java/org/apache/camel/zipkin/CamelRequest.java |  52 ++++++++
 .../java/org/apache/camel/zipkin/ZipkinTracer.java |  67 ++++++++--
 .../org/apache/camel/zipkin/CamelRequestTest.java  |  42 ++++++
 .../camel/zipkin/ZipkinProducerSpanKindTest.java   |  77 +++++++++++
 .../org/apache/camel/zipkin/ZipkinTracerTest.java  | 148 +++++++++++++++++++++
 6 files changed, 381 insertions(+), 10 deletions(-)

diff --git a/components/camel-zipkin/pom.xml b/components/camel-zipkin/pom.xml
index 6a6f254..ed73668 100644
--- a/components/camel-zipkin/pom.xml
+++ b/components/camel-zipkin/pom.xml
@@ -88,6 +88,11 @@
             <artifactId>log4j-slf4j-impl</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/CamelRequest.java b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/CamelRequest.java
new file mode 100644
index 0000000..b498e1a
--- /dev/null
+++ b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/CamelRequest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.zipkin;
+
+import brave.Request;
+import brave.Span;
+import brave.Span.Kind;
+import org.apache.camel.Message;
+
+public class CamelRequest extends Request {
+
+    private final Message message;
+    private final Span.Kind spanKind;
+
+    public CamelRequest(Message message, Span.Kind spanKind) {
+        this.message = message;
+        this.spanKind = spanKind;
+    }
+
+    @Override
+    public Kind spanKind() {
+        return this.spanKind;
+    }
+
+    @Override
+    public Object unwrap() {
+        return this.message;
+    }
+
+    public void setHeader(String key, String value) {
+        message.setHeader(key, value);
+    }
+
+    public String getHeader(String key) {
+        return message.getHeader(key, String.class);
+    }
+
+}
diff --git a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java
index d6aa1cb..e16d636 100644
--- a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java
+++ b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java
@@ -22,7 +22,9 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+
 import brave.Span;
+import brave.Span.Kind;
 import brave.Tracing;
 import brave.context.slf4j.MDCScopeDecorator;
 import brave.propagation.B3Propagation;
@@ -39,7 +41,6 @@ import org.apache.camel.CamelContextAware;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
-import org.apache.camel.Message;
 import org.apache.camel.NamedNode;
 import org.apache.camel.Route;
 import org.apache.camel.RuntimeCamelException;
@@ -126,10 +127,10 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
     private static final Logger LOG = LoggerFactory.getLogger(ZipkinTracer.class);
     private static final String ZIPKIN_COLLECTOR_HTTP_SERVICE = "zipkin-collector-http";
     private static final String ZIPKIN_COLLECTOR_THRIFT_SERVICE = "zipkin-collector-thrift";
-    private static final Getter<Message, String> GETTER = (message, key) -> message.getHeader(key, String.class);
-    private static final Setter<Message, String> SETTER = (message, key, value) -> message.setHeader(key, value);
-    private static final Extractor<Message> EXTRACTOR = B3Propagation.B3_STRING.extractor(GETTER);
-    private static final Injector<Message> INJECTOR = B3Propagation.B3_STRING.injector(SETTER);
+    private static final Getter<CamelRequest, String> GETTER = (cr, key) -> cr.getHeader(key);
+    private static final Setter<CamelRequest, String> SETTER = (cr, key, value) -> cr.setHeader(key, value);
+    private static final Extractor<CamelRequest> EXTRACTOR = B3Propagation.B3_STRING.extractor(GETTER);
+    private static final Injector<CamelRequest> INJECTOR = B3Propagation.B3_STRING.injector(SETTER);
 
     private final ZipkinEventNotifier eventNotifier = new ZipkinEventNotifier();
     private final Map<String, Tracing> braves = new HashMap<>();
@@ -146,10 +147,24 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
     private Set<String> excludePatterns = new HashSet<>();
     private boolean includeMessageBody;
     private boolean includeMessageBodyStreams;
+    private final Map<String, Span.Kind> producerComponentToSpanKind = new HashMap<>();
+    private final Map<String, Span.Kind> consumerComponentToSpanKind = new HashMap<>();
 
     public ZipkinTracer() {
+        producerComponentToSpanKind.put("jms", Span.Kind.PRODUCER);
+        producerComponentToSpanKind.put("sjms", Span.Kind.PRODUCER);
+        producerComponentToSpanKind.put("activemq", Span.Kind.PRODUCER);
+        producerComponentToSpanKind.put("kafka", Span.Kind.PRODUCER);
+        producerComponentToSpanKind.put("amqp", Span.Kind.PRODUCER);
+
+        consumerComponentToSpanKind.put("jms", Span.Kind.CONSUMER);
+        consumerComponentToSpanKind.put("sjms", Span.Kind.CONSUMER);
+        consumerComponentToSpanKind.put("activemq", Span.Kind.CONSUMER);
+        consumerComponentToSpanKind.put("kafka", Span.Kind.CONSUMER);
+        consumerComponentToSpanKind.put("amqp", Span.Kind.CONSUMER);
     }
 
+
     @Override
     public RoutePolicy createRoutePolicy(CamelContext camelContext, String routeId, NamedNode route) {
         // ensure this zipkin tracer gets initialized when Camel starts
@@ -601,10 +616,12 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
         } else {
             span = brave.tracer().nextSpan();
         }
-        span.kind(Span.Kind.CLIENT).start();
+        Span.Kind spanKind = getProducerComponentSpanKind(event.getEndpoint());
+        span.kind(spanKind).start();
 
         ZipkinClientRequestAdapter parser = new ZipkinClientRequestAdapter(this, event.getEndpoint());
-        INJECTOR.inject(span.context(), event.getExchange().getIn());
+        CamelRequest request = new CamelRequest(event.getExchange().getIn(), spanKind);
+        INJECTOR.inject(span.context(), request);
         parser.onRequest(event.getExchange(), span.customizer());
 
         // store span after request
@@ -627,6 +644,17 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
         }
     }
 
+
+    //protected for testing
+    protected Kind getProducerComponentSpanKind(Endpoint endpoint) {
+        return producerComponentToSpanKind.getOrDefault(getComponentName(endpoint), Span.Kind.CLIENT);
+    }
+
+    //protected for testing
+    protected void addProducerComponentSpanKind(String component, Span.Kind kind) {
+        producerComponentToSpanKind.putIfAbsent(component, kind);
+    }
+
     private void clientResponse(Tracing brave, String serviceName, ExchangeSentEvent event) {
         Span span = null;
         ZipkinState state = event.getExchange().getProperty(ZipkinState.KEY, ZipkinState.class);
@@ -666,14 +694,16 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
             exchange.setProperty(ZipkinState.KEY, state);
         }
         Span span = null;
-        TraceContextOrSamplingFlags sampleFlag = EXTRACTOR.extract(exchange.getIn());
+        Span.Kind spanKind = getConsumerComponentSpanKind(exchange.getFromEndpoint());
+        CamelRequest cr = new CamelRequest(exchange.getIn(), spanKind);
+        TraceContextOrSamplingFlags sampleFlag = EXTRACTOR.extract(cr);
         if (ObjectHelper.isEmpty(sampleFlag)) {
             span = brave.tracer().nextSpan();
-            INJECTOR.inject(span.context(), exchange.getIn());
+            INJECTOR.inject(span.context(), cr);
         } else {
             span = brave.tracer().nextSpan(sampleFlag);
         }
-        span.kind(Span.Kind.SERVER).start();
+        span.kind(spanKind).start();
         ZipkinServerRequestAdapter parser = new ZipkinServerRequestAdapter(this, exchange);
         parser.onRequest(exchange, span.customizer());
 
@@ -699,6 +729,12 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
         return span;
     }
 
+
+    //protected for testing
+    protected Kind getConsumerComponentSpanKind(Endpoint endpoint) {
+        return consumerComponentToSpanKind.getOrDefault(getComponentName(endpoint), Span.Kind.SERVER);
+    }
+
     private void serverResponse(Tracing brave, String serviceName, Exchange exchange) {
         Span span = null;
         ZipkinState state = exchange.getProperty(ZipkinState.KEY, ZipkinState.class);
@@ -730,6 +766,17 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
         }
     }
 
+    private String getComponentName(Endpoint endpoint) {
+        String uri = endpoint.getEndpointBaseUri();
+        if (uri != null) {
+            String uriParts[] = uri.split(":");
+            if (uriParts != null && uriParts.length > 0) {
+                return uriParts[0].toLowerCase();
+            }
+        }
+        return null;
+    }
+
     private final class ZipkinEventNotifier extends EventNotifierSupport {
 
         @Override
diff --git a/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/CamelRequestTest.java b/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/CamelRequestTest.java
new file mode 100644
index 0000000..75cccff
--- /dev/null
+++ b/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/CamelRequestTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.zipkin;
+
+import brave.Span;
+import org.apache.camel.Message;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CamelRequestTest {
+
+    @Test
+    public void testCamelRequest() {
+        Message message = mock(Message.class);
+        when(message.getHeader("X-B3-TraceId", String.class)).thenReturn("924c5b125daaaec8");
+        CamelRequest request = new CamelRequest(message, Span.Kind.PRODUCER);
+        request.setHeader("X-B3-SpanId", "db1ccb94946711b0");
+        assertThat(request.spanKind()).isEqualTo(Span.Kind.PRODUCER);
+        assertThat(request.unwrap()).isEqualTo(message);
+        verify(message).setHeader("X-B3-SpanId", "db1ccb94946711b0");
+        assertThat(request.getHeader("X-B3-TraceId")).isEqualTo("924c5b125daaaec8");
+    }
+
+}
diff --git a/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinProducerSpanKindTest.java b/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinProducerSpanKindTest.java
new file mode 100644
index 0000000..2b7cc78
--- /dev/null
+++ b/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinProducerSpanKindTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.zipkin;
+
+import brave.Span;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import zipkin2.reporter.Reporter;
+
+public class ZipkinProducerSpanKindTest extends CamelTestSupport {
+
+    private ZipkinTracer zipkin;
+
+    protected void setSpanReporter(ZipkinTracer zipkin) {
+        zipkin.setSpanReporter(Reporter.NOOP);
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+
+        zipkin = new ZipkinTracer();
+        zipkin.addProducerComponentSpanKind("seda", Span.Kind.PRODUCER);
+        setSpanReporter(zipkin);
+
+        // attaching ourself to CamelContext
+        zipkin.init(context);
+
+        return context;
+    }
+
+    @Test
+    public void testB3SingleHeaderPresent() throws Exception {
+        template.requestBody("direct:start", "Hello World");
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("seda:a").routeId("start");
+
+                from("seda:a").routeId("a")
+                .process(new Processor() {
+                    @Override
+                    public void process(Exchange exchange) throws Exception {
+                        String b3Header = exchange.getIn().getHeader("b3", String.class);
+                        Assertions.assertThat(b3Header).isNotNull();
+                    }
+                });
+            }
+        };
+    }
+
+}
diff --git a/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinTracerTest.java b/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinTracerTest.java
new file mode 100644
index 0000000..55f9a34
--- /dev/null
+++ b/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinTracerTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.zipkin;
+
+import brave.Span;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import zipkin2.reporter.Reporter;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class ZipkinTracerTest extends CamelTestSupport {
+
+    private ZipkinTracer zipkin;
+
+    private Endpoint endpoint;
+
+    protected void setSpanReporter(ZipkinTracer zipkin) {
+        zipkin.setSpanReporter(Reporter.NOOP);
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+        zipkin = new ZipkinTracer();
+        setSpanReporter(zipkin);
+        // attaching ourself to CamelContext
+        zipkin.init(context);
+        return context;
+
+    }
+
+    @Test
+    public void testJmsProducerEndpoint() {
+        endpoint = mock(Endpoint.class);
+        when(endpoint.getEndpointBaseUri()).thenReturn("jms:queue");
+        Span.Kind spankind = zipkin.getProducerComponentSpanKind(endpoint);
+        Assertions.assertThat(spankind).isEqualTo(Span.Kind.PRODUCER);
+    }
+
+    @Test
+    public void testKafkaProducerEndpoint() {
+        endpoint = mock(Endpoint.class);
+        when(endpoint.getEndpointBaseUri()).thenReturn("kafka:topic");
+        Span.Kind spankind = zipkin.getProducerComponentSpanKind(endpoint);
+        Assertions.assertThat(spankind).isEqualTo(Span.Kind.PRODUCER);
+    }
+
+    @Test
+    public void testSJMSProducerEndpoint() {
+        endpoint = mock(Endpoint.class);
+        when(endpoint.getEndpointBaseUri()).thenReturn("sjms:queue");
+        Span.Kind spankind = zipkin.getProducerComponentSpanKind(endpoint);
+        Assertions.assertThat(spankind).isEqualTo(Span.Kind.PRODUCER);
+    }
+
+    @Test
+    public void testActiveMQProducerEndpoint() {
+        endpoint = mock(Endpoint.class);
+        when(endpoint.getEndpointBaseUri()).thenReturn("activemq:queue");
+        Span.Kind spankind = zipkin.getProducerComponentSpanKind(endpoint);
+        Assertions.assertThat(spankind).isEqualTo(Span.Kind.PRODUCER);
+    }
+
+    @Test
+    public void testNonProducerEndpoint() {
+        endpoint = mock(Endpoint.class);
+        when(endpoint.getEndpointBaseUri()).thenReturn("http:www");
+        Span.Kind spankind = zipkin.getProducerComponentSpanKind(endpoint);
+        Assertions.assertThat(spankind).isEqualTo(Span.Kind.CLIENT);
+    }
+
+    @Test
+    public void testNonProducerInvalidEndpoint() {
+        endpoint = mock(Endpoint.class);
+        when(endpoint.getEndpointBaseUri()).thenReturn("jms&queue");
+        Span.Kind spankind = zipkin.getProducerComponentSpanKind(endpoint);
+        Assertions.assertThat(spankind).isEqualTo(Span.Kind.CLIENT);
+    }
+
+    @Test
+    public void testJmsConsumerEndpoint() {
+        endpoint = mock(Endpoint.class);
+        when(endpoint.getEndpointBaseUri()).thenReturn("jms:queue");
+        Span.Kind spankind = zipkin.getConsumerComponentSpanKind(endpoint);
+        Assertions.assertThat(spankind).isEqualTo(Span.Kind.CONSUMER);
+    }
+
+    @Test
+    public void testKafkaConsumerEndpoint() {
+        endpoint = mock(Endpoint.class);
+        when(endpoint.getEndpointBaseUri()).thenReturn("kafka:topic");
+        Span.Kind spankind = zipkin.getConsumerComponentSpanKind(endpoint);
+        Assertions.assertThat(spankind).isEqualTo(Span.Kind.CONSUMER);
+    }
+
+    @Test
+    public void testSJMSConsumerEndpoint() {
+        endpoint = mock(Endpoint.class);
+        when(endpoint.getEndpointBaseUri()).thenReturn("sjms:queue");
+        Span.Kind spankind = zipkin.getConsumerComponentSpanKind(endpoint);
+        Assertions.assertThat(spankind).isEqualTo(Span.Kind.CONSUMER);
+    }
+
+    @Test
+    public void testActiveMQConsumerEndpoint() {
+        endpoint = mock(Endpoint.class);
+        when(endpoint.getEndpointBaseUri()).thenReturn("activemq:queue");
+        Span.Kind spankind = zipkin.getConsumerComponentSpanKind(endpoint);
+        Assertions.assertThat(spankind).isEqualTo(Span.Kind.CONSUMER);
+    }
+
+    @Test
+    public void testNonConsumerEndpoint() {
+        endpoint = mock(Endpoint.class);
+        when(endpoint.getEndpointBaseUri()).thenReturn("rest:customer?");
+        Span.Kind spankind = zipkin.getConsumerComponentSpanKind(endpoint);
+        Assertions.assertThat(spankind).isEqualTo(Span.Kind.SERVER);
+    }
+
+    @Test
+    public void testNonConsumerInvalidEndpoint() {
+        endpoint = mock(Endpoint.class);
+        when(endpoint.getEndpointBaseUri()).thenReturn("rest&customer?");
+        Span.Kind spankind = zipkin.getConsumerComponentSpanKind(endpoint);
+        Assertions.assertThat(spankind).isEqualTo(Span.Kind.SERVER);
+    }
+
+}