You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/04/12 13:58:59 UTC

[camel] branch camel-2.21.x updated: Added test for custom component which uses a backing service which furthermore sends data to a further Camel route asynchronously. As the processing route was a single consumer thread it couldn't process new tasks issued to it till the further task was finished leading to XRay emitting the segment preemptively as it wasn't aware that some stuff from the asynchronous route, which wasn't processed yet, belong to it. Later on, when the asynchronous code was executed an AlreadyEmittedException [...]

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.21.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.21.x by this push:
     new e5fa508  Added test for custom component which uses a backing service which furthermore sends data to a further Camel route asynchronously. As the processing route was a single consumer thread it couldn't process new tasks issued to it till the further task was finished leading to XRay emitting the segment preemptively as it wasn't aware that some stuff from the asynchronous route, which wasn't processed yet, belong to it. Later on, when the asynchronous code was executed an Alre [...]
e5fa508 is described below

commit e5fa508b38a4d6fb7d8742a755a091b94bb3762d
Author: Roman Vottner <ro...@gmx.at>
AuthorDate: Wed Apr 11 17:19:06 2018 +0200

    Added test for custom component which uses a backing service which furthermore sends data to a further Camel route asynchronously. As the processing route was a single consumer thread it couldn't process new tasks issued to it till the further task was finished leading to XRay emitting the segment preemptively as it wasn't aware that some stuff from the asynchronous route, which wasn't processed yet, belong to it. Later on, when the asynchronous code was executed an AlreadyEmittedExce [...]
    Added also notify builders to each test case in order to get issues with test-timing right. This also has the benefit to get rid of Thread.sleeps(...) inside of the tests itself
---
 .../camel/component/aws/xray/XRayTracer.java       |  88 +++++++----
 .../camel/component/aws/xray/ABCRouteTest.java     |  11 ++
 .../camel/component/aws/xray/BeanTracingTest.java  |  11 ++
 .../aws/xray/ClientRecipientListRouteTest.java     |  11 ++
 .../aws/xray/ComprehensiveTrackingTest.java        |   8 +
 .../component/aws/xray/CustomComponentTest.java    | 169 +++++++++++++++++++++
 .../camel/component/aws/xray/EIPTracingTest.java   |  11 ++
 .../component/aws/xray/ErrorHandlingTest.java      |  11 ++
 .../apache/camel/component/aws/xray/ErrorTest.java |  12 ++
 .../aws/xray/MulticastParallelRouteTest.java       |  16 ++
 .../component/aws/xray/MulticastRouteTest.java     |  16 ++
 .../component/aws/xray/Route2ConcurrentTest.java   |   7 +-
 .../component/aws/xray/RouteConcurrentTest.java    |   7 +-
 .../aws/xray/SpringAwsXRaySimpleRouteTest.java     |   8 +-
 .../apache/camel/component/aws/xray/TestUtils.java |  13 +-
 .../camel/component/aws/xray/TwoService2Test.java  |  13 +-
 .../camel/component/aws/xray/TwoServiceTest.java   |  12 +-
 .../aws/xray/TwoServiceWithExcludeTest.java        |  13 +-
 .../aws/xray/bean/ProcessingCamelBean.java         |  45 ++++++
 .../aws/xray/bean/SomeBackingService.java          |  57 +++++++
 .../aws/xray/component/CommonEndpoints.java        |  29 ++++
 .../aws/xray/component/TestXRayComponent.java      |  39 +++++
 .../aws/xray/component/TestXRayEndpoint.java       |  53 +++++++
 .../aws/xray/component/TestXRayProducer.java       |  52 +++++++
 .../services/org/apache/camel/component/xray-test  |   1 +
 25 files changed, 675 insertions(+), 38 deletions(-)

diff --git a/components/camel-aws-xray/src/main/java/org/apache/camel/component/aws/xray/XRayTracer.java b/components/camel-aws-xray/src/main/java/org/apache/camel/component/aws/xray/XRayTracer.java
index 128d964..e8c1112 100644
--- a/components/camel-aws-xray/src/main/java/org/apache/camel/component/aws/xray/XRayTracer.java
+++ b/components/camel-aws-xray/src/main/java/org/apache/camel/component/aws/xray/XRayTracer.java
@@ -18,7 +18,13 @@ package org.apache.camel.component.aws.xray;
 
 import java.lang.invoke.MethodHandles;
 import java.net.URI;
-import java.util.*;
+import java.util.EventObject;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceLoader;
+import java.util.Set;
 
 import com.amazonaws.xray.AWSXRay;
 import com.amazonaws.xray.AWSXRayRecorder;
@@ -26,7 +32,7 @@ import com.amazonaws.xray.entities.Entity;
 import com.amazonaws.xray.entities.Segment;
 import com.amazonaws.xray.entities.Subsegment;
 import com.amazonaws.xray.entities.TraceID;
-
+import com.amazonaws.xray.exceptions.AlreadyEmittedException;
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Endpoint;
@@ -277,10 +283,15 @@ public class XRayTracer extends ServiceSupport implements RoutePolicyFactory, St
                     if (sd.getComponent() != null) {
                         name = sd.getComponent() + ":" + name;
                     }
