You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2019/08/21 11:53:47 UTC

[qpid-jms] branch master updated: QPIDJMS-471: add support for message tracing using OpenTracing

This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git


The following commit(s) were added to refs/heads/master by this push:
     new 94822ac  QPIDJMS-471: add support for message tracing using OpenTracing
94822ac is described below

commit 94822aced1380bae7935db6305250bf6bea7a9ac
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Tue Aug 20 18:27:57 2019 +0100

    QPIDJMS-471: add support for message tracing using OpenTracing
---
 pom.xml                                            |   30 +-
 qpid-jms-client/pom.xml                            |   19 +
 .../java/org/apache/qpid/jms/JmsConnection.java    |    6 +
 .../org/apache/qpid/jms/JmsConnectionFactory.java  |   64 +-
 .../org/apache/qpid/jms/JmsMessageConsumer.java    |   37 +-
 .../qpid/jms/message/facade/JmsMessageFacade.java  |   11 +-
 .../apache/qpid/jms/meta/JmsConnectionInfo.java    |   13 +
 .../qpid/jms/provider/amqp/AmqpFixedProducer.java  |   14 +-
 .../amqp/message/AmqpJmsMessageFacade.java         |   76 ++
 .../org/apache/qpid/jms/tracing/JmsNoOpTracer.java |   49 +
 .../qpid/jms/tracing/JmsNoOpTracerFactory.java     |   39 +
 .../org/apache/qpid/jms/tracing/JmsTracer.java     |   39 +
 .../apache/qpid/jms/tracing/JmsTracerFactory.java  |   76 ++
 .../apache/qpid/jms/tracing/TraceableMessage.java  |  112 ++
 .../jms/tracing/opentracing/OpenTracingTracer.java |  226 ++++
 .../opentracing/OpenTracingTracerFactory.java      |   75 ++
 .../services/org/apache/qpid/jms/tracing/noop      |   17 +
 .../org/apache/qpid/jms/tracing/opentracing        |   17 +
 .../message/facade/test/JmsTestMessageFacade.java  |   35 +
 .../amqp/message/AmqpJmsMessageFacadeTest.java     |    7 +-
 .../sections/MessageAnnotationsSectionMatcher.java |    8 +
 .../qpid/jms/tracing/JmsNoopTracerFactoryTest.java |   51 +
 .../opentracing/OpenTracingIntegrationTest.java    | 1350 ++++++++++++++++++++
 .../opentracing/OpenTracingTracerFactoryTest.java  |   82 ++
 .../tracing/opentracing/OpenTracingTracerTest.java |  453 +++++++
 qpid-jms-docs/Configuration.md                     |   43 +
 26 files changed, 2935 insertions(+), 14 deletions(-)

diff --git a/pom.xml b/pom.xml
index 71a1b37..f36f045 100644
--- a/pom.xml
+++ b/pom.xml
@@ -44,6 +44,9 @@
     <slf4j-version>1.7.25</slf4j-version>
     <geronimo.jms.2.spec.version>1.0-alpha-2</geronimo.jms.2.spec.version>
 
+    <!-- 'Provided'/Test Dependency Versions for this Project -->
+    <opentracing-version>0.33.0</opentracing-version>
+
     <!-- Test Dependency Versions for this Project -->
     <netty-tcnative-version>2.0.25.Final</netty-tcnative-version>
     <activemq-version>5.15.9</activemq-version>
@@ -162,11 +165,18 @@
         <artifactId>netty-codec-http</artifactId>
         <version>${netty-version}</version>
       </dependency>
-      <!--  Testing only Uber Jar inclusion -->
+      <!--  Provided dependencies -->
       <dependency>
-         <groupId>io.netty</groupId>
-         <artifactId>netty-tcnative-boringssl-static</artifactId>
-         <version>${netty-tcnative-version}</version>
+        <groupId>io.opentracing</groupId>
+        <artifactId>opentracing-api</artifactId>
+        <version>${opentracing-version}</version>
+        <scope>provided</scope>
+      </dependency>
+      <dependency>
+        <groupId>io.opentracing</groupId>
+        <artifactId>opentracing-util</artifactId>
+        <version>${opentracing-version}</version>
+        <scope>provided</scope>
       </dependency>
       <!-- Logging impl dependency, e.g for the tests or examples -->
       <dependency>
@@ -176,6 +186,18 @@
       </dependency>
       <!-- Test dependencies -->
       <dependency>
+        <groupId>io.opentracing</groupId>
+        <artifactId>opentracing-mock</artifactId>
+        <version>${opentracing-version}</version>
+        <scope>test</scope>
+      </dependency>
+      <dependency>
+         <groupId>io.netty</groupId>
+         <artifactId>netty-tcnative-boringssl-static</artifactId>
+         <version>${netty-tcnative-version}</version>
+         <scope>test</scope>
+      </dependency>
+      <dependency>
         <groupId>junit</groupId>
         <artifactId>junit</artifactId>
         <version>${junit-version}</version>
diff --git a/qpid-jms-client/pom.xml b/qpid-jms-client/pom.xml
index 8ea5b68..8714560 100644
--- a/qpid-jms-client/pom.xml
+++ b/qpid-jms-client/pom.xml
@@ -76,6 +76,20 @@
     </dependency>
 
     <!-- =================================== -->
+    <!-- Provided Dependencies               -->
+    <!-- =================================== -->
+    <dependency>
+      <groupId>io.opentracing</groupId>
+      <artifactId>opentracing-api</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.opentracing</groupId>
+      <artifactId>opentracing-util</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <!-- =================================== -->
     <!-- Testing Dependencies                -->
     <!-- =================================== -->
     <dependency>
@@ -108,6 +122,11 @@
       <artifactId>netty-tcnative-boringssl-static</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>io.opentracing</groupId>
+      <artifactId>opentracing-mock</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 2cb59fe..5afcb3f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -81,6 +81,7 @@ import org.apache.qpid.jms.provider.ProviderException;
 import org.apache.qpid.jms.provider.ProviderFuture;
 import org.apache.qpid.jms.provider.ProviderListener;
 import org.apache.qpid.jms.provider.ProviderSynchronization;
+import org.apache.qpid.jms.tracing.JmsTracer;
 import org.apache.qpid.jms.util.FifoMessageQueue;
 import org.apache.qpid.jms.util.MessageQueue;
 import org.apache.qpid.jms.util.PriorityMessageQueue;
@@ -243,6 +244,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
                 started.set(false);
                 closing.set(false);
                 closed.set(true);
+                connectionInfo.getTracer().close();
             }
         } catch (Exception e) {
             throw JmsExceptionSupport.create(e);
@@ -1174,6 +1176,10 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         connectionInfo.setCloseLinksThatFailOnReconnect(closeLinksThatFailOnReconnect);
     }
 
+    JmsTracer getTracer() {
+        return connectionInfo.getTracer();
+    }
+
     //----- Async event handlers ---------------------------------------------//
 
     @Override
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
index eb0427d..1bf65d9 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
@@ -52,6 +52,9 @@ import org.apache.qpid.jms.policy.JmsPresettlePolicy;
 import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
 import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderFactory;
+import org.apache.qpid.jms.tracing.JmsNoOpTracer;
+import org.apache.qpid.jms.tracing.JmsTracer;
+import org.apache.qpid.jms.tracing.JmsTracerFactory;
 import org.apache.qpid.jms.util.IdGenerator;
 import org.apache.qpid.jms.util.PropertyUtil;
 import org.apache.qpid.jms.util.URISupport;
@@ -103,6 +106,8 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
     private IdGenerator connectionIdGenerator;
     private String connectionIDPrefix;
     private ExceptionListener exceptionListener;
+    private String tracing;
+    private JmsTracer tracer;
 
     private JmsPrefetchPolicy prefetchPolicy = new JmsDefaultPrefetchPolicy();
     private JmsRedeliveryPolicy redeliveryPolicy = new JmsDefaultRedeliveryPolicy();
@@ -246,6 +251,7 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
     }
 
     protected JmsConnectionInfo configureConnectionInfo(String username, String password) throws JMSException {
+        JmsTracer implicitTracer = JmsNoOpTracer.INSTANCE;
         try {
             Map<String, String> properties = PropertyUtil.getProperties(this);
             // Pull out the clientID prop, we need to act differently according to
@@ -269,6 +275,13 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
             connectionInfo.setDeserializationPolicy(deserializationPolicy.copy());
             connectionInfo.getExtensionMap().putAll(extensionMap);
 
+            if(tracer != null) {
+                connectionInfo.setTracer(tracer);
+            } else if(tracing != null) {
+                implicitTracer = JmsTracerFactory.create(remoteURI, tracing);
+                connectionInfo.setTracer(implicitTracer);
+            }
+
             // Set properties to make additional configuration changes
             PropertyUtil.setProperties(connectionInfo, properties);
 
@@ -287,6 +300,10 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
 
             return connectionInfo;
         } catch (Exception e) {
+            try {
+                implicitTracer.close();
+            } catch (Throwable ignored) {}
+
             throw JmsExceptionSupport.create(e);
         }
     }
@@ -916,7 +933,6 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
         this.useDaemonThread = useDaemonThread;
     }
 
-
     /**
      * @return whether links that fail to be created during failover reconnect are closed or not.
      */
@@ -965,6 +981,52 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
         }
     }
 
+    /**
+     * Sets the type name for a tracing provider to use for the connection(s) created using the factory.
+     *
+     * @param tracing
+     *            The tracing provider type name to set
+     */
+    public void setTracing(String tracing) {
+        this.tracing = tracing;
+    }
+
+    public String getTracing() {
+        return tracing;
+    }
+
+    /**
+     * Explicitly sets a tracer instance for use by the connection(s) created from the factory.
+     *
+     * Using this method overrides any implicit creation of a tracer due to use of either URI configuration option
+     * or the {@link JmsConnectionFactory#setTracing(String)} method.
+     *
+     * The provided tracer will have its close method called when a created connection/context is closed,
+     * so if a tracer is to be used across multiple such connections created by this factory then the close
+     * method should handle that appropriately, e.g no-op and have the application and/or underlying tracing
+     * implementation cleanup at shutdown. If no Connection/JMSContext object is returned from a creation
+     * attempt due to an exception being thrown, the tracer provided will not have its close method called
+     * and the application or underlying tracing implementation is responsible for any cleanup required.
+     *
+     * @param tracer
+     *            The tracer to set
+     */
+    public void setTracer(JmsTracer tracer) {
+        this.tracer = tracer;
+    }
+
+    /**
+     * Gets any tracer previously set explicitly on the connection factory using {@link #setTracer(JmsTracer)}.
+     *
+     * Does not return any tracer created implicitly due to use of either URI configuration option
+     * or the {@link JmsConnectionFactory#setTracing(String)} method.
+     *
+     * @return the tracer previously set, or null if none was set.
+     */
+    public JmsTracer getTracer() {
+        return tracer;
+    }
+
     //----- Static Methods ---------------------------------------------------//
 
     /**
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 7cec9b5..938d5a3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -46,6 +46,9 @@ import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.ProviderException;
 import org.apache.qpid.jms.provider.ProviderFuture;
 import org.apache.qpid.jms.provider.ProviderSynchronization;
+import org.apache.qpid.jms.tracing.JmsTracer;
+import org.apache.qpid.jms.tracing.JmsTracer.DeliveryOutcome;
+import org.apache.qpid.jms.tracing.TraceableMessage;
 import org.apache.qpid.jms.util.FifoMessageQueue;
 import org.apache.qpid.jms.util.MessageQueue;
 import org.apache.qpid.jms.util.PriorityMessageQueue;
@@ -71,6 +74,8 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
     protected final Lock dispatchLock = new ReentrantLock();
     protected final AtomicReference<Throwable> failureCause = new AtomicReference<>();
     protected final MessageDeliverTask deliveryTask = new MessageDeliverTask();
+    protected final JmsTracer tracer;
+    protected final String address;
 
     protected JmsMessageConsumer(JmsConsumerId consumerId, JmsSession session, JmsDestination destination,
                                  String selector, boolean noLocal) throws JMSException {
@@ -81,6 +86,8 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
                                  String name, String selector, boolean noLocal) throws JMSException {
         this.session = session;
         this.connection = session.getConnection();
+        this.tracer = connection.getTracer();
+        this.address = destination.getAddress();
         this.acknowledgementMode = isBrowser() ? Session.AUTO_ACKNOWLEDGE : session.acknowledgementMode();
 
         if (destination.isTemporary()) {
@@ -330,9 +337,17 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
                             // closed until future pulls were performed.
                         }
                     }
-                } else if (consumeExpiredMessage(envelope)) {
+
+                    continue;
+                }
+
+                TraceableMessage facade = envelope.getMessage().getFacade();
+
+                if (consumeExpiredMessage(envelope)) {
                     LOG.trace("{} filtered expired message: {}", getConsumerId(), envelope);
                     doAckExpired(envelope);
+                    tracer.syncReceive(facade, address, DeliveryOutcome.EXPIRED);
+
                     if (timeout > 0) {
                         timeout = Math.max(deadline - System.currentTimeMillis(), 0);
                     }
@@ -340,6 +355,8 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
                 } else if (session.redeliveryExceeded(envelope)) {
                     LOG.debug("{} filtered message with excessive redelivery count: {}", getConsumerId(), envelope);
                     applyRedeliveryPolicyOutcome(envelope);
+                    tracer.syncReceive(facade, address, DeliveryOutcome.REDELIVERIES_EXCEEDED);
+
                     if (timeout > 0) {
                         timeout = Math.max(deadline - System.currentTimeMillis(), 0);
                     }
@@ -348,6 +365,9 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
                     if (LOG.isTraceEnabled()) {
                         LOG.trace(getConsumerId() + " received message: " + envelope);
                     }
+
+                    tracer.syncReceive(facade, address, DeliveryOutcome.DELIVERED);
+
                     return envelope;
                 }
             }
@@ -725,15 +745,21 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
                     return false;
                 }
 
-                JmsMessage copy = null;
+                TraceableMessage facade = envelope.getMessage().getFacade();
 
                 if (consumeExpiredMessage(envelope)) {
                     LOG.trace("{} filtered expired message: {}", getConsumerId(), envelope);
                     doAckExpired(envelope);
+                    tracer.asyncDeliveryInit(facade, address);
+                    tracer.asyncDeliveryComplete(facade, DeliveryOutcome.EXPIRED, null);
                 } else if (session.redeliveryExceeded(envelope)) {
                     LOG.trace("{} filtered message with excessive redelivery count: {}", getConsumerId(), envelope);
                     applyRedeliveryPolicyOutcome(envelope);
+                    tracer.asyncDeliveryInit(facade, address);
+                    tracer.asyncDeliveryComplete(facade, DeliveryOutcome.REDELIVERIES_EXCEEDED, null);
                 } else {
+                    final JmsMessage copy;
+
                     boolean deliveryFailed = false;
                     boolean autoAckOrDupsOk = acknowledgementMode == Session.AUTO_ACKNOWLEDGE ||
                                               acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
@@ -745,9 +771,16 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
                     session.clearSessionRecovered();
 
                     try {
+                        tracer.asyncDeliveryInit(facade, address);
+
                         messageListener.onMessage(copy);
                     } catch (RuntimeException rte) {
                         deliveryFailed = true;
+                        tracer.asyncDeliveryComplete(facade, DeliveryOutcome.APPLICATION_ERROR, rte);
+                    } finally {
+                        if(!deliveryFailed) {
+                            tracer.asyncDeliveryComplete(facade, DeliveryOutcome.DELIVERED, null);
+                        }
                     }
 
                     if (autoAckOrDupsOk && !session.isSessionRecovered()) {
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java
index 48f0524..7a3b629 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java
@@ -21,6 +21,7 @@ import java.util.Set;
 import javax.jms.JMSException;
 
 import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.tracing.TraceableMessage;
 
 /**
  * The Message Facade interface defines the required mapping between a Provider's
@@ -28,7 +29,7 @@ import org.apache.qpid.jms.JmsDestination;
  * interface and offer direct access to its message types without the need to
  * copy to / from a more generic JMS message instance.
  */
