You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/08/29 04:52:38 UTC
[camel] branch camel-3.x updated: CAMEL-19776: Added tracing strategy for OpenTelemetry to trace processors (#11213)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.x by this push:
new 07346225025 CAMEL-19776: Added tracing strategy for OpenTelemetry to trace processors (#11213)
07346225025 is described below
commit 07346225025de7b9eeedc44a562e6173f33b1912
Author: RuslanHryn <gr...@gmail.com>
AuthorDate: Tue Aug 29 07:52:32 2023 +0300
CAMEL-19776: Added tracing strategy for OpenTelemetry to trace processors (#11213)
Co-authored-by: Ruslan Hryn <hr...@crxmarkets.com>
---
.../camel/opentelemetry/NoopTracingStrategy.java | 34 +++++++
.../OpenTelemetryTracingStrategy.java | 107 +++++++++++++++++++++
.../CamelOpenTelemetryTestSupport.java | 7 ++
.../OpenTelemetryTracingStrategyTest.java | 95 ++++++++++++++++++
4 files changed, 243 insertions(+)
diff --git a/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/NoopTracingStrategy.java b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/NoopTracingStrategy.java
new file mode 100644
index 00000000000..ad8d54ba761
--- /dev/null
+++ b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/NoopTracingStrategy.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.opentelemetry;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.NamedNode;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.InterceptStrategy;
+import org.apache.camel.support.processor.DelegateAsyncProcessor;
+
+public class NoopTracingStrategy implements InterceptStrategy {
+
+ @Override
+ public Processor wrapProcessorInInterceptors(
+ CamelContext camelContext, NamedNode processorDefinition,
+ Processor target, Processor nextTarget)
+ throws Exception {
+ return new DelegateAsyncProcessor(target);
+ }
+}
diff --git a/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategy.java b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategy.java
new file mode 100644
index 00000000000..7e2266bf3ae
--- /dev/null
+++ b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategy.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.opentelemetry;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.NamedNode;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.InterceptStrategy;
+import org.apache.camel.support.PatternHelper;
+import org.apache.camel.support.processor.DelegateAsyncProcessor;
+import org.apache.camel.tracing.ActiveSpanManager;
+import org.apache.camel.tracing.SpanDecorator;
+
+public class OpenTelemetryTracingStrategy implements InterceptStrategy {
+ private static final String UNNAMED = "unnamed";
+ private final OpenTelemetryTracer tracer;
+
+ public OpenTelemetryTracingStrategy(OpenTelemetryTracer tracer) {
+ this.tracer = tracer;
+ }
+
+ @Override
+ public Processor wrapProcessorInInterceptors(
+ CamelContext camelContext,
+ NamedNode processorDefinition, Processor target, Processor nextTarget)
+ throws Exception {
+ if (!shouldTrace(processorDefinition)) {
+ return new DelegateAsyncProcessor(target);
+ }
+
+ return new DelegateAsyncProcessor((Exchange exchange) -> {
+ OpenTelemetrySpanAdapter spanWrapper = (OpenTelemetrySpanAdapter) ActiveSpanManager.getSpan(exchange);
+ Span span = spanWrapper.getOpenTelemetrySpan();
+ if (span == null) {
+ target.process(exchange);
+ return;
+ }
+
+ final Span processorSpan = tracer.getTracer().spanBuilder(getOperationName(processorDefinition))
+ .setParent(Context.current().with(span))
+ .setAttribute("component", getComponentName(processorDefinition))
+ .startSpan();
+
+ boolean activateExchange = !(target instanceof GetCorrelationContextProcessor
+ || target instanceof SetCorrelationContextProcessor);
+
+ if (activateExchange) {
+ ActiveSpanManager.activate(exchange, new OpenTelemetrySpanAdapter(processorSpan));
+ }
+
+ try (Scope ignored = processorSpan.makeCurrent()) {
+ target.process(exchange);
+ } catch (Exception ex) {
+ span.setStatus(StatusCode.ERROR);
+ span.recordException(ex);
+ throw ex;
+ } finally {
+ if (activateExchange) {
+ ActiveSpanManager.deactivate(exchange);
+ }
+
+ processorSpan.end();
+ }
+ });
+ }
+
+ private static String getComponentName(NamedNode processorDefinition) {
+ return SpanDecorator.CAMEL_COMPONENT + processorDefinition.getShortName();
+ }
+
+ private static String getOperationName(NamedNode processorDefinition) {
+ final String name = processorDefinition.getId();
+ return name == null ? UNNAMED : name;
+ }
+
+ // Adapted from org.apache.camel.impl.engine.DefaultTracer.shouldTrace
+ // org.apache.camel.impl.engine.DefaultTracer.shouldTracePattern
+ private boolean shouldTrace(NamedNode definition) {
+ for (String pattern : tracer.getExcludePatterns()) {
+ // use matchPattern method from endpoint helper that has a good matcher we use in Camel
+ if (PatternHelper.matchPattern(definition.getId(), pattern)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+}
diff --git a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java
index d7438977838..c9eb885f4e4 100644
--- a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java
+++ b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
import java.util.stream.Collectors;
import io.opentelemetry.api.common.AttributeKey;
@@ -37,6 +38,7 @@ import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import org.apache.camel.CamelContext;
+import org.apache.camel.spi.InterceptStrategy;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.apache.camel.tracing.SpanDecorator;
import org.slf4j.Logger;
@@ -76,6 +78,7 @@ class CamelOpenTelemetryTestSupport extends CamelTestSupport {
ottracer.setTracer(tracer);
ottracer.setExcludePatterns(getExcludePatterns());
ottracer.addDecorator(new TestSEDASpanDecorator());
+ ottracer.setTracingStrategy(getTracingStrategy().apply(ottracer));
ottracer.init(context);
return context;
}
@@ -192,6 +195,10 @@ class CamelOpenTelemetryTestSupport extends CamelTestSupport {
assertEquals(1, inMemorySpanExporter.getFinishedSpanItems().stream().map(s -> s.getTraceId()).distinct().count());
}
+ protected Function<OpenTelemetryTracer, InterceptStrategy> getTracingStrategy() {
+ return ottracer -> new NoopTracingStrategy();
+ }
+
private static class LoggingSpanProcessor implements SpanProcessor {
private static final Logger LOG = LoggerFactory.getLogger(LoggingSpanProcessor.class);
diff --git a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategyTest.java b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategyTest.java
new file mode 100644
index 00000000000..4e9a3baeed6
--- /dev/null
+++ b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategyTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.opentelemetry;
+
+import java.util.function.Function;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.context.Scope;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.InterceptStrategy;
+import org.junit.jupiter.api.Test;
+
+class OpenTelemetryTracingStrategyTest extends CamelOpenTelemetryTestSupport {
+
+ private static SpanTestData[] testdata = {
+ new SpanTestData().setLabel("camel-process").setOperation("third-party-span")
+ .setParentId(1),
+ new SpanTestData().setLabel("camel-process").setOperation("third-party-processor")
+ .setParentId(6),
+ new SpanTestData().setLabel("camel-process").setOperation("direct-processor")
+ .setParentId(3),
+ new SpanTestData().setLabel("direct:serviceB").setOperation("serviceB")
+ .setParentId(4),
+ new SpanTestData().setLabel("direct:serviceB").setOperation("serviceB")
+ .setKind(SpanKind.CLIENT)
+ .setParentId(5),
+ new SpanTestData().setLabel("to:serviceB").setOperation("to-serviceB")
+ .setParentId(6),
+ new SpanTestData().setLabel("direct:serviceA").setUri("direct://start").setOperation("serviceA")
+ .setParentId(7),
+ new SpanTestData().setLabel("direct:serviceA").setUri("direct://start").setOperation("serviceA")
+ .setKind(SpanKind.CLIENT)
+ };
+
+ OpenTelemetryTracingStrategyTest() {
+ super(testdata);
+ }
+
+ @Test
+ void testTracingOfProcessors() {
+ template.requestBody("direct:serviceA", "Hello");
+
+ verify();
+ }
+
+ @Override
+ protected RoutesBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:serviceA").routeId("serviceA")
+ .process(exchange -> {
+ callThirdPartyInstrumentation();
+ }).id("third-party-processor")
+ .to("direct:serviceB").id("to-serviceB");
+
+ from("direct:serviceB").routeId("serviceB")
+ .process(exchange -> {
+ Thread.sleep(100);
+ }).id("direct-processor");
+ }
+
+ private void callThirdPartyInstrumentation() throws InterruptedException {
+ Span span = getTracer().spanBuilder("third-party-span").startSpan();
+ try (Scope ignored = span.makeCurrent()) {
+ span.setAttribute(COMPONENT_KEY, "third-party-component");
+ Thread.sleep(100);
+ } finally {
+ span.end();
+ }
+ }
+ };
+ }
+
+ @Override
+ protected Function<OpenTelemetryTracer, InterceptStrategy> getTracingStrategy() {
+ return OpenTelemetryTracingStrategy::new;
+ }
+}