-                    Subsegment subsegment = AWSXRay.beginSubsegment(sanitizeName(name));
-                    sd.pre(subsegment, ese.getExchange(), ese.getEndpoint());
-                    LOG.trace("Creating new subsegment with ID {} and name {}",
-                            subsegment.getId(), subsegment.getName());
+                    try {
+                        Subsegment subsegment = AWSXRay.beginSubsegment(sanitizeName(name));
+                        sd.pre(subsegment, ese.getExchange(), ese.getEndpoint());
+                        LOG.trace("Creating new subsegment with ID {} and name {}",
+                                subsegment.getId(), subsegment.getName());
+                    } catch (AlreadyEmittedException aeEx) {
+                        LOG.warn("Ignoring starting of subsegment " + name + " as its parent segment"
+                                + " was already emitted to AWS.");
+                    }
                 } else {
                     LOG.trace("Ignoring creation of XRay subsegment as no segment exists in the current thread");
                 }
@@ -293,11 +304,17 @@ public class XRayTracer extends ServiceSupport implements RoutePolicyFactory, St
                 SegmentDecorator sd = getSegmentDecorator(ese.getEndpoint());
 
                 if (AWSXRay.getCurrentSubsegmentOptional().isPresent()) {
-                    Subsegment subsegment = AWSXRay.getCurrentSubsegment();
-                    sd.post(subsegment, ese.getExchange(), ese.getEndpoint());
-                    subsegment.close();
-                    LOG.trace("Closing down subsegment with ID {} and name {}",
-                            subsegment.getId(), subsegment.getName());
+                    String name = sd.getOperationName(ese.getExchange(), ese.getEndpoint());
+                    try {
+                        Subsegment subsegment = AWSXRay.getCurrentSubsegment();
+                        sd.post(subsegment, ese.getExchange(), ese.getEndpoint());
+                        subsegment.close();
+                        LOG.trace("Closing down subsegment with ID {} and name {}",
+                                subsegment.getId(), subsegment.getName());
+                    } catch (AlreadyEmittedException aeEx) {
+                        LOG.warn("Ignoring close of subsegment " + name
+                                + " as its parent segment was already emitted to AWS");
+                    }
                 }
             } else {
                 LOG.trace("Received event {} from source {}", event, event.getSource());
@@ -377,7 +394,8 @@ public class XRayTracer extends ServiceSupport implements RoutePolicyFactory, St
             }
 
             SegmentDecorator sd = getSegmentDecorator(route.getEndpoint());
-            if (!AWSXRay.getCurrentSegmentOptional().isPresent()) {
+            Optional<Segment> curSegment = AWSXRay.getCurrentSegmentOptional();
+            if (!curSegment.isPresent()) {
                 Segment segment = AWSXRay.beginSegment(sanitizeName(route.getId()));
                 segment.setTraceId(traceID);
                 sd.pre(segment, exchange, route.getEndpoint());
@@ -385,10 +403,16 @@ public class XRayTracer extends ServiceSupport implements RoutePolicyFactory, St
                         segment.getId(), segment.getName());
                 exchange.setProperty(CURRENT_SEGMENT, segment);
             } else {
-                Subsegment subsegment = AWSXRay.beginSubsegment(route.getId());
-                sd.pre(subsegment, exchange, route.getEndpoint());
-                LOG.trace("Created new XRay subsegment {} with name {}",
-                        subsegment.getId(), subsegment.getName());
+                String segmentName = curSegment.get().getId();
+                try {
+                    Subsegment subsegment = AWSXRay.beginSubsegment(route.getId());
+                    sd.pre(subsegment, exchange, route.getEndpoint());
+                    LOG.trace("Created new XRay subsegment {} with name {}",
+                            subsegment.getId(), subsegment.getName());
+                } catch (AlreadyEmittedException aeEx) {
+                    LOG.warn("Ignoring opening of subsegment " + route.getId() + " as its parent segment "
+                            + segmentName + " was already emitted before.");
+                }
             }
         }
 
@@ -401,19 +425,25 @@ public class XRayTracer extends ServiceSupport implements RoutePolicyFactory, St
 
             LOG.trace("=> RoutePolicy-Done: Route: {} - RouteId: {}", routeId, route.getId());
 
-            SegmentDecorator sd = getSegmentDecorator(route.getEndpoint());
-            if (AWSXRay.getCurrentSubsegmentOptional().isPresent()) {
-                Subsegment subsegment = AWSXRay.getCurrentSubsegment();
-                sd.post(subsegment, exchange, route.getEndpoint());
-                subsegment.close();
-                LOG.trace("Closing down Subsegment {} with name {}",
-                        subsegment.getId(), subsegment.getName());
-            } else if (AWSXRay.getCurrentSegmentOptional().isPresent()) {
-                Segment segment = AWSXRay.getCurrentSegment();
-                sd.post(segment, exchange, route.getEndpoint());
-                segment.close();
-                LOG.trace("Closing down Segment {} with name {}",
-                        segment.getId(), segment.getName());
+            try {
+                SegmentDecorator sd = getSegmentDecorator(route.getEndpoint());
+                Optional<Segment> curSegment = AWSXRay.getCurrentSegmentOptional();
+                Optional<Subsegment> curSubSegment = AWSXRay.getCurrentSubsegmentOptional();
+                if (curSubSegment.isPresent()) {
+                    Subsegment subsegment = curSubSegment.get();
+                    sd.post(subsegment, exchange, route.getEndpoint());
+                    subsegment.close();
+                    LOG.trace("Closing down Subsegment {} with name {}",
+                            subsegment.getId(), subsegment.getName());
+                } else if (curSegment.isPresent()) {
+                    Segment segment = curSegment.get();
+                    sd.post(segment, exchange, route.getEndpoint());
+                    segment.close();
+                    LOG.trace("Closing down Segment {} with name {}",
+                            segment.getId(), segment.getName());
+                }
+            } catch (AlreadyEmittedException aeEx) {
+                LOG.warn("Ignoring closing of (sub)segment " + route.getId() + " as the segment was already emitted.");
             }
         }
 
