You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/08/29 04:52:38 UTC

[camel] branch camel-3.x updated: CAMEL-19776: Added tracing strategy for OpenTelemetry to trace processors (#11213)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.x by this push:
     new 07346225025 CAMEL-19776: Added tracing strategy for OpenTelemetry to trace processors (#11213)
07346225025 is described below

commit 07346225025de7b9eeedc44a562e6173f33b1912
Author: RuslanHryn <gr...@gmail.com>
AuthorDate: Tue Aug 29 07:52:32 2023 +0300

    CAMEL-19776: Added tracing strategy for OpenTelemetry to trace processors (#11213)
    
    Co-authored-by: Ruslan Hryn <hr...@crxmarkets.com>
---
 .../camel/opentelemetry/NoopTracingStrategy.java   |  34 +++++++
 .../OpenTelemetryTracingStrategy.java              | 107 +++++++++++++++++++++
 .../CamelOpenTelemetryTestSupport.java             |   7 ++
 .../OpenTelemetryTracingStrategyTest.java          |  95 ++++++++++++++++++
 4 files changed, 243 insertions(+)

diff --git a/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/NoopTracingStrategy.java b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/NoopTracingStrategy.java
new file mode 100644
index 00000000000..ad8d54ba761
--- /dev/null
+++ b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/NoopTracingStrategy.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.opentelemetry;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.NamedNode;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.InterceptStrategy;
+import org.apache.camel.support.processor.DelegateAsyncProcessor;
+
+public class NoopTracingStrategy implements InterceptStrategy {
+
+    @Override
+    public Processor wrapProcessorInInterceptors(
+            CamelContext camelContext, NamedNode processorDefinition,
+            Processor target, Processor nextTarget)
+            throws Exception {
+        return new DelegateAsyncProcessor(target);
+    }
+}
diff --git a/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategy.java b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategy.java
new file mode 100644
index 00000000000..7e2266bf3ae
--- /dev/null
+++ b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategy.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.opentelemetry;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.NamedNode;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.InterceptStrategy;
+import org.apache.camel.support.PatternHelper;
+import org.apache.camel.support.processor.DelegateAsyncProcessor;
+import org.apache.camel.tracing.ActiveSpanManager;
+import org.apache.camel.tracing.SpanDecorator;
+
+public class OpenTelemetryTracingStrategy implements InterceptStrategy {
+    private static final String UNNAMED = "unnamed";
+    private final OpenTelemetryTracer tracer;
+
+    public OpenTelemetryTracingStrategy(OpenTelemetryTracer tracer) {
+        this.tracer = tracer;
+    }
+
+    @Override
+    public Processor wrapProcessorInInterceptors(
+            CamelContext camelContext,
+            NamedNode processorDefinition, Processor target, Processor nextTarget)
+            throws Exception {
+        if (!shouldTrace(processorDefinition)) {
+            return new DelegateAsyncProcessor(target);
+        }
+
+        return new DelegateAsyncProcessor((Exchange exchange) -> {
+            OpenTelemetrySpanAdapter spanWrapper = (OpenTelemetrySpanAdapter) ActiveSpanManager.getSpan(exchange);
+            Span span = spanWrapper.getOpenTelemetrySpan();
+            if (span == null) {
+                target.process(exchange);
+                return;
+            }
+
+            final Span processorSpan = tracer.getTracer().spanBuilder(getOperationName(processorDefinition))
+                    .setParent(Context.current().with(span))
+                    .setAttribute("component", getComponentName(processorDefinition))
+                    .startSpan();
+
+            boolean activateExchange = !(target instanceof GetCorrelationContextProcessor
+                    || target instanceof SetCorrelationContextProcessor);
+
+            if (activateExchange) {
+                ActiveSpanManager.activate(exchange, new OpenTelemetrySpanAdapter(processorSpan));
+            }
+
+            try (Scope ignored = processorSpan.makeCurrent()) {
+                target.process(exchange);
+            } catch (Exception ex) {
+                span.setStatus(StatusCode.ERROR);
+                span.recordException(ex);
+                throw ex;
+            } finally {
+                if (activateExchange) {
+                    ActiveSpanManager.deactivate(exchange);
+                }
+
+                processorSpan.end();
+            }
+        });
+    }
+
+    private static String getComponentName(NamedNode processorDefinition) {
+        return SpanDecorator.CAMEL_COMPONENT + processorDefinition.getShortName();
+    }
+
+    private static String getOperationName(NamedNode processorDefinition) {
+        final String name = processorDefinition.getId();
+        return name == null ? UNNAMED : name;
+    }
+
+    // Adapted from org.apache.camel.impl.engine.DefaultTracer.shouldTrace
+    // org.apache.camel.impl.engine.DefaultTracer.shouldTracePattern
+    private boolean shouldTrace(NamedNode definition) {
+        for (String pattern : tracer.getExcludePatterns()) {
+            // use matchPattern method from endpoint helper that has a good matcher we use in Camel
+            if (PatternHelper.matchPattern(definition.getId(), pattern)) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+}
diff --git a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java
index d7438977838..c9eb885f4e4 100644
--- a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java
+++ b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import io.opentelemetry.api.common.AttributeKey;
@@ -37,6 +38,7 @@ import io.opentelemetry.sdk.trace.SpanProcessor;
 import io.opentelemetry.sdk.trace.data.SpanData;
 import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
 import org.apache.camel.CamelContext;
+import org.apache.camel.spi.InterceptStrategy;
 import org.apache.camel.test.junit5.CamelTestSupport;
 import org.apache.camel.tracing.SpanDecorator;
 import org.slf4j.Logger;
@@ -76,6 +78,7 @@ class CamelOpenTelemetryTestSupport extends CamelTestSupport {
         ottracer.setTracer(tracer);
         ottracer.setExcludePatterns(getExcludePatterns());
         ottracer.addDecorator(new TestSEDASpanDecorator());
+        ottracer.setTracingStrategy(getTracingStrategy().apply(ottracer));
         ottracer.init(context);
         return context;
     }
@@ -192,6 +195,10 @@ class CamelOpenTelemetryTestSupport extends CamelTestSupport {
         assertEquals(1, inMemorySpanExporter.getFinishedSpanItems().stream().map(s -> s.getTraceId()).distinct().count());
     }
 
+    protected Function<OpenTelemetryTracer, InterceptStrategy> getTracingStrategy() {
+        return ottracer -> new NoopTracingStrategy();
+    }
+
     private static class LoggingSpanProcessor implements SpanProcessor {
         private static final Logger LOG = LoggerFactory.getLogger(LoggingSpanProcessor.class);
 
diff --git a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategyTest.java b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategyTest.java
new file mode 100644
index 00000000000..4e9a3baeed6
--- /dev/null
+++ b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategyTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.opentelemetry;
+
+import java.util.function.Function;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.context.Scope;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.InterceptStrategy;
+import org.junit.jupiter.api.Test;
+
+class OpenTelemetryTracingStrategyTest extends CamelOpenTelemetryTestSupport {
+
+    private static SpanTestData[] testdata = {
+            new SpanTestData().setLabel("camel-process").setOperation("third-party-span")
+                    .setParentId(1),
+            new SpanTestData().setLabel("camel-process").setOperation("third-party-processor")
+                    .setParentId(6),
+            new SpanTestData().setLabel("camel-process").setOperation("direct-processor")
+                    .setParentId(3),
+            new SpanTestData().setLabel("direct:serviceB").setOperation("serviceB")
+                    .setParentId(4),
+            new SpanTestData().setLabel("direct:serviceB").setOperation("serviceB")
+                    .setKind(SpanKind.CLIENT)
+                    .setParentId(5),
+            new SpanTestData().setLabel("to:serviceB").setOperation("to-serviceB")
+                    .setParentId(6),
+            new SpanTestData().setLabel("direct:serviceA").setUri("direct://start").setOperation("serviceA")
+                    .setParentId(7),
+            new SpanTestData().setLabel("direct:serviceA").setUri("direct://start").setOperation("serviceA")
+                    .setKind(SpanKind.CLIENT)
+    };
+
+    OpenTelemetryTracingStrategyTest() {
+        super(testdata);
+    }
+
+    @Test
+    void testTracingOfProcessors() {
+        template.requestBody("direct:serviceA", "Hello");
+
+        verify();
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:serviceA").routeId("serviceA")
+                        .process(exchange -> {
+                            callThirdPartyInstrumentation();
+                        }).id("third-party-processor")
+                        .to("direct:serviceB").id("to-serviceB");
+
+                from("direct:serviceB").routeId("serviceB")
+                        .process(exchange -> {
+                            Thread.sleep(100);
+                        }).id("direct-processor");
+            }
+
+            private void callThirdPartyInstrumentation() throws InterruptedException {
+                Span span = getTracer().spanBuilder("third-party-span").startSpan();
+                try (Scope ignored = span.makeCurrent()) {
+                    span.setAttribute(COMPONENT_KEY, "third-party-component");
+                    Thread.sleep(100);
+                } finally {
+                    span.end();
+                }
+            }
+        };
+    }
+
+    @Override
+    protected Function<OpenTelemetryTracer, InterceptStrategy> getTracingStrategy() {
+        return OpenTelemetryTracingStrategy::new;
+    }
+}