You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/07/21 16:12:30 UTC
[camel] branch camel-3.4.x updated: setting span kind as
producer/consumer depending on the type of component in the route. If the
span kind if producer then this changes the tracing header being set to b3
single format (#4026)
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch camel-3.4.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.4.x by this push:
new 2a2886a setting span kind as producer/consumer depending on the type of component in the route. If the span kind if producer then this changes the tracing header being set to b3 single format (#4026)
2a2886a is described below
commit 2a2886a5837e99d51bc530c7ac407657154d1a1f
Author: Samrat Dhillon <sa...@gmail.com>
AuthorDate: Tue Jul 21 12:12:16 2020 -0400
setting span kind as producer/consumer depending on the type of component in the route. If the span kind if producer then this changes the tracing header being set to b3 single format (#4026)
Co-authored-by: Samrat Dhillon <sa...@innovapost.com>
---
components/camel-zipkin/pom.xml | 5 +
.../java/org/apache/camel/zipkin/CamelRequest.java | 52 ++++++++
.../java/org/apache/camel/zipkin/ZipkinTracer.java | 67 ++++++++--
.../org/apache/camel/zipkin/CamelRequestTest.java | 42 ++++++
.../camel/zipkin/ZipkinProducerSpanKindTest.java | 77 +++++++++++
.../org/apache/camel/zipkin/ZipkinTracerTest.java | 148 +++++++++++++++++++++
6 files changed, 381 insertions(+), 10 deletions(-)
diff --git a/components/camel-zipkin/pom.xml b/components/camel-zipkin/pom.xml
index 6a6f254..ed73668 100644
--- a/components/camel-zipkin/pom.xml
+++ b/components/camel-zipkin/pom.xml
@@ -88,6 +88,11 @@
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/CamelRequest.java b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/CamelRequest.java
new file mode 100644
index 0000000..b498e1a
--- /dev/null
+++ b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/CamelRequest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.zipkin;
+
+import brave.Request;
+import brave.Span;
+import brave.Span.Kind;
+import org.apache.camel.Message;
+
+public class CamelRequest extends Request {
+
+ private final Message message;
+ private final Span.Kind spanKind;
+
+ public CamelRequest(Message message, Span.Kind spanKind) {
+ this.message = message;
+ this.spanKind = spanKind;
+ }
+
+ @Override
+ public Kind spanKind() {
+ return this.spanKind;
+ }
+
+ @Override
+ public Object unwrap() {
+ return this.message;
+ }
+
+ public void setHeader(String key, String value) {
+ message.setHeader(key, value);
+ }
+
+ public String getHeader(String key) {
+ return message.getHeader(key, String.class);
+ }
+
+}
diff --git a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java
index d6aa1cb..e16d636 100644
--- a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java
+++ b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java
@@ -22,7 +22,9 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+
import brave.Span;
+import brave.Span.Kind;
import brave.Tracing;
import brave.context.slf4j.MDCScopeDecorator;
import brave.propagation.B3Propagation;
@@ -39,7 +41,6 @@ import org.apache.camel.CamelContextAware;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
-import org.apache.camel.Message;
import org.apache.camel.NamedNode;
import org.apache.camel.Route;
import org.apache.camel.RuntimeCamelException;
@@ -126,10 +127,10 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
private static final Logger LOG = LoggerFactory.getLogger(ZipkinTracer.class);
private static final String ZIPKIN_COLLECTOR_HTTP_SERVICE = "zipkin-collector-http";
private static final String ZIPKIN_COLLECTOR_THRIFT_SERVICE = "zipkin-collector-thrift";
- private static final Getter<Message, String> GETTER = (message, key) -> message.getHeader(key, String.class);
- private static final Setter<Message, String> SETTER = (message, key, value) -> message.setHeader(key, value);
- private static final Extractor<Message> EXTRACTOR = B3Propagation.B3_STRING.extractor(GETTER);
- private static final Injector<Message> INJECTOR = B3Propagation.B3_STRING.injector(SETTER);
+ private static final Getter<CamelRequest, String> GETTER = (cr, key) -> cr.getHeader(key);
+ private static final Setter<CamelRequest, String> SETTER = (cr, key, value) -> cr.setHeader(key, value);
+ private static final Extractor<CamelRequest> EXTRACTOR = B3Propagation.B3_STRING.extractor(GETTER);
+ private static final Injector<CamelRequest> INJECTOR = B3Propagation.B3_STRING.injector(SETTER);
private final ZipkinEventNotifier eventNotifier = new ZipkinEventNotifier();
private final Map<String, Tracing> braves = new HashMap<>();
@@ -146,10 +147,24 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
private Set<String> excludePatterns = new HashSet<>();
private boolean includeMessageBody;
private boolean includeMessageBodyStreams;
+ private final Map<String, Span.Kind> producerComponentToSpanKind = new HashMap<>();
+ private final Map<String, Span.Kind> consumerComponentToSpanKind = new HashMap<>();
public ZipkinTracer() {
+ producerComponentToSpanKind.put("jms", Span.Kind.PRODUCER);
+ producerComponentToSpanKind.put("sjms", Span.Kind.PRODUCER);
+ producerComponentToSpanKind.put("activemq", Span.Kind.PRODUCER);
+ producerComponentToSpanKind.put("kafka", Span.Kind.PRODUCER);
+ producerComponentToSpanKind.put("amqp", Span.Kind.PRODUCER);
+
+ consumerComponentToSpanKind.put("jms", Span.Kind.CONSUMER);
+ consumerComponentToSpanKind.put("sjms", Span.Kind.CONSUMER);
+ consumerComponentToSpanKind.put("activemq", Span.Kind.CONSUMER);
+ consumerComponentToSpanKind.put("kafka", Span.Kind.CONSUMER);
+ consumerComponentToSpanKind.put("amqp", Span.Kind.CONSUMER);
}
+
@Override
public RoutePolicy createRoutePolicy(CamelContext camelContext, String routeId, NamedNode route) {
// ensure this zipkin tracer gets initialized when Camel starts
@@ -601,10 +616,12 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
} else {
span = brave.tracer().nextSpan();
}
- span.kind(Span.Kind.CLIENT).start();
+ Span.Kind spanKind = getProducerComponentSpanKind(event.getEndpoint());
+ span.kind(spanKind).start();
ZipkinClientRequestAdapter parser = new ZipkinClientRequestAdapter(this, event.getEndpoint());
- INJECTOR.inject(span.context(), event.getExchange().getIn());
+ CamelRequest request = new CamelRequest(event.getExchange().getIn(), spanKind);
+ INJECTOR.inject(span.context(), request);
parser.onRequest(event.getExchange(), span.customizer());
// store span after request
@@ -627,6 +644,17 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
}
}
+
+ //protected for testing
+ protected Kind getProducerComponentSpanKind(Endpoint endpoint) {
+ return producerComponentToSpanKind.getOrDefault(getComponentName(endpoint), Span.Kind.CLIENT);
+ }
+
+ //protected for testing
+ protected void addProducerComponentSpanKind(String component, Span.Kind kind) {
+ producerComponentToSpanKind.putIfAbsent(component, kind);
+ }
+
private void clientResponse(Tracing brave, String serviceName, ExchangeSentEvent event) {
Span span = null;
ZipkinState state = event.getExchange().getProperty(ZipkinState.KEY, ZipkinState.class);
@@ -666,14 +694,16 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
exchange.setProperty(ZipkinState.KEY, state);
}
Span span = null;
- TraceContextOrSamplingFlags sampleFlag = EXTRACTOR.extract(exchange.getIn());
+ Span.Kind spanKind = getConsumerComponentSpanKind(exchange.getFromEndpoint());
+ CamelRequest cr = new CamelRequest(exchange.getIn(), spanKind);
+ TraceContextOrSamplingFlags sampleFlag = EXTRACTOR.extract(cr);
if (ObjectHelper.isEmpty(sampleFlag)) {
span = brave.tracer().nextSpan();
- INJECTOR.inject(span.context(), exchange.getIn());
+ INJECTOR.inject(span.context(), cr);
} else {
span = brave.tracer().nextSpan(sampleFlag);
}
- span.kind(Span.Kind.SERVER).start();
+ span.kind(spanKind).start();
ZipkinServerRequestAdapter parser = new ZipkinServerRequestAdapter(this, exchange);
parser.onRequest(exchange, span.customizer());
@@ -699,6 +729,12 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
return span;
}
+
+ //protected for testing
+ protected Kind getConsumerComponentSpanKind(Endpoint endpoint) {
+ return consumerComponentToSpanKind.getOrDefault(getComponentName(endpoint), Span.Kind.SERVER);
+ }
+
private void serverResponse(Tracing brave, String serviceName, Exchange exchange) {
Span span = null;
ZipkinState state = exchange.getProperty(ZipkinState.KEY, ZipkinState.class);
@@ -730,6 +766,17 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
}
}
+ private String getComponentName(Endpoint endpoint) {
+ String uri = endpoint.getEndpointBaseUri();
+ if (uri != null) {
+ String uriParts[] = uri.split(":");
+ if (uriParts != null && uriParts.length > 0) {
+ return uriParts[0].toLowerCase();
+ }
+ }
+ return null;
+ }
+
private final class ZipkinEventNotifier extends EventNotifierSupport {
@Override
diff --git a/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/CamelRequestTest.java b/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/CamelRequestTest.java
new file mode 100644
index 0000000..75cccff
--- /dev/null
+++ b/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/CamelRequestTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.zipkin;
+
+import brave.Span;
+import org.apache.camel.Message;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CamelRequestTest {
+
+ @Test
+ public void testCamelRequest() {
+ Message message = mock(Message.class);
+ when(message.getHeader("X-B3-TraceId", String.class)).thenReturn("924c5b125daaaec8");
+ CamelRequest request = new CamelRequest(message, Span.Kind.PRODUCER);
+ request.setHeader("X-B3-SpanId", "db1ccb94946711b0");
+ assertThat(request.spanKind()).isEqualTo(Span.Kind.PRODUCER);
+ assertThat(request.unwrap()).isEqualTo(message);
+ verify(message).setHeader("X-B3-SpanId", "db1ccb94946711b0");
+ assertThat(request.getHeader("X-B3-TraceId")).isEqualTo("924c5b125daaaec8");
+ }
+
+}
diff --git a/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinProducerSpanKindTest.java b/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinProducerSpanKindTest.java
new file mode 100644
index 0000000..2b7cc78
--- /dev/null
+++ b/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinProducerSpanKindTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.zipkin;
+
+import brave.Span;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import zipkin2.reporter.Reporter;
+
+public class ZipkinProducerSpanKindTest extends CamelTestSupport {
+
+ private ZipkinTracer zipkin;
+
+ protected void setSpanReporter(ZipkinTracer zipkin) {
+ zipkin.setSpanReporter(Reporter.NOOP);
+ }
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext context = super.createCamelContext();
+
+ zipkin = new ZipkinTracer();
+ zipkin.addProducerComponentSpanKind("seda", Span.Kind.PRODUCER);
+ setSpanReporter(zipkin);
+
+ // attaching ourself to CamelContext
+ zipkin.init(context);
+
+ return context;
+ }
+
+ @Test
+ public void testB3SingleHeaderPresent() throws Exception {
+ template.requestBody("direct:start", "Hello World");
+ }
+
+ @Override
+ protected RoutesBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start").to("seda:a").routeId("start");
+
+ from("seda:a").routeId("a")
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ String b3Header = exchange.getIn().getHeader("b3", String.class);
+ Assertions.assertThat(b3Header).isNotNull();
+ }
+ });
+ }
+ };
+ }
+
+}
diff --git a/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinTracerTest.java b/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinTracerTest.java
new file mode 100644
index 0000000..55f9a34
--- /dev/null
+++ b/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinTracerTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.zipkin;
+
+import brave.Span;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import zipkin2.reporter.Reporter;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class ZipkinTracerTest extends CamelTestSupport {
+
+ private ZipkinTracer zipkin;
+
+ private Endpoint endpoint;
+
+ protected void setSpanReporter(ZipkinTracer zipkin) {
+ zipkin.setSpanReporter(Reporter.NOOP);
+ }
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext context = super.createCamelContext();
+ zipkin = new ZipkinTracer();
+ setSpanReporter(zipkin);
+ // attaching ourself to CamelContext
+ zipkin.init(context);
+ return context;
+
+ }
+
+ @Test
+ public void testJmsProducerEndpoint() {
+ endpoint = mock(Endpoint.class);
+ when(endpoint.getEndpointBaseUri()).thenReturn("jms:queue");
+ Span.Kind spankind = zipkin.getProducerComponentSpanKind(endpoint);
+ Assertions.assertThat(spankind).isEqualTo(Span.Kind.PRODUCER);
+ }
+
+ @Test
+ public void testKafkaProducerEndpoint() {
+ endpoint = mock(Endpoint.class);
+ when(endpoint.getEndpointBaseUri()).thenReturn("kafka:topic");
+ Span.Kind spankind = zipkin.getProducerComponentSpanKind(endpoint);
+ Assertions.assertThat(spankind).isEqualTo(Span.Kind.PRODUCER);
+ }
+
+ @Test
+ public void testSJMSProducerEndpoint() {
+ endpoint = mock(Endpoint.class);
+ when(endpoint.getEndpointBaseUri()).thenReturn("sjms:queue");
+ Span.Kind spankind = zipkin.getProducerComponentSpanKind(endpoint);
+ Assertions.assertThat(spankind).isEqualTo(Span.Kind.PRODUCER);
+ }
+
+ @Test
+ public void testActiveMQProducerEndpoint() {
+ endpoint = mock(Endpoint.class);
+ when(endpoint.getEndpointBaseUri()).thenReturn("activemq:queue");
+ Span.Kind spankind = zipkin.getProducerComponentSpanKind(endpoint);
+ Assertions.assertThat(spankind).isEqualTo(Span.Kind.PRODUCER);
+ }
+
+ @Test
+ public void testNonProducerEndpoint() {
+ endpoint = mock(Endpoint.class);
+ when(endpoint.getEndpointBaseUri()).thenReturn("http:www");
+ Span.Kind spankind = zipkin.getProducerComponentSpanKind(endpoint);
+ Assertions.assertThat(spankind).isEqualTo(Span.Kind.CLIENT);
+ }
+
+ @Test
+ public void testNonProducerInvalidEndpoint() {
+ endpoint = mock(Endpoint.class);
+ when(endpoint.getEndpointBaseUri()).thenReturn("jms&queue");
+ Span.Kind spankind = zipkin.getProducerComponentSpanKind(endpoint);
+ Assertions.assertThat(spankind).isEqualTo(Span.Kind.CLIENT);
+ }
+
+ @Test
+ public void testJmsConsumerEndpoint() {
+ endpoint = mock(Endpoint.class);
+ when(endpoint.getEndpointBaseUri()).thenReturn("jms:queue");
+ Span.Kind spankind = zipkin.getConsumerComponentSpanKind(endpoint);
+ Assertions.assertThat(spankind).isEqualTo(Span.Kind.CONSUMER);
+ }
+
+ @Test
+ public void testKafkaConsumerEndpoint() {
+ endpoint = mock(Endpoint.class);
+ when(endpoint.getEndpointBaseUri()).thenReturn("kafka:topic");
+ Span.Kind spankind = zipkin.getConsumerComponentSpanKind(endpoint);
+ Assertions.assertThat(spankind).isEqualTo(Span.Kind.CONSUMER);
+ }
+
+ @Test
+ public void testSJMSConsumerEndpoint() {
+ endpoint = mock(Endpoint.class);
+ when(endpoint.getEndpointBaseUri()).thenReturn("sjms:queue");
+ Span.Kind spankind = zipkin.getConsumerComponentSpanKind(endpoint);
+ Assertions.assertThat(spankind).isEqualTo(Span.Kind.CONSUMER);
+ }
+
+ @Test
+ public void testActiveMQConsumerEndpoint() {
+ endpoint = mock(Endpoint.class);
+ when(endpoint.getEndpointBaseUri()).thenReturn("activemq:queue");
+ Span.Kind spankind = zipkin.getConsumerComponentSpanKind(endpoint);
+ Assertions.assertThat(spankind).isEqualTo(Span.Kind.CONSUMER);
+ }
+
+ @Test
+ public void testNonConsumerEndpoint() {
+ endpoint = mock(Endpoint.class);
+ when(endpoint.getEndpointBaseUri()).thenReturn("rest:customer?");
+ Span.Kind spankind = zipkin.getConsumerComponentSpanKind(endpoint);
+ Assertions.assertThat(spankind).isEqualTo(Span.Kind.SERVER);
+ }
+
+ @Test
+ public void testNonConsumerInvalidEndpoint() {
+ endpoint = mock(Endpoint.class);
+ when(endpoint.getEndpointBaseUri()).thenReturn("rest&customer?");
+ Span.Kind spankind = zipkin.getConsumerComponentSpanKind(endpoint);
+ Assertions.assertThat(spankind).isEqualTo(Span.Kind.SERVER);
+ }
+
+}