diff --git a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/ABCRouteTest.java b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/ABCRouteTest.java
index 87b7a47..bbc1d89 100644
--- a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/ABCRouteTest.java
+++ b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/ABCRouteTest.java
@@ -16,9 +16,15 @@
  */
 package org.apache.camel.component.aws.xray;
 
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.Test;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+
 public class ABCRouteTest extends CamelAwsXRayTestSupport {
 
     public ABCRouteTest() {
@@ -43,8 +49,13 @@ public class ABCRouteTest extends CamelAwsXRayTestSupport {
 
     @Test
     public void testRoute() throws Exception {
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(5).create();
+
         template.requestBody("direct:start", "Hello");
 
+        assertThat("Not all exchanges were fully processed",
+                notify.matches(10, TimeUnit.SECONDS), is(equalTo(true)));
+
         verify();
     }
 
diff --git a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/BeanTracingTest.java b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/BeanTracingTest.java
index 5885648..eff47f9 100644
--- a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/BeanTracingTest.java
+++ b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/BeanTracingTest.java
@@ -16,17 +16,23 @@
  */
 package org.apache.camel.component.aws.xray;
 
+import java.util.concurrent.TimeUnit;
+
 import com.amazonaws.xray.AWSXRay;
 import org.apache.camel.Body;
 import org.apache.camel.Exchange;
 import org.apache.camel.Handler;
 import org.apache.camel.Processor;
 import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.spi.InterceptStrategy;
 import org.junit.Test;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+
 public class BeanTracingTest extends CamelAwsXRayTestSupport {
 
     public BeanTracingTest() {
@@ -52,6 +58,8 @@ public class BeanTracingTest extends CamelAwsXRayTestSupport {
 
     @Test
     public void testRoute() throws Exception {
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(2).create();
+
         MockEndpoint mockEndpoint = context.getEndpoint("mock:end", MockEndpoint.class);
         mockEndpoint.expectedMessageCount(1);
         mockEndpoint.expectedBodiesReceived("HELLO");
@@ -59,6 +67,9 @@ public class BeanTracingTest extends CamelAwsXRayTestSupport {
 
         template.requestBody("direct:start", "Hello");
 
+        assertThat("Not all exchanges were fully processed",
+                notify.matches(10, TimeUnit.SECONDS), is(equalTo(true)));
+
         mockEndpoint.assertIsSatisfied();
 
         verify();
diff --git a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/ClientRecipientListRouteTest.java b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/ClientRecipientListRouteTest.java
index 24881e0..c8bb760 100644
--- a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/ClientRecipientListRouteTest.java
+++ b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/ClientRecipientListRouteTest.java
@@ -16,9 +16,15 @@
  */
 package org.apache.camel.component.aws.xray;
 
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.Test;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+
 public class ClientRecipientListRouteTest extends CamelAwsXRayTestSupport {
 
     public ClientRecipientListRouteTest() {
@@ -37,8 +43,13 @@ public class ClientRecipientListRouteTest extends CamelAwsXRayTestSupport {
 
     @Test
     public void testRoute() throws Exception {
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(7).create();
+
         template.requestBody("direct:start", "Hello");
 
+        assertThat("Not all exchanges were fully processed",
+                notify.matches(5, TimeUnit.SECONDS), is(equalTo(true)));
+
         verify();
     }
 
diff --git a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/ComprehensiveTrackingTest.java b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/ComprehensiveTrackingTest.java
index 8cb6553..8d02ee2 100644
--- a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/ComprehensiveTrackingTest.java
+++ b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/ComprehensiveTrackingTest.java
@@ -16,7 +16,10 @@
  */
 package org.apache.camel.component.aws.xray;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.Handler;
+import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.aws.xray.bean.SomeBean;
 import org.junit.Test;
@@ -55,8 +58,13 @@ public class ComprehensiveTrackingTest extends CamelAwsXRayTestSupport {
 
     @Test
     public void testRoute() throws Exception {
+        NotifyBuilder notify = new NotifyBuilder(context).from("seda:test").whenDone(1).create();
+
         template.requestBody("direct:start", "Hello");
 
+        assertThat("Not all exchanges were fully processed",
+                notify.matches(10, TimeUnit.SECONDS), is(equalTo(true)));
+
         verify();
 
         assertThat(invokeChecker.gotInvoked(), is(equalTo(true)));
diff --git a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/CustomComponentTest.java b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/CustomComponentTest.java
new file mode 100644
index 0000000..67bceae
--- /dev/null
+++ b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/CustomComponentTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.xray;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.aws.xray.bean.ProcessingCamelBean;
+import org.apache.camel.component.aws.xray.component.CommonEndpoints;
+import org.apache.camel.spi.InterceptStrategy;
+import org.junit.Test;
+
+import static org.apache.camel.component.aws.xray.TestDataBuilder.createSegment;
+import static org.apache.camel.component.aws.xray.TestDataBuilder.createSubsegment;
+import static org.apache.camel.component.aws.xray.TestDataBuilder.createTrace;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertThat;
+
+/**
+ * This test uses a custom component that will trigger a long-running backing task for certain
+ * specific states. The task is forwarded via an asynchronous send to a Camel route which then
+ * performs the task, such as an upload or a computation.
+ * <p>
+ * AWS XRay does monitor the subsegment count per segment and only emits the segment to the local
+ * XRay daemon once the segment is closed and its internal count reaches 0. If the segment is closed
+ * before the counter reached 0 the segment is not emitted till the last subsegments belonging to
+ * that segment got closed.
+ * <p>
+ * Due to the asynchronous nature of the backing {@link ProcessingCamelBean processing camel bean},
+ * the first request is still in progress when the second request is triggered. As those tasks
+ * aren't executed in parallel, AWS XRay does not take notice of the seconds processing Camel bean
+ * invocation yet which leads to a premature emit of that segment and thus missing subsegments
+ * for the route and bean invocation. This is possible as the count of the segment reached 0 when
+ * the segment got closed as Camel has not had a chance yet to create the subsegments for the
+ * asynchronously executed route and its bean invocation.
+ */
+public class CustomComponentTest extends CamelAwsXRayTestSupport {
+
+    private static final String START = "seda:start";
+    private static final String DELIVERY = "seda:delivery";
+    private static final String IN_QUEUE = "seda:inqueue";
+    private static final String PERSISTENCE_QUEUE = "seda:persistence-queue";
+    private static final String PERSISTING = "seda:persisting";
+
+    public CustomComponentTest() {
+        super(
+                createTrace().inRandomOrder()
+                        .withSegment(createSegment("start")
+                                .withSubsegment(createSubsegment(DELIVERY))
+                        )
+                        .withSegment(createSegment("delivery")
+                                .withSubsegment(createSubsegment(CommonEndpoints.RECEIVED)
+                                        .withSubsegment(createSubsegment("backingTask")
+                                                .withSubsegment(createSubsegment("bean:ProcessingCamelBean"))
+                                        )
+                                        .withMetadata("state", "received")
+                                )
+                                .withSubsegment(createSubsegment(IN_QUEUE))
+                        )
+                        .withSegment(createSegment("processing")
+                                .withSubsegment(createSubsegment(CommonEndpoints.PROCESSING))
+                                .withSubsegment(createSubsegment(PERSISTENCE_QUEUE))
+                        )
+                        .withSegment(createSegment("wait-for-persisting")
+                                .withSubsegment(createSubsegment(CommonEndpoints.PERSISTENCE_QUEUE))
+                                .withSubsegment(createSubsegment(PERSISTING))
+                        )
+                        .withSegment(createSegment("persisting")
+                                .withSubsegment(createSubsegment(CommonEndpoints.READY)
+                                        // not available due to the asynchronous, long-running nature of the processing
+                                        // bean. If the sleep is commented out in the bean, this subsegments should be
+                                        // available
+//                                        .withSubsegment(createSubsegment("backingTask")
+//                                                .withSubsegment(createSubsegment("bean:ProcessingCamelBean"))
+//                                        )
+//                                        .withMetadata("state", "ready")
+                                )
+                        )
+        );
+    }
+
+    @Override
+    protected InterceptStrategy getTracingStrategy() {
+        return new TraceAnnotatedTracingStrategy();
+    }
+
+    @Test
+    public void testRoute() {
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(7).create();
+
+        template.requestBody(START, "Hello");
+
+        assertThat("Not all exchanges were fully processed",
+                notify.matches(10, TimeUnit.SECONDS), is(equalTo(true)));
+
+        verify();
+
+        assertThat(ProcessingCamelBean.gotInvoked(), is(greaterThan(0)));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from(START).routeId("start")
+                        .log("Starting test")
+                        .inOnly(DELIVERY);
+
+                from(DELIVERY).routeId("delivery")
+                        .log("Doing some stuff")
+                        .to(CommonEndpoints.RECEIVED)
+                        .delay(100)
+                        .inOnly(IN_QUEUE);
+
+                from(IN_QUEUE).routeId("processing")
+                        .log("Do some more stuff")
+                        .to(CommonEndpoints.PROCESSING)
+                        .delay(100)
+                        .inOnly(PERSISTENCE_QUEUE);
+
+                from(PERSISTENCE_QUEUE).routeId("wait-for-persisting")
+                        .log("Waiting on available persisting instance")
+                        .to(CommonEndpoints.PERSISTENCE_QUEUE)
+                        .delay(100)
+                        .inOnly(PERSISTING);
+
+                from(PERSISTING).routeId("persisting")
+                        .log("Payload ready for usage")
+                        .to(CommonEndpoints.READY)
+                        .delay(100)
+                        .log("done");
+
+                from("seda:backingTask").routeId("backingTask")
+                        .onException(Exception.class)
+                        .redeliveryDelay(100L)
+                        .onRedelivery((Exchange exchange) -> System.err.println(">> Retrying due to "
+                                + exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class).getLocalizedMessage()))
+                        .logExhausted(true)
+                        .handled(true)
+                        .logStackTrace(true)
+                        .end()
+
+                        .log("routing at ${routeId}")
+                        .bean(ProcessingCamelBean.class)
+                        .log("processing camel bean invoked");
+            }
+        };
+    }
+}
diff --git a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/EIPTracingTest.java b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/EIPTracingTest.java
index 49bfc6e..09218ce 100644
--- a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/EIPTracingTest.java
+++ b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/EIPTracingTest.java
@@ -16,17 +16,23 @@
  */
 package org.apache.camel.component.aws.xray;
 
+import java.util.concurrent.TimeUnit;
+
 import com.amazonaws.xray.AWSXRay;
 import org.apache.camel.Body;
 import org.apache.camel.Exchange;
 import org.apache.camel.Handler;
 import org.apache.camel.Processor;
 import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.spi.InterceptStrategy;
 import org.junit.Test;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+
 public class EIPTracingTest extends CamelAwsXRayTestSupport {
 
     public EIPTracingTest() {
@@ -61,6 +67,8 @@ public class EIPTracingTest extends CamelAwsXRayTestSupport {
 
     @Test
     public void testRoute() throws Exception {
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(2).create();
+
         MockEndpoint mockEndpoint = context.getEndpoint("mock:end", MockEndpoint.class);
         mockEndpoint.expectedMessageCount(1);
         mockEndpoint.expectedBodiesReceived("HELLO");
@@ -68,6 +76,9 @@ public class EIPTracingTest extends CamelAwsXRayTestSupport {
 
         template.requestBody("direct:start", "Hello");
 
+        assertThat("Not all exchanges were fully processed",
+                notify.matches(5, TimeUnit.SECONDS), is(equalTo(true)));
+
         mockEndpoint.assertIsSatisfied();
 
         verify();
diff --git a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/ErrorHandlingTest.java b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/ErrorHandlingTest.java
index eb74e5b..e8bfc2a 100644
--- a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/ErrorHandlingTest.java
+++ b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/ErrorHandlingTest.java
@@ -17,12 +17,15 @@
 package org.apache.camel.component.aws.xray;
 
 import java.lang.invoke.MethodHandles;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.Body;
 import org.apache.camel.Exchange;
 import org.apache.camel.Handler;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.Processor;
 import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.spi.InterceptStrategy;
@@ -30,6 +33,9 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+
 public class ErrorHandlingTest extends CamelAwsXRayTestSupport {
 
     private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -91,12 +97,17 @@ public class ErrorHandlingTest extends CamelAwsXRayTestSupport {
 
     @Test
     public void testRoute() throws Exception {
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(2).create();
+
         MockEndpoint mockEndpoint = context.getEndpoint("mock:end", MockEndpoint.class);
         mockEndpoint.expectedMessageCount(1);
         mockEndpoint.expectedBodiesReceived("HELLO");
 
         template.requestBody("direct:start", "Hello");
 
+        assertThat("Not all exchanges were fully processed",
+                notify.matches(5, TimeUnit.SECONDS), is(equalTo(true)));
+
         mockEndpoint.assertIsSatisfied();
 
         verify();
diff --git a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/ErrorTest.java b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/ErrorTest.java
index 3aebce3..a06c805 100644
--- a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/ErrorTest.java
+++ b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/ErrorTest.java
@@ -17,18 +17,25 @@
 package org.apache.camel.component.aws.xray;
 
 import java.lang.invoke.MethodHandles;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.Body;
 import org.apache.camel.Exchange;
 import org.apache.camel.Handler;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.Processor;
 import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.spi.InterceptStrategy;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
 public class ErrorTest extends CamelAwsXRayTestSupport {
 
     private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -85,8 +92,13 @@ public class ErrorTest extends CamelAwsXRayTestSupport {
 
     @Test
     public void testRoute() throws Exception {
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(1).create();
+
         template.requestBody("direct:start", "Hello");
 
+        assertThat("Not all exchanges were fully processed",
+                notify.matches(5, TimeUnit.SECONDS), is(equalTo(true)));
+
         verify();
     }
 
diff --git a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/MulticastParallelRouteTest.java b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/MulticastParallelRouteTest.java
index 1c83f1b..0c08f98 100644
--- a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/MulticastParallelRouteTest.java
+++ b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/MulticastParallelRouteTest.java
@@ -16,9 +16,16 @@
  */
 package org.apache.camel.component.aws.xray;
 
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.Test;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
 public class MulticastParallelRouteTest extends CamelAwsXRayTestSupport {
 
     public MulticastParallelRouteTest() {
@@ -41,8 +48,17 @@ public class MulticastParallelRouteTest extends CamelAwsXRayTestSupport {
 
     @Test
     public void testRoute() throws Exception {
+        NotifyBuilder notify = new NotifyBuilder(context)
+                .from("seda:b").whenDone(1)
+                .and()
+                .from("seda:c").whenDone(1)
+                .create();
+
         template.requestBody("direct:start", "Hello");
 
+        assertThat("Not all exchanges were fully processed",
+                notify.matches(5, TimeUnit.SECONDS), is(equalTo(true)));
+
         verify();
     }
 
diff --git a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/MulticastRouteTest.java b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/MulticastRouteTest.java
index cf32331..a758202 100644
--- a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/MulticastRouteTest.java
+++ b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/MulticastRouteTest.java
@@ -16,9 +16,16 @@
  */
 package org.apache.camel.component.aws.xray;
 
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.Test;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
 public class MulticastRouteTest extends CamelAwsXRayTestSupport {
 
     public MulticastRouteTest() {
@@ -41,8 +48,17 @@ public class MulticastRouteTest extends CamelAwsXRayTestSupport {
 
     @Test
     public void testRoute() throws Exception {
+        NotifyBuilder notify = new NotifyBuilder(context)
+                .from("seda:b").whenDone(1)
+                .and()
+                .from("seda:c").whenDone(1)
+                .create();
+
         template.requestBody("direct:start", "Hello");
 
+        assertThat("Not all exchanges were fully processed",
+                notify.matches(5, TimeUnit.SECONDS), is(equalTo(true)));
+
         verify();
     }
 
diff --git a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/Route2ConcurrentTest.java b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/Route2ConcurrentTest.java
index bb27693..228b0f8 100644
--- a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/Route2ConcurrentTest.java
+++ b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/Route2ConcurrentTest.java
@@ -21,6 +21,10 @@ import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.Test;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
 public class Route2ConcurrentTest extends CamelAwsXRayTestSupport {
 
     public Route2ConcurrentTest() {
@@ -51,7 +55,8 @@ public class Route2ConcurrentTest extends CamelAwsXRayTestSupport {
             template.sendBody("seda:foo", "Hello World");
         }
 
-        assertTrue(notify.matches(30, TimeUnit.SECONDS));
+        assertThat("Not all exchanges were fully processed",
+                notify.matches(10, TimeUnit.SECONDS), is(equalTo(true)));
 
         verify();
     }
diff --git a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/RouteConcurrentTest.java b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/RouteConcurrentTest.java
index d5ee9b0..514d876 100644
--- a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/RouteConcurrentTest.java
+++ b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/RouteConcurrentTest.java
@@ -21,6 +21,10 @@ import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.Test;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
 public class RouteConcurrentTest extends CamelAwsXRayTestSupport {
 
     public RouteConcurrentTest() {
@@ -37,7 +41,8 @@ public class RouteConcurrentTest extends CamelAwsXRayTestSupport {
 
         template.sendBody("seda:foo", "Hello World");
 
-        assertTrue(notify.matches(30, TimeUnit.SECONDS));
+        assertThat("Not all exchanges were fully processed",
+                notify.matches(10, TimeUnit.SECONDS), is(equalTo(true)));
 
         verify();
     }
diff --git a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/SpringAwsXRaySimpleRouteTest.java b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/SpringAwsXRaySimpleRouteTest.java
index acb6f1a..962d170 100644
--- a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/SpringAwsXRaySimpleRouteTest.java
+++ b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/SpringAwsXRaySimpleRouteTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.aws.xray;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.component.aws.xray.TestDataBuilder.TestTrace;
 import org.apache.camel.test.spring.CamelSpringTestSupport;
@@ -27,6 +28,10 @@ import org.junit.Test;
 import org.springframework.context.support.AbstractApplicationContext;
 import org.springframework.context.support.ClassPathXmlApplicationContext;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
 public class SpringAwsXRaySimpleRouteTest extends CamelSpringTestSupport {
 
     @Rule
@@ -45,7 +50,8 @@ public class SpringAwsXRaySimpleRouteTest extends CamelSpringTestSupport {
             template.sendBody("seda:dude", "Hello World");
         }
 
-        assertTrue(notify.matches(30, TimeUnit.SECONDS));
+        assertThat("Not all exchanges were fully processed",
+                notify.matches(30, TimeUnit.SECONDS), is(equalTo(true)));
 
         List<TestTrace> testData = Arrays.asList(
         TestDataBuilder.createTrace()
diff --git a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/TestUtils.java b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/TestUtils.java
index 67cf2d8..1882c49 100644
--- a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/TestUtils.java
+++ b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/TestUtils.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.aws.xray;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import org.apache.camel.component.aws.xray.TestDataBuilder.TestSegment;
 import org.apache.camel.component.aws.xray.TestDataBuilder.TestSubsegment;
@@ -48,7 +49,9 @@ public final class TestUtils {
     }
 
     private static void verifyTraces(TestTrace expected, TestTrace actual) {
-        assertThat("Incorrect number of segment for trace",
+        assertThat("Incorrect number of segment for trace. Expected traces: "
+                        + expected.getSegments().stream().map(s -> s.name).collect(Collectors.toList())
+                        + " but found " + actual.getSegments().stream().map(s -> s.name).collect(Collectors.toList()),
                 actual.getSegments().size(), is(equalTo(expected.getSegments().size())));
         List<TestSegment> expectedSegments = new ArrayList<>(expected.getSegments());
         List<TestSegment> actualSegments = new ArrayList<>(actual.getSegments());
@@ -77,7 +80,8 @@ public final class TestUtils {
     }
 
     private static void verifySegments(TestSegment expected, TestSegment actual) {
-        assertThat("Incorrect name of segment",
+        assertThat("Incorrect name of segment. Expected segment name: "
+                        + expected.getName() + " but found: " + actual.getName(),
                 actual.getName(), is(equalTo(expected.getName())));
 
         boolean randomOrder = expected.isRandomOrder();
@@ -103,7 +107,8 @@ public final class TestUtils {
     }
 
     private static void verifySubsegments(TestSubsegment expected, TestSubsegment actual) {
-        assertThat("Incorrect name of subsegment",
+        assertThat("Incorrect name of subsegment. Expected " + actual.getName()
+                        + " but found: " + actual.getName(),
                 actual.getName(), is(equalTo(expected.getName())));
 
         boolean randomOrder = expected.isRandomOrder();
@@ -111,6 +116,8 @@ public final class TestUtils {
             if (randomOrder) {
                 checkSubsegmentInRandomOrder(expected.getSubsegments(), actual.getSubsegments());
             } else {
+                assertThat("Incorrect number of subsegments found in " + actual,
+                        actual.getSubsegments().size(), is(equalTo(expected.getSubsegments().size())));
                 for (int i = 0; i < expected.getSubsegments().size(); i++) {
                     verifySubsegments(expected.getSubsegments().get(i), actual.getSubsegments().get(i));
                 }
diff --git a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/TwoService2Test.java b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/TwoService2Test.java
index f82cd52..cf6fab6 100644
--- a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/TwoService2Test.java
+++ b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/TwoService2Test.java
@@ -16,10 +16,17 @@
  */
 package org.apache.camel.component.aws.xray;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.Test;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
 public class TwoService2Test extends CamelAwsXRayTestSupport {
 
     public TwoService2Test() {
@@ -35,9 +42,13 @@ public class TwoService2Test extends CamelAwsXRayTestSupport {
 
     @Test
     public void testRoute() throws Exception {
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(1).create();
+
         template.requestBody("direct:ServiceA", "Hello");
 
-        Thread.sleep(500);
+        assertThat("Not all exchanges were fully processed",
+                notify.matches(10, TimeUnit.SECONDS), is(equalTo(true)));
+
         verify();
     }
 
diff --git a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/TwoServiceTest.java b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/TwoServiceTest.java
index a490be8..89faa75 100644
--- a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/TwoServiceTest.java
+++ b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/TwoServiceTest.java
@@ -16,10 +16,17 @@
  */
 package org.apache.camel.component.aws.xray;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.Test;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
 public class TwoServiceTest extends CamelAwsXRayTestSupport {
 
     public TwoServiceTest() {
@@ -35,9 +42,12 @@ public class TwoServiceTest extends CamelAwsXRayTestSupport {
 
     @Test
     public void testRoute() throws Exception {
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(1).create();
+
         template.requestBody("direct:ServiceA", "Hello");
 
-        Thread.sleep(500);
+        assertThat("Not all exchanges were fully processed",
+                notify.matches(10, TimeUnit.SECONDS), is(equalTo(true)));
         verify();
     }
 
diff --git a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/TwoServiceWithExcludeTest.java b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/TwoServiceWithExcludeTest.java
index 96a0d82..9a78792 100644
--- a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/TwoServiceWithExcludeTest.java
+++ b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/TwoServiceWithExcludeTest.java
@@ -18,10 +18,17 @@ package org.apache.camel.component.aws.xray;
 
 import java.util.Collections;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.Test;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
 public class TwoServiceWithExcludeTest extends CamelAwsXRayTestSupport {
 
     public TwoServiceWithExcludeTest() {
@@ -40,9 +47,13 @@ public class TwoServiceWithExcludeTest extends CamelAwsXRayTestSupport {
 
     @Test
     public void testRoute() throws Exception {
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(1).create();
+
         template.requestBody("direct:ServiceA", "Hello");
 
-        Thread.sleep(500);
+        assertThat("Not all exchanges were fully processed",
+                notify.matches(10, TimeUnit.SECONDS), is(equalTo(true)));
+
         verify();
     }
 
diff --git a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/bean/ProcessingCamelBean.java b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/bean/ProcessingCamelBean.java
new file mode 100644
index 0000000..9f24acb
--- /dev/null
+++ b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/bean/ProcessingCamelBean.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.xray.bean;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.Handler;
+import org.apache.camel.component.aws.xray.XRayTrace;
+
+@XRayTrace
+public class ProcessingCamelBean {
+
+    private static final AtomicInteger INVOKED = new AtomicInteger(0);
+
+    @Handler
+    public void performTask() {
+
+        INVOKED.incrementAndGet();
+
+        try {
+            // sleep 5 seconds
+            Thread.sleep(3000);
+        } catch (InterruptedException iEx) {
+            // do nothing
+        }
+    }
+
+    public static int gotInvoked() {
+        return INVOKED.get();
+    }
+}
diff --git a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/bean/SomeBackingService.java b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/bean/SomeBackingService.java
new file mode 100644
index 0000000..0a24939
--- /dev/null
+++ b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/bean/SomeBackingService.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.xray.bean;
+
+import java.util.UUID;
+
+import com.amazonaws.xray.AWSXRay;
+import com.amazonaws.xray.entities.Entity;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.aws.xray.XRayTracer;
+
+public class SomeBackingService {
+
+    private final Endpoint targetEndpoint;
+    private final ProducerTemplate template;
+
+    public SomeBackingService(CamelContext context) {
+        targetEndpoint = context.getEndpoint("seda:backingTask");
+        template = context.createProducerTemplate();
+    }
+
+    public String performMethod(byte[] body, String state, String traceId) {
+
+        String key = UUID.randomUUID().toString();
+
+        Entity traceEntity = AWSXRay.getGlobalRecorder().getTraceEntity();
+        traceEntity.putMetadata("state", state);
+
+        Exchange newExchange = targetEndpoint.createExchange(ExchangePattern.InOnly);
+        newExchange.getIn().setBody(body);
+        newExchange.getIn().setHeader("KEY", key);
+        newExchange.getIn().setHeader(XRayTracer.XRAY_TRACE_ID, traceId);
+        newExchange.getIn().setHeader(XRayTracer.XRAY_TRACE_ENTITY, traceEntity);
+        template.asyncSend(targetEndpoint, newExchange);
+
+        return key;
+    }
+}
diff --git a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/component/CommonEndpoints.java b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/component/CommonEndpoints.java
new file mode 100644
index 0000000..7f67489
--- /dev/null
+++ b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/component/CommonEndpoints.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.xray.component;
+
+public final class CommonEndpoints {
+
+    public static final String RECEIVED = "xray-test:received";
+    public static final String PROCESSING = "xray-test:processing";
+    public static final String PERSISTENCE_QUEUE = "xray-test:persistence-queue";
+    public static final String READY = "xray-test:ready";
+
+    private CommonEndpoints() {
+
+    }
+}
diff --git a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/component/TestXRayComponent.java b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/component/TestXRayComponent.java
new file mode 100644
index 0000000..5efd581
--- /dev/null
+++ b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/component/TestXRayComponent.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.xray.component;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+
+public class TestXRayComponent extends DefaultComponent {
+
+    public TestXRayComponent() {
+        super();
+    }
+
+    public TestXRayComponent(final CamelContext context) {
+        super(context);
+    }
+
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) {
+        return new TestXRayEndpoint(uri, remaining, this);
+    }
+}
diff --git a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/component/TestXRayEndpoint.java b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/component/TestXRayEndpoint.java
new file mode 100644
index 0000000..d2b7a77
--- /dev/null
+++ b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/component/TestXRayEndpoint.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.xray.component;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+
+public class TestXRayEndpoint extends DefaultEndpoint {
+
+    private final String remaining;
+
+    public TestXRayEndpoint(final String uri, final String remaining, final TestXRayComponent component) {
+        super(uri, component);
+
+        this.remaining = remaining;
+    }
+
+    @Override
+    public TestXRayComponent getComponent() {
+        return (TestXRayComponent) super.getComponent();
+    }
+
+    @Override
+    public Producer createProducer() {
+        return new TestXRayProducer(this, remaining);
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) {
+        throw new UnsupportedOperationException("You cannot create a Consumer for message monitoring");
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+}
diff --git a/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/component/TestXRayProducer.java b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/component/TestXRayProducer.java
new file mode 100644
index 0000000..51fbb8e
--- /dev/null
+++ b/components/camel-aws-xray/src/test/java/org/apache/camel/component/aws/xray/component/TestXRayProducer.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.xray.component;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.aws.xray.XRayTracer;
+import org.apache.camel.component.aws.xray.bean.SomeBackingService;
+import org.apache.camel.impl.DefaultProducer;
+
+public class TestXRayProducer extends DefaultProducer {
+
+    private final SomeBackingService backingService;
+    private final String state;
+
+    public TestXRayProducer(final TestXRayEndpoint endpoint, String state) {
+        super(endpoint);
+
+        this.state = state;
+        backingService = new SomeBackingService(endpoint.getCamelContext());
+    }
+
+    @Override
+    public void process(Exchange exchange) {
+
+        byte[] body = exchange.getIn().getBody(byte[].class);
+
+        if (trim(CommonEndpoints.RECEIVED).equals(this.state)
+                || trim(CommonEndpoints.READY).equals(this.state)) {
+
+            String traceId = exchange.getIn().getHeader(XRayTracer.XRAY_TRACE_ID, String.class);
+            backingService.performMethod(body, state, traceId);
+        }
+    }
+
+    private static String trim(String endpoint) {
+        return endpoint.substring(endpoint.indexOf(":") + 1);
+    }
+}
diff --git a/components/camel-aws-xray/src/test/resources/META-INF/services/org/apache/camel/component/xray-test b/components/camel-aws-xray/src/test/resources/META-INF/services/org/apache/camel/component/xray-test
new file mode 100644
index 0000000..62818f5
--- /dev/null
+++ b/components/camel-aws-xray/src/test/resources/META-INF/services/org/apache/camel/component/xray-test
@@ -0,0 +1 @@
+class=org.apache.camel.component.aws.xray.component.TestXRayComponent
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
davsclaus@apache.org.