You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zipkin.apache.org by ad...@apache.org on 2019/05/31 07:13:26 UTC

[incubator-zipkin-brave] branch messaging-refactor created (now 9c21af4)

This is an automated email from the ASF dual-hosted git repository.

adriancole pushed a change to branch messaging-refactor
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin-brave.git.


      at 9c21af4  Refactors to reduce injector complexity and duplicate code

This branch includes the following new commits:

     new 9c21af4  Refactors to reduce injector complexity and duplicate code

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-zipkin-brave] 01/01: Refactors to reduce injector complexity and duplicate code

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adriancole pushed a commit to branch messaging-refactor
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin-brave.git

commit 9c21af4597f89ceef7d6158b7054a05213117680
Author: Adrian Cole <ac...@pivotal.io>
AuthorDate: Fri May 31 15:12:34 2019 +0800

    Refactors to reduce injector complexity and duplicate code
    
    this adds messaging processor code and some cleanups. there's
    more to do and tests don't pass.
---
 .../jms/src/main/java/brave/jms/JmsAdapter.java    | 121 +++++----------
 .../jms/src/main/java/brave/jms/JmsTracing.java    | 121 ++++++++++-----
 .../java/brave/jms/TracingCompletionListener.java  |  59 +++++--
 .../java/brave/jms/TracingExceptionListener.java   |   4 +-
 .../main/java/brave/jms/TracingJMSConsumer.java    |  21 ++-
 .../main/java/brave/jms/TracingJMSProducer.java    | 105 ++++++-------
 .../java/brave/jms/TracingMessageConsumer.java     |  29 ++--
 .../java/brave/jms/TracingMessageListener.java     |  62 ++------
 .../java/brave/jms/TracingMessageProducer.java     | 170 +++++++++++----------
 .../jms/src/test/java/brave/jms/JmsTest.java       |  11 --
 .../brave/jms/TracingCompletionListenerTest.java   |  47 +++---
 .../java/brave/kafka/clients/KafkaTracing.java     | 150 ++++++++----------
 .../java/brave/kafka/clients/TracingConsumer.java  |  99 +++++-------
 .../java/brave/kafka/clients/TracingProducer.java  | 153 +++++++++----------
 .../brave/kafka/clients/TracingCallbackTest.java   |  30 ++--
 instrumentation/messaging/pom.xml                  |  12 ++
 .../main/java/brave/messaging/ConsumerHandler.java | 143 +++++++++++++++++
 .../main/java/brave/messaging/MessageAdapter.java  |  36 -----
 .../brave/messaging/MessageConsumerAdapter.java    |  36 -----
 .../brave/messaging/MessageProducerAdapter.java    |  36 -----
 .../java/brave/messaging/MessagingAdapter.java     |  73 +++++++++
 .../brave/messaging/MessagingConsumerHandler.java  | 133 ----------------
 .../brave/messaging/MessagingConsumerParser.java   |  44 ------
 .../java/brave/messaging/MessagingHandler.java     |  45 ------
 ...ChannelAdapter.java => MessagingOperation.java} |  17 +--
 .../main/java/brave/messaging/MessagingParser.java |  79 ++++++----
 .../brave/messaging/MessagingProducerHandler.java  |  83 ----------
 .../brave/messaging/MessagingProducerParser.java   |  42 -----
 .../java/brave/messaging/MessagingTracing.java     |  51 ++-----
 .../java/brave/messaging/ProcessorHandler.java     |  71 +++++++++
 .../main/java/brave/messaging/ProducerHandler.java |  94 ++++++++++++
 31 files changed, 1013 insertions(+), 1164 deletions(-)

diff --git a/instrumentation/jms/src/main/java/brave/jms/JmsAdapter.java b/instrumentation/jms/src/main/java/brave/jms/JmsAdapter.java
index 5c87218..a5252be 100644
--- a/instrumentation/jms/src/main/java/brave/jms/JmsAdapter.java
+++ b/instrumentation/jms/src/main/java/brave/jms/JmsAdapter.java
@@ -16,116 +16,67 @@
  */
 package brave.jms;
 
-import brave.messaging.ChannelAdapter;
-import brave.messaging.MessageConsumerAdapter;
-import brave.messaging.MessageProducerAdapter;
+import brave.messaging.MessagingAdapter;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.Queue;
 import javax.jms.Topic;
 
