You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/12/21 05:38:59 UTC
[camel] branch main updated: Tracing: fix flaky test and add scope checks before closing (#8934)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 2e1cc4f5ed3 Tracing: fix flaky test and add scope checks before closing (#8934)
2e1cc4f5ed3 is described below
commit 2e1cc4f5ed3b3cf787301edc80ca5245d1c5271a
Author: Liudmila Molkova <li...@microsoft.com>
AuthorDate: Tue Dec 20 21:38:52 2022 -0800
Tracing: fix flaky test and add scope checks before closing (#8934)
* Tracing: make scope closing idempotent
* cleanup
* more cleanups
* oops
---
.../CamelOpenTelemetryTestSupport.java | 32 +++++++++
.../camel/opentelemetry/CurrentSpanTest.java | 77 ++++++++++++++--------
.../apache/camel/tracing/ActiveSpanManager.java | 32 ++++++++-
.../main/java/org/apache/camel/tracing/Tracer.java | 2 +-
4 files changed, 111 insertions(+), 32 deletions(-)
diff --git a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java
index 93fe09d23f1..d7438977838 100644
--- a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java
+++ b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java
@@ -28,13 +28,19 @@ import java.util.stream.Collectors;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
+import io.opentelemetry.sdk.trace.ReadWriteSpan;
+import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
+import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.apache.camel.tracing.SpanDecorator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -63,6 +69,7 @@ class CamelOpenTelemetryTestSupport extends CamelTestSupport {
ottracer = new OpenTelemetryTracer();
tracerFactory = SdkTracerProvider.builder()
+ .addSpanProcessor(new LoggingSpanProcessor())
.addSpanProcessor(SimpleSpanProcessor.create(inMemorySpanExporter)).build();
tracer = tracerFactory.get("tracerTest");
@@ -185,4 +192,29 @@ class CamelOpenTelemetryTestSupport extends CamelTestSupport {
assertEquals(1, inMemorySpanExporter.getFinishedSpanItems().stream().map(s -> s.getTraceId()).distinct().count());
}
+ private static class LoggingSpanProcessor implements SpanProcessor {
+ private static final Logger LOG = LoggerFactory.getLogger(LoggingSpanProcessor.class);
+
+ @Override
+ public void onStart(Context context, ReadWriteSpan readWriteSpan) {
+ LOG.debug("Span started: name - '{}', kind - '{}', id - '{}-{}", readWriteSpan.getName(), readWriteSpan.getKind(),
+ readWriteSpan.getSpanContext().getTraceId(), readWriteSpan.getSpanContext().getSpanId());
+ }
+
+ @Override
+ public boolean isStartRequired() {
+ return true;
+ }
+
+ @Override
+ public void onEnd(ReadableSpan readableSpan) {
+ LOG.debug("Span ended: name - '{}', kind - '{}', id - '{}-{}", readableSpan.getName(), readableSpan.getKind(),
+ readableSpan.getSpanContext().getTraceId(), readableSpan.getSpanContext().getSpanId());
+ }
+
+ @Override
+ public boolean isEndRequired() {
+ return true;
+ }
+ }
}
diff --git a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CurrentSpanTest.java b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CurrentSpanTest.java
index c5157e14506..b28da91eb1d 100644
--- a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CurrentSpanTest.java
+++ b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CurrentSpanTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.opentelemetry;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -23,12 +24,14 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
+import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.data.SpanData;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
+import org.apache.camel.CamelExecutionException;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
@@ -47,9 +50,10 @@ import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
class CurrentSpanTest extends CamelOpenTelemetryTestSupport {
-
CurrentSpanTest() {
super(new SpanTestData[0]);
}
@@ -57,6 +61,7 @@ class CurrentSpanTest extends CamelOpenTelemetryTestSupport {
@Override
protected CamelContext createCamelContext() throws Exception {
CamelContext context = super.createCamelContext();
+ context.addComponent("asyncmock", new AsyncMockComponent());
context.addComponent("asyncmock1", new AsyncMockComponent());
context.addComponent("asyncmock2", new AsyncMockComponent());
context.addComponent("asyncmock3", new AsyncMockComponent());
@@ -130,7 +135,7 @@ class CurrentSpanTest extends CamelOpenTelemetryTestSupport {
.setKind(SpanKind.CLIENT),
};
- // sync pipeline
+ // async pipeline
template.sendBody("asyncmock2:start", "Hello World");
List<SpanData> spans = verify(expectedSpans, false);
@@ -138,6 +143,25 @@ class CurrentSpanTest extends CamelOpenTelemetryTestSupport {
assertFalse(Span.current().getSpanContext().isValid());
}
+ @Test
+ void testAsyncFailure() {
+ SpanTestData[] expectedSpans = {
+ new SpanTestData().setLabel("asyncmock:fail").setUri("asyncmock://fail").setOperation("asyncmock"),
+ new SpanTestData().setLabel("asyncmock:fail").setUri("asyncmock://fail").setOperation("asyncmock")
+ .setKind(SpanKind.CLIENT),
+ };
+
+ assertThrows(CamelExecutionException.class, () -> template.sendBody("asyncmock:fail", "Hello World"));
+ assertFalse(Span.current().getSpanContext().isValid());
+
+ List<SpanData> spans = verify(expectedSpans, false);
+ assertEquals(spans.get(0).getParentSpanId(), spans.get(1).getSpanId());
+
+ assertTrue(spans.get(0).getAttributes().get(AttributeKey.booleanKey("error")));
+ assertTrue(spans.get(1).getAttributes().get(AttributeKey.booleanKey("error")));
+
+ }
+
@Test
void testMulticastAsync() {
SpanTestData[] expectedSpans = {
@@ -187,6 +211,11 @@ class CurrentSpanTest extends CamelOpenTelemetryTestSupport {
// async pipeline
from("asyncmock2:start").to("asyncmock2:result");
+ // async fail
+ from("asyncmock:fail").process(i -> {
+ throw new IOException("error");
+ });
+
// multicast pipeline
from("direct:start").multicast()
.to("asyncmock1:result")
@@ -196,31 +225,7 @@ class CurrentSpanTest extends CamelOpenTelemetryTestSupport {
// stress pipeline
from("asyncmock3:start").multicast()
.aggregationStrategy((oldExchange, newExchange) -> {
- // context should be cleaned up
- // BUT
- // we have a stack of spans for this pipeline:
- // root is producer (asyncmock3:start) and a bunch of nested under each other successors, e.g:
- // - consumer asyncmock3:start
- // - producer asyncmock2:start
- // - consumer asyncmock2:start
- // -producer asyncmock2:result
- // the root span is still current during aggregation on *some* thread. It's also still running.
- //
- // OTel instrumentation for executor service should take care of propagation of
- // current asyncmock3:start span when possible, but it's not enabled here.
- //
- // So we can have either no context, or, accidentally have asyncmock3:start, which is also valid.
- // hence the condition here:
- if (Span.current().getSpanContext().isValid()) {
- ReadableSpan readable = (ReadableSpan) Span.current();
- if (readable.hasEnded()) {
- System.out.printf("Detected current ended span: name - '%s', parent id - '%s'",
- readable.getName(), readable.getParentSpanContext().getSpanId());
- }
- // we must never get current, but ended span.
- assertFalse(readable.hasEnded());
- assertEquals("asyncmock3", readable.getName());
- }
+ checkCurrentSpan(newExchange);
return newExchange;
})
.executorService(context.getExecutorServiceManager().newFixedThreadPool(this, "CurrentSpanTest", 10))
@@ -229,11 +234,27 @@ class CurrentSpanTest extends CamelOpenTelemetryTestSupport {
.to("log:line", "asyncmock1:start")
.to("log:line", "asyncmock2:start")
.to("log:line", "direct:bar")
- .process(ignored -> assertFalse(Span.current().getSpanContext().isValid()));
+ .process(ex -> checkCurrentSpan(ex));
}
};
}
+ private static void checkCurrentSpan(Exchange exc) {
+ String errorMessage = null;
+ if (Span.current() instanceof ReadableSpan) {
+ ReadableSpan readable = (ReadableSpan) Span.current();
+ errorMessage = String.format(
+ "Current span: name - '%s', kind - '%s', ended - `%s', id - '%s-%s', exchange id - '%s-%s', thread - '%s'\n",
+ readable.getName(), readable.getKind(), readable.hasEnded(),
+ readable.getSpanContext().getTraceId(), readable.getSpanContext().getSpanId(),
+ ActiveSpanManager.getSpan(exc).traceId(), ActiveSpanManager.getSpan(exc).spanId(),
+ Thread.currentThread().getName());
+
+ }
+
+ assertFalse(Span.current().getSpanContext().isValid(), errorMessage);
+ }
+
private static class AsyncMockComponent extends MockComponent {
@Override
diff --git a/components/camel-tracing/src/main/java/org/apache/camel/tracing/ActiveSpanManager.java b/components/camel-tracing/src/main/java/org/apache/camel/tracing/ActiveSpanManager.java
index 5aedf64a038..bce194f27ec 100644
--- a/components/camel-tracing/src/main/java/org/apache/camel/tracing/ActiveSpanManager.java
+++ b/components/camel-tracing/src/main/java/org/apache/camel/tracing/ActiveSpanManager.java
@@ -57,7 +57,7 @@ public final class ActiveSpanManager {
*/
public static void activate(Exchange exchange, SpanAdapter span) {
exchange.setProperty(ACTIVE_SPAN_PROPERTY,
- new Holder((Holder) exchange.getProperty(ACTIVE_SPAN_PROPERTY), span, span.makeCurrent()));
+ new Holder((Holder) exchange.getProperty(ACTIVE_SPAN_PROPERTY), span));
if (exchange.getContext().isUseMDCLogging()) {
MDC.put(MDC_TRACE_ID, "" + span.traceId());
MDC.put(MDC_SPAN_ID, "" + span.spanId());
@@ -116,10 +116,10 @@ public final class ActiveSpanManager {
private SpanAdapter span;
private AutoCloseable scope;
- public Holder(Holder parent, SpanAdapter span, AutoCloseable scope) {
+ public Holder(Holder parent, SpanAdapter span) {
this.parent = parent;
this.span = span;
- this.scope = scope;
+ this.scope = new ScopeWrapper(span.makeCurrent(), Thread.currentThread().getId());
}
public Holder getParent() {
@@ -141,4 +141,30 @@ public final class ActiveSpanManager {
}
}
}
+
+ /**
+ * Makes closing scopes idempotent and prevents restoring scope on the wrong thread: Should be removed if
+ * https://github.com/open-telemetry/opentelemetry-java/issues/5055 is fixed.
+ */
+ private static class ScopeWrapper implements AutoCloseable {
+ private final long startThreadId;
+ private final AutoCloseable inner;
+ private boolean closed;
+
+ public ScopeWrapper(AutoCloseable inner, long startThreadId) {
+ this.startThreadId = startThreadId;
+ this.inner = inner;
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (!closed && Thread.currentThread().getId() == startThreadId) {
+ closed = true;
+ inner.close();
+ } else {
+ LOG.debug("not closing scope, closed - {}, started on thread - '{}', current thread - '{}'",
+ closed, startThreadId, Thread.currentThread().getId());
+ }
+ }
+ }
}
diff --git a/components/camel-tracing/src/main/java/org/apache/camel/tracing/Tracer.java b/components/camel-tracing/src/main/java/org/apache/camel/tracing/Tracer.java
index c7ef133c491..776d0fd72ba 100644
--- a/components/camel-tracing/src/main/java/org/apache/camel/tracing/Tracer.java
+++ b/components/camel-tracing/src/main/java/org/apache/camel/tracing/Tracer.java
@@ -264,8 +264,8 @@ public abstract class Tracer extends ServiceSupport implements RoutePolicyFactor
LOG.trace("Tracing: start client span={}", span);
}
sd.post(span, ese.getExchange(), ese.getEndpoint());
- finishSpan(span);
ActiveSpanManager.deactivate(ese.getExchange());
+ finishSpan(span);
} else {
LOG.warn("Tracing: could not find managed span for exchange={}", ese.getExchange());
}