You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/12/21 05:38:59 UTC

[camel] branch main updated: Tracing: fix flaky test and add scope checks before closing (#8934)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 2e1cc4f5ed3 Tracing: fix flaky test and add scope checks before closing (#8934)
2e1cc4f5ed3 is described below

commit 2e1cc4f5ed3b3cf787301edc80ca5245d1c5271a
Author: Liudmila Molkova <li...@microsoft.com>
AuthorDate: Tue Dec 20 21:38:52 2022 -0800

    Tracing: fix flaky test and add scope checks before closing (#8934)
    
    * Tracing: make scope closing idempotent
    
    * cleanup
    
    * more cleanups
    
    * oops
---
 .../CamelOpenTelemetryTestSupport.java             | 32 +++++++++
 .../camel/opentelemetry/CurrentSpanTest.java       | 77 ++++++++++++++--------
 .../apache/camel/tracing/ActiveSpanManager.java    | 32 ++++++++-
 .../main/java/org/apache/camel/tracing/Tracer.java |  2 +-
 4 files changed, 111 insertions(+), 32 deletions(-)

diff --git a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java
index 93fe09d23f1..d7438977838 100644
--- a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java
+++ b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java
@@ -28,13 +28,19 @@ import java.util.stream.Collectors;
 
 import io.opentelemetry.api.common.AttributeKey;
 import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
 import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
+import io.opentelemetry.sdk.trace.ReadWriteSpan;
+import io.opentelemetry.sdk.trace.ReadableSpan;
 import io.opentelemetry.sdk.trace.SdkTracerProvider;
+import io.opentelemetry.sdk.trace.SpanProcessor;
 import io.opentelemetry.sdk.trace.data.SpanData;
 import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
 import org.apache.camel.CamelContext;
 import org.apache.camel.test.junit5.CamelTestSupport;
 import org.apache.camel.tracing.SpanDecorator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -63,6 +69,7 @@ class CamelOpenTelemetryTestSupport extends CamelTestSupport {
         ottracer = new OpenTelemetryTracer();
 
         tracerFactory = SdkTracerProvider.builder()
+                .addSpanProcessor(new LoggingSpanProcessor())
                 .addSpanProcessor(SimpleSpanProcessor.create(inMemorySpanExporter)).build();
 
         tracer = tracerFactory.get("tracerTest");
@@ -185,4 +192,29 @@ class CamelOpenTelemetryTestSupport extends CamelTestSupport {
         assertEquals(1, inMemorySpanExporter.getFinishedSpanItems().stream().map(s -> s.getTraceId()).distinct().count());
     }
 
+    private static class LoggingSpanProcessor implements SpanProcessor {
+        private static final Logger LOG = LoggerFactory.getLogger(LoggingSpanProcessor.class);
+
+        @Override
+        public void onStart(Context context, ReadWriteSpan readWriteSpan) {
+            LOG.debug("Span started: name - '{}', kind - '{}', id - '{}-{}", readWriteSpan.getName(), readWriteSpan.getKind(),
+                    readWriteSpan.getSpanContext().getTraceId(), readWriteSpan.getSpanContext().getSpanId());
+        }
+
+        @Override
+        public boolean isStartRequired() {
+            return true;
+        }
+
+        @Override
+        public void onEnd(ReadableSpan readableSpan) {
+            LOG.debug("Span ended: name - '{}', kind - '{}', id - '{}-{}", readableSpan.getName(), readableSpan.getKind(),
+                    readableSpan.getSpanContext().getTraceId(), readableSpan.getSpanContext().getSpanId());
+        }
+
+        @Override
+        public boolean isEndRequired() {
+            return true;
+        }
+    }
 }
diff --git a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CurrentSpanTest.java b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CurrentSpanTest.java
index c5157e14506..b28da91eb1d 100644
--- a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CurrentSpanTest.java
+++ b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CurrentSpanTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.opentelemetry;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -23,12 +24,14 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 
+import io.opentelemetry.api.common.AttributeKey;
 import io.opentelemetry.api.trace.Span;
 import io.opentelemetry.api.trace.SpanKind;
 import io.opentelemetry.sdk.trace.ReadableSpan;
 import io.opentelemetry.sdk.trace.data.SpanData;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
+import org.apache.camel.CamelExecutionException;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -47,9 +50,10 @@ import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class CurrentSpanTest extends CamelOpenTelemetryTestSupport {
-
     CurrentSpanTest() {
         super(new SpanTestData[0]);
     }
@@ -57,6 +61,7 @@ class CurrentSpanTest extends CamelOpenTelemetryTestSupport {
     @Override
     protected CamelContext createCamelContext() throws Exception {
         CamelContext context = super.createCamelContext();
+        context.addComponent("asyncmock", new AsyncMockComponent());
         context.addComponent("asyncmock1", new AsyncMockComponent());
         context.addComponent("asyncmock2", new AsyncMockComponent());
         context.addComponent("asyncmock3", new AsyncMockComponent());
@@ -130,7 +135,7 @@ class CurrentSpanTest extends CamelOpenTelemetryTestSupport {
                         .setKind(SpanKind.CLIENT),
         };
 
-        // sync pipeline
+        // async pipeline
         template.sendBody("asyncmock2:start", "Hello World");
 
         List<SpanData> spans = verify(expectedSpans, false);
@@ -138,6 +143,25 @@ class CurrentSpanTest extends CamelOpenTelemetryTestSupport {
         assertFalse(Span.current().getSpanContext().isValid());
     }
 
+    @Test
+    void testAsyncFailure() {
+        SpanTestData[] expectedSpans = {
+                new SpanTestData().setLabel("asyncmock:fail").setUri("asyncmock://fail").setOperation("asyncmock"),
+                new SpanTestData().setLabel("asyncmock:fail").setUri("asyncmock://fail").setOperation("asyncmock")
+                        .setKind(SpanKind.CLIENT),
+        };
+
+        assertThrows(CamelExecutionException.class, () -> template.sendBody("asyncmock:fail", "Hello World"));
+        assertFalse(Span.current().getSpanContext().isValid());
+
+        List<SpanData> spans = verify(expectedSpans, false);
+        assertEquals(spans.get(0).getParentSpanId(), spans.get(1).getSpanId());
+
+        assertTrue(spans.get(0).getAttributes().get(AttributeKey.booleanKey("error")));
+        assertTrue(spans.get(1).getAttributes().get(AttributeKey.booleanKey("error")));
+
+    }
+
     @Test
     void testMulticastAsync() {
         SpanTestData[] expectedSpans = {
@@ -187,6 +211,11 @@ class CurrentSpanTest extends CamelOpenTelemetryTestSupport {
                 // async pipeline
                 from("asyncmock2:start").to("asyncmock2:result");
 
+                // async fail
+                from("asyncmock:fail").process(i -> {
+                    throw new IOException("error");
+                });
+
                 // multicast pipeline
                 from("direct:start").multicast()
                         .to("asyncmock1:result")
@@ -196,31 +225,7 @@ class CurrentSpanTest extends CamelOpenTelemetryTestSupport {
                 // stress pipeline
                 from("asyncmock3:start").multicast()
                         .aggregationStrategy((oldExchange, newExchange) -> {
-                            // context should be cleaned up
-                            // BUT
-                            // we have a stack of spans for this pipeline:
-                            // root is producer (asyncmock3:start) and a bunch of nested under each other successors, e.g:
-                            // - consumer asyncmock3:start
-                            //   - producer asyncmock2:start
-                            //     - consumer asyncmock2:start
-                            //       -producer asyncmock2:result
-                            // the root span is still current during aggregation on *some* thread. It's also still running.
-                            //
-                            // OTel instrumentation for executor service should take care of propagation of
-                            // current asyncmock3:start span when possible, but it's not enabled here.
-                            //
-                            // So we can have either no context, or, accidentally have asyncmock3:start, which is also valid.
-                            // hence the condition here:
-                            if (Span.current().getSpanContext().isValid()) {
-                                ReadableSpan readable = (ReadableSpan) Span.current();
-                                if (readable.hasEnded()) {
-                                    System.out.printf("Detected current ended span: name - '%s', parent id - '%s'",
-                                            readable.getName(), readable.getParentSpanContext().getSpanId());
-                                }
-                                // we must never get current, but ended span.
-                                assertFalse(readable.hasEnded());
-                                assertEquals("asyncmock3", readable.getName());
-                            }
+                            checkCurrentSpan(newExchange);
                             return newExchange;
                         })
                         .executorService(context.getExecutorServiceManager().newFixedThreadPool(this, "CurrentSpanTest", 10))
@@ -229,11 +234,27 @@ class CurrentSpanTest extends CamelOpenTelemetryTestSupport {
                         .to("log:line", "asyncmock1:start")
                         .to("log:line", "asyncmock2:start")
                         .to("log:line", "direct:bar")
-                        .process(ignored -> assertFalse(Span.current().getSpanContext().isValid()));
+                        .process(ex -> checkCurrentSpan(ex));
             }
         };
     }
 
+    private static void checkCurrentSpan(Exchange exc) {
+        String errorMessage = null;
+        if (Span.current() instanceof ReadableSpan) {
+            ReadableSpan readable = (ReadableSpan) Span.current();
+            errorMessage = String.format(
+                    "Current span: name - '%s', kind - '%s', ended - `%s', id - '%s-%s', exchange id - '%s-%s', thread - '%s'\n",
+                    readable.getName(), readable.getKind(), readable.hasEnded(),
+                    readable.getSpanContext().getTraceId(), readable.getSpanContext().getSpanId(),
+                    ActiveSpanManager.getSpan(exc).traceId(), ActiveSpanManager.getSpan(exc).spanId(),
+                    Thread.currentThread().getName());
+
+        }
+
+        assertFalse(Span.current().getSpanContext().isValid(), errorMessage);
+    }
+
     private static class AsyncMockComponent extends MockComponent {
 
         @Override
diff --git a/components/camel-tracing/src/main/java/org/apache/camel/tracing/ActiveSpanManager.java b/components/camel-tracing/src/main/java/org/apache/camel/tracing/ActiveSpanManager.java
index 5aedf64a038..bce194f27ec 100644
--- a/components/camel-tracing/src/main/java/org/apache/camel/tracing/ActiveSpanManager.java
+++ b/components/camel-tracing/src/main/java/org/apache/camel/tracing/ActiveSpanManager.java
@@ -57,7 +57,7 @@ public final class ActiveSpanManager {
      */
     public static void activate(Exchange exchange, SpanAdapter span) {
         exchange.setProperty(ACTIVE_SPAN_PROPERTY,
-                new Holder((Holder) exchange.getProperty(ACTIVE_SPAN_PROPERTY), span, span.makeCurrent()));
+                new Holder((Holder) exchange.getProperty(ACTIVE_SPAN_PROPERTY), span));
         if (exchange.getContext().isUseMDCLogging()) {
             MDC.put(MDC_TRACE_ID, "" + span.traceId());
             MDC.put(MDC_SPAN_ID, "" + span.spanId());
@@ -116,10 +116,10 @@ public final class ActiveSpanManager {
         private SpanAdapter span;
         private AutoCloseable scope;
 
-        public Holder(Holder parent, SpanAdapter span, AutoCloseable scope) {
+        public Holder(Holder parent, SpanAdapter span) {
             this.parent = parent;
             this.span = span;
-            this.scope = scope;
+            this.scope = new ScopeWrapper(span.makeCurrent(), Thread.currentThread().getId());
         }
 
         public Holder getParent() {
@@ -141,4 +141,30 @@ public final class ActiveSpanManager {
             }
         }
     }
+
+    /**
+     * Makes closing scopes idempotent and prevents restoring scope on the wrong thread: Should be removed if
+     * https://github.com/open-telemetry/opentelemetry-java/issues/5055 is fixed.
+     */
+    private static class ScopeWrapper implements AutoCloseable {
+        private final long startThreadId;
+        private final AutoCloseable inner;
+        private boolean closed;
+
+        public ScopeWrapper(AutoCloseable inner, long startThreadId) {
+            this.startThreadId = startThreadId;
+            this.inner = inner;
+        }
+
+        @Override
+        public void close() throws Exception {
+            if (!closed && Thread.currentThread().getId() == startThreadId) {
+                closed = true;
+                inner.close();
+            } else {
+                LOG.debug("not closing scope, closed - {}, started on thread - '{}', current thread - '{}'",
+                        closed, startThreadId, Thread.currentThread().getId());
+            }
+        }
+    }
 }
diff --git a/components/camel-tracing/src/main/java/org/apache/camel/tracing/Tracer.java b/components/camel-tracing/src/main/java/org/apache/camel/tracing/Tracer.java
index c7ef133c491..776d0fd72ba 100644
--- a/components/camel-tracing/src/main/java/org/apache/camel/tracing/Tracer.java
+++ b/components/camel-tracing/src/main/java/org/apache/camel/tracing/Tracer.java
@@ -264,8 +264,8 @@ public abstract class Tracer extends ServiceSupport implements RoutePolicyFactor
                             LOG.trace("Tracing: start client span={}", span);
                         }
                         sd.post(span, ese.getExchange(), ese.getEndpoint());
-                        finishSpan(span);
                         ActiveSpanManager.deactivate(ese.getExchange());
+                        finishSpan(span);
                     } else {
                         LOG.warn("Tracing: could not find managed span for exchange={}", ese.getExchange());
                     }