-public interface JmsMessageFacade {
+public interface JmsMessageFacade extends TraceableMessage {
 
     /**
      * Returns the property names for this Message instance. The Set returned may be
@@ -96,7 +97,7 @@ public interface JmsMessageFacade {
      * Called before a message is dispatched to its intended consumer to allow for
      * any necessary processing of message data such as setting read-only state etc.
      *
-     * @throws JMSException if an error occurs while preparing the message for send.
+     * @throws JMSException if an error occurs while preparing the message for dispatch.
      */
     void onDispatch() throws JMSException;
 
@@ -123,14 +124,14 @@ public interface JmsMessageFacade {
     JmsMessageFacade copy() throws JMSException;
 
     /**
-     * Gets the timestamp assigned to the message when it was sent.
+     * Gets the time stamp assigned to the message when it was sent.
      *
-     * @return the message timestamp value.
+     * @return the message time stamp value.
      */
     long getTimestamp();
 
     /**
-     * Sets the timestamp value of this message.
+     * Sets the time stamp value of this message.
      *
      * @param timestamp
      *        the time that the message was sent by the provider.
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
index 13f14ab..6b08fc3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
@@ -19,6 +19,7 @@ package org.apache.qpid.jms.meta;
 import java.net.URI;
 import java.nio.charset.Charset;
 import java.util.EnumMap;
+import java.util.Objects;
 import java.util.function.BiFunction;
 
 import javax.jms.Connection;
@@ -35,6 +36,8 @@ import org.apache.qpid.jms.policy.JmsMessageIDPolicy;
 import org.apache.qpid.jms.policy.JmsPrefetchPolicy;
 import org.apache.qpid.jms.policy.JmsPresettlePolicy;
 import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
+import org.apache.qpid.jms.tracing.JmsNoOpTracer;
+import org.apache.qpid.jms.tracing.JmsTracer;
 
 /**
  * Meta object that contains the JmsConnection identification and configuration
@@ -84,6 +87,7 @@ public final class JmsConnectionInfo extends JmsAbstractResource implements Comp
     private JmsDeserializationPolicy deserializationPolicy;
 
     private volatile byte[] encodedUserId;
+    private JmsTracer tracer = JmsNoOpTracer.INSTANCE;
 
     public JmsConnectionInfo(JmsConnectionId connectionId) {
         if (connectionId == null) {
@@ -429,4 +433,13 @@ public final class JmsConnectionInfo extends JmsAbstractResource implements Comp
     public void visit(JmsResourceVistor vistor) throws Exception {
         vistor.processConnectionInfo(this);
     }
+
+    public void setTracer(JmsTracer tracer) {
+        Objects.requireNonNull(tracer);
+        this.tracer = tracer;
+    }
+
+    public JmsTracer getTracer() {
+        return tracer;
+    }
 }
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index 3c79fd7..69fc240 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -35,6 +35,7 @@ import org.apache.qpid.jms.provider.exceptions.ProviderExceptionSupport;
 import org.apache.qpid.jms.provider.exceptions.ProviderIllegalStateException;
 import org.apache.qpid.jms.provider.exceptions.ProviderSendTimedOutException;
 import org.apache.qpid.jms.provider.exceptions.ProviderUnsupportedOperationException;
+import org.apache.qpid.jms.tracing.JmsTracer;
 import org.apache.qpid.proton.amqp.messaging.Modified;
 import org.apache.qpid.proton.amqp.messaging.Rejected;
 import org.apache.qpid.proton.amqp.transaction.TransactionalState;
@@ -64,11 +65,14 @@ public class AmqpFixedProducer extends AmqpProducer {
     private final Map<Object, InFlightSend> blocked = new LinkedHashMap<Object, InFlightSend>();
 
     private final AmqpConnection connection;
+    private final JmsTracer tracer;
 
     public AmqpFixedProducer(AmqpSession session, JmsProducerInfo info, Sender sender) {
         super(session, info, sender);
 
         connection = session.getConnection();
+        tracer = connection.getResourceInfo().getTracer();
+
         delayedDeliverySupported = connection.getProperties().isDelayedDeliverySupported();
     }
 
@@ -316,7 +320,7 @@ public class AmqpFixedProducer extends AmqpProducer {
 
     //----- Class used to manage held sends ----------------------------------//
 
-    private class InFlightSend implements AsyncResult, AmqpExceptionBuilder {
+    private final class InFlightSend implements AsyncResult, AmqpExceptionBuilder {
 
         private final JmsOutboundMessageDispatch envelope;
         private final AsyncResult request;
@@ -399,6 +403,14 @@ public class AmqpFixedProducer extends AmqpProducer {
                 blocked.remove(envelope.getMessageId());
             }
 
+            // Null delivery means that we never had credit to send so no delivery was created to carry the message.
+            if (delivery != null) {
+                DeliveryState remoteState = delivery.getRemoteState();
+                tracer.completeSend(envelope.getMessage().getFacade(), remoteState == null ? null : remoteState.getType().name());
+            } else {
+                tracer.completeSend(envelope.getMessage().getFacade(), null);
+            }
+
             // Put the message back to usable state following send complete
             envelope.getMessage().onSendComplete();
 
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
index 2ef7d40..5362416 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
@@ -27,6 +27,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.BiConsumer;
 
 import javax.jms.JMSException;
 import javax.jms.JMSRuntimeException;
@@ -38,6 +39,7 @@ import org.apache.qpid.jms.message.JmsMessage;
 import org.apache.qpid.jms.message.facade.JmsMessageFacade;
 import org.apache.qpid.jms.provider.amqp.AmqpConnection;
 import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
+import org.apache.qpid.jms.tracing.JmsTracer;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
@@ -228,6 +230,9 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
         }
 
         header.setTimeToLive(ttl);
+
+        JmsTracer tracer = connection.getResourceInfo().getTracer();
+        tracer.initSend(this, getToAddress());
     }
 
     @Override
@@ -293,6 +298,10 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
             target.lazyCreateFooter();
             target.footerMap.putAll(footerMap);
         }
+
+        if (tracingContext != null && !tracingContext.isEmpty()) {
+            target.lazyCreateTracingContext().putAll(tracingContext);
+        }
     }
 
     @Override
@@ -904,6 +913,73 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
         return AmqpCodec.encodeMessage(this);
     }
 
+    //----- TracableMessage implementation
+
+    private Map<String, Object> tracingContext;
+
+    private Map<String, Object> lazyCreateTracingContext() {
+        if (tracingContext == null) {
+            tracingContext = new HashMap<>();
+        }
+        return tracingContext;
+    }
+
+    @Override
+    public Object getTracingContext(String key) {
+        if(tracingContext == null) {
+            return null;
+        }
+
+        return tracingContext.get(key);
+    }
+
+    @Override
+    public Object setTracingContext(String key, Object value) {
+        return lazyCreateTracingContext().put(key, value);
+    }
+
+    @Override
+    public Object removeTracingContext(String key) {
+        if(tracingContext == null) {
+            return null;
+        }
+
+        return tracingContext.remove(key);
+    }
+
+    @Override
+    public Object getTracingAnnotation(String key) {
+        if (messageAnnotationsMap != null && !messageAnnotationsMap.isEmpty()) {
+            return messageAnnotationsMap.get(Symbol.valueOf(key));
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public Object setTracingAnnotation(String key, Object value) {
+        lazyCreateMessageAnnotations();
+        return messageAnnotationsMap.put(Symbol.valueOf(key), value);
+    }
+
+    @Override
+    public Object removeTracingAnnotation(String key) {
+        if (messageAnnotationsMap != null && !messageAnnotationsMap.isEmpty()) {
+            return messageAnnotationsMap.remove(Symbol.valueOf(key));
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public void filterTracingAnnotations(BiConsumer<String, Object> filter) {
+        if (messageAnnotationsMap != null && !messageAnnotationsMap.isEmpty()) {
+            messageAnnotationsMap.forEach((key, value) -> {
+                filter.accept(key.toString(), value);
+            });
+        }
+    }
+
     //----- Access to AMQP Message Values ------------------------------------//
 
     AmqpHeader getAmqpHeader() {
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/tracing/JmsNoOpTracer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/tracing/JmsNoOpTracer.java
new file mode 100644
index 0000000..7e0d4fa
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/tracing/JmsNoOpTracer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.tracing;
+
+/**
+ * Default no-op Tracing implementation
+ */
+public final class JmsNoOpTracer implements JmsTracer {
+
+    public static final JmsNoOpTracer INSTANCE = new JmsNoOpTracer();
+
+    @Override
+    public void initSend(TraceableMessage message, String address) {
+    }
+
+    @Override
+    public void completeSend(TraceableMessage message, String outcome) {
+    }
+
+    @Override
+    public void syncReceive(TraceableMessage message, String address, DeliveryOutcome outcome) {
+    }
+
+    @Override
+    public void asyncDeliveryInit(TraceableMessage message, String address) {
+    }
+
+    @Override
+    public void asyncDeliveryComplete(TraceableMessage message, DeliveryOutcome outcome, Throwable throwable) {
+    }
+
+    @Override
+    public void close() {
+    }
+}
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/tracing/JmsNoOpTracerFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/tracing/JmsNoOpTracerFactory.java
new file mode 100644
index 0000000..c86f92f
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/tracing/JmsNoOpTracerFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.tracing;
+
+import java.net.URI;
+
+public class JmsNoOpTracerFactory extends JmsTracerFactory {
+
+    static final String TYPE_NAME = "noop";
+
+    /**
+     * Returns a JmsNoOpTracer instance. This performs no actions or returns null
+     * for any of the JmsTracer methods.
+     *
+     * @return a JmsTracer instance that performs no action for any of its methods.
+     */
+    public static JmsTracer create() {
+        return JmsNoOpTracer.INSTANCE;
+    }
+
+    @Override
+    public JmsTracer createTracer(URI remoteURI, String type) throws Exception {
+        return create();
+    }
+}
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/tracing/JmsTracer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/tracing/JmsTracer.java
new file mode 100644
index 0000000..9a17f34
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/tracing/JmsTracer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.tracing;
+
+public interface JmsTracer {
+
+    enum DeliveryOutcome {
+        DELIVERED,
+        EXPIRED,
+        REDELIVERIES_EXCEEDED,
+        APPLICATION_ERROR
+    }
+
+    void initSend(TraceableMessage message, String address);
+
+    void completeSend(TraceableMessage message, String outcome);
+
+    void syncReceive(TraceableMessage message, String address, DeliveryOutcome outcome);
+
+    void asyncDeliveryInit(TraceableMessage message, String address);
+
+    void asyncDeliveryComplete(TraceableMessage message, DeliveryOutcome outcome, Throwable throwable);
+
+    void close();
+}
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/tracing/JmsTracerFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/tracing/JmsTracerFactory.java
new file mode 100644
index 0000000..1b3a2b5
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/tracing/JmsTracerFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.tracing;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.qpid.jms.util.FactoryFinder;
+
+public abstract class JmsTracerFactory {
+
+    private static final FactoryFinder<JmsTracerFactory> TRACER_FACTORY_FINDER =
+        new FactoryFinder<JmsTracerFactory>(JmsTracerFactory.class,
+            "META-INF/services/" + JmsTracerFactory.class.getPackage().getName().replace(".", "/") + "/");
+
+    public abstract JmsTracer createTracer(URI remoteURI, String name) throws Exception;
+
+    /**
+     * Creates a JmsTracer using factory with the given name and any relevant configuration
+     * properties set on the given remote URI.
+     *
+     * @param remoteURI
+     *        The connection uri.
+     * @param name
+     *        The name that describes the desired tracer factory.
+     * @return a tracer instance matching the name.
+     *
+     * @throws Exception if an error occurs while creating the tracer.
+     */
+    public static JmsTracer create(URI remoteURI, String name) throws Exception {
+        JmsTracerFactory factory = findTracerFactory(name);
+
+        return factory.createTracer(remoteURI, name);
+    }
+
+    /**
+     * Searches for a JmsTracerFactory by using the given name.
+     *
+     * The search first checks the local cache of factories before moving on to search in the class path.
+     *
+     * @param name
+     *        The name that describes the desired tracer factory.
+     *
+     * @return a tracer factory instance matching the name.
+     *
+     * @throws IOException if an error occurs while locating the factory.
+     */
+    public static JmsTracerFactory findTracerFactory(String name) throws IOException {
+        if (name == null || name.isEmpty()) {
+            throw new IOException("No Tracer name specified.");
+        }
+
+        JmsTracerFactory factory = null;
+        try {
+            factory = TRACER_FACTORY_FINDER.newInstance(name);
+        } catch (Throwable e) {
+            throw new IOException("Tracer name NOT recognized: [" + name + "]", e);
+        }
+
+        return factory;
+    }
+}
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/tracing/TraceableMessage.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/tracing/TraceableMessage.java
new file mode 100644
index 0000000..65949d0
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/tracing/TraceableMessage.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.tracing;
+
+import java.util.function.BiConsumer;
+
+/**
+ * Interface which allows generic tracing interactions with Messages
+ * from the client.
+ */
+public interface TraceableMessage {
+
+    /**
+     * Gets some trace related context from the message for use by the tracing
+     * implementation.
+     * <p>
+     * Allows the Tracing implementation to store context data into the message
+     * without the message needing to know what the types or structure of that
+     * data is.
+     *
+     * @param key
+     * 		The name of the context element to be looked up.
+     *
+     * @return the stored tracing context element or null if not present.
+     */
+    Object getTracingContext(String key);
+
+    /**
+     * Sets some trace related context from the message for use by the tracing
+     * implementation.
+     * <p>
+     * Allows the Tracing implementation to store context data into the message
+     * without the message needing to know what the types or structure of that
+     * data is.
+     *
+     * @param key
+     * 		The key that the tracing context element should be stored under.
+     * @param value
+     * 		The value to store under the given key.
+     *
+     * @return the previous value stored under the given key if any present.
+     */
+    Object setTracingContext(String key, Object value);
+
+    /**
+     * Removes some trace related context from the message.
+     *
+     * @param key
+     *      The key of the tracing context element that should be cleared.
+     *
+     * @return the previous value stored under the given key if any present.
+     */
+    Object removeTracingContext(String key);
+
+    /**
+     * Gets some trace specific message annotation that was previously applied to
+     * the given message either locally or by a remote peer.
+     *
+     * @param key
+     * 		The name of the tracing annotation data to retrieve.
+     *
+     * @return the tracing related annotation data under the given key.
+     */
+    Object getTracingAnnotation(String key);
+
+    /**
+     * Remove some trace specific message annotation that was previously applied to
+     * the given message either locally or by a remote peer.
+     *
+     * @param key
+     * 		The name of the tracing annotation data to remove.
+     *
+     * @return the tracing related annotation data under the given key.
+     */
+    Object removeTracingAnnotation(String key);
+
+    /**
+     * Sets some trace specific message annotation that was previously applied to
+     * the given message either locally or by a remote peer.
+     *
+     * @param key
+     * 		The name of the tracing annotation data to store the trace data under.
+     * @param value
+     * 		The value to store under the given key.
+     *
+     * @return the previous value stored under the given key if any present.
+     */
+    Object setTracingAnnotation(String key, Object value);
+
+    /**
+     * Allows the tracing layer to filter out tracing related details from the full
+     * set of message annotations that a message might be carrying.
+     *
+     * @param filter
+     * 		The filter used to consume tracing related message annotations of interest.
+     */
+    void filterTracingAnnotations(BiConsumer<String, Object> filter);
+}
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/tracing/opentracing/OpenTracingTracer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/tracing/opentracing/OpenTracingTracer.java
new file mode 100644
index 0000000..55b0882
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/tracing/opentracing/OpenTracingTracer.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.tracing.opentracing;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.qpid.jms.tracing.JmsTracer;
+import org.apache.qpid.jms.tracing.TraceableMessage;
+
+import io.opentracing.Scope;
+import io.opentracing.Span;
+import io.opentracing.SpanContext;
+import io.opentracing.Tracer;
+import io.opentracing.log.Fields;
+import io.opentracing.propagation.Format;
+import io.opentracing.propagation.TextMap;
+import io.opentracing.propagation.TextMapAdapter;
+import io.opentracing.tag.Tags;
+
+public class OpenTracingTracer implements JmsTracer {
+    static final String REDELIVERIES_EXCEEDED = "redeliveries-exceeded";
+    static final String MESSAGE_EXPIRED = "message-expired";
+    static final String SEND_SPAN_NAME = "amqp-delivery-send";
+    static final String RECEIVE_SPAN_NAME = "receive";
+    static final String ONMESSAGE_SPAN_NAME = "onMessage";
+    static final String DELIVERY_SETTLED = "delivery settled";
+    static final String STATE = "state";
+    static final String COMPONENT = "qpid-jms";
+    static final Object ERROR_EVENT = "error";
+
+    static final String SEND_SPAN_CONTEXT_KEY = "sendSpan";
+    static final String ARRIVING_SPAN_CTX_CONTEXT_KEY = "arrivingContext";
+    static final String DELIVERY_SPAN_CONTEXT_KEY = "deliverySpan";
+    static final String ONMESSAGE_SCOPE_CONTEXT_KEY = "onMessageScope";
+
+    static final String ANNOTATION_KEY = "x-opt-qpid-tracestate";
+
+    private Tracer tracer;
+    private boolean closeUnderlyingTracer;
+
+    OpenTracingTracer(Tracer tracer, boolean closeUnderlyingTracer) {
+        this.tracer = tracer;
+        this.closeUnderlyingTracer = closeUnderlyingTracer;
+    }
+
+    @Override
+    public void initSend(TraceableMessage message, String address) {
+        Span span = tracer.buildSpan(SEND_SPAN_NAME)
+                          .withTag(Tags.SPAN_KIND, Tags.SPAN_KIND_PRODUCER)
+                          .withTag(Tags.MESSAGE_BUS_DESTINATION, address)
+                          .withTag(Tags.COMPONENT, COMPONENT)
+                          .start();
+
+        LazyTextMapInject carrier = new LazyTextMapInject();
+
+        tracer.inject(span.context(), Format.Builtin.TEXT_MAP, carrier);
+
+        if(carrier.getInjectMap() != null) {
+            message.setTracingAnnotation(ANNOTATION_KEY, carrier.getInjectMap());
+        } else {
+            message.removeTracingAnnotation(ANNOTATION_KEY);
+        }
+
+        message.setTracingContext(SEND_SPAN_CONTEXT_KEY, span);
+    }
+
+    @Override
+    public void completeSend(TraceableMessage message, String outcome) {
+        Object cachedSpan = message.getTracingContext(SEND_SPAN_CONTEXT_KEY);
+        if (cachedSpan != null) {
+            Span span = (Span) cachedSpan;
+
+            Map<String, String> fields = new HashMap<>();
+            fields.put(Fields.EVENT, DELIVERY_SETTLED);
+            fields.put(STATE, outcome == null ? "null" : outcome);
+
+            span.log(fields);
+
+            span.finish();
+        }
+    }
+
+    private SpanContext extract(TraceableMessage message) {
+        SpanContext spanContext = null;
+
+        @SuppressWarnings("unchecked")
+        Map<String, String> headers = (Map<String, String>) message.getTracingAnnotation(ANNOTATION_KEY);
+        if(headers != null && !headers.isEmpty()) {
+            spanContext = tracer.extract(Format.Builtin.TEXT_MAP, new TextMapAdapter(headers));
+        }
+
+        if(spanContext != null) {
+            message.setTracingContext(ARRIVING_SPAN_CTX_CONTEXT_KEY, spanContext);
+        }
+
+        return spanContext;
+    }
+
+    @Override
+    public void syncReceive(TraceableMessage message, String address, DeliveryOutcome outcome) {
+        SpanContext context = extract(message);
+
+        Span span = tracer.buildSpan(RECEIVE_SPAN_NAME)
+                          .asChildOf(context)
+                          .withTag(Tags.SPAN_KIND, Tags.SPAN_KIND_CONSUMER)
+                          .withTag(Tags.MESSAGE_BUS_DESTINATION, address)
+                          .withTag(Tags.COMPONENT, COMPONENT)
+                          .start();
+        try {
+            addDeliveryLogIfNeeded(outcome, span);
+        } finally {
+            span.finish();
+        }
+
+        message.setTracingContext(DELIVERY_SPAN_CONTEXT_KEY, span);
+    }
+
+    private void addDeliveryLogIfNeeded(DeliveryOutcome outcome, Span span) {
+        Map<String, Object> fields = null;
+        if (outcome == DeliveryOutcome.EXPIRED) {
+            fields = new HashMap<>();
+            fields.put(Fields.EVENT, MESSAGE_EXPIRED);
+        } else if (outcome == DeliveryOutcome.REDELIVERIES_EXCEEDED) {
+            fields = new HashMap<>();
+            fields.put(Fields.EVENT, REDELIVERIES_EXCEEDED);
+        }
+
+        if (fields != null) {
+            span.log(fields);
+        }
+    }
+
+    @Override
+    public void asyncDeliveryInit(TraceableMessage message, String address) {
+        SpanContext context = extract(message);
+
+        Span span = tracer.buildSpan(ONMESSAGE_SPAN_NAME)
+                          .ignoreActiveSpan()
+                          .asChildOf(context)
+                          .withTag(Tags.SPAN_KIND, Tags.SPAN_KIND_CONSUMER)
+                          .withTag(Tags.MESSAGE_BUS_DESTINATION, address)
+                          .withTag(Tags.COMPONENT, COMPONENT)
+                          .start();
+
+        message.setTracingContext(DELIVERY_SPAN_CONTEXT_KEY, span);
+
+        Scope scope = tracer.activateSpan(span);
+        message.setTracingContext(ONMESSAGE_SCOPE_CONTEXT_KEY, scope);
+    }
+
+    @Override
+    public void asyncDeliveryComplete(TraceableMessage message, DeliveryOutcome outcome, Throwable throwable) {
+        Scope scope = (Scope) message.removeTracingContext(ONMESSAGE_SCOPE_CONTEXT_KEY);
+        try {
+            if (scope != null) {
+                scope.close();
+            }
+        } finally {
+            Span span = (Span) message.getTracingContext(DELIVERY_SPAN_CONTEXT_KEY);
+            if (span != null) {
+                try {
+                    if (throwable != null) {
+                        span.setTag(Tags.ERROR, true);
+
+                        Map<String, Object> fields = new HashMap<>();
+                        fields.put(Fields.EVENT, ERROR_EVENT);
+                        fields.put(Fields.ERROR_OBJECT, throwable);
+                        fields.put(Fields.MESSAGE, "Application error, exception thrown from onMessage.");
+
+                        span.log(fields);
+                    } else {
+                        addDeliveryLogIfNeeded(outcome, span);
+                    }
+                } finally {
+                    span.finish();
+                }
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        if (closeUnderlyingTracer) {
+            tracer.close();
+        }
+    }
+
+    private static class LazyTextMapInject implements TextMap {
+        private Map<String,String> injectMap = null;
+
+        @Override
+        public void put(String key, String value) {
+            if(injectMap == null) {
+                injectMap = new HashMap<>();
+            }
+
+            injectMap.put(key, value);
+        }
+
+        @Override
+        public Iterator<Entry<String, String>> iterator() {
+            throw new UnsupportedOperationException();
+        }
+
+        Map<String, String> getInjectMap() {
+            return injectMap;
+        }
+    }
+}
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/tracing/opentracing/OpenTracingTracerFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/tracing/opentracing/OpenTracingTracerFactory.java
new file mode 100644
index 0000000..0bb0b12
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/tracing/opentracing/OpenTracingTracerFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.tracing.opentracing;
+
+import java.net.URI;
+
+import javax.jms.Connection;
+
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.tracing.JmsTracer;
+import org.apache.qpid.jms.tracing.JmsTracerFactory;
+
+import io.opentracing.Tracer;
+import io.opentracing.util.GlobalTracer;
+
+public class OpenTracingTracerFactory extends JmsTracerFactory {
+
+    static final String TYPE_NAME = "opentracing";
+
+    /**
+     * Creates a JmsTracer wrapping a provided Open Tracing tracer instance
+     * previously created by the application.
+     *
+     * Used for programmatic creation of JmsTracer to explicitly set on a ConnectionFactory
+     * when not using the jms.tracing URI option, or {@link JmsConnectionFactory#setTracing(String)},
+     * which both utilise the {@link GlobalTracer}.
+     *
+     * The returned JmsTracer will no-op when its close method is called during
+     * {@link Connection#close()} closure, to allow using the given Tracer with multiple
+     * connections and elsewhere in the application. Equivalent to calling
+     * {@link #create(Tracer, boolean) #create(Tracer, false)}
+     *
+     * @param tracer
+     *            The Open Tracing tracer to use
+     * @return a JmsTracer instance using the provided OpenTracing tracer.
+     */
+    public static JmsTracer create(Tracer tracer) {
+        return create(tracer, false);
+    }
+
+    /**
+     * As {@link #create(Tracer)}, but providing control over whether the given Tracer
+     * has its close method called when the returned JmsTracer is closed during
+     * {@link Connection#close()}.
+     *
+     * @param tracer
+     *            The Open Tracing tracer to use
+     * @param closeUnderlyingTracer
+     *            Whether to close the underlying tracer during {@link Connection#close()}
+     * @return a JmsTracer instance using the provided OpenTracing tracer.
+     */
+    public static JmsTracer create(Tracer tracer, boolean closeUnderlyingTracer) {
+        return new OpenTracingTracer(tracer, closeUnderlyingTracer);
+    }
+
+    @Override
+    public JmsTracer createTracer(URI remoteURI, String name) throws Exception {
+        Tracer tracer = GlobalTracer.get();
+        return new OpenTracingTracer(tracer, false);
+    }
+}
diff --git a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/tracing/noop b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/tracing/noop
new file mode 100644
index 0000000..39421ac
--- /dev/null
+++ b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/tracing/noop
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.qpid.jms.tracing.JmsNoOpTracerFactory
diff --git a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/tracing/opentracing b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/tracing/opentracing
new file mode 100644
index 0000000..256e19b
--- /dev/null
+++ b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/tracing/opentracing
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.qpid.jms.tracing.opentracing.OpenTracingTracerFactory
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/facade/test/JmsTestMessageFacade.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/facade/test/JmsTestMessageFacade.java
index a8f0eab..6d7078d 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/facade/test/JmsTestMessageFacade.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/facade/test/JmsTestMessageFacade.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.BiConsumer;
 
 import javax.jms.JMSException;
 
@@ -374,4 +375,38 @@ public class JmsTestMessageFacade implements JmsMessageFacade {
     public Object encodeMessage() {
         return this;
     }
+
+    @Override
+    public Object getTracingContext(String key) {
+        return null;
+    }
+
+    @Override
+    public Object setTracingContext(String key, Object value) {
+        return null;
+    }
+
+    @Override
+    public Object getTracingAnnotation(String key) {
+        return null;
+    }
+
+    @Override
+    public Object setTracingAnnotation(String key, Object value) {
+        return null;
+    }
+
+    @Override
+    public void filterTracingAnnotations(BiConsumer<String, Object> filter) {
+    }
+
+    @Override
+    public Object removeTracingAnnotation(String key) {
+        return null;
+    }
+
+    @Override
+    public Object removeTracingContext(String key) {
+        return null;
+    }
 }
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
index 0cb5937..c92e9be 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
@@ -2194,6 +2194,10 @@ public class AmqpJmsMessageFacadeTest extends AmqpJmsMessageTypesTestCase  {
         source.setProperty("APP-Prop-1", "APP-Prop-1-Value");
         source.setProperty("APP-Prop-2", "APP-Prop-2-Value");
 
+        source.setTracingContext("Tracing-Key", "Tracing-Detail");
+
+        // ------------------------------------
+
         AmqpJmsMessageFacade copy = source.copy();
 
         assertSame(source.getConnection(), copy.getConnection());
@@ -2215,7 +2219,6 @@ public class AmqpJmsMessageFacadeTest extends AmqpJmsMessageTypesTestCase  {
         assertEquals(source.getUserId(), copy.getUserId());
         assertEquals(source.getDeliveryTime(), copy.getDeliveryTime());
 
-
         // There should be two since none of the extended options were set
         assertEquals(2, copy.getPropertyNames().size());
 
@@ -2225,6 +2228,8 @@ public class AmqpJmsMessageFacadeTest extends AmqpJmsMessageTypesTestCase  {
         assertEquals("APP-Prop-1-Value", copy.getProperty("APP-Prop-1"));
         assertEquals("APP-Prop-2-Value", copy.getProperty("APP-Prop-2"));
 
+        assertEquals("Tracing-Detail", copy.getTracingContext("Tracing-Key"));
+
         Footer copiedFooter = copy.getFooter();
         DeliveryAnnotations copiedDeliveryAnnotations = copy.getDeliveryAnnotations();
 
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/MessageAnnotationsSectionMatcher.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/MessageAnnotationsSectionMatcher.java
index 754d32a..23958fe 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/MessageAnnotationsSectionMatcher.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/MessageAnnotationsSectionMatcher.java
@@ -71,5 +71,13 @@ public class MessageAnnotationsSectionMatcher extends MessageMapSectionMatcher
             return false;
         }
     }
+
+    public Object getReceivedAnnotation(Symbol key)
+    {
+        Map<Object, Object> receivedFields = super.getReceivedFields();
+
+        return receivedFields.get(key);
+    }
+
 }
 
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/tracing/JmsNoopTracerFactoryTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/tracing/JmsNoopTracerFactoryTest.java
new file mode 100644
index 0000000..5ece910
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/tracing/JmsNoopTracerFactoryTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.tracing;
+
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.fail;
+
+import java.net.URI;
+
+import org.junit.Test;
+
+public class JmsNoopTracerFactoryTest {
+
+    @Test
+    public void testCreate() {
+        JmsTracer tracer  = JmsNoOpTracerFactory.create();
+
+        assertSame("Unexpected tracer instance", JmsNoOpTracer.INSTANCE, tracer);
+    }
+
+    @Test
+    public void testCreateURIAndTypeName() throws Exception {
+        JmsTracer tracer  = JmsNoOpTracerFactory.create(new URI("amqp://localhost:1234"), JmsNoOpTracerFactory.TYPE_NAME);
+
+        assertSame("Unexpected tracer instance", JmsNoOpTracer.INSTANCE, tracer);
+    }
+
+    @Test
+    public void testCreateURIAndTypeNameUnknown() throws Exception {
+        try {
+            JmsNoOpTracerFactory.create(new URI("amqp://localhost:1234"), "unknown");
+            fail("Exception was not thrown");
+        } catch (Exception e) {
+            // Expected
+        }
+    }
+}
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/tracing/opentracing/OpenTracingIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/tracing/opentracing/OpenTracingIntegrationTest.java
new file mode 100644
index 0000000..f1e20f5
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/tracing/opentracing/OpenTracingIntegrationTest.java
@@ -0,0 +1,1350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.tracing.opentracing;
+
+import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.ANNOTATION_KEY;
+import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.COMPONENT;
+import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.DELIVERY_SETTLED;
+import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.ERROR_EVENT;
+import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.MESSAGE_EXPIRED;
+import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.ONMESSAGE_SPAN_NAME;
+import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.RECEIVE_SPAN_NAME;
+import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.REDELIVERIES_EXCEEDED;
+import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.SEND_SPAN_NAME;
+import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.STATE;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.Connection;
+import javax.jms.JMSConsumer;
+import javax.jms.JMSContext;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.Wait;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.HeaderDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.MessageAnnotationsDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
+import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
+import org.apache.qpid.jms.tracing.JmsTracer;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.MockitoAnnotations;
+
+import io.opentracing.Span;
+import io.opentracing.SpanContext;
+import io.opentracing.log.Fields;
+import io.opentracing.mock.MockSpan;
+import io.opentracing.mock.MockSpan.LogEntry;
+import io.opentracing.mock.MockSpan.MockContext;
+import io.opentracing.mock.MockTracer;
+import io.opentracing.propagation.Format;
+import io.opentracing.propagation.TextMapAdapter;
+import io.opentracing.tag.Tags;
+
+public class OpenTracingIntegrationTest extends QpidJmsTestCase {
+
+    @Captor
+    private ArgumentCaptor<Map<String, String>> annotationMapCaptor;
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @Test(timeout = 20000)
+    public void testSend() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnectionFactory factory = new JmsConnectionFactory(createPeerURI(testPeer));
+
+            MockTracer mockTracer = new MockTracer();
+            JmsTracer tracer = OpenTracingTracerFactory.create(mockTracer);
+            factory.setTracer(tracer);
+
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
+            testPeer.expectBegin();
+
+            Connection connection = factory.createConnection();
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+
+            testPeer.expectSenderAttach();
+
+            MessageProducer producer = session.createProducer(queue);
+
+            // Expect a message with the trace info annotation set
+            String msgContent = "myTracedMessageContent";
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+            msgAnnotationsMatcher.withEntry(Symbol.valueOf(ANNOTATION_KEY), Matchers.any(Map.class));
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+            messageMatcher.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
+            messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(msgContent));
+
+            testPeer.expectTransfer(messageMatcher);
+
+            TextMessage message = session.createTextMessage(msgContent);
+            producer.send(message);
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            List<MockSpan> finishedSpans = mockTracer.finishedSpans();
+            assertEquals("Expected 1 finished span: " + finishedSpans, 1, finishedSpans.size());
+            Span sendSpan = finishedSpans.get(0);
+            assertEquals("Unexpected span class", MockSpan.class, sendSpan.getClass());
+            MockSpan sendMockSpan = (MockSpan) sendSpan;
+
+            assertEquals("Expected span to have no parent", 0, sendMockSpan.parentId());
+            assertEquals("Unexpected span operation name", SEND_SPAN_NAME, sendMockSpan.operationName());
+
+            // Verify tags set on the completed span
+            Map<String, Object> spanTags = sendMockSpan.tags();
+            assertFalse("Expected some tags", spanTags.isEmpty());
+            assertFalse("Expected error tag not to be set", spanTags.containsKey(Tags.ERROR.getKey()));
+            assertEquals(Tags.SPAN_KIND_PRODUCER, spanTags.get(Tags.SPAN_KIND.getKey()));
+            assertEquals(queueName, spanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
+            assertEquals(COMPONENT, spanTags.get(Tags.COMPONENT.getKey()));
+
+            // Verify log set on the completed span
+            List<LogEntry> entries = sendMockSpan.logEntries();
+            assertEquals("Expected 1 log entry: " + entries, 1, entries.size());
+
+            Map<String, ?> entryFields = entries.get(0).fields();
+            assertFalse("Expected some log entry fields", entryFields.isEmpty());
+            assertNotNull("Expected a state description", entryFields.get(STATE));
+            assertEquals(DELIVERY_SETTLED, entryFields.get(Fields.EVENT));
+
+            // Verify the context sent on the wire matches the original span
+            Object obj = msgAnnotationsMatcher.getReceivedAnnotation(Symbol.valueOf(ANNOTATION_KEY));
+            assertTrue("annotation was not a map", obj instanceof Map);
+            @SuppressWarnings("unchecked")
+            Map<String, String> traceInfo = (Map<String, String>) obj;
+            assertFalse("Expected some content in map", traceInfo.isEmpty());
+
+            SpanContext extractedContext = mockTracer.extract(Format.Builtin.TEXT_MAP, new TextMapAdapter(traceInfo));
+            assertEquals("Unexpected context class", MockContext.class, extractedContext.getClass());
+            assertEquals("Extracted context spanId did not match original", sendMockSpan.context().spanId(), ((MockContext) extractedContext).spanId());
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testSendPreSettled() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnectionFactory factory = new JmsConnectionFactory(createPeerURI(testPeer, "jms.presettlePolicy.presettleProducers=true"));
+
+            MockTracer mockTracer = new MockTracer();
+            JmsTracer tracer = OpenTracingTracerFactory.create(mockTracer);
+            factory.setTracer(tracer);
+
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
+            testPeer.expectBegin();
+
+            Connection connection = factory.createConnection();
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+
+            testPeer.expectSettledSenderAttach();
+
+            MessageProducer producer = session.createProducer(queue);
+
+            // Expect a message with the trace info annotation set
+            String msgContent = "myTracedMessageContent";
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+            msgAnnotationsMatcher.withEntry(Symbol.valueOf(ANNOTATION_KEY), Matchers.any(Map.class));
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+            messageMatcher.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
+            messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(msgContent));
+
+            // Expect settled transfer
+            testPeer.expectTransfer(messageMatcher, Matchers.nullValue(), true, false, null, false);
+
+            TextMessage message = session.createTextMessage(msgContent);
+            producer.send(message);
+
+            // Await the pre-settled transfer completing (so we can get some details of it from the peer) and span finishing.
+            testPeer.waitForAllHandlersToComplete(2000);
+            boolean finishedSpanFound = Wait.waitFor(() -> !(mockTracer.finishedSpans().isEmpty()), 3000, 10);
+            assertTrue("Did not get finished span after send", finishedSpanFound);
+
+            List<MockSpan> finishedSpans = mockTracer.finishedSpans();
+            assertEquals("Expected 1 finished span: " + finishedSpans, 1, finishedSpans.size());
+            Span sendSpan = finishedSpans.get(0);
+            assertEquals("Unexpected span class", MockSpan.class, sendSpan.getClass());
+            MockSpan sendMockSpan = (MockSpan) sendSpan;
+
+            assertEquals("Expected span to have no parent", 0, sendMockSpan.parentId());
+            assertEquals("Unexpected span operation name", SEND_SPAN_NAME, sendMockSpan.operationName());
+
+            // Verify tags set on the completed span
+            Map<String, Object> spanTags = sendMockSpan.tags();
+            assertFalse("Expected some tags", spanTags.isEmpty());
+            assertFalse("Expected error tag not to be set", spanTags.containsKey(Tags.ERROR.getKey()));
+            assertEquals(Tags.SPAN_KIND_PRODUCER, spanTags.get(Tags.SPAN_KIND.getKey()));
+            assertEquals(queueName, spanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
+            assertEquals(COMPONENT, spanTags.get(Tags.COMPONENT.getKey()));
+
+            // Verify log set on the completed span
+            List<LogEntry> entries = sendMockSpan.logEntries();
+            assertEquals("Expected 1 log entry: " + entries, 1, entries.size());
+
+            Map<String, ?> entryFields = entries.get(0).fields();
+            assertFalse("Expected some log entry fields", entryFields.isEmpty());
+            assertNotNull("Expected a state description", entryFields.get(STATE));
+            assertEquals(DELIVERY_SETTLED, entryFields.get(Fields.EVENT));
+
+            // Verify the context sent on the wire matches the original span
+            Object obj = msgAnnotationsMatcher.getReceivedAnnotation(Symbol.valueOf(ANNOTATION_KEY));
+            assertTrue("annotation was not a map", obj instanceof Map);
+            @SuppressWarnings("unchecked")
+            Map<String, String> traceInfo = (Map<String, String>) obj;
+            assertFalse("Expected some content in map", traceInfo.isEmpty());
+
+            SpanContext extractedContext = mockTracer.extract(Format.Builtin.TEXT_MAP, new TextMapAdapter(traceInfo));
+            assertEquals("Unexpected context class", MockContext.class, extractedContext.getClass());
+            assertEquals("Extracted context spanId did not match original", sendMockSpan.context().spanId(), ((MockContext) extractedContext).spanId());
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testReceive() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnectionFactory factory = new JmsConnectionFactory(createPeerURI(testPeer));
+
+            MockTracer mockTracer = new MockTracer();
+            JmsTracer tracer = OpenTracingTracerFactory.create(mockTracer);
+            factory.setTracer(tracer);
+
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
+            testPeer.expectBegin();
+
+            Connection connection = factory.createConnection();
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+
+            // Prepare an arriving message with tracing info
+            Map<String,String> injected = new HashMap<>();
+            MockSpan sendSpan = mockTracer.buildSpan(SEND_SPAN_NAME).start();
+            mockTracer.inject(sendSpan.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(injected));
+            assertFalse("Expected inject to add values", injected.isEmpty());
+
+            MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType();
+            msgAnnotations.setSymbolKeyedAnnotation(ANNOTATION_KEY, injected);
+
+            String msgContent = "myContent";
+            DescribedType amqpValueContent = new AmqpValueDescribedType(msgContent);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, null, null, amqpValueContent);
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            Message msg = messageConsumer.receive(2000);
+
+            assertNotNull("Did not receive message as expected", msg);
+            assertNull("expected no active span", mockTracer.activeSpan());
+
+            boolean finishedSpanFound = Wait.waitFor(() -> !(mockTracer.finishedSpans().isEmpty()), 3000, 10);
+            assertTrue("Did not get finished span after receive", finishedSpanFound);
+
+            List<MockSpan> finishedSpans = mockTracer.finishedSpans();
+            assertEquals("Expected 1 finished span: " + finishedSpans, 1, finishedSpans.size());
+            Span deliverySpan = finishedSpans.get(0);
+            assertEquals("Unexpected span class", MockSpan.class, deliverySpan.getClass());
+            MockSpan deliveryMockSpan = (MockSpan) deliverySpan;
+
+            assertEquals("Expected span to be child of the send span", sendSpan.context().spanId(), deliveryMockSpan.parentId());
+            assertEquals("Unexpected span operation name", RECEIVE_SPAN_NAME, deliveryMockSpan.operationName());
+
+            // Verify tags set on the completed span
+            Map<String, Object> spanTags = deliveryMockSpan.tags();
+            assertFalse("Expected some tags", spanTags.isEmpty());
+            assertFalse("Expected error tag not to be set", spanTags.containsKey(Tags.ERROR.getKey()));
+            assertEquals(Tags.SPAN_KIND_CONSUMER, spanTags.get(Tags.SPAN_KIND.getKey()));
+            assertEquals(queueName, spanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
+            assertEquals(COMPONENT, spanTags.get(Tags.COMPONENT.getKey()));
+
+            // Verify no log set on the completed span
+            List<LogEntry> logEntries = deliveryMockSpan.logEntries();
+            assertTrue("Expected no log entry: " + logEntries, logEntries.isEmpty());
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testReceiveBody() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnectionFactory factory = new JmsConnectionFactory(createPeerURI(testPeer));
+
+            MockTracer mockTracer = new MockTracer();
+            JmsTracer tracer = OpenTracingTracerFactory.create(mockTracer);
+            factory.setTracer(tracer);
+
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
+            testPeer.expectBegin();
+
+            JMSContext context = factory.createContext();
+            context.start();
+
+            testPeer.expectBegin();
+
+            String queueName = "myQueue";
+            Queue queue = context.createQueue(queueName);
+
+            // Prepare an arriving message with tracing info
+            Map<String,String> injected = new HashMap<>();
+            MockSpan sendSpan = mockTracer.buildSpan(SEND_SPAN_NAME).start();
+            mockTracer.inject(sendSpan.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(injected));
+            assertFalse("Expected inject to add values", injected.isEmpty());
+
+            MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType();
+            msgAnnotations.setSymbolKeyedAnnotation(ANNOTATION_KEY, injected);
+
+            String msgContent = "myContent";
+            DescribedType amqpValueContent = new AmqpValueDescribedType(msgContent);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, null, null, amqpValueContent);
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            JMSConsumer consumer = context.createConsumer(queue);
+
+            String body = consumer.receiveBody(String.class, 2000);
+
+            assertEquals("Did not receive message body as expected", msgContent, body);
+            assertNull("expected no active span", mockTracer.activeSpan());
+
+            boolean finishedSpanFound = Wait.waitFor(() -> !(mockTracer.finishedSpans().isEmpty()), 3000, 10);
+            assertTrue("Did not get finished span after receiveBody", finishedSpanFound);
+
+            List<MockSpan> finishedSpans = mockTracer.finishedSpans();
+            assertEquals("Expected 1 finished span: " + finishedSpans, 1, finishedSpans.size());
+            Span deliverySpan = finishedSpans.get(0);
+            assertEquals("Unexpected span class", MockSpan.class, deliverySpan.getClass());
+            MockSpan deliveryMockSpan = (MockSpan) deliverySpan;
+
+            assertEquals("Expected span to be child of the send span", sendSpan.context().spanId(), deliveryMockSpan.parentId());
+            assertEquals("Unexpected span operation name", RECEIVE_SPAN_NAME, deliveryMockSpan.operationName());
+
+            // Verify tags set on the completed span
+            Map<String, Object> spanTags = deliveryMockSpan.tags();
+            assertFalse("Expected some tags", spanTags.isEmpty());
+            assertFalse("Expected error tag not to be set", spanTags.containsKey(Tags.ERROR.getKey()));
+            assertEquals(Tags.SPAN_KIND_CONSUMER, spanTags.get(Tags.SPAN_KIND.getKey()));
+            assertEquals(queueName, spanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
+            assertEquals(COMPONENT, spanTags.get(Tags.COMPONENT.getKey()));
+
+            // Verify no log set on the completed span
+            List<LogEntry> logEntries = deliveryMockSpan.logEntries();
+            assertTrue("Expected no log entry: " + logEntries, logEntries.isEmpty());
+
+            testPeer.expectEnd();
+            testPeer.expectClose();
+
+            context.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
+
+    @Test(timeout = 20000)
+    public void testReceiveWithoutTraceInfo() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnectionFactory factory = new JmsConnectionFactory(createPeerURI(testPeer));
+
+            MockTracer mockTracer = new MockTracer();
+            JmsTracer tracer = OpenTracingTracerFactory.create(mockTracer);
+            factory.setTracer(tracer);
+
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
+            testPeer.expectBegin();
+
+            Connection connection = factory.createConnection();
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+
+            // Prepare an arriving message without tracing info
+            String msgContent = "myContent";
+            DescribedType amqpValueContent = new AmqpValueDescribedType(msgContent);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueContent);
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            Message msg = messageConsumer.receive(2000);
+
+            assertNotNull("Did not receive message as expected", msg);
+            assertNull("expected no active span", mockTracer.activeSpan());
+
+            boolean finishedSpanFound = Wait.waitFor(() -> !(mockTracer.finishedSpans().isEmpty()), 3000, 10);
+            assertTrue("Did not get finished span after receive", finishedSpanFound);
+
+            List<MockSpan> finishedSpans = mockTracer.finishedSpans();
+            assertEquals("Expected 1 finished span: " + finishedSpans, 1, finishedSpans.size());
+            Span deliverySpan = finishedSpans.get(0);
+            assertEquals("Unexpected span class", MockSpan.class, deliverySpan.getClass());
+            MockSpan deliveryMockSpan = (MockSpan) deliverySpan;
+
+            assertEquals("Expected span to have no parent as incoming message had no context", 0, deliveryMockSpan.parentId());
+            assertEquals("Unexpected span operation name", RECEIVE_SPAN_NAME, deliveryMockSpan.operationName());
+
+            // Verify tags set on the completed span
+            Map<String, Object> spanTags = deliveryMockSpan.tags();
+            assertFalse("Expected some tags", spanTags.isEmpty());
+            assertFalse("Expected error tag not to be set", spanTags.containsKey(Tags.ERROR.getKey()));
+            assertEquals(Tags.SPAN_KIND_CONSUMER, spanTags.get(Tags.SPAN_KIND.getKey()));
+            assertEquals(queueName, spanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
+            assertEquals(COMPONENT, spanTags.get(Tags.COMPONENT.getKey()));
+
+            // Verify no log set on the completed span
+            List<LogEntry> logEntries = deliveryMockSpan.logEntries();
+            assertTrue("Expected no log entry: " + logEntries, logEntries.isEmpty());
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testReceiveWithExpiredMessage() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnectionFactory factory = new JmsConnectionFactory(createPeerURI(testPeer));
+
+            MockTracer mockTracer = new MockTracer();
+            JmsTracer tracer = OpenTracingTracerFactory.create(mockTracer);
+            factory.setTracer(tracer);
+
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
+            testPeer.expectBegin();
+
+            Connection connection = factory.createConnection();
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+
+            // Prepare an arriving message with tracing info, but which has also already expired
+            Map<String,String> injected1 = new HashMap<>();
+            MockSpan sendSpan1 = mockTracer.buildSpan(SEND_SPAN_NAME).start();
+            mockTracer.inject(sendSpan1.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(injected1));
+            assertFalse("Expected inject to add values", injected1.isEmpty());
+
+            MessageAnnotationsDescribedType msgAnnotations1 = new MessageAnnotationsDescribedType();
+            msgAnnotations1.setSymbolKeyedAnnotation(ANNOTATION_KEY, injected1);
+
+            PropertiesDescribedType props = new PropertiesDescribedType();
+            props.setAbsoluteExpiryTime(new Date(System.currentTimeMillis() - 100));
+
+            String expiredMsgContent = "already-expired";
+
+            // Also prepare a message which is not expired yet.
+            String liveMsgContent = "still-active";
+
+            Map<String,String> injected2 = new HashMap<>();
+            MockSpan sendSpan2 = mockTracer.buildSpan(SEND_SPAN_NAME).start();
+            mockTracer.inject(sendSpan2.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(injected2));
+            assertFalse("Expected inject to add values", injected2.isEmpty());
+
+            MessageAnnotationsDescribedType msgAnnotations2 = new MessageAnnotationsDescribedType();
+            msgAnnotations2.setSymbolKeyedAnnotation(ANNOTATION_KEY, injected2);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations1, props, null, new AmqpValueDescribedType(expiredMsgContent));
+
+            testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, msgAnnotations2, null, null, new AmqpValueDescribedType(liveMsgContent), 2);
+
+            ModifiedMatcher modified = new ModifiedMatcher();
+            modified.withDeliveryFailed(equalTo(true));
+            modified.withUndeliverableHere(equalTo(true));
+
+            testPeer.expectDisposition(true, modified, 1, 1);
+            testPeer.expectDisposition(true, new AcceptedMatcher(), 2, 2);
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            Message msg = messageConsumer.receive(3000);
+
+            assertNotNull("Message should have been received", msg);
+            assertTrue(msg instanceof TextMessage);
+            assertEquals("Unexpected message content", liveMsgContent, ((TextMessage)msg).getText());
+            assertNotEquals(expiredMsgContent, liveMsgContent);
+
+            assertNull("expected no active span", mockTracer.activeSpan());
+
+            boolean finishedSpansFound = Wait.waitFor(() -> (mockTracer.finishedSpans().size() == 2), 3000, 10);
+            assertTrue("Did not get finished spans after receive", finishedSpansFound);
+
+            List<MockSpan> finishedSpans = mockTracer.finishedSpans();
+            assertEquals("Expected 2 finished spans: " + finishedSpans, 2, finishedSpans.size());
+
+            Span expiredSpan = finishedSpans.get(0);
+            assertEquals("Unexpected span class", MockSpan.class, expiredSpan.getClass());
+            MockSpan expiredMockSpan = (MockSpan) expiredSpan;
+
+            assertEquals("Expected expired message span to be child of the first send span", sendSpan1.context().spanId(), expiredMockSpan.parentId());
+            assertEquals("Unexpected span operation name", RECEIVE_SPAN_NAME, expiredMockSpan.operationName());
+
+            // Verify tags on the span for expired message
+            Map<String, Object> expiredSpanTags = expiredMockSpan.tags();
+            assertFalse("Expected some tags", expiredSpanTags.isEmpty());
+            assertFalse("Expected error tag not to be set", expiredSpanTags.containsKey(Tags.ERROR.getKey()));
+            assertEquals(Tags.SPAN_KIND_CONSUMER, expiredSpanTags.get(Tags.SPAN_KIND.getKey()));
+            assertEquals(queueName, expiredSpanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
+            assertEquals(COMPONENT, expiredSpanTags.get(Tags.COMPONENT.getKey()));
+
+            // Verify log on the span for expired message
+            List<LogEntry> expiredLogEntries = expiredMockSpan.logEntries();
+            assertEquals("Expected 1 log entry: " + expiredLogEntries, 1, expiredLogEntries.size());
+            Map<String, ?> entryFields = expiredLogEntries.get(0).fields();
+            assertFalse("Expected some log entry fields", entryFields.isEmpty());
+            assertEquals(MESSAGE_EXPIRED, entryFields.get(Fields.EVENT));
+
+            Span deliverySpan = finishedSpans.get(1);
+            assertEquals("Unexpected span class", MockSpan.class, deliverySpan.getClass());
+            MockSpan deliveryMockSpan = (MockSpan) deliverySpan;
+
+            assertEquals("Expected delivery span to be child of the second send span", sendSpan2.context().spanId(), deliveryMockSpan.parentId());
+            assertEquals("Unexpected span operation name", RECEIVE_SPAN_NAME, deliveryMockSpan.operationName());
+
+            // Verify tags on the span for delivered message
+            Map<String, Object> deliveredSpanTags = deliveryMockSpan.tags();
+            assertFalse("Expected some tags", deliveredSpanTags.isEmpty());
+            assertFalse("Expected error tag not to be set", deliveredSpanTags.containsKey(Tags.ERROR.getKey()));
+            assertEquals(Tags.SPAN_KIND_CONSUMER, deliveredSpanTags.get(Tags.SPAN_KIND.getKey()));
+            assertEquals(queueName, deliveredSpanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
+            assertEquals(COMPONENT, deliveredSpanTags.get(Tags.COMPONENT.getKey()));
+
+            // Verify no log on the span for delivered message
+            List<LogEntry> deliveredLogEntries = deliveryMockSpan.logEntries();
+            assertTrue("Expected no log entry: " + deliveredLogEntries, deliveredLogEntries.isEmpty());
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            sendSpan1.finish();
+            sendSpan2.finish();
+            finishedSpans = mockTracer.finishedSpans();
+            assertEquals("Expected 4 finished spans: " + finishedSpans, 4, finishedSpans.size());
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testReceiveWithRedeliveryPolicy() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnectionFactory factory = new JmsConnectionFactory(createPeerURI(testPeer, "jms.redeliveryPolicy.maxRedeliveries=1"));
+
+            MockTracer mockTracer = new MockTracer();
+            JmsTracer tracer = OpenTracingTracerFactory.create(mockTracer);
+            factory.setTracer(tracer);
+
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
+            testPeer.expectBegin();
+
+            Connection connection = factory.createConnection();
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+
+            // Prepare an arriving message with tracing info, but which has also already exceeded the redelivery-policy
+            Map<String,String> injected1 = new HashMap<>();
+            MockSpan sendSpan1 = mockTracer.buildSpan(SEND_SPAN_NAME).start();
+            mockTracer.inject(sendSpan1.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(injected1));
+            assertFalse("Expected inject to add values", injected1.isEmpty());
+
+            MessageAnnotationsDescribedType msgAnnotations1 = new MessageAnnotationsDescribedType();
+            msgAnnotations1.setSymbolKeyedAnnotation(ANNOTATION_KEY, injected1);
+
+            HeaderDescribedType header = new HeaderDescribedType();
+            header.setDeliveryCount(UnsignedInteger.valueOf(2));
+
+            String redeliveredMsgContent = "already-exceeded-redelivery-policy";
+
+            // Also prepare a message which has not exceeded the redelivery policy yet.
+            String liveMsgContent = "still-active";
+
+            Map<String,String> injected2 = new HashMap<>();
+            MockSpan sendSpan2 = mockTracer.buildSpan(SEND_SPAN_NAME).start();
+            mockTracer.inject(sendSpan2.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(injected2));
+            assertFalse("Expected inject to add values", injected2.isEmpty());
+
+            MessageAnnotationsDescribedType msgAnnotations2 = new MessageAnnotationsDescribedType();
+            msgAnnotations2.setSymbolKeyedAnnotation(ANNOTATION_KEY, injected2);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(header, msgAnnotations1, null, null, new AmqpValueDescribedType(redeliveredMsgContent));
+
+            testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, msgAnnotations2, null, null, new AmqpValueDescribedType(liveMsgContent), 2);
+
+            ModifiedMatcher modified = new ModifiedMatcher();
+            modified.withDeliveryFailed(equalTo(true));
+            modified.withUndeliverableHere(equalTo(true));
+
+            testPeer.expectDisposition(true, modified, 1, 1);
+            testPeer.expectDisposition(true, new AcceptedMatcher(), 2, 2);
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            Message msg = messageConsumer.receive(3000);
+
+            assertNotNull("Message should have been received", msg);
+            assertTrue(msg instanceof TextMessage);
+            assertEquals("Unexpected message content", liveMsgContent, ((TextMessage)msg).getText());
+            assertNotEquals(redeliveredMsgContent, liveMsgContent);
+
+            assertNull("expected no active span", mockTracer.activeSpan());
+
+            boolean finishedSpansFound = Wait.waitFor(() -> (mockTracer.finishedSpans().size() == 2), 3000, 10);
+            assertTrue("Did not get finished spans after receive", finishedSpansFound);
+
+            List<MockSpan> finishedSpans = mockTracer.finishedSpans();
+            assertEquals("Expected 2 finished spans: " + finishedSpans, 2, finishedSpans.size());
+
+            Span redeliveredSpan = finishedSpans.get(0);
+            assertEquals("Unexpected span class", MockSpan.class, redeliveredSpan.getClass());
+            MockSpan redeliveredMockSpan = (MockSpan) redeliveredSpan;
+
+            assertEquals("Expected redelivered message span to be child of the first send span", sendSpan1.context().spanId(), redeliveredMockSpan.parentId());
+            assertEquals("Unexpected span operation name", RECEIVE_SPAN_NAME, redeliveredMockSpan.operationName());
+
+            // Verify tags on the span for redelivered message
+            Map<String, Object> redeliveredSpanTags = redeliveredMockSpan.tags();
+            assertFalse("Expected some tags", redeliveredSpanTags.isEmpty());
+            assertFalse("Expected error tag not to be set", redeliveredSpanTags.containsKey(Tags.ERROR.getKey()));
+            assertEquals(Tags.SPAN_KIND_CONSUMER, redeliveredSpanTags.get(Tags.SPAN_KIND.getKey()));
+            assertEquals(queueName, redeliveredSpanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
+            assertEquals(COMPONENT, redeliveredSpanTags.get(Tags.COMPONENT.getKey()));
+
+            // Verify log on the span for redelivered message
+            List<LogEntry> redeliveredLogEntries = redeliveredMockSpan.logEntries();
+            assertEquals("Expected 1 log entry: " + redeliveredLogEntries, 1, redeliveredLogEntries.size());
+            Map<String, ?> entryFields = redeliveredLogEntries.get(0).fields();
+            assertFalse("Expected some log entry fields", entryFields.isEmpty());
+            assertEquals(REDELIVERIES_EXCEEDED, entryFields.get(Fields.EVENT));
+
+            Span deliverySpan = finishedSpans.get(1);
+            assertEquals("Unexpected span class", MockSpan.class, deliverySpan.getClass());
+            MockSpan deliveryMockSpan = (MockSpan) deliverySpan;
+
+            assertEquals("Expected delivery span to be child of the second send span", sendSpan2.context().spanId(), deliveryMockSpan.parentId());
+            assertEquals("Unexpected span operation name", RECEIVE_SPAN_NAME, deliveryMockSpan.operationName());
+
+            // Verify tags on the span for delivered message
+            Map<String, Object> deliveredSpanTags = deliveryMockSpan.tags();
+            assertFalse("Expected some tags", deliveredSpanTags.isEmpty());
+            assertFalse("Expected error tag not to be set", deliveredSpanTags.containsKey(Tags.ERROR.getKey()));
+            assertEquals(Tags.SPAN_KIND_CONSUMER, deliveredSpanTags.get(Tags.SPAN_KIND.getKey()));
+            assertEquals(queueName, deliveredSpanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
+            assertEquals(COMPONENT, deliveredSpanTags.get(Tags.COMPONENT.getKey()));
+
+            // Verify no log on the span for delivered message
+            List<LogEntry> deliveredLogEntries = deliveryMockSpan.logEntries();
+            assertTrue("Expected no log entry: " + deliveredLogEntries, deliveredLogEntries.isEmpty());
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            sendSpan1.finish();
+            sendSpan2.finish();
+            finishedSpans = mockTracer.finishedSpans();
+            assertEquals("Expected 4 finished spans: " + finishedSpans, 4, finishedSpans.size());
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testOnMessage() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnectionFactory factory = new JmsConnectionFactory(createPeerURI(testPeer));
+
+            MockTracer mockTracer = new MockTracer();
+            JmsTracer tracer = OpenTracingTracerFactory.create(mockTracer);
+            factory.setTracer(tracer);
+
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
+            testPeer.expectBegin();
+
+            Connection connection = factory.createConnection();
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+
+            // Prepare an arriving message with tracing info
+            Map<String,String> injected = new HashMap<>();
+            MockSpan sendSpan = mockTracer.buildSpan(SEND_SPAN_NAME).start();
+            mockTracer.inject(sendSpan.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(injected));
+            assertFalse("Expected inject to add values", injected.isEmpty());
+
+            MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType();
+            msgAnnotations.setSymbolKeyedAnnotation(ANNOTATION_KEY, injected);
+
+            String msgContent = "myContent";
+            DescribedType amqpValueContent = new AmqpValueDescribedType(msgContent);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, null, null, amqpValueContent);
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            AtomicReference<Span> activeSpanRef = new AtomicReference<>();
+            AtomicReference<Throwable> throwableRef = new AtomicReference<>();
+            CountDownLatch deliveryRun = new CountDownLatch(1);
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            messageConsumer.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    try {
+                        activeSpanRef.set(mockTracer.activeSpan());
+
+                        deliveryRun.countDown();
+                    } catch (Throwable t) {
+                        throwableRef.set(t);
+                    }
+                }
+            });
+
+            assertTrue("onMessage did not run in timely fashion: " + throwableRef.get(), deliveryRun.await(3000, TimeUnit.MILLISECONDS));
+
+            Span deliverySpan = activeSpanRef.get();
+            assertNotNull("expected an active span during onMessage", deliverySpan);
+            assertEquals("Unexpected span class", MockSpan.class, deliverySpan.getClass());
+            MockSpan deliveryMockSpan = (MockSpan) deliverySpan;
+
+            boolean finishedSpanFound = Wait.waitFor(() -> !(mockTracer.finishedSpans().isEmpty()), 3000, 10);
+            assertTrue("Did not get finished span after onMessage", finishedSpanFound);
+
+            List<MockSpan> finishedSpans = mockTracer.finishedSpans();
+            assertEquals("Expected 1 finished span: " + finishedSpans, 1, finishedSpans.size());
+            assertEquals("Unexpected finished span", deliverySpan, finishedSpans.get(0));
+
+            assertEquals("Expected span to be child of the send span", sendSpan.context().spanId(), deliveryMockSpan.parentId());
+            assertEquals("Unexpected span operation name", ONMESSAGE_SPAN_NAME, deliveryMockSpan.operationName());
+
+            // Verify tags set on the completed span
+            Map<String, Object> spanTags = deliveryMockSpan.tags();
+            assertFalse("Expected some tags", spanTags.isEmpty());
+            assertFalse("Expected error tag not to be set", spanTags.containsKey(Tags.ERROR.getKey()));
+            assertEquals(Tags.SPAN_KIND_CONSUMER, spanTags.get(Tags.SPAN_KIND.getKey()));
+            assertEquals(queueName, spanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
+            assertEquals(COMPONENT, spanTags.get(Tags.COMPONENT.getKey()));
+
+            // Verify no log set on the completed span
+            List<LogEntry> logEntries = deliveryMockSpan.logEntries();
+            assertTrue("Expected no log entry: " + logEntries, logEntries.isEmpty());
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            sendSpan.finish();
+            finishedSpans = mockTracer.finishedSpans();
+            assertEquals("Expected 2 finished spans: " + finishedSpans, 2, finishedSpans.size());
+
+            assertNull("Unexpected error during onMessage", throwableRef.get());
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testOnMessageWithoutTraceInfo() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnectionFactory factory = new JmsConnectionFactory(createPeerURI(testPeer));
+
+            MockTracer mockTracer = new MockTracer();
+            JmsTracer tracer = OpenTracingTracerFactory.create(mockTracer);
+            factory.setTracer(tracer);
+
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
+            testPeer.expectBegin();
+
+            Connection connection = factory.createConnection();
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+
+            // Prepare an arriving message without tracing info
+            String msgContent = "myContent";
+            DescribedType amqpValueContent = new AmqpValueDescribedType(msgContent);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueContent);
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            AtomicReference<Span> activeSpanRef = new AtomicReference<>();
+            AtomicReference<Throwable> throwableRef = new AtomicReference<>();
+            CountDownLatch deliveryRun = new CountDownLatch(1);
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            messageConsumer.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    try {
+                        activeSpanRef.set(mockTracer.activeSpan());
+                        deliveryRun.countDown();
+                    } catch (Throwable t) {
+                        throwableRef.set(t);
+                    }
+                }
+            });
+
+            assertTrue("onMessage did not run in timely fashion: " + throwableRef.get(), deliveryRun.await(3000, TimeUnit.MILLISECONDS));
+
+            Span deliverySpan = activeSpanRef.get();
+            assertNotNull("expected an active span during onMessage", deliverySpan);
+            assertEquals("Unexpected span class", MockSpan.class, deliverySpan.getClass());
+            MockSpan deliveryMockSpan = (MockSpan) deliverySpan;
+
+            boolean finishedSpanFound = Wait.waitFor(() -> !(mockTracer.finishedSpans().isEmpty()), 3000, 10);
+            assertTrue("Did not get finished span after onMessage", finishedSpanFound);
+
+            List<MockSpan> finishedSpans = mockTracer.finishedSpans();
+            assertEquals("Expected 1 finished span: " + finishedSpans, 1, finishedSpans.size());
+            assertEquals("Unexpected finished span", deliverySpan, finishedSpans.get(0));
+
+            assertEquals("Expected span to have no parent as incoming message had no context", 0, deliveryMockSpan.parentId());
+            assertEquals("Unexpected span operation name", ONMESSAGE_SPAN_NAME, deliveryMockSpan.operationName());
+
+            // Verify tags set on the completed span
+            Map<String, Object> spanTags = deliveryMockSpan.tags();
+            assertFalse("Expected some tags", spanTags.isEmpty());
+            assertFalse("Expected error tag not to be set", spanTags.containsKey(Tags.ERROR.getKey()));
+            assertEquals(Tags.SPAN_KIND_CONSUMER, spanTags.get(Tags.SPAN_KIND.getKey()));
+            assertEquals(queueName, spanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
+            assertEquals(COMPONENT, spanTags.get(Tags.COMPONENT.getKey()));
+
+            // Verify no log set on the completed span
+            List<LogEntry> logEntries = deliveryMockSpan.logEntries();
+            assertTrue("Expected no log entry: " + logEntries, logEntries.isEmpty());
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            assertNull("Unexpected error during onMessage", throwableRef.get());
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testOnMessageThrowingException() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnectionFactory factory = new JmsConnectionFactory(createPeerURI(testPeer));
+
+            MockTracer mockTracer = new MockTracer();
+            JmsTracer tracer = OpenTracingTracerFactory.create(mockTracer);
+            factory.setTracer(tracer);
+
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
+            testPeer.expectBegin();
+
+            Connection connection = factory.createConnection();
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+
+            // Prepare an arriving message with tracing info
+            Map<String,String> injected = new HashMap<>();
+            MockSpan sendSpan = mockTracer.buildSpan(SEND_SPAN_NAME).start();
+            mockTracer.inject(sendSpan.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(injected));
+            assertFalse("Expected inject to add values", injected.isEmpty());
+
+            MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType();
+            msgAnnotations.setSymbolKeyedAnnotation(ANNOTATION_KEY, injected);
+
+            String msgContent = "myContent";
+            DescribedType amqpValueContent = new AmqpValueDescribedType(msgContent);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, null, null, amqpValueContent);
+            testPeer.expectDispositionThatIsReleasedAndSettled();
+
+            AtomicReference<Span> activeSpanRef = new AtomicReference<>();
+            AtomicReference<Throwable> throwableRef = new AtomicReference<>();
+            CountDownLatch deliveryRun = new CountDownLatch(1);
+
+            String exceptionMessage = "not-supposed-to-throw-from-onMessage";
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            messageConsumer.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    try {
+                        activeSpanRef.set(mockTracer.activeSpan());
+
+                        deliveryRun.countDown();
+                    } catch (Throwable t) {
+                        throwableRef.set(t);
+                    }
+
+                    throw new RuntimeException(exceptionMessage);
+                }
+            });
+
+            assertTrue("onMessage did not run in timely fashion: " + throwableRef.get(), deliveryRun.await(3000, TimeUnit.MILLISECONDS));
+
+            Span deliverySpan = activeSpanRef.get();
+            assertNotNull("expected an active span during onMessage", deliverySpan);
+            assertEquals("Unexpected span class", MockSpan.class, deliverySpan.getClass());
+            MockSpan deliveryMockSpan = (MockSpan) deliverySpan;
+
+            boolean finishedSpanFound = Wait.waitFor(() -> !(mockTracer.finishedSpans().isEmpty()), 3000, 10);
+            assertTrue("Did not get finished span after onMessage", finishedSpanFound);
+
+            List<MockSpan> finishedSpans = mockTracer.finishedSpans();
+            assertEquals("Expected 1 finished span: " + finishedSpans, 1, finishedSpans.size());
+            assertEquals("Unexpected finished span", deliveryMockSpan, finishedSpans.get(0));
+
+            assertEquals("Expected span to be child of the send span", sendSpan.context().spanId(), deliveryMockSpan.parentId());
+            assertEquals("Unexpected span operation name", ONMESSAGE_SPAN_NAME, deliveryMockSpan.operationName());
+
+            // Verify tags set on the completed span
+            Map<String, Object> spanTags = deliveryMockSpan.tags();
+            assertFalse("Expected some tags", spanTags.isEmpty());
+            assertTrue("Expected error tag to be true", (Boolean) spanTags.get(Tags.ERROR.getKey()));
+            assertEquals(Tags.SPAN_KIND_CONSUMER, spanTags.get(Tags.SPAN_KIND.getKey()));
+            assertEquals(queueName, spanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
+            assertEquals(COMPONENT, spanTags.get(Tags.COMPONENT.getKey()));
+
+            // Verify log set on the completed span
+            List<LogEntry> logEntries = deliveryMockSpan.logEntries();
+            assertEquals("Expected 1 log entry: " + logEntries, 1, logEntries.size());
+
+            Map<String, ?> entryFields = logEntries.get(0).fields();
+            assertFalse("Expected some log entry fields", entryFields.isEmpty());
+            assertEquals(ERROR_EVENT, entryFields.get(Fields.EVENT));
+            Object messageDesc = entryFields.get(Fields.MESSAGE);
+            assertTrue(messageDesc instanceof String);
+            assertTrue(((String) messageDesc).contains("thrown from onMessage"));
+            Object t = entryFields.get(Fields.ERROR_OBJECT);
+            assertNotNull("Expected error object to be set", t);
+            assertTrue(t instanceof RuntimeException);
+            assertTrue(exceptionMessage.equals(((RuntimeException) t).getMessage()));
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            sendSpan.finish();
+            finishedSpans = mockTracer.finishedSpans();
+            assertEquals("Expected 2 finished spans: " + finishedSpans, 2, finishedSpans.size());
+
+            assertNull("Unexpected error during onMessage", throwableRef.get());
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testOnMessageWithExpiredMessage() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnectionFactory factory = new JmsConnectionFactory(createPeerURI(testPeer));
+
+            MockTracer mockTracer = new MockTracer();
+            JmsTracer tracer = OpenTracingTracerFactory.create(mockTracer);
+            factory.setTracer(tracer);
+
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
+            testPeer.expectBegin();
+
+            Connection connection = factory.createConnection();
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+
+            // Prepare an arriving message with tracing info, but which has also already expired
+            Map<String,String> injected1 = new HashMap<>();
+            MockSpan sendSpan1 = mockTracer.buildSpan(SEND_SPAN_NAME).start();
+            mockTracer.inject(sendSpan1.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(injected1));
+            assertFalse("Expected inject to add values", injected1.isEmpty());
+
+            MessageAnnotationsDescribedType msgAnnotations1 = new MessageAnnotationsDescribedType();
+            msgAnnotations1.setSymbolKeyedAnnotation(ANNOTATION_KEY, injected1);
+
+            PropertiesDescribedType props = new PropertiesDescribedType();
+            props.setAbsoluteExpiryTime(new Date(System.currentTimeMillis() - 100));
+
+            String expiredMsgContent = "already-expired";
+
+            // Also prepare a message which is not expired yet.
+            String liveMsgContent = "still-active";
+
+            Map<String,String> injected2 = new HashMap<>();
+            MockSpan sendSpan2 = mockTracer.buildSpan(SEND_SPAN_NAME).start();
+            mockTracer.inject(sendSpan2.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(injected2));
+            assertFalse("Expected inject to add values", injected2.isEmpty());
+
+            MessageAnnotationsDescribedType msgAnnotations2 = new MessageAnnotationsDescribedType();
+            msgAnnotations2.setSymbolKeyedAnnotation(ANNOTATION_KEY, injected2);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations1, props, null, new AmqpValueDescribedType(expiredMsgContent));
+
+            testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, msgAnnotations2, null, null, new AmqpValueDescribedType(liveMsgContent), 2);
+
+            ModifiedMatcher modified = new ModifiedMatcher();
+            modified.withDeliveryFailed(equalTo(true));
+            modified.withUndeliverableHere(equalTo(true));
+
+            testPeer.expectDisposition(true, modified, 1, 1);
+            testPeer.expectDisposition(true, new AcceptedMatcher(), 2, 2);
+
+            AtomicReference<Span> activeSpanRef = new AtomicReference<>();
+            AtomicReference<Throwable> throwableRef = new AtomicReference<>();
+            CountDownLatch deliveryRun = new CountDownLatch(1);
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            messageConsumer.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    try {
+                        activeSpanRef.compareAndSet(null, mockTracer.activeSpan());
+
+                        deliveryRun.countDown();
+                    } catch (Throwable t) {
+                        throwableRef.set(t);
+                    }
+                }
+            });
+
+            assertTrue("onMessage did not run in timely fashion: " + throwableRef.get(), deliveryRun.await(3000, TimeUnit.MILLISECONDS));
+
+            boolean finishedSpansFound = Wait.waitFor(() -> (mockTracer.finishedSpans().size() == 2), 3000, 10);
+            assertTrue("Did not get finished spans after receive", finishedSpansFound);
+
+            Span deliverySpan = activeSpanRef.get();
+            assertNotNull("expected an active span during onMessage", deliverySpan);
+            assertEquals("Unexpected span class", MockSpan.class, deliverySpan.getClass());
+            MockSpan deliveryMockSpan = (MockSpan) deliverySpan;
+
+            List<MockSpan> finishedSpans = mockTracer.finishedSpans();
+            assertEquals("Expected 2 finished spans: " + finishedSpans, 2, finishedSpans.size());
+
+            assertEquals("Expected span to be child of the second send span", sendSpan2.context().spanId(), deliveryMockSpan.parentId());
+            assertEquals("Unexpected span operation name", ONMESSAGE_SPAN_NAME, deliveryMockSpan.operationName());
+
+            Span expiredSpan = finishedSpans.get(0);
+            assertEquals("Unexpected span class", MockSpan.class, expiredSpan.getClass());
+            MockSpan expiredMockSpan = (MockSpan) expiredSpan;
+
+            assertEquals("Expected expired message span to be child of the first send span", sendSpan1.context().spanId(), expiredMockSpan.parentId());
+            assertEquals("Unexpected span operation name", ONMESSAGE_SPAN_NAME, expiredMockSpan.operationName());
+
+            // Verify tags on the span for expired message
+            Map<String, Object> expiredSpanTags = expiredMockSpan.tags();
+            assertFalse("Expected some tags", expiredSpanTags.isEmpty());
+            assertFalse("Expected error tag not to be set", expiredSpanTags.containsKey(Tags.ERROR.getKey()));
+            assertEquals(Tags.SPAN_KIND_CONSUMER, expiredSpanTags.get(Tags.SPAN_KIND.getKey()));
+            assertEquals(queueName, expiredSpanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
+            assertEquals(COMPONENT, expiredSpanTags.get(Tags.COMPONENT.getKey()));
+
+            // Verify log on the span for expired message
+            List<LogEntry> expiredLogEntries = expiredMockSpan.logEntries();
+            assertEquals("Expected 1 log entry: " + expiredLogEntries, 1, expiredLogEntries.size());
+            Map<String, ?> entryFields = expiredLogEntries.get(0).fields();
+            assertFalse("Expected some log entry fields", entryFields.isEmpty());
+            assertEquals(MESSAGE_EXPIRED, entryFields.get(Fields.EVENT));
+
+            assertEquals("Unexpected second finished span", deliveryMockSpan, finishedSpans.get(1));
+            assertEquals("Expected delivery span to be child of the second send span", sendSpan2.context().spanId(), deliveryMockSpan.parentId());
+            assertEquals("Unexpected span operation name", ONMESSAGE_SPAN_NAME, deliveryMockSpan.operationName());
+
+            // Verify tags on the span for delivered message
+            Map<String, Object> deliveredSpanTags = deliveryMockSpan.tags();
+            assertFalse("Expected some tags", deliveredSpanTags.isEmpty());
+            assertFalse("Expected error tag not to be set", deliveredSpanTags.containsKey(Tags.ERROR.getKey()));
+            assertEquals(Tags.SPAN_KIND_CONSUMER, deliveredSpanTags.get(Tags.SPAN_KIND.getKey()));
+            assertEquals(queueName, deliveredSpanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
+            assertEquals(COMPONENT, deliveredSpanTags.get(Tags.COMPONENT.getKey()));
+
+            // Verify no log on the span for delivered message
+            List<LogEntry> deliveredLogEntries = deliveryMockSpan.logEntries();
+            assertTrue("Expected no log entry: " + deliveredLogEntries, deliveredLogEntries.isEmpty());
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            sendSpan1.finish();
+            sendSpan2.finish();
+            finishedSpans = mockTracer.finishedSpans();
+            assertEquals("Expected 4 finished spans: " + finishedSpans, 4, finishedSpans.size());
+
+            assertNull("Unexpected error during onMessage", throwableRef.get());
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testOnMessageWithRedeliveryPolicy() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnectionFactory factory = new JmsConnectionFactory(createPeerURI(testPeer, "jms.redeliveryPolicy.maxRedeliveries=1"));
+
+            MockTracer mockTracer = new MockTracer();
+            JmsTracer tracer = OpenTracingTracerFactory.create(mockTracer);
+            factory.setTracer(tracer);
+
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
+            testPeer.expectBegin();
+
+            Connection connection = factory.createConnection();
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String topicName = "myTopic";
+            Topic topic = session.createTopic(topicName);
+
+            // Prepare an arriving message with tracing info, but which has also already exceeded the redelivery-policy
+            Map<String,String> injected1 = new HashMap<>();
+            MockSpan sendSpan1 = mockTracer.buildSpan(SEND_SPAN_NAME).start();
+            mockTracer.inject(sendSpan1.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(injected1));
+            assertFalse("Expected inject to add values", injected1.isEmpty());
+
+            MessageAnnotationsDescribedType msgAnnotations1 = new MessageAnnotationsDescribedType();
+            msgAnnotations1.setSymbolKeyedAnnotation(ANNOTATION_KEY, injected1);
+
+            HeaderDescribedType header = new HeaderDescribedType();
+            header.setDeliveryCount(UnsignedInteger.valueOf(2));
+
+            String redeliveredMsgContent = "already-exceeded-redelivery-policy";
+
+            // Also prepare a message which has not exceeded the redelivery policy yet.
+            String liveMsgContent = "still-active";
+
+            Map<String,String> injected2 = new HashMap<>();
+            MockSpan sendSpan2 = mockTracer.buildSpan(SEND_SPAN_NAME).start();
+            mockTracer.inject(sendSpan2.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(injected2));
+            assertFalse("Expected inject to add values", injected2.isEmpty());
+
+            MessageAnnotationsDescribedType msgAnnotations2 = new MessageAnnotationsDescribedType();
+            msgAnnotations2.setSymbolKeyedAnnotation(ANNOTATION_KEY, injected2);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(header, msgAnnotations1, null, null, new AmqpValueDescribedType(redeliveredMsgContent));
+
+            testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, msgAnnotations2, null, null, new AmqpValueDescribedType(liveMsgContent), 2);
+
+            ModifiedMatcher modified = new ModifiedMatcher();
+            modified.withDeliveryFailed(equalTo(true));
+            modified.withUndeliverableHere(equalTo(true));
+
+            testPeer.expectDisposition(true, modified, 1, 1);
+            testPeer.expectDisposition(true, new AcceptedMatcher(), 2, 2);
+
+            AtomicReference<Span> activeSpanRef = new AtomicReference<>();
+            AtomicReference<Throwable> throwableRef = new AtomicReference<>();
+            CountDownLatch deliveryRun = new CountDownLatch(1);
+
+            MessageConsumer messageConsumer = session.createConsumer(topic);
+            messageConsumer.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    try {
+                        activeSpanRef.compareAndSet(null, mockTracer.activeSpan());
+
+                        deliveryRun.countDown();
+                    } catch (Throwable t) {
+                        throwableRef.set(t);
+                    }
+                }
+            });
+
+            assertTrue("onMessage did not run in timely fashion: " + throwableRef.get(), deliveryRun.await(3000, TimeUnit.MILLISECONDS));
+
+            boolean finishedSpansFound = Wait.waitFor(() -> (mockTracer.finishedSpans().size() == 2), 3000, 10);
+            assertTrue("Did not get finished spans after receive", finishedSpansFound);
+
+            Span deliverySpan = activeSpanRef.get();
+            assertNotNull("expected an active span during onMessage", deliverySpan);
+            assertEquals("Unexpected span class", MockSpan.class, deliverySpan.getClass());
+            MockSpan deliveryMockSpan = (MockSpan) deliverySpan;
+
+            List<MockSpan> finishedSpans = mockTracer.finishedSpans();
+            assertEquals("Expected 2 finished spans: " + finishedSpans, 2, finishedSpans.size());
+
+            assertEquals("Expected span to be child of the second send span", sendSpan2.context().spanId(), deliveryMockSpan.parentId());
+            assertEquals("Unexpected span operation name", ONMESSAGE_SPAN_NAME, deliveryMockSpan.operationName());
+
+            Span redeliveredSpan = finishedSpans.get(0);
+            assertEquals("Unexpected span class", MockSpan.class, redeliveredSpan.getClass());
+            MockSpan redeliveredMockSpan = (MockSpan) redeliveredSpan;
+
+            assertEquals("Expected redelivered message span to be child of the first send span", sendSpan1.context().spanId(), redeliveredMockSpan.parentId());
+            assertEquals("Unexpected span operation name", ONMESSAGE_SPAN_NAME, redeliveredMockSpan.operationName());
+
+            // Verify tags on the span for redelivered message
+            Map<String, Object> redeliveredSpanTags = redeliveredMockSpan.tags();
+            assertFalse("Expected some tags", redeliveredSpanTags.isEmpty());
+            assertFalse("Expected error tag not to be set", redeliveredSpanTags.containsKey(Tags.ERROR.getKey()));
+            assertEquals(Tags.SPAN_KIND_CONSUMER, redeliveredSpanTags.get(Tags.SPAN_KIND.getKey()));
+            assertEquals(topicName, redeliveredSpanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
+            assertEquals(COMPONENT, redeliveredSpanTags.get(Tags.COMPONENT.getKey()));
+
+            // Verify log on the span for redelivered message
+            List<LogEntry> redeliveredLogEntries = redeliveredMockSpan.logEntries();
+            assertEquals("Expected 1 log entry: " + redeliveredLogEntries, 1, redeliveredLogEntries.size());
+            Map<String, ?> entryFields = redeliveredLogEntries.get(0).fields();
+            assertFalse("Expected some log entry fields", entryFields.isEmpty());
+            assertEquals(REDELIVERIES_EXCEEDED, entryFields.get(Fields.EVENT));
+
+            assertEquals("Unexpected second finished span", deliveryMockSpan, finishedSpans.get(1));
+            assertEquals("Expected delivery span to be child of the second send span", sendSpan2.context().spanId(), deliveryMockSpan.parentId());
+            assertEquals("Unexpected span operation name", ONMESSAGE_SPAN_NAME, deliveryMockSpan.operationName());
+
+            // Verify tags on the span for delivered message
+            Map<String, Object> deliveredSpanTags = deliveryMockSpan.tags();
+            assertFalse("Expected some tags", deliveredSpanTags.isEmpty());
+            assertFalse("Expected error tag not to be set", deliveredSpanTags.containsKey(Tags.ERROR.getKey()));
+            assertEquals(Tags.SPAN_KIND_CONSUMER, deliveredSpanTags.get(Tags.SPAN_KIND.getKey()));
+            assertEquals(topicName, deliveredSpanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
+            assertEquals(COMPONENT, deliveredSpanTags.get(Tags.COMPONENT.getKey()));
+
+            // Verify no log on the span for delivered message
+            List<LogEntry> deliveredLogEntries = deliveryMockSpan.logEntries();
+            assertTrue("Expected no log entry: " + deliveredLogEntries, deliveredLogEntries.isEmpty());
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            sendSpan1.finish();
+            sendSpan2.finish();
+            finishedSpans = mockTracer.finishedSpans();
+            assertEquals("Expected 4 finished spans: " + finishedSpans, 4, finishedSpans.size());
+
+            assertNull("Unexpected error during onMessage", throwableRef.get());
+        }
+    }
+
+    private String createPeerURI(TestAmqpPeer peer) {
+        return createPeerURI(peer, null);
+    }
+
+    private String createPeerURI(TestAmqpPeer peer, String params) {
+        return "amqp://localhost:" + peer.getServerPort() + (params != null ? "?" + params : "");
+    }
+}
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/tracing/opentracing/OpenTracingTracerFactoryTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/tracing/opentracing/OpenTracingTracerFactoryTest.java
new file mode 100644
index 0000000..0b1f616
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/tracing/opentracing/OpenTracingTracerFactoryTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.tracing.opentracing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.net.URI;
+
+import org.apache.qpid.jms.tracing.JmsTracer;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import io.opentracing.Tracer;
+
+public class OpenTracingTracerFactoryTest {
+
+    @Test
+    public void testCreateWithProvidedTracer() {
+        // As used when setting a JmsTracer on the connection factory
+        Tracer mock = Mockito.mock(Tracer.class);
+        JmsTracer jmsTracer  = OpenTracingTracerFactory.create(mock);
+
+        assertEquals("Unexpected tracer instance type", OpenTracingTracer.class, jmsTracer.getClass());
+
+        //Check it doesn't close underlying tracer
+        Mockito.verifyZeroInteractions(mock);
+        jmsTracer.close();
+        Mockito.verifyZeroInteractions(mock);
+    }
+
+    @Test
+    public void testCreateWithProvidedTracerCloseProvider() {
+        // As used when setting a JmsTracer on the connection factory
+        Tracer mock = Mockito.mock(Tracer.class);
+
+        //Check it doesn't close underlying tracer if not asked
+        JmsTracer jmsTracerDontClose  = OpenTracingTracerFactory.create(mock, false);
+        Mockito.verifyZeroInteractions(mock);
+        jmsTracerDontClose.close();
+        Mockito.verifyZeroInteractions(mock);
+
+        //Check it does close underlying tracer when asked
+        JmsTracer jmsTracerClose  = OpenTracingTracerFactory.create(mock, true);
+        Mockito.verifyZeroInteractions(mock);
+        jmsTracerClose.close();
+        Mockito.verify(mock).close();
+        Mockito.verifyNoMoreInteractions(mock);
+    }
+
+    @Test
+    public void testCreateWithURIAndTypeName() throws Exception {
+        // As used when requesting tracing via URI option
+        JmsTracer jmsTracer  = OpenTracingTracerFactory.create(new URI("amqp://localhost:1234"), OpenTracingTracerFactory.TYPE_NAME);
+
+        assertEquals("Unexpected tracer instance type", OpenTracingTracer.class, jmsTracer.getClass());
+    }
+
+    @Test
+    public void testCreateWithURIAndTypeNameUnknown() throws Exception {
+        try {
+            OpenTracingTracerFactory.create(new URI("amqp://localhost:1234"), "unknown");
+            fail("Exception was not thrown");
+        } catch (Exception e) {
+            // Expected
+        }
+    }
+}
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/tracing/opentracing/OpenTracingTracerTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/tracing/opentracing/OpenTracingTracerTest.java
new file mode 100644
index 0000000..e08badf
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/tracing/opentracing/OpenTracingTracerTest.java
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.tracing.opentracing;
+
+import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.ANNOTATION_KEY;
+import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.ARRIVING_SPAN_CTX_CONTEXT_KEY;
+import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.COMPONENT;
+import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.DELIVERY_SETTLED;
+import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.DELIVERY_SPAN_CONTEXT_KEY;
+import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.ONMESSAGE_SCOPE_CONTEXT_KEY;
+import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.ONMESSAGE_SPAN_NAME;
+import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.RECEIVE_SPAN_NAME;
+import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.SEND_SPAN_CONTEXT_KEY;
+import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.SEND_SPAN_NAME;
+import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.STATE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.tracing.JmsTracer;
+import org.apache.qpid.jms.tracing.JmsTracer.DeliveryOutcome;
+import org.apache.qpid.jms.tracing.TraceableMessage;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import io.opentracing.Scope;
+import io.opentracing.Span;
+import io.opentracing.SpanContext;
+import io.opentracing.Tracer;
+import io.opentracing.log.Fields;
+import io.opentracing.mock.MockSpan;
+import io.opentracing.mock.MockSpan.LogEntry;
+import io.opentracing.mock.MockSpan.MockContext;
+import io.opentracing.mock.MockTracer;
+import io.opentracing.noop.NoopTracerFactory;
+import io.opentracing.propagation.Format;
+import io.opentracing.propagation.TextMapAdapter;
+import io.opentracing.tag.Tags;
+import io.opentracing.util.ThreadLocalScope;
+
+public class OpenTracingTracerTest extends QpidJmsTestCase {
+
+    @Captor
+    private ArgumentCaptor<Map<String, String>> annotationMapCaptor;
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @Test
+    public void testCreateAndCloseOmitUnderlyingClose() {
+        // Test when set NOT TO close the underlying Tracer
+        Tracer mockTracer2 = Mockito.mock(Tracer.class);
+        JmsTracer jmsTracer2  = new OpenTracingTracer(mockTracer2, false);
+
+        Mockito.verifyZeroInteractions(mockTracer2);
+        jmsTracer2.close();
+        Mockito.verifyZeroInteractions(mockTracer2);
+    }
+
+    @Test
+    public void testCreateAndClose() {
+        // Test when set TO close the underlying Tracer
+        Tracer mockTracer1 = Mockito.mock(Tracer.class);
+        JmsTracer jmsTracer1  = new OpenTracingTracer(mockTracer1, true);
+
+        Mockito.verifyZeroInteractions(mockTracer1);
+        jmsTracer1.close();
+        Mockito.verify(mockTracer1).close();
+        Mockito.verifyNoMoreInteractions(mockTracer1);
+    }
+
+    @Test
+    public void testSendOperations() {
+        MockTracer mockTracer = new MockTracer();
+        JmsTracer jmsTracer  = new OpenTracingTracer(mockTracer, true);
+        TraceableMessage message = Mockito.mock(TraceableMessage.class);
+        String sendDestination = "myAddress";
+        String sendOutcomeDescription = "myOutcomeDescription";
+
+        // Start send operation
+        jmsTracer.initSend(message, sendDestination);
+
+        assertNull("Did not expect active span to be present", mockTracer.activeSpan());
+
+        ArgumentCaptor<Span> sendSpanCapture = ArgumentCaptor.forClass(Span.class);
+        Mockito.verify(message).setTracingContext(Mockito.eq(SEND_SPAN_CONTEXT_KEY), sendSpanCapture.capture());
+        Mockito.verify(message).setTracingAnnotation(Mockito.eq(ANNOTATION_KEY), annotationMapCaptor.capture());
+        Mockito.verifyNoMoreInteractions(message);
+
+        Span sendSpan = sendSpanCapture.getValue();
+        assertNotNull("expected a span from send operation", sendSpan);
+        Mockito.when(message.getTracingContext(SEND_SPAN_CONTEXT_KEY)).thenReturn(sendSpan);
+        assertEquals("Unexpected span class", MockSpan.class, sendSpan.getClass());
+        MockSpan sendMockSpan = (MockSpan) sendSpan;
+
+        assertEquals("Unexpected span operation name", SEND_SPAN_NAME, sendMockSpan.operationName());
+
+        Map<String, String> annotationValue = annotationMapCaptor.getValue();
+        assertNotNull("expected an annotation from the send operation", annotationValue);
+        Mockito.when(message.getTracingAnnotation(ANNOTATION_KEY)).thenReturn(annotationValue);
+
+        assertTrue("Expected no finished spans", mockTracer.finishedSpans().isEmpty());
+
+        // Finish the send operation
+        jmsTracer.completeSend(message, sendOutcomeDescription);
+
+        Mockito.verify(message).getTracingContext(Mockito.eq(SEND_SPAN_CONTEXT_KEY));
+        Mockito.verifyNoMoreInteractions(message);
+
+        List<MockSpan> finishedSpans = mockTracer.finishedSpans();
+        assertEquals("Expected 1 finished span: " + finishedSpans, 1, finishedSpans.size());
+        assertEquals("Unexpected finished span", sendSpan, finishedSpans.get(0));
+
+        // Verify log set on the completed span
+        List<LogEntry> entries = sendMockSpan.logEntries();
+        assertEquals("Expected 1 log entry: " + entries, 1, entries.size());
+
+        Map<String, ?> entryFields = entries.get(0).fields();
+        assertFalse("Expected some log entry fields", entryFields.isEmpty());
+        assertEquals(sendOutcomeDescription, entryFields.get(STATE));
+        assertEquals(DELIVERY_SETTLED, entryFields.get(Fields.EVENT));
+
+        // Verify tags set on the span
+        Map<String, Object> spanTags = sendMockSpan.tags();
+        assertFalse("Expected some tags", spanTags.isEmpty());
+        assertEquals(Tags.SPAN_KIND_PRODUCER, spanTags.get(Tags.SPAN_KIND.getKey()));
+        assertEquals(sendDestination, spanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
+        assertEquals(COMPONENT, spanTags.get(Tags.COMPONENT.getKey()));
+    }
+
+    @Test
+    public void testSendOperationsWithoutTracingContextToSend() {
+        // Use the NoOp tracer to ensure there is no context to send
+        Tracer noopTracer = NoopTracerFactory.create();
+        JmsTracer jmsTracer  = new OpenTracingTracer(noopTracer, true);
+        TraceableMessage message = Mockito.mock(TraceableMessage.class);
+        String sendDestination = "myAddress";
+        String sendOutcomeDescription = "myOutcomeDescription";
+
+        // Start send operation
+        jmsTracer.initSend(message, sendDestination);
+
+        // Should have cleared the tracing annotation, if there was no trace context injected for this send.
+        Mockito.verify(message).removeTracingAnnotation(Mockito.eq(ANNOTATION_KEY));
+
+        ArgumentCaptor<Span> sendSpanCapture = ArgumentCaptor.forClass(Span.class);
+        Mockito.verify(message).setTracingContext(Mockito.eq(SEND_SPAN_CONTEXT_KEY), sendSpanCapture.capture());
+        Mockito.verifyNoMoreInteractions(message);
+
+        Span sendSpan = sendSpanCapture.getValue();
+        assertNotNull("expected a span from send operation", sendSpan);
+        Mockito.when(message.getTracingContext(SEND_SPAN_CONTEXT_KEY)).thenReturn(sendSpan);
+
+        // Finish the send operation
+        jmsTracer.completeSend(message, sendOutcomeDescription);
+
+        Mockito.verify(message).getTracingContext(Mockito.eq(SEND_SPAN_CONTEXT_KEY));
+        Mockito.verifyNoMoreInteractions(message);
+    }
+
+    @Test
+    public void testReceiveOperations() {
+        MockTracer mockTracer = new MockTracer();
+        JmsTracer jmsTracer  = new OpenTracingTracer(mockTracer, true);
+        String consumerDestination = "myAddress";
+
+        // Prepare an 'arriving' message with tracing info
+        TraceableMessage message = Mockito.mock(TraceableMessage.class);
+
+        Map<String,String> injected = new HashMap<>();
+        MockSpan sendSpan = mockTracer.buildSpan(SEND_SPAN_NAME).start();
+        mockTracer.inject(sendSpan.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(injected));
+
+        assertFalse("Expected inject to add values", injected.isEmpty());
+
+        Mockito.when(message.getTracingAnnotation(ANNOTATION_KEY)).thenReturn(injected);
+
+        assertTrue("Expected no finished spans", mockTracer.finishedSpans().isEmpty());
+
+        // Do the receive operation
+        jmsTracer.syncReceive(message, consumerDestination, DeliveryOutcome.DELIVERED);
+
+        ArgumentCaptor<SpanContext> sendSpanContextCapture = ArgumentCaptor.forClass(SpanContext.class);
+        Mockito.verify(message).getTracingAnnotation(Mockito.eq(ANNOTATION_KEY));
+        Mockito.verify(message).setTracingContext(Mockito.eq(ARRIVING_SPAN_CTX_CONTEXT_KEY), sendSpanContextCapture.capture());
+
+        SpanContext sendContext = sendSpanContextCapture.getValue();
+        assertNotNull("expected a span context from extract operation", sendContext);
+        assertEquals("Unexpected context class", MockContext.class, sendContext.getClass());
+        assertEquals("Extracted context spanId did not match original", sendSpan.context().spanId(), ((MockContext) sendContext).spanId());
+
+        ArgumentCaptor<Span> deliverySpanCapture = ArgumentCaptor.forClass(Span.class);
+        Mockito.verify(message).setTracingContext(Mockito.eq(DELIVERY_SPAN_CONTEXT_KEY), deliverySpanCapture.capture());
+        Mockito.verifyNoMoreInteractions(message);
+
+        Span deliverySpan = deliverySpanCapture.getValue();
+        assertNotNull("expected a span from receive operation", deliverySpan);
+        assertEquals("Unexpected span class", MockSpan.class, deliverySpan.getClass());
+        MockSpan deliveryMockSpan = (MockSpan) deliverySpan;
+
+        assertEquals("Unexpected span operation name", RECEIVE_SPAN_NAME, deliveryMockSpan.operationName());
+
+        List<MockSpan> finishedSpans = mockTracer.finishedSpans();
+        assertEquals("Expected 1 finished span: " + finishedSpans, 1, finishedSpans.size());
+        assertEquals("Unexpected finished span", deliverySpan, finishedSpans.get(0));
+
+        assertEquals("Expected span to be child of 'send' span", sendSpan.context().spanId(), deliveryMockSpan.parentId());
+
+        // Verify tags set on the span
+        Map<String, Object> spanTags = deliveryMockSpan.tags();
+        assertFalse("Expected some tags", spanTags.isEmpty());
+        assertEquals(Tags.SPAN_KIND_CONSUMER, spanTags.get(Tags.SPAN_KIND.getKey()));
+        assertEquals(consumerDestination, spanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
+        assertEquals(COMPONENT, spanTags.get(Tags.COMPONENT.getKey()));
+    }
+
+    @Test
+    public void testReceiveWithoutTracingContext() {
+        MockTracer mockTracer = new MockTracer();
+        JmsTracer jmsTracer  = new OpenTracingTracer(mockTracer, true);
+        String consumerDestination = "myAddress";
+
+        // Prepare an 'arriving' message without tracing info
+        TraceableMessage message = Mockito.mock(TraceableMessage.class);
+        Mockito.when(message.getTracingAnnotation(ANNOTATION_KEY)).thenReturn(null);
+
+        assertTrue("Expected no finished spans", mockTracer.finishedSpans().isEmpty());
+
+        // Do the receive operation, verify behaviour after extract yields no context.
+        jmsTracer.syncReceive(message, consumerDestination, DeliveryOutcome.DELIVERED);
+
+        ArgumentCaptor<Span> deliverySpanCapture = ArgumentCaptor.forClass(Span.class);
+        Mockito.verify(message).getTracingAnnotation(Mockito.eq(ANNOTATION_KEY));
+        Mockito.verify(message).setTracingContext(Mockito.eq(DELIVERY_SPAN_CONTEXT_KEY), deliverySpanCapture.capture());
+        Mockito.verifyNoMoreInteractions(message);
+
+        Span deliverySpan = deliverySpanCapture.getValue();
+        assertNotNull("expected a span from receive operation", deliverySpan);
+        assertEquals("Unexpected span class", MockSpan.class, deliverySpan.getClass());
+        MockSpan deliveryMockSpan = (MockSpan) deliverySpan;
+
+        assertEquals("Unexpected span operation name", RECEIVE_SPAN_NAME, deliveryMockSpan.operationName());
+
+        List<MockSpan> finishedSpans = mockTracer.finishedSpans();
+        assertEquals("Expected 1 finished span: " + finishedSpans, 1, finishedSpans.size());
+        assertEquals("Unexpected finished span", deliverySpan, finishedSpans.get(0));
+
+        assertEquals("Expected span to have no parent span", 0, deliveryMockSpan.parentId());
+
+        // Verify tags set on the span
+        Map<String, Object> spanTags = deliveryMockSpan.tags();
+        assertFalse("Expected some tags", spanTags.isEmpty());
+        assertEquals(Tags.SPAN_KIND_CONSUMER, spanTags.get(Tags.SPAN_KIND.getKey()));
+        assertEquals(consumerDestination, spanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
+        assertEquals(COMPONENT, spanTags.get(Tags.COMPONENT.getKey()));
+    }
+
+    @Test
+    public void testOnMessageOperations() {
+        MockTracer mockTracer = new MockTracer();
+        JmsTracer jmsTracer  = new OpenTracingTracer(mockTracer, true);
+        String consumerDestination = "myAddress";
+
+        // Prepare an 'arriving' message with tracing info
+        TraceableMessage message = Mockito.mock(TraceableMessage.class);
+
+        Map<String,String> injected = new HashMap<>();
+        MockSpan sendSpan = mockTracer.buildSpan(SEND_SPAN_NAME).start();
+        mockTracer.inject(sendSpan.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(injected));
+
+        assertFalse("Expected inject to add values", injected.isEmpty());
+
+        Mockito.when(message.getTracingAnnotation(ANNOTATION_KEY)).thenReturn(injected);
+
+        assertTrue("Expected no finished spans", mockTracer.finishedSpans().isEmpty());
+
+        // Do the onMessage init operation
+        jmsTracer.asyncDeliveryInit(message, consumerDestination);
+
+        ArgumentCaptor<SpanContext> sendSpanContextCapture = ArgumentCaptor.forClass(SpanContext.class);
+        Mockito.verify(message).getTracingAnnotation(Mockito.eq(ANNOTATION_KEY));
+        Mockito.verify(message).setTracingContext(Mockito.eq(ARRIVING_SPAN_CTX_CONTEXT_KEY), sendSpanContextCapture.capture());
+
+        SpanContext sendContext = sendSpanContextCapture.getValue();
+        assertNotNull("expected a span context from extract operation", sendContext);
+        assertEquals("Unexpected context class", MockContext.class, sendContext.getClass());
+        assertEquals("Extracted context did not match original", sendSpan.context().spanId(), ((MockContext) sendContext).spanId());
+
+        assertTrue("Expected no finished spans", mockTracer.finishedSpans().isEmpty());
+
+        ArgumentCaptor<Span> deliverySpanCapture = ArgumentCaptor.forClass(Span.class);
+        ArgumentCaptor<Scope> deliveryScopeCapture = ArgumentCaptor.forClass(Scope.class);
+        Mockito.verify(message).setTracingContext(Mockito.eq(DELIVERY_SPAN_CONTEXT_KEY), deliverySpanCapture.capture());
+        Mockito.verify(message).setTracingContext(Mockito.eq(ONMESSAGE_SCOPE_CONTEXT_KEY), deliveryScopeCapture.capture());
+        Mockito.verifyNoMoreInteractions(message);
+
+        assertNotNull("Expected active span to be present", mockTracer.activeSpan());
+
+        Span deliverySpan = deliverySpanCapture.getValue();
+        assertNotNull("expected a span from onMessage operation", deliverySpan);
+        assertEquals("Unexpected span class", MockSpan.class, deliverySpan.getClass());
+        MockSpan deliveryMockSpan = (MockSpan) deliverySpan;
+
+        assertEquals("Unexpected span operation name", ONMESSAGE_SPAN_NAME, deliveryMockSpan.operationName());
+
+        Scope deliveryScope = deliveryScopeCapture.getValue();
+        assertNotNull("expected a scope from onMessage operation", deliveryScope);
+        assertEquals("Unexpected scope class", ThreadLocalScope.class, deliveryScope.getClass());
+
+        Mockito.when(message.getTracingContext(DELIVERY_SPAN_CONTEXT_KEY)).thenReturn(deliverySpan);
+        Mockito.when(message.removeTracingContext(ONMESSAGE_SCOPE_CONTEXT_KEY)).thenReturn(deliveryScope);
+
+        assertTrue("Expected no finished spans", mockTracer.finishedSpans().isEmpty());
+
+        // Do the onMessage completion operation
+        jmsTracer.asyncDeliveryComplete(message, DeliveryOutcome.DELIVERED, null);
+
+        Mockito.verify(message).getTracingContext(Mockito.eq(DELIVERY_SPAN_CONTEXT_KEY));
+        Mockito.verify(message).removeTracingContext(Mockito.eq(ONMESSAGE_SCOPE_CONTEXT_KEY));
+        Mockito.verifyNoMoreInteractions(message);
+
+        List<MockSpan> finishedSpans = mockTracer.finishedSpans();
+        assertEquals("Expected 1 finished span: " + finishedSpans, 1, finishedSpans.size());
+        assertEquals("Unexpected finished span", deliverySpan, finishedSpans.get(0));
+
+        assertEquals("Expected span to be child of 'send' span", sendSpan.context().spanId(), deliveryMockSpan.parentId());
+
+        // Verify tags set on the span
+        Map<String, Object> spanTags = deliveryMockSpan.tags();
+        assertFalse("Expected some tags", spanTags.isEmpty());
+        assertEquals(Tags.SPAN_KIND_CONSUMER, spanTags.get(Tags.SPAN_KIND.getKey()));
+        assertEquals(consumerDestination, spanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
+        assertEquals(COMPONENT, spanTags.get(Tags.COMPONENT.getKey()));
+    }
+
+    @Test
+    public void testOnMessageCompletionClosesScopeAndSpan() {
+        MockTracer mockTracer = new MockTracer();
+        JmsTracer jmsTracer  = new OpenTracingTracer(mockTracer, true);
+
+        // Prepare a message with tracing context
+        TraceableMessage message = Mockito.mock(TraceableMessage.class);
+        Span deliverySpan = Mockito.mock(Span.class);
+        Scope deliveryScope = Mockito.mock(Scope.class);
+
+        Mockito.when(message.getTracingContext(DELIVERY_SPAN_CONTEXT_KEY)).thenReturn(deliverySpan);
+        Mockito.when(message.removeTracingContext(ONMESSAGE_SCOPE_CONTEXT_KEY)).thenReturn(deliveryScope);
+
+        // Do the onMessage completion operation
+        jmsTracer.asyncDeliveryComplete(message, DeliveryOutcome.DELIVERED, null);
+
+        Mockito.verify(message).getTracingContext(Mockito.eq(DELIVERY_SPAN_CONTEXT_KEY));
+        Mockito.verify(message).removeTracingContext(Mockito.eq(ONMESSAGE_SCOPE_CONTEXT_KEY));
+        Mockito.verifyNoMoreInteractions(message);
+
+        // Verify the span and scope are closed
+        Mockito.verify(deliverySpan).finish();
+        Mockito.verifyNoMoreInteractions(deliverySpan);
+
+        Mockito.verify(deliveryScope).close();
+        Mockito.verifyNoMoreInteractions(deliveryScope);
+    }
+
+    @Test
+    public void testOnMessageOperationsWithoutTracingContext() {
+        MockTracer mockTracer = new MockTracer();
+        JmsTracer jmsTracer  = new OpenTracingTracer(mockTracer, true);
+        String consumerDestination = "myAddress";
+
+        // Prepare an 'arriving' message without the tracing info
+        TraceableMessage message = Mockito.mock(TraceableMessage.class);
+
+        Mockito.when(message.getTracingAnnotation(ANNOTATION_KEY)).thenReturn(null);
+
+        assertTrue("Expected no finished spans", mockTracer.finishedSpans().isEmpty());
+        assertNull("Did not expect active span to be present", mockTracer.activeSpan());
+
+        // Do the onMessage init operation, verify behaviour after extract yields no context.
+        jmsTracer.asyncDeliveryInit(message, consumerDestination);
+
+        ArgumentCaptor<Span> deliverySpanCapture = ArgumentCaptor.forClass(Span.class);
+        ArgumentCaptor<Scope> deliveryScopeCapture = ArgumentCaptor.forClass(Scope.class);
+        Mockito.verify(message).getTracingAnnotation(Mockito.eq(ANNOTATION_KEY));
+        Mockito.verify(message).setTracingContext(Mockito.eq(DELIVERY_SPAN_CONTEXT_KEY), deliverySpanCapture.capture());
+        Mockito.verify(message).setTracingContext(Mockito.eq(ONMESSAGE_SCOPE_CONTEXT_KEY), deliveryScopeCapture.capture());
+        Mockito.verifyNoMoreInteractions(message);
+
+        assertNotNull("Expected active span to be present", mockTracer.activeSpan());
+
+        Span deliverySpan = deliverySpanCapture.getValue();
+        assertNotNull("expected a span from onMessage operation", deliverySpan);
+        assertEquals("Unexpected span class", MockSpan.class, deliverySpan.getClass());
+        MockSpan deliveryMockSpan = (MockSpan) deliverySpan;
+
+        assertEquals("Unexpected span operation name", ONMESSAGE_SPAN_NAME, deliveryMockSpan.operationName());
+
+        Scope deliveryScope = deliveryScopeCapture.getValue();
+        assertNotNull("expected a scope from onMessage operation", deliveryScope);
+        assertEquals("Unexpected scope class", ThreadLocalScope.class, deliveryScope.getClass());
+
+        Mockito.when(message.getTracingContext(DELIVERY_SPAN_CONTEXT_KEY)).thenReturn(deliverySpan);
+        Mockito.when(message.removeTracingContext(ONMESSAGE_SCOPE_CONTEXT_KEY)).thenReturn(deliveryScope);
+
+        assertTrue("Expected no finished spans", mockTracer.finishedSpans().isEmpty());
+
+        // Do the onMessage completion operation
+        jmsTracer.asyncDeliveryComplete(message, DeliveryOutcome.DELIVERED, null);
+
+        Mockito.verify(message).getTracingContext(Mockito.eq(DELIVERY_SPAN_CONTEXT_KEY));
+        Mockito.verify(message).removeTracingContext(Mockito.eq(ONMESSAGE_SCOPE_CONTEXT_KEY));
+        Mockito.verifyNoMoreInteractions(message);
+
+        List<MockSpan> finishedSpans = mockTracer.finishedSpans();
+        assertEquals("Expected 1 finished span: " + finishedSpans, 1, finishedSpans.size());
+        assertEquals("Unexpected finished span", deliverySpan, finishedSpans.get(0));
+
+        assertEquals("Expected span to have no parent span", 0, deliveryMockSpan.parentId());
+
+        // Verify tags set on the span
+        Map<String, Object> spanTags = deliveryMockSpan.tags();
+        assertFalse("Expected some tags", spanTags.isEmpty());
+        assertEquals(Tags.SPAN_KIND_CONSUMER, spanTags.get(Tags.SPAN_KIND.getKey()));
+        assertEquals(consumerDestination, spanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
+        assertEquals(COMPONENT, spanTags.get(Tags.COMPONENT.getKey()));
+    }
+}
diff --git a/qpid-jms-docs/Configuration.md b/qpid-jms-docs/Configuration.md
index a08c264..87d95bd 100644
--- a/qpid-jms-docs/Configuration.md
+++ b/qpid-jms-docs/Configuration.md
@@ -104,6 +104,7 @@ The options apply to the behaviour of the JMS objects such as Connection, Sessio
 + **jms.populateJMSXUserID** Controls whether a MessageProducer will populate the JMSXUserID value for each sent message using the authenticated username from the connection.  This value defaults to false and the JMSXUserID for all sent message will not be populated.
 + **jms.awaitClientID** Controls whether a Connection with no ClientID configured in the URI will wait for a ClientID being set programatically (or the connection being used otherwise to signal none can be set) before sending the AMQP connection Open. Defaults to true.
 + **jms.useDaemonThread** Controls whether a Connection will use a daemon thread for its executor. Defaults to false to ensure a non-daemon thread is present by default.
++ **jms.tracing** Sets the type name of a tracing provider to use for the connection(s) created by the factory. Supported values are "opentracing" and "noop". Default is unset, effectively noop.
 
 The Prefetch Policy controls how many messages the remote peer can send to the client and be held in a prefetch buffer for each consumer instance.
 
@@ -289,6 +290,48 @@ When debugging some issues, it may sometimes be useful to enable additional prot
 + Set the environment variable (not Java system property) *PN_TRACE_FRM* to *true*, which will cause Proton to emit frame logging to stdout.
 + Add the option *amqp.traceFrames=true* to your connection URI to have the client add a protocol tracer to Proton, and configure the *org.apache.qpid.jms.provider.amqp.FRAMES* Logger to *TRACE* level to include the output in your logs.
 
+## Tracing
+
+The client can perform distributed tracing of message production and consumption using an [OpenTracing](https://opentracing.io/) implementation.
+
+When tracing is enabled, upon producing messages a Span is created for the AMQP delivery process, with the delivery outcome logged on it upon completion. When consuming messages, an active Span is created while the onMessage method of a MessageListener is called. For synchronous receive calls, a Span is created and finished internally while returning a Message. This is also the case for any messages expired before delivery or which exceed the configured redelivery policy.
+
+There are two ways of enabling this support:
+
+1.  Configure the client via URI option to utilise the *GlobalTracer* independently set by the application.
+
+    The tracing implementation can be enabled using the *jms.tracing* URI option:
+
+        amqp://localhost:5672?jms.tracing=opentracing
+
+    The application must create a Tracer and register it as the GlobalTracer. An overview of doing this would be:
+
+        io.opentracing.Tracer tracer = ...;
+        io.opentracing.util.GlobalTracer.registerIfAbsent(tracer);
+
+    If no registration is performed, the GlobalTracer acts as a no-op Tracer.
+
+2.  Set a JmsTracer instance to use directly on the JmsConnectionFactory object.
+
+        io.opentracing.Tracer tracer = ...;
+        org.apache.qpid.jms.tracing.JmsTracer jmsTracer =
+                org.apache.qpid.jms.tracing.opentracing.OpenTracingTracerFactory.create(tracer);
+
+        JmsConnectionFactory connectionFactory = ...;
+        connectionFactory.setTracer(jmsTracer);
+
+    This method of enabling tracing overrides the URI configuration option if also set.
+
+In order to perform tracing a suitable OpenTracing implementation must be provided by the application (along with the io.opentracing:opentracing-api and io.opentracing:opentracing-util dependencies, should the tracing implementation not provide them).
+
+An example distributed tracing system would be [Jaeger](https://www.jaegertracing.io/). The related application dependency to utilise it would be:
+
+    <dependency>
+      <groupId>io.jaegertracing</groupId>
+      <artifactId>jaeger-client</artifactId>
+      <version>${jaeger-version}</version>
+    </dependency>
+
 ## Extended Session Acknowledgement modes
 
 The client supports two additional session acknowledgement modes beyond the standard JMS specification modes.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org