-import static brave.jms.JmsTracing.JMS_QUEUE;
-import static brave.jms.JmsTracing.JMS_TOPIC;
+abstract class JmsAdapter<T> extends MessagingAdapter<Destination, T, T> {
+  final String remoteServiceName;
 
-class JmsAdapter {
-
-  static class JmsMessageConsumerAdapter implements MessageConsumerAdapter<Message> {
-
-    final JmsTracing jmsTracing;
-
-    JmsMessageConsumerAdapter(JmsTracing jmsTracing) {
-      this.jmsTracing = jmsTracing;
-    }
-
-    static JmsMessageConsumerAdapter create(JmsTracing jmsTracing) {
-      return new JmsMessageConsumerAdapter(jmsTracing);
-    }
+  JmsAdapter(JmsTracing jmsTracing) {
+    remoteServiceName = jmsTracing.remoteServiceName;
+  }
 
-    @Override public String operation(Message message) {
-      return "receive";
+  static final class MessageAdapter extends JmsAdapter<Message> {
+    MessageAdapter(JmsTracing jmsTracing) {
+      super(jmsTracing);
     }
 
-    @Override public String identifier(Message message) {
+    @Override public String correlationId(Message message) {
       try {
         return message.getJMSMessageID();
-      } catch (JMSException e) {
+      } catch (JMSException ignored) {
         // don't crash on wonky exceptions!
       }
       return null;
     }
-
-    @Override public String identifierTagKey() {
-      return "jms.message_id";
-    }
   }
 
-  static class JmsMessageProducerAdapter implements MessageProducerAdapter<Message> {
-
-    final JmsTracing jmsTracing;
-
-    JmsMessageProducerAdapter(JmsTracing jmsTracing) {
-      this.jmsTracing = jmsTracing;
-    }
-
-    static JmsMessageProducerAdapter create(JmsTracing jmsTracing) {
-      return new JmsMessageProducerAdapter(jmsTracing);
-    }
-
-    @Override public String operation(Message message) {
-      return "send";
-    }
+  @Override public T carrier(T message) {
+    return message;
+  }
 
-    @Override public String identifier(Message message) {
-      try {
-        return message.getJMSMessageID();
-      } catch (JMSException e) {
-        // don't crash on wonky exceptions!
+  @Override public String channel(Destination channel) {
+    try {
+      if (channel instanceof Queue) {
+        return ((Queue) channel).getQueueName();
+      } else if (channel instanceof Topic) {
+        return ((Topic) channel).getTopicName();
       }
-      return null;
-    }
-
-    @Override public String identifierTagKey() {
-      return null;
+      // TODO: we could use toString here..
+    } catch (JMSException ignored) {
+      // don't crash on wonky exceptions!
     }
+    return null;
   }
 
-  static class JmsChannelAdapter implements ChannelAdapter<Destination> {
-
-    final JmsTracing jmsTracing;
-
-    JmsChannelAdapter(JmsTracing jmsTracing) {
-      this.jmsTracing = jmsTracing;
-    }
-
-    static JmsChannelAdapter create(JmsTracing jmsTracing) {
-      return new JmsChannelAdapter(jmsTracing);
-    }
-
-    @Override public String channel(Destination destination) {
-      try {
-        if (destination instanceof Queue) {
-          return ((Queue) destination).getQueueName();
-        } else if (destination instanceof Topic) {
-          return ((Topic) destination).getTopicName();
-        }
-      } catch (JMSException ignored) {
-        // don't crash on wonky exceptions!
-      }
-      return null;
+  @Override public String channelType(Destination channel) {
+    if (channel instanceof Queue) {
+      return "queue";
+    } else if (channel instanceof Topic) {
+      return "topic";
     }
+    return null;
+  }
 
-    @Override public String channelTagKey(Destination destination) {
-      if (destination instanceof Queue) {
-        return JMS_QUEUE;
-      } else if (destination instanceof Topic) {
-        return JMS_TOPIC;
-      }
-      return null;
-    }
+  @Override public String messageKey(T message) {
+    return null;
+  }
 
-    @Override public String remoteServiceName(Destination message) {
-      return jmsTracing.remoteServiceName;
-    }
+  @Override public String brokerName(Destination channel) {
+    return remoteServiceName;
   }
 }
diff --git a/instrumentation/jms/src/main/java/brave/jms/JmsTracing.java b/instrumentation/jms/src/main/java/brave/jms/JmsTracing.java
index bc0531a..383bbf2 100644
--- a/instrumentation/jms/src/main/java/brave/jms/JmsTracing.java
+++ b/instrumentation/jms/src/main/java/brave/jms/JmsTracing.java
@@ -17,9 +17,18 @@
 package brave.jms;
 
 import brave.Span;
+import brave.SpanCustomizer;
 import brave.Tracing;
+import brave.internal.Nullable;
+import brave.jms.JmsAdapter.MessageAdapter;
+import brave.messaging.ConsumerHandler;
+import brave.messaging.MessagingAdapter;
+import brave.messaging.MessagingParser;
 import brave.messaging.MessagingTracing;
+import brave.messaging.ProcessorHandler;
+import brave.messaging.ProducerHandler;
 import brave.propagation.Propagation.Getter;
+import brave.propagation.Propagation.Setter;
 import brave.propagation.TraceContext;
 import brave.propagation.TraceContext.Extractor;
 import brave.propagation.TraceContext.Injector;
@@ -31,7 +40,9 @@ import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
+import javax.jms.Queue;
 import javax.jms.QueueConnection;
+import javax.jms.Topic;
 import javax.jms.TopicConnection;
 import javax.jms.XAConnection;
 import javax.jms.XAConnectionFactory;
@@ -42,8 +53,19 @@ import static brave.propagation.B3SingleFormat.writeB3SingleFormatWithoutParentI
 
 /** Use this class to decorate your Jms consumer / producer and enable Tracing. */
 public final class JmsTracing {
-  static final String JMS_QUEUE = "jms.queue";
-  static final String JMS_TOPIC = "jms.topic";
+  static final Setter<Message, String> SETTER = new Setter<Message, String>() {
+    @Override public void put(Message carrier, String key, String value) {
+      try {
+        carrier.setStringProperty(key, value);
+      } catch (JMSException e) {
+        // don't crash on wonky exceptions!
+      }
+    }
+
+    @Override public String toString() {
+      return "Message::setStringProperty";
+    }
+  };
 
   static final Getter<Message, String> GETTER = new Getter<Message, String>() {
     @Override public String get(Message carrier, String key) {
@@ -69,17 +91,18 @@ public final class JmsTracing {
   }
 
   public static final class Builder {
-    final MessagingTracing msgTracing;
+    final MessagingTracing messageTracing;
     String remoteServiceName = "jms";
 
     Builder(Tracing tracing) {
       if (tracing == null) throw new NullPointerException("tracing == null");
-      this.msgTracing = MessagingTracing.create(tracing);
+      this.messageTracing = MessagingTracing.newBuilder(tracing)
+        .parser(new LegacyMessagingParser()).build();
     }
 
-    Builder(MessagingTracing msgTracing) {
-      if (msgTracing == null) throw new NullPointerException("msgTracing == null");
-      this.msgTracing = msgTracing;
+    Builder(MessagingTracing messageTracing) {
+      if (messageTracing == null) throw new NullPointerException("messageTracing == null");
+      this.messageTracing = messageTracing;
     }
 
     /**
@@ -95,38 +118,28 @@ public final class JmsTracing {
     }
   }
 
-  final MessagingTracing msgTracing;
-  final Extractor<Message> extractor;
-  final Injector<Message> injector;
-  final JmsAdapter.JmsChannelAdapter channelAdapter;
-  final JmsAdapter.JmsMessageConsumerAdapter consumerMessageAdapter;
-  final JmsAdapter.JmsMessageProducerAdapter producerMessageAdapter;
+  final MessagingTracing messageTracing;
+  final ConsumerHandler<Destination, Message, Message> consumerHandler;
+  final ProcessorHandler<Destination, Message, Message> processorHandler;
+  final ProducerHandler<Destination, Message, Message> producerHandler;
+  final Extractor<Message> messageExtractor;
+  final Injector<Message> messageInjector;
+  final MessageAdapter messageAdapter;
   final String remoteServiceName;
   final Set<String> propagationKeys;
 
   JmsTracing(Builder builder) { // intentionally hidden constructor
-    this.msgTracing = builder.msgTracing;
-    this.extractor = msgTracing.tracing().propagation().extractor(GETTER);
+    this.messageTracing = builder.messageTracing;
+    this.messageExtractor = messageTracing.tracing().propagation().extractor(GETTER);
     this.remoteServiceName = builder.remoteServiceName;
-    this.propagationKeys = new LinkedHashSet<>(msgTracing.tracing().propagation().keys());
-    this.consumerMessageAdapter = JmsAdapter.JmsMessageConsumerAdapter.create(this);
-    this.channelAdapter = JmsAdapter.JmsChannelAdapter.create(this);
-    this.producerMessageAdapter = JmsAdapter.JmsMessageProducerAdapter.create(this);
-    this.injector = new Injector<Message>() {
-      @Override public void inject(TraceContext traceContext, Message carrier) {
-        try {
-          PropertyFilter.MESSAGE.filterProperties(carrier, propagationKeys);
-          carrier.setStringProperty("b3", writeB3SingleFormatWithoutParentId(traceContext));
-        } catch (JMSException e) {
-          // don't crash on wonky exceptions!
-        }
-      }
-
-      @Override
-      public String toString() {
-        return "Message::setStringProperty(\"b3\",singleHeaderFormatWithoutParent)";
-      }
-    };
+    this.propagationKeys = new LinkedHashSet<>(messageTracing.tracing().propagation().keys());
+    this.messageAdapter = new MessageAdapter(this);
+    this.messageInjector = new FilteringInjector<>(PropertyFilter.MESSAGE, propagationKeys, SETTER);
+    consumerHandler =
+      ConsumerHandler.create(messageTracing, messageAdapter, messageExtractor, messageInjector);
+    processorHandler = ProcessorHandler.create(messageTracing, consumerHandler);
+    producerHandler =
+      ProducerHandler.create(messageTracing, messageAdapter, messageExtractor, messageInjector);
   }
 
   public Connection connection(Connection connection) {
@@ -200,8 +213,7 @@ public final class JmsTracing {
    * one couldn't be extracted.
    */
   public Span nextSpan(Message message) {
-    return msgTracing.nextSpan(channelAdapter, consumerMessageAdapter, extractor, message,
-      destination(message));
+    return processorHandler.startProcessor(destination(message), message, false);
   }
 
   //TraceContextOrSamplingFlags extractAndClearMessage(Message message) {
@@ -212,7 +224,7 @@ public final class JmsTracing {
   //  return extracted;
   //}
 
-  Destination destination(Message message) {
+  @Nullable static Destination destination(Message message) {
     try {
       return message.getJMSDestination();
     } catch (JMSException e) {
@@ -220,4 +232,39 @@ public final class JmsTracing {
     }
     return null;
   }
+
+  static class FilteringInjector<C> implements TraceContext.Injector<C> {
+    final PropertyFilter filter;
+    final Set<String> namesToClear;
+    final Setter<C, String> setter;
+
+    FilteringInjector(PropertyFilter filter, Set<String> namesToClear, Setter<C, String> setter) {
+      this.filter = filter;
+      this.namesToClear = namesToClear;
+      this.setter = setter;
+    }
+
+    @Override public void inject(TraceContext traceContext, C carrier) {
+      filter.filterProperties(carrier, namesToClear);
+      setter.put(carrier, "b3", writeB3SingleFormatWithoutParentId(traceContext));
+    }
+
+    @Override public String toString() {
+      return setter + "(\"b3\",singleHeaderFormatWithoutParent)";
+    }
+  }
+
+  static class LegacyMessagingParser extends MessagingParser {
+    @Override
+    protected <Chan, Msg, C> void addMessageTags(MessagingAdapter<Chan, Msg, C> adapter,
+      Chan channel, @Nullable Msg msg, TraceContext context, SpanCustomizer customizer) {
+      String channelName = adapter.channel(channel);
+      if (channelName == null) return;
+      if (channel instanceof Queue) {
+        customizer.tag("jms.queue", channelName);
+      } else if (channel instanceof Topic) {
+        customizer.tag("jms.topic", channelName);
+      }
+    }
+  }
 }
diff --git a/instrumentation/jms/src/main/java/brave/jms/TracingCompletionListener.java b/instrumentation/jms/src/main/java/brave/jms/TracingCompletionListener.java
index 3a7d50a..3df209a 100644
--- a/instrumentation/jms/src/main/java/brave/jms/TracingCompletionListener.java
+++ b/instrumentation/jms/src/main/java/brave/jms/TracingCompletionListener.java
@@ -17,28 +17,64 @@
 package brave.jms;
 
 import brave.Span;
+import brave.internal.Nullable;
+import brave.messaging.ProducerHandler;
 import brave.propagation.CurrentTraceContext;
 import brave.propagation.CurrentTraceContext.Scope;
 import javax.jms.CompletionListener;
+import javax.jms.Destination;
 import javax.jms.Message;
 
 /**
- * Decorates, then finishes a producer span. Allows tracing to record the duration between batching
+ * Decorates, then finishes a producer span. Allows tracing to message the duration between batching
  * for send and actual send.
  */
-@JMS2_0 final class TracingCompletionListener implements CompletionListener {
-  static CompletionListener create(CompletionListener delegate, Span span,
-      CurrentTraceContext current) {
-    if (span.isNoop()) return delegate; // save allocation overhead
-    return new TracingCompletionListener(delegate, span, current);
+@JMS2_0 class TracingCompletionListener<Msg> implements CompletionListener {
+  static <Msg> TracingCompletionListener<Msg> create(@Nullable CompletionListener delegate,
+    ProducerHandler<Destination, Msg, Msg> handler, CurrentTraceContext current,
+    Destination destination, Msg message, Span span) {
+    if (delegate == null) {
+      return new TracingCompletionListener<>(handler, destination, message, span);
+    }
+    return new TracingForwardingCompletionListener<>(delegate, handler, current, destination,
+      message, span);
   }
 
+  final ProducerHandler<Destination, Msg, Msg> handler;
+  final Destination destination;
+  final Msg message;
   final Span span;
+
+  TracingCompletionListener(ProducerHandler<Destination, Msg, Msg> handler, Destination destination,
+    Msg message, Span span) {
+    this.handler = handler;
+    this.destination = destination;
+    this.message = message;
+    this.span = span;
+  }
+
+  @Override public void onCompletion(Message message) {
+    finish(null);
+  }
+
+  @Override public void onException(Message message, Exception exception) {
+    finish(exception);
+  }
+
+  void finish(@Nullable Throwable error) {
+    if (error != null) span.error(error);
+    handler.finishSend(destination, message, span);
+  }
+}
+
+final class TracingForwardingCompletionListener<Msg> extends TracingCompletionListener<Msg> {
   final CompletionListener delegate;
   final CurrentTraceContext current;
 
-  TracingCompletionListener(CompletionListener delegate, Span span, CurrentTraceContext current) {
-    this.span = span;
+  TracingForwardingCompletionListener(CompletionListener delegate,
+    ProducerHandler<Destination, Msg, Msg> handler, CurrentTraceContext current,
+    Destination destination, Msg message, Span span) {
+    super(handler, destination, message, span);
     this.delegate = delegate;
     this.current = current;
   }
@@ -47,16 +83,15 @@ import javax.jms.Message;
     try (Scope ws = current.maybeScope(span.context())) {
       delegate.onCompletion(message);
     } finally {
-      span.finish();
+      finish(null);
     }
   }
 
   @Override public void onException(Message message, Exception exception) {
-    try {
+    try (Scope ws = current.maybeScope(span.context())) {
       delegate.onException(message, exception);
-      span.error(exception);
     } finally {
-      span.finish();
+      finish(exception);
     }
   }
 }
diff --git a/instrumentation/jms/src/main/java/brave/jms/TracingExceptionListener.java b/instrumentation/jms/src/main/java/brave/jms/TracingExceptionListener.java
index 9397a5f..8f801f0 100644
--- a/instrumentation/jms/src/main/java/brave/jms/TracingExceptionListener.java
+++ b/instrumentation/jms/src/main/java/brave/jms/TracingExceptionListener.java
@@ -24,13 +24,13 @@ import javax.jms.JMSException;
 
 final class TracingExceptionListener {
   static ExceptionListener create(JmsTracing jmsTracing) {
-    return new TagError(jmsTracing.msgTracing.tracing().tracer());
+    return new TagError(jmsTracing.messageTracing.tracing().tracer());
   }
 
   static ExceptionListener create(ExceptionListener delegate, JmsTracing jmsTracing) {
     if (delegate == null) throw new NullPointerException("exceptionListener == null");
     if (delegate instanceof TagError) return delegate;
-    return new DelegateAndTagError(delegate, jmsTracing.msgTracing.tracing().tracer());
+    return new DelegateAndTagError(delegate, jmsTracing.messageTracing.tracing().tracer());
   }
 
   static class TagError implements ExceptionListener {
diff --git a/instrumentation/jms/src/main/java/brave/jms/TracingJMSConsumer.java b/instrumentation/jms/src/main/java/brave/jms/TracingJMSConsumer.java
index 4a4275b..939789e 100644
--- a/instrumentation/jms/src/main/java/brave/jms/TracingJMSConsumer.java
+++ b/instrumentation/jms/src/main/java/brave/jms/TracingJMSConsumer.java
@@ -16,25 +16,22 @@
  */
 package brave.jms;
 
-import brave.messaging.MessagingConsumerHandler;
+import brave.messaging.ConsumerHandler;
 import javax.jms.Destination;
 import javax.jms.JMSConsumer;
 import javax.jms.JMSRuntimeException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
 
-@JMS2_0 final class TracingJMSConsumer extends MessagingConsumerHandler<JMSConsumer, Destination, Message> implements JMSConsumer {
+@JMS2_0 final class TracingJMSConsumer implements JMSConsumer {
+  final JMSConsumer delegate;
   final JmsTracing jmsTracing;
   final Destination destination;
+  final ConsumerHandler<Destination, Message, Message> handler;
 
   TracingJMSConsumer(JMSConsumer delegate, Destination destination, JmsTracing jmsTracing) {
-    super(
-        delegate,
-        jmsTracing.msgTracing,
-        jmsTracing.channelAdapter,
-        jmsTracing.consumerMessageAdapter,
-        jmsTracing.extractor,
-        jmsTracing.injector);
+    this.delegate = delegate;
+    this.handler = jmsTracing.consumerHandler;
     this.destination = destination;
     this.jmsTracing = jmsTracing;
   }
@@ -53,19 +50,19 @@ import javax.jms.MessageListener;
 
   @Override public Message receive() {
     Message message = delegate.receive();
-    handleConsume(destination, message);
+    handler.handleReceive(destination, message);
     return message;
   }
 
   @Override public Message receive(long timeout) {
     Message message = delegate.receive(timeout);
-    handleConsume(destination, message);
+    handler.handleReceive(destination, message);
     return message;
   }
 
   @Override public Message receiveNoWait() {
     Message message = delegate.receiveNoWait();
-    handleConsume(destination, message);
+    handler.handleReceive(destination, message);
     return message;
   }
 
diff --git a/instrumentation/jms/src/main/java/brave/jms/TracingJMSProducer.java b/instrumentation/jms/src/main/java/brave/jms/TracingJMSProducer.java
index e380cf2..2482aea 100644
--- a/instrumentation/jms/src/main/java/brave/jms/TracingJMSProducer.java
+++ b/instrumentation/jms/src/main/java/brave/jms/TracingJMSProducer.java
@@ -19,11 +19,10 @@ package brave.jms;
 import brave.Span;
 import brave.Tracer;
 import brave.Tracer.SpanInScope;
-import brave.messaging.MessageProducerAdapter;
-import brave.messaging.MessagingProducerHandler;
+import brave.messaging.ProducerHandler;
 import brave.propagation.CurrentTraceContext;
 import brave.propagation.Propagation.Getter;
-import brave.propagation.TraceContext;
+import brave.propagation.Propagation.Setter;
 import java.io.Serializable;
 import java.util.Map;
 import java.util.Set;
@@ -32,45 +31,24 @@ import javax.jms.Destination;
 import javax.jms.JMSProducer;
 import javax.jms.Message;
 
-import static brave.propagation.B3SingleFormat.writeB3SingleFormatWithoutParentId;
-
-@JMS2_0 final class TracingJMSProducer
-    extends MessagingProducerHandler<JMSProducer, Destination, JMSProducer>
-    implements JMSProducer {
-
-  static final Getter<JMSProducer, String> GETTER = new Getter<JMSProducer, String>() {
-    @Override public String get(JMSProducer carrier, String key) {
-      return carrier.getStringProperty(key);
-    }
-
-    @Override public String toString() {
-      return "JMSProducer::getStringProperty";
-    }
-  };
+@JMS2_0 final class TracingJMSProducer implements JMSProducer {
 
+  final JMSProducer delegate;
+  final ProducerHandler<Destination, JMSProducer, JMSProducer> handler;
   final Tracer tracer;
   final CurrentTraceContext current;
 
   TracingJMSProducer(JMSProducer delegate, JmsTracing jmsTracing) {
-    super(
-        delegate,
-        jmsTracing.msgTracing,
-        jmsTracing.channelAdapter,
-        JmsProducerAdapter.create(jmsTracing),
-        jmsTracing.msgTracing.tracing().propagation().extractor(GETTER),
-        new TraceContext.Injector<JMSProducer>() {
-          @Override public void inject(TraceContext traceContext, JMSProducer carrier) {
-            PropertyFilter.JMS_PRODUCER.filterProperties(carrier, jmsTracing.propagationKeys);
-            carrier.setProperty("b3", writeB3SingleFormatWithoutParentId(traceContext));
-          }
-
-          @Override
-          public String toString() {
-            return "Message::setStringProperty(\"b3\",singleHeaderFormatWithoutParent)";
-          }
-        });
-    this.tracer = jmsTracing.msgTracing.tracing().tracer();
-    this.current = jmsTracing.msgTracing.tracing().currentTraceContext();
+    this.delegate = delegate;
+    this.handler = ProducerHandler.create(
+      jmsTracing.messageTracing,
+      new JMSProducerAdapter(jmsTracing),
+      jmsTracing.messageTracing.tracing().propagation().extractor(GETTER),
+      new JmsTracing.FilteringInjector<>(PropertyFilter.JMS_PRODUCER, jmsTracing.propagationKeys,
+        SETTER)
+    );
+    tracer = jmsTracing.messageTracing.tracing().tracer();
+    current = jmsTracing.messageTracing.tracing().currentTraceContext();
   }
 
   // Partial function pattern as this needs to work before java 8 method references
@@ -130,24 +108,30 @@ import static brave.propagation.B3SingleFormat.writeB3SingleFormatWithoutParentI
   }
 
   void send(Send send, Destination destination, Object message) {
-    Span span = handleProduce(destination, this);
+    Span span = handler.startSend(destination, this);
+    // TODO: document what's going on with the getAsync dance here as it makes the lifecycle complex
+    // it isn't clear why a synchronous method would imply an async call, for example.
     final CompletionListener oldCompletionListener = getAsync();
+    boolean shouldFinish = true;
     if (oldCompletionListener != null) {
-      delegate.setAsync(TracingCompletionListener.create(oldCompletionListener, span, current));
+      shouldFinish = false;
+      delegate.setAsync(TracingCompletionListener.create(oldCompletionListener, handler, current,
+        destination, delegate, span));
     }
     SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
     try {
       send.apply(delegate, destination, message);
     } catch (RuntimeException | Error e) {
       span.error(e);
-      span.finish();
+      shouldFinish = true;
       throw e;
     } finally {
       ws.close();
-      if (oldCompletionListener != null) {
+      if (oldCompletionListener != null) { // TODO: why are we doing this
         delegate.setAsync(oldCompletionListener);
-      } else {
-        span.finish();
+      }
+      if (shouldFinish) {
+        handler.finishSend(destination, this, span);
       }
     }
   }
@@ -345,31 +329,34 @@ import static brave.propagation.B3SingleFormat.writeB3SingleFormatWithoutParentI
     return delegate.getJMSReplyTo();
   }
 
-  static class JmsProducerAdapter implements MessageProducerAdapter<JMSProducer> {
-    final JmsTracing jmsTracing;
-
-    JmsProducerAdapter(JmsTracing jmsTracing) {
-      this.jmsTracing = jmsTracing;
+  // helper types inlined here to avoid having JmsTracing access JMS 2.0 types
+  static final Getter<JMSProducer, String> GETTER = new Getter<JMSProducer, String>() {
+    @Override public String get(JMSProducer carrier, String key) {
+      return carrier.getStringProperty(key);
     }
 
-    static JmsProducerAdapter create(JmsTracing jmsTracing) {
-      return new JmsProducerAdapter(jmsTracing);
+    @Override public String toString() {
+      return "JMSProducer::getStringProperty";
     }
+  };
 
-    @Override public String operation(JMSProducer message) {
-      return "send";
+  static final Setter<JMSProducer, String> SETTER = new Setter<JMSProducer, String>() {
+    @Override public void put(JMSProducer carrier, String key, String value) {
+      carrier.setProperty(key, value);
     }
 
-    @Override public String identifier(JMSProducer message) {
-      return message.getJMSCorrelationID();
+    @Override public String toString() {
+      return "JMSProducer::setProperty";
     }
+  };
 
-    //@Override public void clearPropagation(JMSProducer message) {
-    //  PropertyFilter.JMS_PRODUCER.filterProperties(message, jmsTracing.propagationKeys);
-    //}
+  static final class JMSProducerAdapter extends JmsAdapter<JMSProducer> {
+    JMSProducerAdapter(JmsTracing jmsTracing) {
+      super(jmsTracing);
+    }
 
-    @Override public String identifierTagKey() {
-      return "jms.correlation_id";
+    @Override public String correlationId(JMSProducer message) {
+      return message.getJMSCorrelationID();
     }
   }
 }
diff --git a/instrumentation/jms/src/main/java/brave/jms/TracingMessageConsumer.java b/instrumentation/jms/src/main/java/brave/jms/TracingMessageConsumer.java
index b489cc0..8b33646 100644
--- a/instrumentation/jms/src/main/java/brave/jms/TracingMessageConsumer.java
+++ b/instrumentation/jms/src/main/java/brave/jms/TracingMessageConsumer.java
@@ -16,7 +16,7 @@
  */
 package brave.jms;
 
-import brave.messaging.MessagingConsumerHandler;
+import brave.messaging.ConsumerHandler;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -29,30 +29,31 @@ import javax.jms.Topic;
 import javax.jms.TopicPublisher;
 import javax.jms.TopicSubscriber;
 
+import static brave.jms.JmsTracing.destination;
 import static brave.jms.TracingConnection.TYPE_QUEUE;
 import static brave.jms.TracingConnection.TYPE_TOPIC;
 
 /** Implements all interfaces as according to ActiveMQ, this is typical of JMS 1.1. */
-final class TracingMessageConsumer
-    extends MessagingConsumerHandler<MessageConsumer, Destination, Message>
-    implements QueueReceiver, TopicSubscriber {
+final class TracingMessageConsumer implements QueueReceiver, TopicSubscriber {
 
   static TracingMessageConsumer create(MessageConsumer delegate, JmsTracing jmsTracing) {
     if (delegate instanceof TracingMessageConsumer) return (TracingMessageConsumer) delegate;
     return new TracingMessageConsumer(delegate, jmsTracing);
   }
 
+  final MessageConsumer delegate;
   final JmsTracing jmsTracing;
+  final ConsumerHandler<Destination, Message, Message> handler;
   final int types;
 
   TracingMessageConsumer(MessageConsumer delegate, JmsTracing jmsTracing) {
-    super(
-        delegate,
-        jmsTracing.msgTracing,
-        jmsTracing.channelAdapter,
-        jmsTracing.consumerMessageAdapter,
-        jmsTracing.extractor,
-        jmsTracing.injector);
+    this.delegate = delegate;
+    this.handler = ConsumerHandler.create(
+      jmsTracing.messageTracing,
+      jmsTracing.messageAdapter,
+      jmsTracing.messageExtractor,
+      jmsTracing.messageInjector
+    );
     this.jmsTracing = jmsTracing;
     int types = 0;
     if (delegate instanceof QueueSender) types |= TYPE_QUEUE;
@@ -74,19 +75,19 @@ final class TracingMessageConsumer
 
   @Override public Message receive() throws JMSException {
     Message message = delegate.receive();
-    handleConsume(message.getJMSDestination(), message);
+    handler.handleReceive(destination(message), message);
     return message;
   }
 
   @Override public Message receive(long timeout) throws JMSException {
     Message message = delegate.receive(timeout);
-    handleConsume(message.getJMSDestination(), message);
+    handler.handleReceive(destination(message), message);
     return message;
   }
 
   @Override public Message receiveNoWait() throws JMSException {
     Message message = delegate.receiveNoWait();
-    handleConsume(message.getJMSDestination(), message);
+    handler.handleReceive(destination(message), message);
     return message;
   }
 
diff --git a/instrumentation/jms/src/main/java/brave/jms/TracingMessageListener.java b/instrumentation/jms/src/main/java/brave/jms/TracingMessageListener.java
index 3f733cc..93b6bd3 100644
--- a/instrumentation/jms/src/main/java/brave/jms/TracingMessageListener.java
+++ b/instrumentation/jms/src/main/java/brave/jms/TracingMessageListener.java
@@ -17,28 +17,25 @@
 package brave.jms;
 
 import brave.Span;
-import brave.Tracer;
-import brave.Tracer.SpanInScope;
-import brave.Tracing;
-import brave.messaging.ChannelAdapter;
-import brave.propagation.TraceContextOrSamplingFlags;
+import brave.messaging.ProcessorHandler;
+import brave.propagation.CurrentTraceContext;
+import brave.propagation.CurrentTraceContext.Scope;
 import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageListener;
 
-import static brave.Span.Kind.CONSUMER;
+import static brave.jms.JmsTracing.destination;
 
 /**
  * When {@link #addConsumerSpan} this creates 2 spans:
  * <ol>
- *   <li>A duration 1 {@link Span.Kind#CONSUMER} span to represent receipt from the destination</li>
- *   <li>A child span with the duration of the delegated listener</li>
+ * <li>A duration 1 {@link Span.Kind#CONSUMER} span to represent receipt from the destination</li>
+ * <li>A child span with the duration of the delegated listener</li>
  * </ol>
  *
  * <p>{@link #addConsumerSpan} should only be set when the message consumer is not traced.
  */
 final class TracingMessageListener implements MessageListener {
-
   /** Creates a message listener which also adds a consumer span. */
   static MessageListener create(MessageListener delegate, JmsTracing jmsTracing) {
     if (delegate instanceof TracingMessageListener) return delegate;
@@ -46,54 +43,27 @@ final class TracingMessageListener implements MessageListener {
   }
 
   final MessageListener delegate;
-  final JmsTracing jmsTracing;
-  final Tracing tracing;
-  final Tracer tracer;
-  final String remoteServiceName;
+  final CurrentTraceContext current;
+  final ProcessorHandler<Destination, Message, Message> handler;
   final boolean addConsumerSpan;
-  final ChannelAdapter<Destination> channelAdapter;
 
   TracingMessageListener(MessageListener delegate, JmsTracing jmsTracing, boolean addConsumerSpan) {
     this.delegate = delegate;
-    this.jmsTracing = jmsTracing;
-    this.tracing = jmsTracing.msgTracing.tracing();
-    this.tracer = jmsTracing.msgTracing.tracing().tracer();
-    this.remoteServiceName = jmsTracing.remoteServiceName;
+    this.current = jmsTracing.messageTracing.tracing().currentTraceContext();
+    this.handler = jmsTracing.processorHandler;
     this.addConsumerSpan = addConsumerSpan;
-    channelAdapter = JmsAdapter.JmsChannelAdapter.create(jmsTracing);
   }
 
   @Override public void onMessage(Message message) {
-    Span listenerSpan = startMessageListenerSpan(message);
-    try (SpanInScope ws = tracer.withSpanInScope(listenerSpan)) {
+    Span listenerSpan =
+      handler.startProcessor(destination(message), message, addConsumerSpan).name("on-message");
+    try (Scope ws = current.newScope(listenerSpan.context())) {
       delegate.onMessage(message);
-    } catch (Throwable t) {
-      listenerSpan.error(t);
-      throw t;
+    } catch (RuntimeException | Error e) {
+      listenerSpan.error(e);
+      throw e;
     } finally {
       listenerSpan.finish();
     }
   }
-
-  Span startMessageListenerSpan(Message message) {
-    if (!addConsumerSpan) return jmsTracing.nextSpan(message).name("on-message").start();
-    TraceContextOrSamplingFlags extracted = jmsTracing.extractor.extract(message);
-
-    // JMS has no visibility of the incoming message, which incidentally could be local!
-    Span consumerSpan = tracer.nextSpan(extracted).kind(CONSUMER).name("receive");
-    Span listenerSpan = tracer.newChild(consumerSpan.context());
-
-    if (!consumerSpan.isNoop()) {
-      long timestamp = tracing.clock(consumerSpan.context()).currentTimeMicroseconds();
-      consumerSpan.start(timestamp);
-      if (remoteServiceName != null) consumerSpan.remoteServiceName(remoteServiceName);
-      jmsTracing.msgTracing.consumerParser().channel(channelAdapter, jmsTracing.destination(message), consumerSpan);
-      long consumerFinish = timestamp + 1L; // save a clock reading
-      consumerSpan.finish(consumerFinish);
-
-      // not using scoped span as we want to start late
-      listenerSpan.name("on-message").start(consumerFinish);
-    }
-    return listenerSpan;
-  }
 }
diff --git a/instrumentation/jms/src/main/java/brave/jms/TracingMessageProducer.java b/instrumentation/jms/src/main/java/brave/jms/TracingMessageProducer.java
index f79b8b3..8aaf910 100644
--- a/instrumentation/jms/src/main/java/brave/jms/TracingMessageProducer.java
+++ b/instrumentation/jms/src/main/java/brave/jms/TracingMessageProducer.java
@@ -18,9 +18,9 @@ package brave.jms;
 
 import brave.Span;
 import brave.Tracer;
-import brave.Tracer.SpanInScope;
-import brave.messaging.MessagingProducerHandler;
+import brave.messaging.ProducerHandler;
 import brave.propagation.CurrentTraceContext;
+import brave.propagation.CurrentTraceContext.Scope;
 import javax.jms.CompletionListener;
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -31,46 +31,32 @@ import javax.jms.QueueSender;
 import javax.jms.Topic;
 import javax.jms.TopicPublisher;
 
+import static brave.jms.JmsTracing.destination;
 import static brave.jms.TracingConnection.TYPE_QUEUE;
 import static brave.jms.TracingConnection.TYPE_TOPIC;
 
 /** Implements all interfaces as according to ActiveMQ, this is typical of JMS 1.1. */
-final class TracingMessageProducer extends MessagingProducerHandler<MessageProducer, Destination, Message>
-    implements QueueSender, TopicPublisher {
-
+final class TracingMessageProducer implements QueueSender, TopicPublisher {
   static TracingMessageProducer create(MessageProducer delegate, JmsTracing jmsTracing) {
     if (delegate instanceof TracingMessageProducer) return (TracingMessageProducer) delegate;
     return new TracingMessageProducer(delegate, jmsTracing);
   }
 
+  final MessageProducer delegate;
   final int types;
+  final ProducerHandler<Destination, Message, Message> handler;
   final CurrentTraceContext current;
   final Tracer tracer;
 
   TracingMessageProducer(MessageProducer delegate, JmsTracing jmsTracing) {
-    super(delegate,
-        jmsTracing.msgTracing,
-        jmsTracing.channelAdapter,
-        jmsTracing.producerMessageAdapter,
-        jmsTracing.extractor,
-        jmsTracing.injector);
+    this.delegate = delegate;
     int types = 0;
     if (delegate instanceof QueueSender) types |= TYPE_QUEUE;
     if (delegate instanceof TopicPublisher) types |= TYPE_TOPIC;
     this.types = types;
-    this.current = jmsTracing.msgTracing.tracing().currentTraceContext();
-    this.tracer = jmsTracing.msgTracing.tracing().tracer();
-  }
-
-  Destination destination(Message message) {
-    try {
-      Destination result = message.getJMSDestination();
-      if (result != null) return result;
-      return delegate.getDestination();
-    } catch (JMSException ignored) {
-      // don't crash on wonky exceptions!
-    }
-    return null;
+    this.handler = jmsTracing.producerHandler;
+    this.current = jmsTracing.messageTracing.tracing().currentTraceContext();
+    this.tracer = jmsTracing.messageTracing.tracing().tracer();
   }
 
   @Override public void setDisableMessageID(boolean value) throws JMSException {
@@ -132,8 +118,10 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
   }
 
   @Override public void send(Message message) throws JMSException {
-    Span span = handleProduce(destination(message), message);
-    SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+    Destination destination = destination(message);
+
+    Span span = handler.startSend(destination, message);
+    Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
     try {
       delegate.send(message);
     } catch (RuntimeException | JMSException | Error e) {
@@ -141,14 +129,16 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
       throw e;
     } finally {
       ws.close();
-      span.finish();
+      handler.finishSend(destination, message, span);
     }
   }
 
   @Override public void send(Message message, int deliveryMode, int priority, long timeToLive)
-      throws JMSException {
-    Span span = handleProduce(destination(message), message);
-    SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+    throws JMSException {
+    Destination destination = destination(message);
+
+    Span span = handler.startSend(destination, message);
+    Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
     try {
       delegate.send(message, deliveryMode, priority, timeToLive);
     } catch (RuntimeException | JMSException | Error e) {
@@ -156,32 +146,32 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
       throw e;
     } finally {
       ws.close();
-      span.finish();
+      handler.finishSend(destination, message, span);
     }
   }
 
   enum SendDestination {
     DESTINATION {
       @Override void apply(MessageProducer producer, Destination destination, Message message)
-          throws JMSException {
+        throws JMSException {
         producer.send(destination, message);
       }
     },
     QUEUE {
       @Override void apply(MessageProducer producer, Destination destination, Message message)
-          throws JMSException {
+        throws JMSException {
         ((QueueSender) producer).send((Queue) destination, message);
       }
     },
     TOPIC {
       @Override void apply(MessageProducer producer, Destination destination, Message message)
-          throws JMSException {
+        throws JMSException {
         ((TopicPublisher) producer).publish((Topic) destination, message);
       }
     };
 
     abstract void apply(MessageProducer producer, Destination destination, Message message)
-        throws JMSException;
+      throws JMSException;
   }
 
   @Override public void send(Destination destination, Message message) throws JMSException {
@@ -189,9 +179,9 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
   }
 
   void send(SendDestination sendDestination, Destination destination, Message message)
-      throws JMSException {
-    Span span = handleProduce(destination, message);
-    SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+    throws JMSException {
+    Span span = handler.startSend(destination, message);
+    Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
     try {
       sendDestination.apply(delegate, destination, message);
     } catch (RuntimeException | JMSException | Error e) {
@@ -199,15 +189,15 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
       throw e;
     } finally {
       ws.close();
-      span.finish();
+      handler.finishSend(destination, message, span);
     }
   }
 
   @Override
   public void send(Destination destination, Message message, int deliveryMode, int priority,
-      long timeToLive) throws JMSException {
-    Span span = handleProduce(destination, message);
-    SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+    long timeToLive) throws JMSException {
+    Span span = handler.startSend(destination, message);
+    Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
     try {
       delegate.send(destination, message, deliveryMode, priority, timeToLive);
     } catch (RuntimeException | JMSException | Error e) {
@@ -215,20 +205,24 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
       throw e;
     } finally {
       ws.close();
-      span.finish();
+      handler.finishSend(destination, message, span);
     }
   }
 
   /* @Override JMS 2.0 method: Intentionally no override to ensure JMS 1.1 works! */
   @JMS2_0
   public void send(Message message, CompletionListener completionListener) throws JMSException {
-    Span span = handleProduce(destination(message), message);
-    SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+    Destination destination = destination(message);
+
+    Span span = handler.startSend(destination, message);
+    completionListener =
+      tracingCompletionListener(message, completionListener, span, destination);
+    Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
     try {
-      delegate.send(message, TracingCompletionListener.create(completionListener, span, current));
+      delegate.send(message, completionListener);
     } catch (RuntimeException | JMSException | Error e) {
       span.error(e);
-      span.finish();
+      handler.finishSend(destination, message, span);
       throw e;
     } finally {
       ws.close();
@@ -237,32 +231,44 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
 
   /* @Override JMS 2.0 method: Intentionally no override to ensure JMS 1.1 works! */
   @JMS2_0 public void send(Message message, int deliveryMode, int priority, long timeToLive,
-      CompletionListener completionListener) throws JMSException {
-    Span span = handleProduce(destination(message), message);
-    completionListener = TracingCompletionListener.create(completionListener, span, current);
-    SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+    CompletionListener completionListener) throws JMSException {
+    Destination destination = destination(message);
+
+    Span span = handler.startSend(destination, message);
+    completionListener =
+      tracingCompletionListener(message, completionListener, span, destination);
+    Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
     try {
       delegate.send(message, deliveryMode, priority, timeToLive, completionListener);
     } catch (RuntimeException | JMSException | Error e) {
       span.error(e);
-      span.finish();
+      handler.finishSend(destination, message, span);
       throw e;
     } finally {
       ws.close();
     }
   }
 
+  private CompletionListener tracingCompletionListener(Message message,
+    CompletionListener completionListener,
+    Span span, Destination destination) {
+    completionListener =
+      TracingCompletionListener.create(completionListener, handler, current, destination, message,
+        span);
+    return completionListener;
+  }
+
   /* @Override JMS 2.0 method: Intentionally no override to ensure JMS 1.1 works! */
   @JMS2_0 public void send(Destination destination, Message message,
-      CompletionListener completionListener) throws JMSException {
-    Span span = handleProduce(destination, message);
-    completionListener = TracingCompletionListener.create(completionListener, span, current);
-    SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+    CompletionListener completionListener) throws JMSException {
+    Span span = handler.startSend(destination, message);
+    completionListener = tracingCompletionListener(message, completionListener, span, destination);
+    Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
     try {
       delegate.send(destination, message, completionListener);
     } catch (RuntimeException | JMSException | Error e) {
       span.error(e);
-      span.finish();
+      handler.finishSend(destination, message, span);
       throw e;
     } finally {
       ws.close();
@@ -271,15 +277,15 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
 
   /* @Override JMS 2.0 method: Intentionally no override to ensure JMS 1.1 works! */
   @JMS2_0 public void send(Destination destination, Message message, int deliveryMode, int priority,
-      long timeToLive, CompletionListener completionListener) throws JMSException {
-    Span span = handleProduce(destination, message);
-    completionListener = TracingCompletionListener.create(completionListener, span, current);
-    SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+    long timeToLive, CompletionListener completionListener) throws JMSException {
+    Span span = handler.startSend(destination, message);
+    completionListener = tracingCompletionListener(message, completionListener, span, destination);
+    Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
     try {
       delegate.send(destination, message, deliveryMode, priority, timeToLive, completionListener);
     } catch (RuntimeException | JMSException | Error e) {
       span.error(e);
-      span.finish();
+      handler.finishSend(destination, message, span);
       throw e;
     } finally {
       ws.close();
@@ -300,11 +306,14 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
 
   @Override
   public void send(Queue queue, Message message, int deliveryMode, int priority, long timeToLive)
-      throws JMSException {
+    throws JMSException {
     checkQueueSender();
     QueueSender qs = (QueueSender) delegate;
-    Span span = handleProduce(destination(message), message);
-    SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+    Destination destination = destination(message);
+    if (destination == null) destination = qs.getDestination();
+
+    Span span = handler.startSend(destination, message);
+    Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
     try {
       qs.send(queue, message, deliveryMode, priority, timeToLive);
     } catch (RuntimeException | JMSException | Error e) {
@@ -312,7 +321,7 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
       throw e;
     } finally {
       ws.close();
-      span.finish();
+      handler.finishSend(destination, message, span);
     }
   }
 
@@ -332,9 +341,11 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
   @Override public void publish(Message message) throws JMSException {
     checkTopicPublisher();
     TopicPublisher tp = (TopicPublisher) delegate;
+    Destination destination = destination(message);
+    if (destination == null) destination = tp.getDestination();
 
-    Span span = handleProduce(destination(message), message);
-    SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+    Span span = handler.startSend(destination, message);
+    Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
     try {
       tp.publish(message);
     } catch (RuntimeException | JMSException | Error e) {
@@ -342,17 +353,19 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
       throw e;
     } finally {
       ws.close();
-      span.finish();
+      handler.finishSend(destination, message, span);
     }
   }
 
   @Override public void publish(Message message, int deliveryMode, int priority, long timeToLive)
-      throws JMSException {
+    throws JMSException {
     checkTopicPublisher();
     TopicPublisher tp = (TopicPublisher) delegate;
+    Destination destination = destination(message);
+    if (destination == null) destination = tp.getDestination();
 
-    Span span = handleProduce(destination(message), message);
-    SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+    Span span = handler.startSend(destination, message);
+    Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
     try {
       tp.publish(message, deliveryMode, priority, timeToLive);
     } catch (RuntimeException | JMSException | Error e) {
@@ -360,7 +373,7 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
       throw e;
     } finally {
       ws.close();
-      span.finish();
+      handler.finishSend(destination, message, span);
     }
   }
 
@@ -371,12 +384,14 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
 
   @Override
   public void publish(Topic topic, Message message, int deliveryMode, int priority, long timeToLive)
-      throws JMSException {
+    throws JMSException {
     checkTopicPublisher();
     TopicPublisher tp = (TopicPublisher) delegate;
+    Destination destination = destination(message);
+    if (destination == null) destination = tp.getDestination();
 
-    Span span = handleProduce(destination(message), message);
-    SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+    Span span = handler.startSend(destination, message);
+    Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
     try {
       tp.publish(topic, message, deliveryMode, priority, timeToLive);
     } catch (RuntimeException | JMSException | Error e) {
@@ -384,7 +399,7 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
       throw e;
     } finally {
       ws.close();
-      span.finish();
+      handler.finishSend(destination, message, span);
     }
   }
 
@@ -393,5 +408,4 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
       throw new IllegalStateException(delegate + " is not a TopicPublisher");
     }
   }
-
 }
diff --git a/instrumentation/jms/src/test/java/brave/jms/JmsTest.java b/instrumentation/jms/src/test/java/brave/jms/JmsTest.java
index 67637ce..9acbc4c 100644
--- a/instrumentation/jms/src/test/java/brave/jms/JmsTest.java
+++ b/instrumentation/jms/src/test/java/brave/jms/JmsTest.java
@@ -39,17 +39,6 @@ import zipkin2.Span;
 import static org.assertj.core.api.Assertions.assertThat;
 
 public abstract class JmsTest {
-  static final Propagation.Setter<Message, String> SETTER =
-      new Propagation.Setter<Message, String>() {
-        @Override public void put(Message carrier, String key, String value) {
-          try {
-            carrier.setStringProperty(key, value);
-          } catch (JMSException e) {
-            throw new AssertionError(e);
-          }
-        }
-      };
-
   @After public void tearDown() {
     tracing.close();
   }
diff --git a/instrumentation/jms/src/test/java/brave/jms/TracingCompletionListenerTest.java b/instrumentation/jms/src/test/java/brave/jms/TracingCompletionListenerTest.java
index a5f2d2d..bf35264 100644
--- a/instrumentation/jms/src/test/java/brave/jms/TracingCompletionListenerTest.java
+++ b/instrumentation/jms/src/test/java/brave/jms/TracingCompletionListenerTest.java
@@ -17,8 +17,8 @@
 package brave.jms;
 
 import brave.Span;
-import brave.sampler.Sampler;
 import javax.jms.CompletionListener;
+import javax.jms.Destination;
 import javax.jms.Message;
 import org.junit.Test;
 
@@ -27,46 +27,36 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
 public class TracingCompletionListenerTest extends JmsTest {
-  @Test public void create_returns_input_on_noop() {
-    Span span = tracing.tracer().withSampler(Sampler.NEVER_SAMPLE).nextSpan();
-
-    CompletionListener delegate = mock(CompletionListener.class);
-    CompletionListener tracingCompletionListener =
-        TracingCompletionListener.create(delegate, span, current);
-
-    assertThat(tracingCompletionListener).isSameAs(delegate);
-  }
+  Destination destination = mock(Destination.class);
+  Message message = mock(Message.class);
+  CompletionListener delegate = mock(CompletionListener.class);
 
   @Test public void on_completion_should_finish_span() throws Exception {
-    Message message = mock(Message.class);
     Span span = tracing.tracer().nextSpan().start();
 
-    CompletionListener tracingCompletionListener =
-        TracingCompletionListener.create(mock(CompletionListener.class), span, current);
+    CompletionListener tracingCompletionListener = TracingCompletionListener.create(
+      delegate, jmsTracing.producerHandler, current, destination, message, span);
     tracingCompletionListener.onCompletion(message);
 
     assertThat(takeSpan()).isNotNull();
   }
 
   @Test public void on_exception_should_tag_if_exception() throws Exception {
-    Message message = mock(Message.class);
     Span span = tracing.tracer().nextSpan().start();
 
-    CompletionListener tracingCompletionListener =
-        TracingCompletionListener.create(mock(CompletionListener.class), span, current);
+    CompletionListener tracingCompletionListener = TracingCompletionListener.create(
+      delegate, jmsTracing.producerHandler, current, destination, message, span);
     tracingCompletionListener.onException(message, new Exception("Test exception"));
 
     assertThat(takeSpan().tags())
-        .containsEntry("error", "Test exception");
+      .containsEntry("error", "Test exception");
   }
 
   @Test public void on_completion_should_forward_then_finish_span() throws Exception {
-    Message message = mock(Message.class);
     Span span = tracing.tracer().nextSpan().start();
 
-    CompletionListener delegate = mock(CompletionListener.class);
-    CompletionListener tracingCompletionListener =
-        TracingCompletionListener.create(delegate, span, current);
+    CompletionListener tracingCompletionListener = TracingCompletionListener.create(
+      delegate, jmsTracing.producerHandler, current, destination, message, span);
     tracingCompletionListener.onCompletion(message);
 
     verify(delegate).onCompletion(message);
@@ -74,7 +64,6 @@ public class TracingCompletionListenerTest extends JmsTest {
   }
 
   @Test public void on_completion_should_have_span_in_scope() throws Exception {
-    Message message = mock(Message.class);
     Span span = tracing.tracer().nextSpan().start();
 
     CompletionListener delegate = new CompletionListener() {
@@ -87,23 +76,25 @@ public class TracingCompletionListenerTest extends JmsTest {
       }
     };
 
-    TracingCompletionListener.create(delegate, span, current).onCompletion(message);
+    CompletionListener tracingCompletionListener = TracingCompletionListener.create(
+      delegate, jmsTracing.producerHandler, current, destination, message, span);
+
+    tracingCompletionListener.onCompletion(message);
 
     takeSpan(); // consumer reported span
   }
 
   @Test public void on_exception_should_forward_then_tag() throws Exception {
-    Message message = mock(Message.class);
     Span span = tracing.tracer().nextSpan().start();
 
-    CompletionListener delegate = mock(CompletionListener.class);
-    CompletionListener tracingCompletionListener =
-        TracingCompletionListener.create(delegate, span, current);
+    CompletionListener tracingCompletionListener = TracingCompletionListener.create(
+      delegate, jmsTracing.producerHandler, current, destination, message, span);
+
     Exception e = new Exception("Test exception");
     tracingCompletionListener.onException(message, e);
 
     verify(delegate).onException(message, e);
     assertThat(takeSpan().tags())
-        .containsEntry("error", "Test exception");
+      .containsEntry("error", "Test exception");
   }
 }
diff --git a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaTracing.java b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaTracing.java
index ae1359b..46159ac 100644
--- a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaTracing.java
+++ b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaTracing.java
@@ -17,21 +17,27 @@
 package brave.kafka.clients;
 
 import brave.Span;
+import brave.SpanCustomizer;
 import brave.Tracing;
+import brave.internal.Nullable;
+import brave.kafka.clients.TracingConsumer.ConsumerRecordAdapter;
+import brave.kafka.clients.TracingProducer.ProducerRecordAdapter;
+import brave.messaging.ConsumerHandler;
+import brave.messaging.MessagingAdapter;
+import brave.messaging.MessagingParser;
 import brave.messaging.MessagingTracing;
+import brave.messaging.ProcessorHandler;
+import brave.messaging.ProducerHandler;
 import brave.propagation.B3SingleFormat;
 import brave.propagation.Propagation;
 import brave.propagation.TraceContext;
 import brave.propagation.TraceContext.Extractor;
 import brave.propagation.TraceContext.Injector;
-import brave.propagation.TraceContextOrSamplingFlags;
-import java.util.Iterator;
 import java.util.List;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 
 import static brave.kafka.clients.KafkaPropagation.B3_SINGLE_TEST_HEADERS;
@@ -42,11 +48,6 @@ import static brave.propagation.B3SingleFormat.writeB3SingleFormatWithoutParentI
 
 /** Use this class to decorate your Kafka consumer / producer and enable Tracing. */
 public final class KafkaTracing {
-
-  static final String PROTOCOL = "kafka";
-  static final String PRODUCER_OPERATION = "send";
-  static final String CONSUMER_OPERATION = "poll";
-
   public static KafkaTracing create(Tracing tracing) {
     return new Builder(tracing).build();
   }
@@ -56,18 +57,19 @@ public final class KafkaTracing {
   }
 
   public static final class Builder {
-    final MessagingTracing msgTracing;
+    final MessagingTracing messagingTracing;
     String remoteServiceName = "kafka";
     boolean writeB3SingleFormat;
 
     Builder(Tracing tracing) {
       if (tracing == null) throw new NullPointerException("tracing == null");
-      this.msgTracing = MessagingTracing.create(tracing);
+      this.messagingTracing =
+        MessagingTracing.newBuilder(tracing).parser(new LegacyMessagingParser()).build();
     }
 
-    Builder(MessagingTracing msgTracing) {
-      if (msgTracing == null) throw new NullPointerException("msgTracing == null");
-      this.msgTracing = msgTracing;
+    Builder(MessagingTracing messagingTracing) {
+      if (messagingTracing == null) throw new NullPointerException("messagingTracing == null");
+      this.messagingTracing = messagingTracing;
     }
 
     /**
@@ -95,72 +97,40 @@ public final class KafkaTracing {
     }
   }
 
-  final MessagingTracing msgTracing;
-  final String remoteServiceName;
-  final List<String> propagationKeys;
-
-  boolean singleFormat;
+  final MessagingTracing messagingTracing;
+  final ConsumerHandler<String, ConsumerRecord, Headers> consumerHandler;
+  final ProcessorHandler<String, ConsumerRecord, Headers> processorHandler;
+  final ProducerHandler<String, ProducerRecord, Headers> producerHandler;
 
   KafkaTracing(Builder builder) { // intentionally hidden constructor
-    this.msgTracing = builder.msgTracing;
-    this.remoteServiceName = builder.remoteServiceName;
-    this.propagationKeys = msgTracing.tracing().propagation().keys();
-    final Extractor<Headers> extractor =
-        msgTracing.tracing().propagation().extractor(HEADERS_GETTER);
-    List<String> keyList = msgTracing.tracing().propagation().keys();
-    singleFormat = false;
+    this.messagingTracing = builder.messagingTracing;
+    Extractor<Headers> extractor =
+      messagingTracing.tracing().propagation().extractor(HEADERS_GETTER);
+    Injector<Headers> injector;
+    List<String> keyList = messagingTracing.tracing().propagation().keys();
     if (builder.writeB3SingleFormat || keyList.equals(Propagation.B3_SINGLE_STRING.keys())) {
       TraceContext testExtraction = extractor.extract(B3_SINGLE_TEST_HEADERS).context();
       if (!TEST_CONTEXT.equals(testExtraction)) {
         throw new IllegalArgumentException(
-            "KafkaTracing.Builder.writeB3SingleFormat set, but Tracing.Builder.propagationFactory cannot parse this format!");
+          "KafkaTracing.Builder.writeB3SingleFormat set, but Tracing.Builder.propagationFactory cannot parse this format!");
       }
-      singleFormat = true;
-    }
-  }
-
-  <K, V> Extractor<ProducerRecord<K, V>> producerRecordExtractor() {
-    return msgTracing.tracing()
-        .propagation()
-        .extractor((record, key) -> HEADERS_GETTER.get(record.headers(), key));
-  }
-
-  <K, V> Injector<ProducerRecord<K, V>> producerRecordInjector() {
-    return singleFormat ?
-        new Injector<ProducerRecord<K, V>>() {
-          @Override public void inject(TraceContext traceContext, ProducerRecord<K, V> carrier) {
-            carrier.headers().add("b3", writeB3SingleFormatWithoutParentIdAsBytes(traceContext));
-          }
-
-          @Override public String toString() {
-            return "Headers::add(\"b3\",singleHeaderFormatWithoutParent)";
-          }
+      injector = new Injector<Headers>() {
+        @Override public void inject(TraceContext traceContext, Headers headers) {
+          headers.add("b3", writeB3SingleFormatWithoutParentIdAsBytes(traceContext));
         }
-        : msgTracing.tracing().propagation().injector((record, key, value) -> {
-          HEADERS_SETTER.put(record.headers(), key, value);
-        });
-  }
 
-  <K, V> Extractor<ConsumerRecord<K, V>> consumerRecordExtractor() {
-    return msgTracing.tracing()
-        .propagation()
-        .extractor((record, key) -> HEADERS_GETTER.get(record.headers(), key));
-  }
-
-  <K, V> Injector<ConsumerRecord<K, V>> consumerRecordInjector() {
-    return singleFormat ?
-        new Injector<ConsumerRecord<K, V>>() {
-          @Override public void inject(TraceContext traceContext, ConsumerRecord<K, V> carrier) {
-            carrier.headers().add("b3", writeB3SingleFormatWithoutParentIdAsBytes(traceContext));
-          }
-
-          @Override public String toString() {
-            return "Headers::add(\"b3\",singleHeaderFormatWithoutParent)";
-          }
+        @Override public String toString() {
+          return "Headers::add(\"b3\",singleHeaderFormatWithoutParent)";
         }
-        : msgTracing.tracing().propagation().injector((record, key, value) -> {
-          HEADERS_SETTER.put(record.headers(), key, value);
-        });
+      };
+    } else {
+      injector = messagingTracing.tracing().propagation().injector(HEADERS_SETTER);
+    }
+    consumerHandler = ConsumerHandler.create(
+      messagingTracing, new ConsumerRecordAdapter(builder.remoteServiceName), extractor, injector);
+    processorHandler = ProcessorHandler.create(messagingTracing, consumerHandler);
+    producerHandler = ProducerHandler.create(
+      messagingTracing, new ProducerRecordAdapter(builder.remoteServiceName), extractor, injector);
   }
 
   /**
@@ -185,35 +155,37 @@ public final class KafkaTracing {
    * one couldn't be extracted.
    */
   public <K, V> Span nextSpan(ConsumerRecord<K, V> record) {
-    final TracingConsumer.KafkaConsumerAdapter<K, V> adapter =
-        TracingConsumer.KafkaConsumerAdapter.create(this);
-    return msgTracing.nextSpan(adapter, adapter, consumerRecordExtractor(), record, record);
-  }
-
-  <Record> String channelTagKey(Record record) {
-    return String.format("%s.topic", PROTOCOL);
+    return processorHandler.startProcessor(record.topic(), record, false);
   }
 
-  String recordKey(Object key) {
+  @Nullable static String recordKey(Object key) {
     if (key instanceof String && !"".equals(key)) {
       return key.toString();
     }
     return null;
   }
 
-  String identifierTagKey() {
-    return String.format("%s.key", PROTOCOL);
-  }
+  static class LegacyMessagingParser extends MessagingParser {
+    @Override
+    protected <Chan, Msg, C> void addMessageTags(MessagingAdapter<Chan, Msg, C> adapter,
+      Chan channel, @Nullable Msg msg, TraceContext context, SpanCustomizer customizer) {
+      String channelName = adapter.channel(channel);
+      if (channelName != null) customizer.tag("kafka.topic", channelName);
+      if (msg != null && context.parentId() == null) { // is a root span
+        String messageKey = adapter.messageKey(msg);
+        if (messageKey != null) customizer.tag("kafka.key", messageKey);
+      }
+    }
 
-  // BRAVE6: consider a messaging variant of extraction which clears headers as they are read.
-  // this could prevent having to go back and clear them later. Another option is to encourage,
-  // then special-case single header propagation. When there's only 1 propagation key, you don't
-  // need to do a loop!
-  void clearHeaders(Headers headers) {
-    // Headers::remove creates and consumes an iterator each time. This does one loop instead.
-    for (Iterator<Header> i = headers.iterator(); i.hasNext(); ) {
-      Header next = i.next();
-      if (propagationKeys.contains(next.key())) i.remove();
+    /** Returns the span name of a message operation. Defaults to the operation name. */
+    @Override protected <Chan, Msg, C> String spanName(String operation,
+      MessagingAdapter<Chan, Msg, C> adapter, Chan channel, @Nullable Msg msg) {
+      switch (operation) {
+        case "receive":
+        case "receive-batch":
+          return "poll";
+      }
+      return super.spanName(operation, adapter, channel, msg);
     }
   }
 }
diff --git a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingConsumer.java b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingConsumer.java
index 7f17d30..de2b06d 100644
--- a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingConsumer.java
+++ b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingConsumer.java
@@ -18,9 +18,8 @@ package brave.kafka.clients;
 
 import brave.Span;
 import brave.Tracing;
-import brave.messaging.ChannelAdapter;
-import brave.messaging.MessageConsumerAdapter;
-import brave.messaging.MessagingConsumerHandler;
+import brave.messaging.ConsumerHandler;
+import brave.messaging.MessagingAdapter;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.LinkedHashMap;
@@ -40,39 +39,31 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
 
 /**
  * Kafka Consumer decorator. Read records headers to create and complete a child of the incoming
  * producers span if possible.
  */
-final class TracingConsumer<K, V>
-    extends MessagingConsumerHandler<Consumer<K, V>, ConsumerRecord<K, V>, ConsumerRecord<K, V>>
-    implements Consumer<K, V> {
-
-  final KafkaTracing kafkaTracing;
-  final Tracing tracing;
-  final String remoteServiceName;
-
+final class TracingConsumer<K, V> implements Consumer<K, V> {
   // replicate org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener behaviour
   static final ConsumerRebalanceListener NO_OP_CONSUMER_REBALANCE_LISTENER =
-      new ConsumerRebalanceListener() {
-        @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-        }
+    new ConsumerRebalanceListener() {
+      @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+      }
 
-        @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-        }
-      };
+      @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+      }
+    };
+
+  final Consumer<K, V> delegate;
+  final Tracing tracing;
+  final ConsumerHandler<String, ConsumerRecord, Headers> handler;
 
   TracingConsumer(Consumer<K, V> delegate, KafkaTracing kafkaTracing) {
-    super(delegate,
-        kafkaTracing.msgTracing,
-        KafkaConsumerAdapter.create(kafkaTracing),
-        KafkaConsumerAdapter.create(kafkaTracing),
-        kafkaTracing.consumerRecordExtractor(),
-        kafkaTracing.consumerRecordInjector());
-    this.kafkaTracing = kafkaTracing;
-    this.tracing = kafkaTracing.msgTracing.tracing();
-    this.remoteServiceName = kafkaTracing.remoteServiceName;
+    this.delegate = delegate;
+    this.handler = kafkaTracing.consumerHandler;
+    this.tracing = kafkaTracing.messagingTracing.tracing();
   }
 
   // Do not use @Override annotation to avoid compatibility issue version < 2.0
@@ -84,15 +75,13 @@ final class TracingConsumer<K, V>
   @Override public ConsumerRecords<K, V> poll(long timeout) {
     ConsumerRecords<K, V> records = delegate.poll(timeout);
     if (records.isEmpty() || tracing.isNoop()) return records;
-    long timestamp = 0L;
     Map<String, Span> consumerSpansForTopic = new LinkedHashMap<>();
     for (TopicPartition partition : records.partitions()) {
-      List<ConsumerRecord<K, V>> recordsInPartition = records.records(partition);
+      List<? extends ConsumerRecord> recordsInPartition = records.records(partition);
       consumerSpansForTopic =
-          handleConsume(recordsInPartition.size() > 0 ? recordsInPartition.get(0) : null,
-              recordsInPartition, consumerSpansForTopic);
+        handler.startBulkReceive(partition.topic(), recordsInPartition, consumerSpansForTopic);
     }
-    for (Span span : consumerSpansForTopic.values()) span.finish(timestamp);
+    handler.finishBulkReceive(consumerSpansForTopic);
     return records;
   }
 
@@ -156,7 +145,7 @@ final class TracingConsumer<K, V>
   }
 
   @Override public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets,
-      OffsetCommitCallback callback) {
+    OffsetCommitCallback callback) {
     delegate.commitAsync(offsets, callback);
   }
 
@@ -228,13 +217,13 @@ final class TracingConsumer<K, V>
   }
 
   @Override public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
-      Map<TopicPartition, Long> timestampsToSearch) {
+    Map<TopicPartition, Long> timestampsToSearch) {
     return delegate.offsetsForTimes(timestampsToSearch);
   }
 
   // Do not use @Override annotation to avoid compatibility issue version < 2.0
   public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
-      Map<TopicPartition, Long> timestampsToSearch, Duration timeout) {
+    Map<TopicPartition, Long> timestampsToSearch, Duration timeout) {
     return delegate.offsetsForTimes(timestampsToSearch, timeout);
   }
 
@@ -245,7 +234,7 @@ final class TracingConsumer<K, V>
 
   // Do not use @Override annotation to avoid compatibility issue version < 2.0
   public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions,
-      Duration timeout) {
+    Duration timeout) {
     return delegate.beginningOffsets(partitions, timeout);
   }
 
@@ -255,7 +244,7 @@ final class TracingConsumer<K, V>
 
   // Do not use @Override annotation to avoid compatibility issue version < 2.0
   public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions,
-      Duration timeout) {
+    Duration timeout) {
     return delegate.endOffsets(partitions, timeout);
   }
 
@@ -276,40 +265,36 @@ final class TracingConsumer<K, V>
     delegate.wakeup();
   }
 
-  static class KafkaConsumerAdapter<K, V> implements MessageConsumerAdapter<ConsumerRecord<K, V>>,
-      ChannelAdapter<ConsumerRecord<K, V>> {
-    final KafkaTracing kafkaTracing;
-
-    KafkaConsumerAdapter(KafkaTracing kafkaTracing) {
-      this.kafkaTracing = kafkaTracing;
-    }
+  static final class ConsumerRecordAdapter
+    extends MessagingAdapter<String, ConsumerRecord, Headers> {
+    final String remoteServiceName;
 
-    static <K, V> KafkaConsumerAdapter<K, V> create(KafkaTracing kafkaTracing) {
-      return new KafkaConsumerAdapter<>(kafkaTracing);
+    ConsumerRecordAdapter(String remoteServiceName) {
+      this.remoteServiceName = remoteServiceName;
     }
 
-    @Override public String channel(ConsumerRecord message) {
-      return message.topic();
+    @Override public Headers carrier(ConsumerRecord message) {
+      return message.headers();
     }
 
-    @Override public String operation(ConsumerRecord message) {
-      return KafkaTracing.CONSUMER_OPERATION;
+    @Override public String channel(String topic) {
+      return topic;
     }
 
-    @Override public String identifier(ConsumerRecord message) {
-      return kafkaTracing.recordKey(message.key());
+    @Override public String channelType(String channel) {
+      return "topic";
     }
 
-    @Override public String remoteServiceName(ConsumerRecord message) {
-      return kafkaTracing.remoteServiceName;
+    @Override public String messageKey(ConsumerRecord message) {
+      return KafkaTracing.recordKey(message.key());
     }
 
-    @Override public String channelTagKey(ConsumerRecord<K, V> message) {
-      return kafkaTracing.channelTagKey(message);
+    @Override public String correlationId(ConsumerRecord message) {
+      return null;
     }
 
-    @Override public String identifierTagKey() {
-      return kafkaTracing.identifierTagKey();
+    @Override public String brokerName(String topic) {
+      return remoteServiceName;
     }
   }
 }
diff --git a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingProducer.java b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingProducer.java
index 9ce4092..6186e3e 100644
--- a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingProducer.java
+++ b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingProducer.java
@@ -18,11 +18,12 @@ package brave.kafka.clients;
 
 import brave.Span;
 import brave.Tracer;
+import brave.Tracing;
 import brave.internal.Nullable;
-import brave.messaging.ChannelAdapter;
-import brave.messaging.MessageProducerAdapter;
-import brave.messaging.MessagingProducerHandler;
-import brave.propagation.CurrentTraceContext;
+import brave.messaging.MessagingAdapter;
+import brave.messaging.MessagingParser;
+import brave.messaging.ProducerHandler;
+import brave.propagation.CurrentTraceContext.Scope;
 import java.time.Duration;
 import java.util.List;
 import java.util.Map;
@@ -37,28 +38,22 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
 
-final class TracingProducer<K, V>
-    extends MessagingProducerHandler<Producer<K, V>, ProducerRecord<K, V>, ProducerRecord<K, V>>
-    implements Producer<K, V> {
+final class TracingProducer<K, V> implements Producer<K, V> {
 
+  final Producer<K, V> delegate;
   final KafkaTracing kafkaTracing;
-  final CurrentTraceContext current;
-  final Tracer tracer;
-  @Nullable final String remoteServiceName;
+  final Tracing tracing;
+  final ProducerHandler<String, ProducerRecord, Headers> handler;
+  final MessagingParser parser;
 
   TracingProducer(Producer<K, V> delegate, KafkaTracing kafkaTracing) {
-    super(
-        delegate,
-        kafkaTracing.msgTracing,
-        KafkaProducerAdapter.create(kafkaTracing),
-        KafkaProducerAdapter.create(kafkaTracing),
-        kafkaTracing.producerRecordExtractor(),
-        kafkaTracing.producerRecordInjector());
+    this.delegate = delegate;
+    this.handler = kafkaTracing.producerHandler;
     this.kafkaTracing = kafkaTracing;
-    this.current = kafkaTracing.msgTracing.tracing().currentTraceContext();
-    this.tracer = kafkaTracing.msgTracing.tracing().tracer();
-    this.remoteServiceName = kafkaTracing.remoteServiceName;
+    this.tracing = kafkaTracing.messagingTracing.tracing();
+    this.parser = kafkaTracing.messagingTracing.parser();
   }
 
   @Override public void initTransactions() {
@@ -82,25 +77,27 @@ final class TracingProducer<K, V>
    * tracing.
    */
   @Override public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
-    return this.send(record, null);
+    return send(record, null);
   }
 
   /**
    * This wraps the send method to add tracing.
    *
    * <p>When there is no current span, this attempts to extract one from headers. This is possible
-   * when a call to produce a message happens directly after a tracing consumer received a span. One
+   * when a call to produce a message happens directly after a tracing producer received a span. One
    * example scenario is Kafka Streams instrumentation.
    */
   // TODO: make b3single an option and then note how using this minimizes overhead
   @Override
   public Future<RecordMetadata> send(ProducerRecord<K, V> record, @Nullable Callback callback) {
-    Span span = handleProduce(record, record);
+    Span span = handler.startSend(record.topic(), record);
+    if (span.isNoop()) return delegate.send(record, callback);
 
-    try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
-      return delegate.send(record, TracingCallback.create(callback, span, current));
+    FinishSpanCallback finishSpanCallback = tracingCallback(record, callback, span);
+    try (Tracer.SpanInScope ws = tracing.tracer().withSpanInScope(span)) {
+      return delegate.send(record, finishSpanCallback);
     } catch (RuntimeException | Error e) {
-      span.error(e).finish(); // finish as an exception means the callback won't finish the span
+      finishSpanCallback.finish(e); // an exception might imply the callback was not invoked
       throw e;
     }
   }
@@ -132,89 +129,87 @@ final class TracingProducer<K, V>
 
   @Override
   public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
-      String consumerGroupId) {
-    delegate.sendOffsetsToTransaction(offsets, consumerGroupId);
+    String producerGroupId) {
+    delegate.sendOffsetsToTransaction(offsets, producerGroupId);
   }
 
   /**
    * Decorates, then finishes a producer span. Allows tracing to record the duration between
    * batching for send and actual send.
    */
-  static final class TracingCallback {
-    static Callback create(@Nullable Callback delegate, Span span, CurrentTraceContext current) {
-      if (span.isNoop()) return delegate; // save allocation overhead
-      if (delegate == null) return new FinishSpan(span);
-      return new DelegateAndFinishSpan(delegate, span, current);
-    }
+  FinishSpanCallback tracingCallback(ProducerRecord<K, V> record, @Nullable Callback delegate,
+    Span span) {
+    if (delegate == null) return new FinishSpanCallback(record, span);
+    return new DelegateAndFinishSpanCallback(record, delegate, span);
+  }
 
-    static class FinishSpan implements Callback {
-      final Span span;
+  final class DelegateAndFinishSpanCallback extends FinishSpanCallback {
+    final Callback delegate;
 
-      FinishSpan(Span span) {
-        this.span = span;
-      }
+    DelegateAndFinishSpanCallback(ProducerRecord<K, V> record, Callback delegate, Span span) {
+      super(record, span);
+      this.delegate = delegate;
+    }
 
-      @Override public void onCompletion(RecordMetadata metadata, @Nullable Exception exception) {
-        if (exception != null) span.error(exception);
-        span.finish();
+    @Override public void onCompletion(RecordMetadata metadata, @Nullable Exception exception) {
+      try (Scope ws = tracing.currentTraceContext().maybeScope(span.context())) {
+        delegate.onCompletion(metadata, exception);
+      } finally {
+        finish(exception);
       }
     }
+  }
 
-    static final class DelegateAndFinishSpan extends FinishSpan {
-      final Callback delegate;
-      final CurrentTraceContext current;
-
-      DelegateAndFinishSpan(Callback delegate, Span span, CurrentTraceContext current) {
-        super(span);
-        this.delegate = delegate;
-        this.current = current;
-      }
+  class FinishSpanCallback implements Callback {
+    final ProducerRecord<K, V> record;
+    final Span span;
 
-      @Override public void onCompletion(RecordMetadata metadata, @Nullable Exception exception) {
-        try (CurrentTraceContext.Scope ws = current.maybeScope(span.context())) {
-          delegate.onCompletion(metadata, exception);
-        } finally {
-          super.onCompletion(metadata, exception);
-        }
-      }
+    FinishSpanCallback(ProducerRecord<K, V> record, Span span) {
+      this.record = record;
+      this.span = span;
     }
-  }
 
-  static final class KafkaProducerAdapter<K, V> implements
-      MessageProducerAdapter<ProducerRecord<K, V>>,
-      ChannelAdapter<ProducerRecord<K, V>> {
-    final KafkaTracing kafkaTracing;
+    @Override public void onCompletion(RecordMetadata metadata, @Nullable Exception exception) {
+      finish(exception);
+    }
 
-    KafkaProducerAdapter(KafkaTracing kafkaTracing) {
-      this.kafkaTracing = kafkaTracing;
+    void finish(@Nullable Throwable error) {
+      if (error != null) span.error(error);
+      handler.finishSend(record.topic(), record, span);
+      span.finish();
     }
+  }
+
+  static final class ProducerRecordAdapter
+    extends MessagingAdapter<String, ProducerRecord, Headers> {
+    final String remoteServiceName;
 
-    static <K, V> KafkaProducerAdapter<K, V> create(KafkaTracing kafkaTracing) {
-      return new KafkaProducerAdapter<>(kafkaTracing);
+    ProducerRecordAdapter(String remoteServiceName) {
+      this.remoteServiceName = remoteServiceName;
     }
 
-    @Override public String channel(ProducerRecord message) {
-      return message.topic();
+    @Override public Headers carrier(ProducerRecord message) {
+      return message.headers();
     }
 
-    @Override public String operation(ProducerRecord message) {
-      return KafkaTracing.PRODUCER_OPERATION;
+    @Override public String channel(String topic) {
+      return topic;
     }
 
-    @Override public String identifier(ProducerRecord message) {
-      return kafkaTracing.recordKey(message.key());
+    @Override public String channelType(String channel) {
+      return "topic";
     }
 
-    @Override public String remoteServiceName(ProducerRecord message) {
-      return kafkaTracing.remoteServiceName;
+    @Override public String messageKey(ProducerRecord message) {
+      return KafkaTracing.recordKey(message.key());
     }
 
-    @Override public String channelTagKey(ProducerRecord<K, V> message) {
-      return kafkaTracing.channelTagKey(message);
+    @Override public String correlationId(ProducerRecord message) {
+      return null;
     }
 
-    @Override public String identifierTagKey() {
-      return kafkaTracing.identifierTagKey();
+    @Override public String brokerName(String topic) {
+      return remoteServiceName;
     }
   }
 }
diff --git a/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/TracingCallbackTest.java b/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/TracingCallbackTest.java
index 7cf6caa..e1a0dab 100644
--- a/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/TracingCallbackTest.java
+++ b/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/TracingCallbackTest.java
@@ -17,8 +17,9 @@
 package brave.kafka.clients;
 
 import brave.Span;
-import brave.sampler.Sampler;
 import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.junit.Test;
@@ -28,19 +29,14 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
 public class TracingCallbackTest extends BaseTracingTest {
-  @Test public void create_returns_input_on_noop() {
-    Span span = tracing.tracer().withSampler(Sampler.NEVER_SAMPLE).nextSpan();
-
-    Callback delegate = mock(Callback.class);
-    Callback tracingCallback = TracingProducer.TracingCallback.create(delegate, span, current);
-
-    assertThat(tracingCallback).isSameAs(delegate);
-  }
+  Producer<String, String> producer = mock(Producer.class);
+  ProducerRecord<String, String> record = mock(ProducerRecord.class);
+  TracingProducer<String, String> tracingProducer = new TracingProducer<>(producer, kafkaTracing);
 
   @Test public void on_completion_should_finish_span() {
     Span span = tracing.tracer().nextSpan().start();
 
-    Callback tracingCallback = TracingProducer.TracingCallback.create(null, span, current);
+    Callback tracingCallback = tracingProducer.tracingCallback(record, null, span);
     tracingCallback.onCompletion(createRecordMetadata(), null);
 
     assertThat(spans.getFirst()).isNotNull();
@@ -49,18 +45,18 @@ public class TracingCallbackTest extends BaseTracingTest {
   @Test public void on_completion_should_tag_if_exception() {
     Span span = tracing.tracer().nextSpan().start();
 
-    Callback tracingCallback = TracingProducer.TracingCallback.create(null, span, current);
+    Callback tracingCallback = tracingProducer.tracingCallback(record, null, span);
     tracingCallback.onCompletion(null, new Exception("Test exception"));
 
     assertThat(spans.getFirst().tags())
-        .containsEntry("error", "Test exception");
+      .containsEntry("error", "Test exception");
   }
 
   @Test public void on_completion_should_forward_then_finish_span() {
     Span span = tracing.tracer().nextSpan().start();
 
     Callback delegate = mock(Callback.class);
-    Callback tracingCallback = TracingProducer.TracingCallback.create(delegate, span, current);
+    Callback tracingCallback = tracingProducer.tracingCallback(record, delegate, span);
     RecordMetadata md = createRecordMetadata();
     tracingCallback.onCompletion(md, null);
 
@@ -73,14 +69,16 @@ public class TracingCallbackTest extends BaseTracingTest {
 
     Callback delegate = (metadata, exception) -> assertThat(current.get()).isSameAs(span.context());
 
-    TracingProducer.TracingCallback.create(delegate, span, current).onCompletion(createRecordMetadata(), null);
+    Callback tracingCallback = tracingProducer.tracingCallback(record, delegate, span);
+
+    tracingCallback.onCompletion(createRecordMetadata(), null);
   }
 
   @Test public void on_completion_should_forward_then_tag_if_exception() {
     Span span = tracing.tracer().nextSpan().start();
 
     Callback delegate = mock(Callback.class);
-    Callback tracingCallback = TracingProducer.TracingCallback.create(delegate, span, current);
+    Callback tracingCallback = tracingProducer.tracingCallback(record, delegate, span);
     RecordMetadata md = createRecordMetadata();
     Exception e = new Exception("Test exception");
     tracingCallback.onCompletion(md, e);
@@ -88,7 +86,7 @@ public class TracingCallbackTest extends BaseTracingTest {
     verify(delegate).onCompletion(md, e);
 
     assertThat(spans.getFirst().tags())
-        .containsEntry("error", "Test exception");
+      .containsEntry("error", "Test exception");
   }
 
   RecordMetadata createRecordMetadata() {
diff --git a/instrumentation/messaging/pom.xml b/instrumentation/messaging/pom.xml
index 310ac2b..1f6ed5b 100644
--- a/instrumentation/messaging/pom.xml
+++ b/instrumentation/messaging/pom.xml
@@ -38,6 +38,18 @@
     <errorprone.args>-Xep:MissingOverride:OFF</errorprone.args>
   </properties>
 
+  <dependencies>
+    <dependency>
+      <groupId>software.amazon.awssdk</groupId>
+      <artifactId>sqs</artifactId>
+      <version>2.5.52</version>
+    </dependency>
+    <dependency>
+      <groupId>com.rabbitmq</groupId>
+      <artifactId>amqp-client</artifactId>
+      <version>5.7.1</version>
+    </dependency>
+  </dependencies>
   <build>
     <plugins>
       <plugin>
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/ConsumerHandler.java b/instrumentation/messaging/src/main/java/brave/messaging/ConsumerHandler.java
new file mode 100644
index 0000000..7456b16
--- /dev/null
+++ b/instrumentation/messaging/src/main/java/brave/messaging/ConsumerHandler.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brave.messaging;
+
+import brave.Span;
+import brave.Tracing;
+import brave.internal.Nullable;
+import brave.propagation.TraceContext;
+import brave.propagation.TraceContextOrSamplingFlags;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @param <Chan> the type of the channel
+ * @param <Msg> the type of the message
+ * @param <C> the type that carriers the trace context, usually headers
+ */
+public final class ConsumerHandler<Chan, Msg, C> {
+
+  public static <Chan, Msg, C> ConsumerHandler<Chan, Msg, C> create(
+    MessagingTracing messagingTracing,
+    MessagingAdapter<Chan, Msg, C> adapter,
+    TraceContext.Extractor<C> extractor,
+    TraceContext.Injector<C> injector
+  ) {
+    return new ConsumerHandler<>(messagingTracing, adapter, extractor, injector);
+  }
+
+  final Tracing tracing;
+  final TraceContext.Extractor<C> extractor;
+  final TraceContext.Injector<C> injector;
+  final MessagingParser parser;
+  final MessagingAdapter<Chan, Msg, C> adapter;
+
+  ConsumerHandler(MessagingTracing messagingTracing,
+    MessagingAdapter<Chan, Msg, C> adapter,
+    TraceContext.Extractor<C> extractor,
+    TraceContext.Injector<C> injector
+  ) {
+    this.tracing = messagingTracing.tracing;
+    this.extractor = extractor;
+    this.injector = injector;
+    this.parser = messagingTracing.parser;
+    this.adapter = adapter;
+  }
+
+  public void handleReceive(Chan channel, Msg message) {
+    if (message == null || tracing.isNoop()) return;
+    C carrier = adapter.carrier(message);
+    TraceContextOrSamplingFlags extracted = extractor.extract(carrier);
+    handleReceive(channel, message, carrier, adapter.brokerName(channel), extracted, false);
+  }
+
+  /** Returns a started processor span if {@code createProcessor} is true */
+  @Nullable Span handleReceive(Chan channel, Msg message, C carrier, String remoteServiceName,
+    TraceContextOrSamplingFlags extracted, boolean createProcessor) {
+    Span span = tracing.tracer().nextSpan(extracted);
+    // Creating the processor while the consumer is not finished ensures clocks are the same. This
+    // allows the processor to start later, but not be subject to clock drift relative to the parent.
+    Span processorSpan = createProcessor ? tracing.tracer().newChild(span.context()) : null;
+    if (!span.isNoop()) {
+      span.kind(Span.Kind.CONSUMER);
+      parser.start("receive", adapter, channel, message, span.context(), span.customizer());
+      if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
+
+      // incur timestamp overhead only once
+      long timestamp = tracing.clock(span.context()).currentTimeMicroseconds();
+      span.start(timestamp);
+      parser.finish("receive", adapter, channel, message, span.context(), span.customizer());
+      span.finish(timestamp + 1);
+
+      // eventhough we are setting the timestamp here, start timestamp is allowed to be overwritten
+      // later as needed. Doing so here avoids the overhead of a tick reading
+      if (processorSpan != null) processorSpan.start(timestamp + 1);
+    }
+    injector.inject(createProcessor ? processorSpan.context() : span.context(), carrier);
+    return processorSpan;
+  }
+
+  public Map<Chan, Span> startBulkReceive(Chan channel, List<? extends Msg> messages,
+    Map<Chan, Span> spanForChannel) {
+    long timestamp = 0L;
+    for (int i = 0, length = messages.size(); i < length; i++) {
+      Msg message = messages.get(i);
+      C carrier = adapter.carrier(message);
+      TraceContextOrSamplingFlags extracted = extractor.extract(carrier);
+      String remoteServiceName = adapter.brokerName(channel);
+
+      // If we extracted neither a trace context, nor request-scoped data (extra),
+      // make or reuse a span for this topic
+      if (extracted.samplingFlags() != null && extracted.extra().isEmpty()) {
+        Span span = spanForChannel.get(channel);
+        if (span == null) {
+          span = tracing.tracer().nextSpan(extracted);
+          if (!span.isNoop()) {
+            span.kind(Span.Kind.CONSUMER);
+            parser.start("receive-batch", adapter, channel, null, span.context(),
+              span.customizer());
+            if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
+            // incur timestamp overhead only once
+            if (timestamp == 0L) {
+              timestamp = tracing.clock(span.context()).currentTimeMicroseconds();
+            }
+            span.start(timestamp);
+          }
+          spanForChannel.put(channel, span);
+        }
+        injector.inject(span.context(), carrier);
+      } else { // we extracted request-scoped data, so cannot share a consumer span.
+        handleReceive(channel, message, carrier, remoteServiceName, extracted, false);
+      }
+    }
+    return spanForChannel;
+  }
+
+  public void finishBulkReceive(Map<Chan, Span> spanForChannel) {
+    long timestamp = 0L;
+    for (Map.Entry<Chan, Span> entry : spanForChannel.entrySet()) {
+      Span span = entry.getValue();
+      // incur timestamp overhead only once
+      if (timestamp == 0L) {
+        timestamp = tracing.clock(span.context()).currentTimeMicroseconds();
+      }
+      parser.finish("receive-batch", adapter, entry.getKey(), null, span.context(),
+        span.customizer());
+      span.finish(timestamp);
+    }
+  }
+}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessageAdapter.java b/instrumentation/messaging/src/main/java/brave/messaging/MessageAdapter.java
deleted file mode 100644
index 502890b..0000000
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessageAdapter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package brave.messaging;
-
-interface MessageAdapter<Msg> {
-  /**
-   * Messaging operation semantics, e.g. pull, push, send, receive, etc.
-   */
-  String operation(Msg message);
-
-  /**
-   * Message identifier, e.g. kafka record key, jms message correlation id.
-   */
-  String identifier(Msg message);
-
-  /**
-   * Removes propagation context from Message context carrier.
-   */
-  //void clearPropagation(Msg message);
-
-  String identifierTagKey();
-}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessageConsumerAdapter.java b/instrumentation/messaging/src/main/java/brave/messaging/MessageConsumerAdapter.java
deleted file mode 100644
index f31e327..0000000
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessageConsumerAdapter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package brave.messaging;
-
-public interface MessageConsumerAdapter<Msg> extends MessageAdapter<Msg> {
-  /**
-   * Messaging operation semantics, e.g. pull, push, send, receive, etc.
-   */
-  String operation(Msg message);
-
-  /**
-   * Message identifier, e.g. kafka record key, jms message correlation id.
-   */
-  String identifier(Msg message);
-
-  /**
-   * Removes propagation context from Message context carrier.
-   */
-  //void clearPropagation(Msg message);
-
-  String identifierTagKey();
-}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessageProducerAdapter.java b/instrumentation/messaging/src/main/java/brave/messaging/MessageProducerAdapter.java
deleted file mode 100644
index 2bfbde6..0000000
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessageProducerAdapter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package brave.messaging;
-
-public interface MessageProducerAdapter<Msg> extends MessageAdapter<Msg> {
-  /**
-   * Messaging operation semantics, e.g. pull, push, send, receive, etc.
-   */
-  String operation(Msg message);
-
-  /**
-   * Message identifier, e.g. kafka record key, jms message correlation id.
-   */
-  String identifier(Msg message);
-
-  /**
-   * Removes propagation context from Message context carrier.
-   */
-  //void clearPropagation(Msg message);
-
-  String identifierTagKey();
-}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessagingAdapter.java b/instrumentation/messaging/src/main/java/brave/messaging/MessagingAdapter.java
new file mode 100644
index 0000000..11a4e6c
--- /dev/null
+++ b/instrumentation/messaging/src/main/java/brave/messaging/MessagingAdapter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brave.messaging;
+
+import brave.Span;
+import brave.internal.Nullable;
+
+/**
+ * @param <Chan> the type of the channel
+ * @param <Msg> the type of the message
+ * @param <C> the type that carriers the trace context, usually headers
+ */
+// abstract class instead of interface to allow method adds before Java 1.8
+public abstract class MessagingAdapter<Chan, Msg, C> {
+  // TODO: make some of these methods not abstract as they don't have meaning for all impls
+
+  /** Returns the trace context carrier from the message. Usually, this is headers. */
+  public abstract C carrier(Msg message);
+
+  /**
+   * Messaging channel, e.g. kafka queue or JMS topic name. {@code null} if unreadable.
+   *
+   * <p>Conventionally associated with the key "message.channel"
+   */
+  @Nullable public abstract String channel(Chan channel);
+
+  /**
+   * Type of channel, e.g. queue or topic. {@code null} if unreadable.
+   *
+   * <p>Conventionally associated with the key "message.channel_type"
+   */
+  // TODO: naming is arbitrary.. we once used "kind" for Span but only because stackdriver did..
+  // Not sure we should re-use kind for parity with that or not..
+  @Nullable public abstract String channelType(Chan channel);
+
+  /**
+   * Key used to identity or partition messages. {@code null} if unreadable.
+   *
+   * <p>Conventionally associated with the key "message.key"
+   */
+  @Nullable public abstract String messageKey(Msg message);
+
+  /**
+   * Identifier used to correlate logs. {@code null} if unreadable.
+   *
+   * <p>Conventionally associated with the key "message.correlation_id"
+   */
+  @Nullable public abstract String correlationId(Msg message);
+
+  /**
+   * Message broker name. {@code null} if unreadable.
+   *
+   * <p>Conventionally associated with {@link Span#remoteServiceName(String)}
+   */
+  @Nullable public abstract String brokerName(Chan channel);
+
+  protected MessagingAdapter() {
+  }
+}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessagingConsumerHandler.java b/instrumentation/messaging/src/main/java/brave/messaging/MessagingConsumerHandler.java
deleted file mode 100644
index 629e3b9..0000000
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessagingConsumerHandler.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package brave.messaging;
-
-import brave.Span;
-import brave.SpanCustomizer;
-import brave.Tracing;
-import brave.propagation.TraceContext;
-import brave.propagation.TraceContextOrSamplingFlags;
-import java.util.List;
-import java.util.Map;
-
-public class MessagingConsumerHandler<C, Chan, Msg>
-    extends MessagingHandler<Chan, Msg, ChannelAdapter<Chan>, MessageAdapter<Msg>> {
-
-  static public <C, Chan, Msg> MessagingConsumerHandler<C, Chan, Msg> create(
-      C delegate,
-      MessagingTracing tracing,
-      ChannelAdapter<Chan> channelAdapter,
-      MessageConsumerAdapter<Msg> messageAdapter,
-      TraceContext.Extractor<Msg> extractor,
-      TraceContext.Injector<Msg> injector) {
-    return new MessagingConsumerHandler<>(delegate, tracing, channelAdapter, messageAdapter,
-        extractor, injector);
-  }
-
-  public final C delegate;
-  final Tracing tracing;
-
-  public MessagingConsumerHandler(
-      C delegate,
-      MessagingTracing messagingTracing,
-      ChannelAdapter<Chan> channelAdapter,
-      MessageConsumerAdapter<Msg> messageAdapter,
-      TraceContext.Extractor<Msg> extractor,
-      TraceContext.Injector<Msg> injector) {
-    super(messagingTracing.tracing.currentTraceContext(), channelAdapter, messageAdapter,
-        messagingTracing.consumerParser, extractor, injector);
-    this.delegate = delegate;
-    this.tracing = messagingTracing.tracing;
-  }
-
-  public Span nextSpan(Chan channel, Msg message) {
-    TraceContextOrSamplingFlags extracted = extractor.extract(message);
-    Span result = tracing.tracer().nextSpan(extracted);
-    if (extracted.context() == null && !result.isNoop()) {
-      addTags(channel, result);
-    }
-    return result;
-  }
-
-  /** When an upstream context was not present, lookup keys are unlikely added */
-  void addTags(Chan channel, SpanCustomizer result) {
-    parser.channel(channelAdapter, channel, result);
-    //parser.identifier(messageAdapter, message, result);
-  }
-
-  public void handleConsume(Chan channel, Msg message) {
-    if (message == null || tracing.isNoop()) return;
-    // remove prior propagation headers from the message
-    Span span = nextSpan(channel, message);
-    if (!span.isNoop()) {
-      span.kind(Span.Kind.CONSUMER);
-      parser.message(channelAdapter, messageAdapter, channel, message, span);
-
-      // incur timestamp overhead only once
-      long timestamp = tracing.clock(span.context()).currentTimeMicroseconds();
-      span.start(timestamp).finish(timestamp);
-    }
-    injector.inject(span.context(), message);
-  }
-
-  public Map<String, Span> handleConsume(Chan chan, List<Msg> messages,
-      Map<String, Span> spanForChannel) {
-    long timestamp = 0L;
-    for (int i = 0, length = messages.size(); i < length; i++) {
-      Msg message = messages.get(i);
-      TraceContextOrSamplingFlags extracted = extractor.extract(message);
-
-      // If we extracted neither a trace context, nor request-scoped data (extra),
-      // make or reuse a span for this topic
-      if (extracted.samplingFlags() != null && extracted.extra().isEmpty()) {
-        String channel = channelAdapter.channel(chan);
-        Span span = spanForChannel.get(channel);
-        if (span == null) {
-          span = tracing.tracer().nextSpan(extracted);
-          if (!span.isNoop()) {
-            span.name(messageAdapter.operation(message)).kind(Span.Kind.CONSUMER);
-            parser.message(channelAdapter, messageAdapter, chan, message, span);
-            String remoteServiceName = channelAdapter.remoteServiceName(chan);
-            if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
-            // incur timestamp overhead only once
-            if (timestamp == 0L) {
-              timestamp = tracing.clock(span.context()).currentTimeMicroseconds();
-            }
-            span.start(timestamp);
-          }
-          spanForChannel.put(channel, span);
-        }
-        injector.inject(span.context(), message);
-      } else { // we extracted request-scoped data, so cannot share a consumer span.
-        Span span = tracing.tracer().nextSpan(extracted);
-        if (!span.isNoop()) {
-          span.kind(Span.Kind.CONSUMER);
-          parser.message(channelAdapter, messageAdapter, chan, message, span);
-          String remoteServiceName = channelAdapter.remoteServiceName(chan);
-          if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
-          // incur timestamp overhead only once
-          if (timestamp == 0L) {
-            timestamp = tracing.clock(span.context()).currentTimeMicroseconds();
-          }
-          span.start(timestamp).finish(timestamp); // span won't be shared by other records
-        }
-        injector.inject(span.context(), message);
-      }
-    }
-    return spanForChannel;
-  }
-}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessagingConsumerParser.java b/instrumentation/messaging/src/main/java/brave/messaging/MessagingConsumerParser.java
deleted file mode 100644
index c463860..0000000
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessagingConsumerParser.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package brave.messaging;
-
-import brave.SpanCustomizer;
-import brave.propagation.TraceContext;
-import brave.propagation.TraceContextOrSamplingFlags;
-
-public class MessagingConsumerParser extends MessagingParser {
-
-  public <Chan, Msg> void message(ChannelAdapter<Chan> channelAdapter,
-      MessageAdapter<Msg> messageAdapter,
-      Chan channel, Msg message, SpanCustomizer customizer) {
-    customizer.name(messageAdapter.operation(message));
-    channel(channelAdapter, channel, customizer);
-    //identifier(messageAdapter, message, customizer);
-  }
-
-  //public <Msg> TraceContextOrSamplingFlags extractContextAndClearMessage(
-  //    MessageAdapter<Msg> adapter,
-  //    TraceContext.Extractor<Msg> extractor,
-  //    Msg message) {
-  //  // clear propagation headers if we were able to extract a span
-  //  //TODO check if correct to not filter on empty flags. Diff between kafka and jms instrumentation
-  //  //if (!extracted.equals(TraceContextOrSamplingFlags.EMPTY)) {
-  //  //adapter.clearPropagation(message);
-  //  //}
-  //  return extractor.extract(message);
-  //}
-}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessagingHandler.java b/instrumentation/messaging/src/main/java/brave/messaging/MessagingHandler.java
deleted file mode 100644
index 0d22086..0000000
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessagingHandler.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package brave.messaging;
-
-import brave.propagation.CurrentTraceContext;
-import brave.propagation.TraceContext;
-
-abstract class MessagingHandler<Chan, Msg, CA extends ChannelAdapter<Chan>, MA extends MessageAdapter<Msg>> {
-
-  final CurrentTraceContext currentTraceContext;
-  final CA channelAdapter;
-  final MA messageAdapter;
-  final MessagingParser parser;
-  final TraceContext.Extractor<Msg> extractor;
-  final TraceContext.Injector<Msg> injector;
-
-  MessagingHandler(
-      CurrentTraceContext currentTraceContext,
-      CA channelAdapter,
-      MA adapter,
-      MessagingParser parser,
-      TraceContext.Extractor<Msg> extractor,
-      TraceContext.Injector<Msg> injector) {
-    this.currentTraceContext = currentTraceContext;
-    this.channelAdapter = channelAdapter;
-    this.messageAdapter = adapter;
-    this.parser = parser;
-    this.extractor = extractor;
-    this.injector = injector;
-  }
-}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/ChannelAdapter.java b/instrumentation/messaging/src/main/java/brave/messaging/MessagingOperation.java
similarity index 72%
rename from instrumentation/messaging/src/main/java/brave/messaging/ChannelAdapter.java
rename to instrumentation/messaging/src/main/java/brave/messaging/MessagingOperation.java
index 601410e..8194791 100644
--- a/instrumentation/messaging/src/main/java/brave/messaging/ChannelAdapter.java
+++ b/instrumentation/messaging/src/main/java/brave/messaging/MessagingOperation.java
@@ -16,16 +16,9 @@
  */
 package brave.messaging;
 
-public interface ChannelAdapter<Channel> {
-  /**
-   * Messaging channel, e.g. kafka topic, jms queue, jms topic, etc.
-   */
-  String channel(Channel channel);
-
-  String channelTagKey(Channel channel);
-
-  /**
-   * Messaging broker service, e.g. kafka-cluster, jms-server.
-   */
-  String remoteServiceName(Channel channel);
+public enum MessagingOperation {
+  SEND,
+  BULK_SEND,
+  RECEIVE,
+  BULK_RECEIVE;
 }
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessagingParser.java b/instrumentation/messaging/src/main/java/brave/messaging/MessagingParser.java
index 73ae644..87ccbed 100644
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessagingParser.java
+++ b/instrumentation/messaging/src/main/java/brave/messaging/MessagingParser.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *     messaging://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,42 +17,61 @@
 package brave.messaging;
 
 import brave.SpanCustomizer;
+import brave.Tracing;
+import brave.internal.Nullable;
+import brave.propagation.ExtraFieldPropagation;
+import brave.propagation.TraceContext;
 
+/**
+ * <p>Methods will not be invoked with a span in scope. Please use the explicit {@link
+ * TraceContext} if you need to create tags based on propagated data like {@link
+ * ExtraFieldPropagation}.
+ */
+// there are no producer/consumer subtypes because it is simpler than multiple inheritance when the
+// same libary handles producer and consumer relationships.
 public class MessagingParser {
 
-  public <Chan, Msg> void message(ChannelAdapter<Chan> channelAdapter,
-      MessageAdapter<Msg> messageAdapter,
-      Chan channel, Msg message, SpanCustomizer customizer) {
-    customizer.name(messageAdapter.operation(message));
-    channel(channelAdapter, channel, customizer);
-    identifier(messageAdapter, message, customizer);
+  /**
+   * Override to change what data to add to the span when a message operation starts. By default,
+   * this sets the span name to the operation name the tag "messaging.channel" if available.
+   *
+   * <p>If you only want to change the span name, you can override {@link
+   * #spanName(String, MessagingAdapter, Object, Object)} instead.
+   *
+   * @param msg null when a bulk operation
+   * @see #spanName(String, MessagingAdapter, Object, Object)
+   * @see #finish(String, MessagingAdapter, Object, Object, TraceContext, SpanCustomizer)
+   */
+  // Context is here so that people can decide to add tags based on local root etc.
+  public <Chan, Msg, C> void start(String operation, MessagingAdapter<Chan, Msg, C> adapter,
+    Chan channel, @Nullable Msg msg, TraceContext context, SpanCustomizer customizer) {
+    customizer.name(spanName(operation, adapter, channel, msg));
+    addMessageTags(adapter, channel, msg, context, customizer);
   }
 
-  public <Chan> void channel(ChannelAdapter<Chan> adapter, Chan chan,
-      SpanCustomizer customizer) {
-    String channel = adapter.channel(chan);
-    if (chan != null) customizer.tag(adapter.channelTagKey(chan), channel);
+  // channel is nullable as JMS could have an exception getting it from the message
+  protected <Chan, Msg, C> void addMessageTags(MessagingAdapter<Chan, Msg, C> adapter,
+    @Nullable Chan channel, @Nullable Msg msg, TraceContext context, SpanCustomizer customizer) {
+    String channelName = adapter.channel(channel);
+    if (channelName != null) customizer.tag("messaging.channel", channelName);
   }
 
-  public <Msg> void identifier(MessageAdapter<Msg> adapter, Msg message,
-      SpanCustomizer customizer) {
-    String identifier = adapter.identifier(message);
-    if (identifier != null) {
-      customizer.tag(adapter.identifierTagKey(), identifier);
-    }
+  /**
+   * Override to change what data to add to the span when a message operation completes.
+   *
+   * <p>This adds no tags by default. Error tagging is delegated to {@link Tracing#errorParser()}.
+   *
+   * @param msg null when a bulk operation
+   * @see #start(String, MessagingAdapter, Object, Object, TraceContext, SpanCustomizer)
+   */
+  // Context is here so that people can add tags based on extra fields without the cost of a scoping
+  public <Chan, Msg, C> void finish(String operation, MessagingAdapter<Chan, Msg, C> adapter,
+    Chan channel, @Nullable Msg msg, TraceContext context, SpanCustomizer customizer) {
   }
 
-  //public <Msg> TraceContextOrSamplingFlags extractContextAndClearMessage(
-  //    MessageAdapter<Msg> adapter,
-  //    TraceContext.Extractor<Msg> extractor,
-  //    Msg message) {
-  //  TraceContextOrSamplingFlags extracted = extractor.extract(message);
-  //  // clear propagation headers if we were able to extract a span
-  //  //TODO check if correct to not filter on empty flags. Diff between kafka and jms instrumentation
-  //  //if (!extracted.equals(TraceContextOrSamplingFlags.EMPTY)) {
-  //  //TODO check we dont need this
-  //  //  adapter.clearPropagation(message);
-  //  //}
-  //  return extracted;
-  //}
+  /** Returns the span name of a message operation. Defaults to the operation name. */
+  protected <Chan, Msg, C> String spanName(String operation,
+    MessagingAdapter<Chan, Msg, C> adapter, Chan channel, @Nullable Msg msg) {
+    return operation;
+  }
 }
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessagingProducerHandler.java b/instrumentation/messaging/src/main/java/brave/messaging/MessagingProducerHandler.java
deleted file mode 100644
index d330fc3..0000000
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessagingProducerHandler.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package brave.messaging;
-
-import brave.Span;
-import brave.Tracer;
-import brave.propagation.TraceContext;
-
-public class MessagingProducerHandler<P, Chan, Msg>
-    extends MessagingHandler<Chan, Msg, ChannelAdapter<Chan>, MessageProducerAdapter<Msg>> {
-
-  public static <P, Chan, Msg> MessagingProducerHandler<P, Chan, Msg> create(
-      P delegate,
-      MessagingTracing tracing,
-      ChannelAdapter<Chan> channelAdapter,
-      MessageProducerAdapter<Msg> messageAdapter,
-      TraceContext.Extractor<Msg> extractor,
-      TraceContext.Injector<Msg> injector) {
-    return new MessagingProducerHandler<>(delegate, tracing, channelAdapter, messageAdapter,
-        extractor, injector);
-  }
-
-  public final P delegate;
-  final Tracer tracer;
-
-  public MessagingProducerHandler(
-      P delegate,
-      MessagingTracing messagingTracing,
-      ChannelAdapter<Chan> channelAdapter,
-      MessageProducerAdapter<Msg> messageAdapter,
-      TraceContext.Extractor<Msg> extractor,
-      TraceContext.Injector<Msg> injector) {
-    super(messagingTracing.tracing.currentTraceContext(), channelAdapter, messageAdapter,
-        messagingTracing.producerParser, extractor, injector);
-    this.delegate = delegate;
-    this.tracer = messagingTracing.tracing.tracer();
-  }
-
-  public Span handleProduce(Chan channel, Msg message) {
-    TraceContext maybeParent = currentTraceContext.get();
-    // Unlike message consumers, we try current span before trying extraction. This is the proper
-    // order because the span in scope should take precedence over a potentially stale header entry.
-    //
-    // NOTE: Brave instrumentation used properly does not result in stale header entries, as we
-    // always clear message headers after reading.
-    Span span;
-    if (maybeParent == null) {
-      span = tracer.nextSpan(extractor.extract(message));
-    } else {
-      // As JMS is sensitive about write access to headers, we  defensively clear even if it seems
-      // upstream would have cleared (because there is a span in scope!).
-      span = tracer.newChild(maybeParent);
-      //TODO check we dont need this
-      // messageAdapter.clearPropagation(message);
-    }
-
-    if (!span.isNoop()) {
-      span.kind(Span.Kind.PRODUCER).name(messageAdapter.operation(message));
-      parser.message(channelAdapter, messageAdapter, channel, message, span);
-      String remoteServiceName = channelAdapter.remoteServiceName(channel);
-      if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
-      span.start();
-    }
-
-    injector.inject(span.context(), message);
-
-    return span;
-  }
-}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessagingProducerParser.java b/instrumentation/messaging/src/main/java/brave/messaging/MessagingProducerParser.java
deleted file mode 100644
index 84723b5..0000000
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessagingProducerParser.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package brave.messaging;
-
-import brave.SpanCustomizer;
-
-public class MessagingProducerParser extends MessagingParser {
-
-  public <Chan, Msg> void message(ChannelAdapter<Chan> channelAdapter,
-      MessageAdapter<Msg> messageAdapter,
-      Chan channel, Msg message, SpanCustomizer customizer) {
-    customizer.name(messageAdapter.operation(message));
-    channel(channelAdapter, channel, customizer);
-    identifier(messageAdapter, message, customizer);
-  }
-
-  //public <Msg> TraceContextOrSamplingFlags extractContextAndClearMessage(
-  //    MessageAdapter<Msg> adapter,
-  //    TraceContext.Extractor<Msg> extractor,
-  //    Msg message) {
-  //  // clear propagation headers if we were able to extract a span
-  //  //TODO check if correct to not filter on empty flags. Diff between kafka and jms instrumentation
-  //  //if (!extracted.equals(TraceContextOrSamplingFlags.EMPTY)) {
-  //  //adapter.clearPropagation(message);
-  //  //}
-  //  return extractor.extract(message);
-  //}
-}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessagingTracing.java b/instrumentation/messaging/src/main/java/brave/messaging/MessagingTracing.java
index a2d6e66..109ddaa 100644
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessagingTracing.java
+++ b/instrumentation/messaging/src/main/java/brave/messaging/MessagingTracing.java
@@ -16,13 +16,9 @@
  */
 package brave.messaging;
 
-import brave.Span;
 import brave.Tracing;
-import brave.propagation.TraceContext;
-import brave.propagation.TraceContextOrSamplingFlags;
 
 public class MessagingTracing {
-
   public static MessagingTracing create(Tracing tracing) {
     return newBuilder(tracing).build();
   }
@@ -32,66 +28,37 @@ public class MessagingTracing {
   }
 
   final Tracing tracing;
-  final MessagingConsumerParser consumerParser;
-  final MessagingProducerParser producerParser;
+  final MessagingParser parser;
 
   MessagingTracing(Builder builder) {
     this.tracing = builder.tracing;
-    this.consumerParser = builder.consumerParser;
-    this.producerParser = builder.producerParser;
+    this.parser = builder.parser;
   }
 
   public Tracing tracing() {
     return tracing;
   }
 
-  public MessagingProducerParser producerParser() {
-    return producerParser;
-  }
-
-  public MessagingConsumerParser consumerParser() {
-    return consumerParser;
-  }
-
-  public <Chan, Msg> Span nextSpan(ChannelAdapter<Chan> channelAdapter,
-      MessageAdapter<Msg> messageAdapter,
-      TraceContext.Extractor<Msg> extractor,
-      Msg message,
-      Chan channel) {
-    TraceContextOrSamplingFlags extracted = extractor.extract(message);
-    Span result = tracing.tracer().nextSpan(extracted);
-
-    // When an upstream context was not present, lookup keys are unlikely added
-    if (extracted.context() == null && !result.isNoop()) {
-      consumerParser.channel(channelAdapter, channel, result);
-      consumerParser.identifier(messageAdapter, message, result);
-    }
-    return result;
+  public MessagingParser parser() {
+    return parser;
   }
 
   public static class Builder {
     final Tracing tracing;
-    MessagingConsumerParser consumerParser = new MessagingConsumerParser();
-    MessagingProducerParser producerParser = new MessagingProducerParser();
+    MessagingParser parser = new MessagingParser();
 
     Builder(Tracing tracing) {
       if (tracing == null) throw new NullPointerException("tracing == null");
       this.tracing = tracing;
     }
 
-    public Builder consumerParser(MessagingConsumerParser consumerParser) {
-      if (producerParser == null) throw new NullPointerException("consumerParser == null");
-      this.consumerParser = consumerParser;
-      return this;
-    }
-
-    public Builder producerParser(MessagingProducerParser producerParser) {
-      if (producerParser == null) throw new NullPointerException("producerParser == null");
-      this.producerParser = producerParser;
+    public Builder parser(MessagingParser parser) {
+      if (parser == null) throw new NullPointerException("parser == null");
+      this.parser = parser;
       return this;
     }
 
-    MessagingTracing build() {
+    public MessagingTracing build() {
       return new MessagingTracing(this);
     }
   }
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/ProcessorHandler.java b/instrumentation/messaging/src/main/java/brave/messaging/ProcessorHandler.java
new file mode 100644
index 0000000..fa5da4c
--- /dev/null
+++ b/instrumentation/messaging/src/main/java/brave/messaging/ProcessorHandler.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brave.messaging;
+
+import brave.Span;
+import brave.Tracer;
+import brave.internal.Nullable;
+
+/**
+ * @param <Chan> the type of the channel
+ * @param <Msg> the type of the message
+ * @param <C> the type that carriers the trace context, usually headers
+ */
+public final class ProcessorHandler<Chan, Msg, C> {
+  public static <Chan, Msg, C> ProcessorHandler<Chan, Msg, C> create(
+    MessagingTracing messagingTracing,
+    ConsumerHandler<Chan, Msg, C> consumerHandler
+  ) {
+    return new ProcessorHandler<>(messagingTracing, consumerHandler);
+  }
+
+  final Tracer tracer;
+  final ConsumerHandler<Chan, Msg, C> consumerHandler;
+  final MessagingAdapter<Chan, Msg, C> adapter;
+
+  ProcessorHandler(MessagingTracing messagingTracing,
+    ConsumerHandler<Chan, Msg, C> consumerHandler) {
+    this.tracer = messagingTracing.tracing.tracer();
+    this.consumerHandler = consumerHandler;
+    this.adapter = consumerHandler.adapter;
+  }
+
+  /**
+   * When {@code addConsumerSpan} is true, this creates 2 spans:
+   * <ol>
+   * <li>A duration 1 {@link Span.Kind#CONSUMER} span to represent receipt from the
+   * destination</li>
+   * <li>A child span with the duration of the delegated listener</li>
+   * </ol>
+   *
+   * <p>{@code addConsumerSpan} should only be set when the message consumer is not traced.
+   */
+  // channel is nullable as JMS could have an exception getting it from the message
+  public Span startProcessor(@Nullable Chan channel, Msg message, boolean addConsumerSpan) {
+    if (!addConsumerSpan) {
+      Span result = tracer.nextSpan().start();
+      if (!result.isNoop()) {
+        consumerHandler.parser.addMessageTags(adapter, channel, message,
+          result.context(), result.customizer());
+      }
+      return result;
+    }
+    C carrier = adapter.carrier(message);
+    return consumerHandler.handleReceive(channel, message, carrier,
+      adapter.brokerName(channel), consumerHandler.extractor.extract(carrier), true);
+  }
+}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/ProducerHandler.java b/instrumentation/messaging/src/main/java/brave/messaging/ProducerHandler.java
new file mode 100644
index 0000000..81d9c72
--- /dev/null
+++ b/instrumentation/messaging/src/main/java/brave/messaging/ProducerHandler.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brave.messaging;
+
+import brave.Span;
+import brave.Tracing;
+import brave.propagation.TraceContext;
+
+/**
+ * @param <Chan> the type of the channel
+ * @param <Msg> the type of the message
+ * @param <C> the type that carriers the trace context, usually headers
+ */
+public final class ProducerHandler<Chan, Msg, C> {
+
+  public static <Chan, Msg, C> ProducerHandler<Chan, Msg, C> create(
+    MessagingTracing messagingTracing,
+    MessagingAdapter<Chan, Msg, C> adapter,
+    TraceContext.Extractor<C> extractor,
+    TraceContext.Injector<C> injector
+  ) {
+    return new ProducerHandler<>(
+      messagingTracing.tracing, messagingTracing.parser(), adapter, extractor, injector);
+  }
+
+  final Tracing tracing;
+  final TraceContext.Extractor<C> extractor;
+  final TraceContext.Injector<C> injector;
+  final MessagingParser parser;
+  final MessagingAdapter<Chan, Msg, C> adapter;
+
+  ProducerHandler(Tracing tracing, MessagingParser parser,
+    MessagingAdapter<Chan, Msg, C> adapter, TraceContext.Extractor<C> extractor,
+    TraceContext.Injector<C> injector) {
+    this.tracing = tracing;
+    this.extractor = extractor;
+    this.injector = injector;
+    this.parser = parser;
+    this.adapter = adapter;
+  }
+
+  /**
+   * Attempts to resume a trace from the current span, falling back to extracting context from the
+   * carrier. Tags are added before the span is started.
+   *
+   * <p>This is typically called before the send is processed by the actual library.
+   */
+  public Span startSend(Chan channel, Msg message) {
+    C carrier = adapter.carrier(message);
+    TraceContext maybeParent = tracing.currentTraceContext().get();
+    // Unlike message consumers, we try current span before trying extraction. This is the proper
+    // order because the span in scope should take precedence over a potentially stale header entry.
+    //
+    // NOTE: Brave instrumentation used properly does not result in stale header entries, as we use
+    // propagation formats that can always be overwritten.
+    Span span;
+    if (maybeParent == null) {
+      span = tracing.tracer().nextSpan(extractor.extract(carrier));
+    } else {
+      span = tracing.tracer().newChild(maybeParent);
+    }
+
+    if (!span.isNoop()) {
+      span.kind(Span.Kind.PRODUCER);
+      parser.start("send", adapter, channel, message, span.context(), span.customizer());
+      String remoteServiceName = adapter.brokerName(channel);
+      if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
+      span.start();
+    }
+
+    injector.inject(span.context(), carrier);
+
+    return span;
+  }
+
+  public void finishSend(Chan channel, Msg message, Span span) {
+    parser.finish("send", adapter, channel, message, span.context(), span.customizer());
+    span.finish();
+  }
+}