You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zipkin.apache.org by ad...@apache.org on 2019/05/31 07:13:27 UTC
[incubator-zipkin-brave] 01/01: Refactors to reduce injector
complexity and duplicate code
This is an automated email from the ASF dual-hosted git repository.
adriancole pushed a commit to branch messaging-refactor
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin-brave.git
commit 9c21af4597f89ceef7d6158b7054a05213117680
Author: Adrian Cole <ac...@pivotal.io>
AuthorDate: Fri May 31 15:12:34 2019 +0800
Refactors to reduce injector complexity and duplicate code
this adds messaging processor code and some cleanups. there's
more to do and tests don't pass.
---
.../jms/src/main/java/brave/jms/JmsAdapter.java | 121 +++++----------
.../jms/src/main/java/brave/jms/JmsTracing.java | 121 ++++++++++-----
.../java/brave/jms/TracingCompletionListener.java | 59 +++++--
.../java/brave/jms/TracingExceptionListener.java | 4 +-
.../main/java/brave/jms/TracingJMSConsumer.java | 21 ++-
.../main/java/brave/jms/TracingJMSProducer.java | 105 ++++++-------
.../java/brave/jms/TracingMessageConsumer.java | 29 ++--
.../java/brave/jms/TracingMessageListener.java | 62 ++------
.../java/brave/jms/TracingMessageProducer.java | 170 +++++++++++----------
.../jms/src/test/java/brave/jms/JmsTest.java | 11 --
.../brave/jms/TracingCompletionListenerTest.java | 47 +++---
.../java/brave/kafka/clients/KafkaTracing.java | 150 ++++++++----------
.../java/brave/kafka/clients/TracingConsumer.java | 99 +++++-------
.../java/brave/kafka/clients/TracingProducer.java | 153 +++++++++----------
.../brave/kafka/clients/TracingCallbackTest.java | 30 ++--
instrumentation/messaging/pom.xml | 12 ++
.../main/java/brave/messaging/ConsumerHandler.java | 143 +++++++++++++++++
.../main/java/brave/messaging/MessageAdapter.java | 36 -----
.../brave/messaging/MessageConsumerAdapter.java | 36 -----
.../brave/messaging/MessageProducerAdapter.java | 36 -----
.../java/brave/messaging/MessagingAdapter.java | 73 +++++++++
.../brave/messaging/MessagingConsumerHandler.java | 133 ----------------
.../brave/messaging/MessagingConsumerParser.java | 44 ------
.../java/brave/messaging/MessagingHandler.java | 45 ------
...ChannelAdapter.java => MessagingOperation.java} | 17 +--
.../main/java/brave/messaging/MessagingParser.java | 79 ++++++----
.../brave/messaging/MessagingProducerHandler.java | 83 ----------
.../brave/messaging/MessagingProducerParser.java | 42 -----
.../java/brave/messaging/MessagingTracing.java | 51 ++-----
.../java/brave/messaging/ProcessorHandler.java | 71 +++++++++
.../main/java/brave/messaging/ProducerHandler.java | 94 ++++++++++++
31 files changed, 1013 insertions(+), 1164 deletions(-)
diff --git a/instrumentation/jms/src/main/java/brave/jms/JmsAdapter.java b/instrumentation/jms/src/main/java/brave/jms/JmsAdapter.java
index 5c87218..a5252be 100644
--- a/instrumentation/jms/src/main/java/brave/jms/JmsAdapter.java
+++ b/instrumentation/jms/src/main/java/brave/jms/JmsAdapter.java
@@ -16,116 +16,67 @@
*/
package brave.jms;
-import brave.messaging.ChannelAdapter;
-import brave.messaging.MessageConsumerAdapter;
-import brave.messaging.MessageProducerAdapter;
+import brave.messaging.MessagingAdapter;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Topic;
-import static brave.jms.JmsTracing.JMS_QUEUE;
-import static brave.jms.JmsTracing.JMS_TOPIC;
+abstract class JmsAdapter<T> extends MessagingAdapter<Destination, T, T> {
+ final String remoteServiceName;
-class JmsAdapter {
-
- static class JmsMessageConsumerAdapter implements MessageConsumerAdapter<Message> {
-
- final JmsTracing jmsTracing;
-
- JmsMessageConsumerAdapter(JmsTracing jmsTracing) {
- this.jmsTracing = jmsTracing;
- }
-
- static JmsMessageConsumerAdapter create(JmsTracing jmsTracing) {
- return new JmsMessageConsumerAdapter(jmsTracing);
- }
+ JmsAdapter(JmsTracing jmsTracing) {
+ remoteServiceName = jmsTracing.remoteServiceName;
+ }
- @Override public String operation(Message message) {
- return "receive";
+ static final class MessageAdapter extends JmsAdapter<Message> {
+ MessageAdapter(JmsTracing jmsTracing) {
+ super(jmsTracing);
}
- @Override public String identifier(Message message) {
+ @Override public String correlationId(Message message) {
try {
return message.getJMSMessageID();
- } catch (JMSException e) {
+ } catch (JMSException ignored) {
// don't crash on wonky exceptions!
}
return null;
}
-
- @Override public String identifierTagKey() {
- return "jms.message_id";
- }
}
- static class JmsMessageProducerAdapter implements MessageProducerAdapter<Message> {
-
- final JmsTracing jmsTracing;
-
- JmsMessageProducerAdapter(JmsTracing jmsTracing) {
- this.jmsTracing = jmsTracing;
- }
-
- static JmsMessageProducerAdapter create(JmsTracing jmsTracing) {
- return new JmsMessageProducerAdapter(jmsTracing);
- }
-
- @Override public String operation(Message message) {
- return "send";
- }
+ @Override public T carrier(T message) {
+ return message;
+ }
- @Override public String identifier(Message message) {
- try {
- return message.getJMSMessageID();
- } catch (JMSException e) {
- // don't crash on wonky exceptions!
+ @Override public String channel(Destination channel) {
+ try {
+ if (channel instanceof Queue) {
+ return ((Queue) channel).getQueueName();
+ } else if (channel instanceof Topic) {
+ return ((Topic) channel).getTopicName();
}
- return null;
- }
-
- @Override public String identifierTagKey() {
- return null;
+ // TODO: we could use toString here..
+ } catch (JMSException ignored) {
+ // don't crash on wonky exceptions!
}
+ return null;
}
- static class JmsChannelAdapter implements ChannelAdapter<Destination> {
-
- final JmsTracing jmsTracing;
-
- JmsChannelAdapter(JmsTracing jmsTracing) {
- this.jmsTracing = jmsTracing;
- }
-
- static JmsChannelAdapter create(JmsTracing jmsTracing) {
- return new JmsChannelAdapter(jmsTracing);
- }
-
- @Override public String channel(Destination destination) {
- try {
- if (destination instanceof Queue) {
- return ((Queue) destination).getQueueName();
- } else if (destination instanceof Topic) {
- return ((Topic) destination).getTopicName();
- }
- } catch (JMSException ignored) {
- // don't crash on wonky exceptions!
- }
- return null;
+ @Override public String channelType(Destination channel) {
+ if (channel instanceof Queue) {
+ return "queue";
+ } else if (channel instanceof Topic) {
+ return "topic";
}
+ return null;
+ }
- @Override public String channelTagKey(Destination destination) {
- if (destination instanceof Queue) {
- return JMS_QUEUE;
- } else if (destination instanceof Topic) {
- return JMS_TOPIC;
- }
- return null;
- }
+ @Override public String messageKey(T message) {
+ return null;
+ }
- @Override public String remoteServiceName(Destination message) {
- return jmsTracing.remoteServiceName;
- }
+ @Override public String brokerName(Destination channel) {
+ return remoteServiceName;
}
}
diff --git a/instrumentation/jms/src/main/java/brave/jms/JmsTracing.java b/instrumentation/jms/src/main/java/brave/jms/JmsTracing.java
index bc0531a..383bbf2 100644
--- a/instrumentation/jms/src/main/java/brave/jms/JmsTracing.java
+++ b/instrumentation/jms/src/main/java/brave/jms/JmsTracing.java
@@ -17,9 +17,18 @@
package brave.jms;
import brave.Span;
+import brave.SpanCustomizer;
import brave.Tracing;
+import brave.internal.Nullable;
+import brave.jms.JmsAdapter.MessageAdapter;
+import brave.messaging.ConsumerHandler;
+import brave.messaging.MessagingAdapter;
+import brave.messaging.MessagingParser;
import brave.messaging.MessagingTracing;
+import brave.messaging.ProcessorHandler;
+import brave.messaging.ProducerHandler;
import brave.propagation.Propagation.Getter;
+import brave.propagation.Propagation.Setter;
import brave.propagation.TraceContext;
import brave.propagation.TraceContext.Extractor;
import brave.propagation.TraceContext.Injector;
@@ -31,7 +40,9 @@ import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
+import javax.jms.Queue;
import javax.jms.QueueConnection;
+import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.XAConnection;
import javax.jms.XAConnectionFactory;
@@ -42,8 +53,19 @@ import static brave.propagation.B3SingleFormat.writeB3SingleFormatWithoutParentI
/** Use this class to decorate your Jms consumer / producer and enable Tracing. */
public final class JmsTracing {
- static final String JMS_QUEUE = "jms.queue";
- static final String JMS_TOPIC = "jms.topic";
+ static final Setter<Message, String> SETTER = new Setter<Message, String>() {
+ @Override public void put(Message carrier, String key, String value) {
+ try {
+ carrier.setStringProperty(key, value);
+ } catch (JMSException e) {
+ // don't crash on wonky exceptions!
+ }
+ }
+
+ @Override public String toString() {
+ return "Message::setStringProperty";
+ }
+ };
static final Getter<Message, String> GETTER = new Getter<Message, String>() {
@Override public String get(Message carrier, String key) {
@@ -69,17 +91,18 @@ public final class JmsTracing {
}
public static final class Builder {
- final MessagingTracing msgTracing;
+ final MessagingTracing messageTracing;
String remoteServiceName = "jms";
Builder(Tracing tracing) {
if (tracing == null) throw new NullPointerException("tracing == null");
- this.msgTracing = MessagingTracing.create(tracing);
+ this.messageTracing = MessagingTracing.newBuilder(tracing)
+ .parser(new LegacyMessagingParser()).build();
}
- Builder(MessagingTracing msgTracing) {
- if (msgTracing == null) throw new NullPointerException("msgTracing == null");
- this.msgTracing = msgTracing;
+ Builder(MessagingTracing messageTracing) {
+ if (messageTracing == null) throw new NullPointerException("messageTracing == null");
+ this.messageTracing = messageTracing;
}
/**
@@ -95,38 +118,28 @@ public final class JmsTracing {
}
}
- final MessagingTracing msgTracing;
- final Extractor<Message> extractor;
- final Injector<Message> injector;
- final JmsAdapter.JmsChannelAdapter channelAdapter;
- final JmsAdapter.JmsMessageConsumerAdapter consumerMessageAdapter;
- final JmsAdapter.JmsMessageProducerAdapter producerMessageAdapter;
+ final MessagingTracing messageTracing;
+ final ConsumerHandler<Destination, Message, Message> consumerHandler;
+ final ProcessorHandler<Destination, Message, Message> processorHandler;
+ final ProducerHandler<Destination, Message, Message> producerHandler;
+ final Extractor<Message> messageExtractor;
+ final Injector<Message> messageInjector;
+ final MessageAdapter messageAdapter;
final String remoteServiceName;
final Set<String> propagationKeys;
JmsTracing(Builder builder) { // intentionally hidden constructor
- this.msgTracing = builder.msgTracing;
- this.extractor = msgTracing.tracing().propagation().extractor(GETTER);
+ this.messageTracing = builder.messageTracing;
+ this.messageExtractor = messageTracing.tracing().propagation().extractor(GETTER);
this.remoteServiceName = builder.remoteServiceName;
- this.propagationKeys = new LinkedHashSet<>(msgTracing.tracing().propagation().keys());
- this.consumerMessageAdapter = JmsAdapter.JmsMessageConsumerAdapter.create(this);
- this.channelAdapter = JmsAdapter.JmsChannelAdapter.create(this);
- this.producerMessageAdapter = JmsAdapter.JmsMessageProducerAdapter.create(this);
- this.injector = new Injector<Message>() {
- @Override public void inject(TraceContext traceContext, Message carrier) {
- try {
- PropertyFilter.MESSAGE.filterProperties(carrier, propagationKeys);
- carrier.setStringProperty("b3", writeB3SingleFormatWithoutParentId(traceContext));
- } catch (JMSException e) {
- // don't crash on wonky exceptions!
- }
- }
-
- @Override
- public String toString() {
- return "Message::setStringProperty(\"b3\",singleHeaderFormatWithoutParent)";
- }
- };
+ this.propagationKeys = new LinkedHashSet<>(messageTracing.tracing().propagation().keys());
+ this.messageAdapter = new MessageAdapter(this);
+ this.messageInjector = new FilteringInjector<>(PropertyFilter.MESSAGE, propagationKeys, SETTER);
+ consumerHandler =
+ ConsumerHandler.create(messageTracing, messageAdapter, messageExtractor, messageInjector);
+ processorHandler = ProcessorHandler.create(messageTracing, consumerHandler);
+ producerHandler =
+ ProducerHandler.create(messageTracing, messageAdapter, messageExtractor, messageInjector);
}
public Connection connection(Connection connection) {
@@ -200,8 +213,7 @@ public final class JmsTracing {
* one couldn't be extracted.
*/
public Span nextSpan(Message message) {
- return msgTracing.nextSpan(channelAdapter, consumerMessageAdapter, extractor, message,
- destination(message));
+ return processorHandler.startProcessor(destination(message), message, false);
}
//TraceContextOrSamplingFlags extractAndClearMessage(Message message) {
@@ -212,7 +224,7 @@ public final class JmsTracing {
// return extracted;
//}
- Destination destination(Message message) {
+ @Nullable static Destination destination(Message message) {
try {
return message.getJMSDestination();
} catch (JMSException e) {
@@ -220,4 +232,39 @@ public final class JmsTracing {
}
return null;
}
+
+ static class FilteringInjector<C> implements TraceContext.Injector<C> {
+ final PropertyFilter filter;
+ final Set<String> namesToClear;
+ final Setter<C, String> setter;
+
+ FilteringInjector(PropertyFilter filter, Set<String> namesToClear, Setter<C, String> setter) {
+ this.filter = filter;
+ this.namesToClear = namesToClear;
+ this.setter = setter;
+ }
+
+ @Override public void inject(TraceContext traceContext, C carrier) {
+ filter.filterProperties(carrier, namesToClear);
+ setter.put(carrier, "b3", writeB3SingleFormatWithoutParentId(traceContext));
+ }
+
+ @Override public String toString() {
+ return setter + "(\"b3\",singleHeaderFormatWithoutParent)";
+ }
+ }
+
+ static class LegacyMessagingParser extends MessagingParser {
+ @Override
+ protected <Chan, Msg, C> void addMessageTags(MessagingAdapter<Chan, Msg, C> adapter,
+ Chan channel, @Nullable Msg msg, TraceContext context, SpanCustomizer customizer) {
+ String channelName = adapter.channel(channel);
+ if (channelName == null) return;
+ if (channel instanceof Queue) {
+ customizer.tag("jms.queue", channelName);
+ } else if (channel instanceof Topic) {
+ customizer.tag("jms.topic", channelName);
+ }
+ }
+ }
}
diff --git a/instrumentation/jms/src/main/java/brave/jms/TracingCompletionListener.java b/instrumentation/jms/src/main/java/brave/jms/TracingCompletionListener.java
index 3a7d50a..3df209a 100644
--- a/instrumentation/jms/src/main/java/brave/jms/TracingCompletionListener.java
+++ b/instrumentation/jms/src/main/java/brave/jms/TracingCompletionListener.java
@@ -17,28 +17,64 @@
package brave.jms;
import brave.Span;
+import brave.internal.Nullable;
+import brave.messaging.ProducerHandler;
import brave.propagation.CurrentTraceContext;
import brave.propagation.CurrentTraceContext.Scope;
import javax.jms.CompletionListener;
+import javax.jms.Destination;
import javax.jms.Message;
/**
- * Decorates, then finishes a producer span. Allows tracing to record the duration between batching
+ * Decorates, then finishes a producer span. Allows tracing to message the duration between batching
* for send and actual send.
*/
-@JMS2_0 final class TracingCompletionListener implements CompletionListener {
- static CompletionListener create(CompletionListener delegate, Span span,
- CurrentTraceContext current) {
- if (span.isNoop()) return delegate; // save allocation overhead
- return new TracingCompletionListener(delegate, span, current);
+@JMS2_0 class TracingCompletionListener<Msg> implements CompletionListener {
+ static <Msg> TracingCompletionListener<Msg> create(@Nullable CompletionListener delegate,
+ ProducerHandler<Destination, Msg, Msg> handler, CurrentTraceContext current,
+ Destination destination, Msg message, Span span) {
+ if (delegate == null) {
+ return new TracingCompletionListener<>(handler, destination, message, span);
+ }
+ return new TracingForwardingCompletionListener<>(delegate, handler, current, destination,
+ message, span);
}
+ final ProducerHandler<Destination, Msg, Msg> handler;
+ final Destination destination;
+ final Msg message;
final Span span;
+
+ TracingCompletionListener(ProducerHandler<Destination, Msg, Msg> handler, Destination destination,
+ Msg message, Span span) {
+ this.handler = handler;
+ this.destination = destination;
+ this.message = message;
+ this.span = span;
+ }
+
+ @Override public void onCompletion(Message message) {
+ finish(null);
+ }
+
+ @Override public void onException(Message message, Exception exception) {
+ finish(exception);
+ }
+
+ void finish(@Nullable Throwable error) {
+ if (error != null) span.error(error);
+ handler.finishSend(destination, message, span);
+ }
+}
+
+final class TracingForwardingCompletionListener<Msg> extends TracingCompletionListener<Msg> {
final CompletionListener delegate;
final CurrentTraceContext current;
- TracingCompletionListener(CompletionListener delegate, Span span, CurrentTraceContext current) {
- this.span = span;
+ TracingForwardingCompletionListener(CompletionListener delegate,
+ ProducerHandler<Destination, Msg, Msg> handler, CurrentTraceContext current,
+ Destination destination, Msg message, Span span) {
+ super(handler, destination, message, span);
this.delegate = delegate;
this.current = current;
}
@@ -47,16 +83,15 @@ import javax.jms.Message;
try (Scope ws = current.maybeScope(span.context())) {
delegate.onCompletion(message);
} finally {
- span.finish();
+ finish(null);
}
}
@Override public void onException(Message message, Exception exception) {
- try {
+ try (Scope ws = current.maybeScope(span.context())) {
delegate.onException(message, exception);
- span.error(exception);
} finally {
- span.finish();
+ finish(exception);
}
}
}
diff --git a/instrumentation/jms/src/main/java/brave/jms/TracingExceptionListener.java b/instrumentation/jms/src/main/java/brave/jms/TracingExceptionListener.java
index 9397a5f..8f801f0 100644
--- a/instrumentation/jms/src/main/java/brave/jms/TracingExceptionListener.java
+++ b/instrumentation/jms/src/main/java/brave/jms/TracingExceptionListener.java
@@ -24,13 +24,13 @@ import javax.jms.JMSException;
final class TracingExceptionListener {
static ExceptionListener create(JmsTracing jmsTracing) {
- return new TagError(jmsTracing.msgTracing.tracing().tracer());
+ return new TagError(jmsTracing.messageTracing.tracing().tracer());
}
static ExceptionListener create(ExceptionListener delegate, JmsTracing jmsTracing) {
if (delegate == null) throw new NullPointerException("exceptionListener == null");
if (delegate instanceof TagError) return delegate;
- return new DelegateAndTagError(delegate, jmsTracing.msgTracing.tracing().tracer());
+ return new DelegateAndTagError(delegate, jmsTracing.messageTracing.tracing().tracer());
}
static class TagError implements ExceptionListener {
diff --git a/instrumentation/jms/src/main/java/brave/jms/TracingJMSConsumer.java b/instrumentation/jms/src/main/java/brave/jms/TracingJMSConsumer.java
index 4a4275b..939789e 100644
--- a/instrumentation/jms/src/main/java/brave/jms/TracingJMSConsumer.java
+++ b/instrumentation/jms/src/main/java/brave/jms/TracingJMSConsumer.java
@@ -16,25 +16,22 @@
*/
package brave.jms;
-import brave.messaging.MessagingConsumerHandler;
+import brave.messaging.ConsumerHandler;
import javax.jms.Destination;
import javax.jms.JMSConsumer;
import javax.jms.JMSRuntimeException;
import javax.jms.Message;
import javax.jms.MessageListener;
-@JMS2_0 final class TracingJMSConsumer extends MessagingConsumerHandler<JMSConsumer, Destination, Message> implements JMSConsumer {
+@JMS2_0 final class TracingJMSConsumer implements JMSConsumer {
+ final JMSConsumer delegate;
final JmsTracing jmsTracing;
final Destination destination;
+ final ConsumerHandler<Destination, Message, Message> handler;
TracingJMSConsumer(JMSConsumer delegate, Destination destination, JmsTracing jmsTracing) {
- super(
- delegate,
- jmsTracing.msgTracing,
- jmsTracing.channelAdapter,
- jmsTracing.consumerMessageAdapter,
- jmsTracing.extractor,
- jmsTracing.injector);
+ this.delegate = delegate;
+ this.handler = jmsTracing.consumerHandler;
this.destination = destination;
this.jmsTracing = jmsTracing;
}
@@ -53,19 +50,19 @@ import javax.jms.MessageListener;
@Override public Message receive() {
Message message = delegate.receive();
- handleConsume(destination, message);
+ handler.handleReceive(destination, message);
return message;
}
@Override public Message receive(long timeout) {
Message message = delegate.receive(timeout);
- handleConsume(destination, message);
+ handler.handleReceive(destination, message);
return message;
}
@Override public Message receiveNoWait() {
Message message = delegate.receiveNoWait();
- handleConsume(destination, message);
+ handler.handleReceive(destination, message);
return message;
}
diff --git a/instrumentation/jms/src/main/java/brave/jms/TracingJMSProducer.java b/instrumentation/jms/src/main/java/brave/jms/TracingJMSProducer.java
index e380cf2..2482aea 100644
--- a/instrumentation/jms/src/main/java/brave/jms/TracingJMSProducer.java
+++ b/instrumentation/jms/src/main/java/brave/jms/TracingJMSProducer.java
@@ -19,11 +19,10 @@ package brave.jms;
import brave.Span;
import brave.Tracer;
import brave.Tracer.SpanInScope;
-import brave.messaging.MessageProducerAdapter;
-import brave.messaging.MessagingProducerHandler;
+import brave.messaging.ProducerHandler;
import brave.propagation.CurrentTraceContext;
import brave.propagation.Propagation.Getter;
-import brave.propagation.TraceContext;
+import brave.propagation.Propagation.Setter;
import java.io.Serializable;
import java.util.Map;
import java.util.Set;
@@ -32,45 +31,24 @@ import javax.jms.Destination;
import javax.jms.JMSProducer;
import javax.jms.Message;
-import static brave.propagation.B3SingleFormat.writeB3SingleFormatWithoutParentId;
-
-@JMS2_0 final class TracingJMSProducer
- extends MessagingProducerHandler<JMSProducer, Destination, JMSProducer>
- implements JMSProducer {
-
- static final Getter<JMSProducer, String> GETTER = new Getter<JMSProducer, String>() {
- @Override public String get(JMSProducer carrier, String key) {
- return carrier.getStringProperty(key);
- }
-
- @Override public String toString() {
- return "JMSProducer::getStringProperty";
- }
- };
+@JMS2_0 final class TracingJMSProducer implements JMSProducer {
+ final JMSProducer delegate;
+ final ProducerHandler<Destination, JMSProducer, JMSProducer> handler;
final Tracer tracer;
final CurrentTraceContext current;
TracingJMSProducer(JMSProducer delegate, JmsTracing jmsTracing) {
- super(
- delegate,
- jmsTracing.msgTracing,
- jmsTracing.channelAdapter,
- JmsProducerAdapter.create(jmsTracing),
- jmsTracing.msgTracing.tracing().propagation().extractor(GETTER),
- new TraceContext.Injector<JMSProducer>() {
- @Override public void inject(TraceContext traceContext, JMSProducer carrier) {
- PropertyFilter.JMS_PRODUCER.filterProperties(carrier, jmsTracing.propagationKeys);
- carrier.setProperty("b3", writeB3SingleFormatWithoutParentId(traceContext));
- }
-
- @Override
- public String toString() {
- return "Message::setStringProperty(\"b3\",singleHeaderFormatWithoutParent)";
- }
- });
- this.tracer = jmsTracing.msgTracing.tracing().tracer();
- this.current = jmsTracing.msgTracing.tracing().currentTraceContext();
+ this.delegate = delegate;
+ this.handler = ProducerHandler.create(
+ jmsTracing.messageTracing,
+ new JMSProducerAdapter(jmsTracing),
+ jmsTracing.messageTracing.tracing().propagation().extractor(GETTER),
+ new JmsTracing.FilteringInjector<>(PropertyFilter.JMS_PRODUCER, jmsTracing.propagationKeys,
+ SETTER)
+ );
+ tracer = jmsTracing.messageTracing.tracing().tracer();
+ current = jmsTracing.messageTracing.tracing().currentTraceContext();
}
// Partial function pattern as this needs to work before java 8 method references
@@ -130,24 +108,30 @@ import static brave.propagation.B3SingleFormat.writeB3SingleFormatWithoutParentI
}
void send(Send send, Destination destination, Object message) {
- Span span = handleProduce(destination, this);
+ Span span = handler.startSend(destination, this);
+ // TODO: document what's going on with the getAsync dance here as it makes the lifecycle complex
+ // it isn't clear why a synchronous method would imply an async call, for example.
final CompletionListener oldCompletionListener = getAsync();
+ boolean shouldFinish = true;
if (oldCompletionListener != null) {
- delegate.setAsync(TracingCompletionListener.create(oldCompletionListener, span, current));
+ shouldFinish = false;
+ delegate.setAsync(TracingCompletionListener.create(oldCompletionListener, handler, current,
+ destination, delegate, span));
}
SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
try {
send.apply(delegate, destination, message);
} catch (RuntimeException | Error e) {
span.error(e);
- span.finish();
+ shouldFinish = true;
throw e;
} finally {
ws.close();
- if (oldCompletionListener != null) {
+ if (oldCompletionListener != null) { // TODO: why are we doing this
delegate.setAsync(oldCompletionListener);
- } else {
- span.finish();
+ }
+ if (shouldFinish) {
+ handler.finishSend(destination, this, span);
}
}
}
@@ -345,31 +329,34 @@ import static brave.propagation.B3SingleFormat.writeB3SingleFormatWithoutParentI
return delegate.getJMSReplyTo();
}
- static class JmsProducerAdapter implements MessageProducerAdapter<JMSProducer> {
- final JmsTracing jmsTracing;
-
- JmsProducerAdapter(JmsTracing jmsTracing) {
- this.jmsTracing = jmsTracing;
+ // helper types inlined here to avoid having JmsTracing access JMS 2.0 types
+ static final Getter<JMSProducer, String> GETTER = new Getter<JMSProducer, String>() {
+ @Override public String get(JMSProducer carrier, String key) {
+ return carrier.getStringProperty(key);
}
- static JmsProducerAdapter create(JmsTracing jmsTracing) {
- return new JmsProducerAdapter(jmsTracing);
+ @Override public String toString() {
+ return "JMSProducer::getStringProperty";
}
+ };
- @Override public String operation(JMSProducer message) {
- return "send";
+ static final Setter<JMSProducer, String> SETTER = new Setter<JMSProducer, String>() {
+ @Override public void put(JMSProducer carrier, String key, String value) {
+ carrier.setProperty(key, value);
}
- @Override public String identifier(JMSProducer message) {
- return message.getJMSCorrelationID();
+ @Override public String toString() {
+ return "JMSProducer::setProperty";
}
+ };
- //@Override public void clearPropagation(JMSProducer message) {
- // PropertyFilter.JMS_PRODUCER.filterProperties(message, jmsTracing.propagationKeys);
- //}
+ static final class JMSProducerAdapter extends JmsAdapter<JMSProducer> {
+ JMSProducerAdapter(JmsTracing jmsTracing) {
+ super(jmsTracing);
+ }
- @Override public String identifierTagKey() {
- return "jms.correlation_id";
+ @Override public String correlationId(JMSProducer message) {
+ return message.getJMSCorrelationID();
}
}
}
diff --git a/instrumentation/jms/src/main/java/brave/jms/TracingMessageConsumer.java b/instrumentation/jms/src/main/java/brave/jms/TracingMessageConsumer.java
index b489cc0..8b33646 100644
--- a/instrumentation/jms/src/main/java/brave/jms/TracingMessageConsumer.java
+++ b/instrumentation/jms/src/main/java/brave/jms/TracingMessageConsumer.java
@@ -16,7 +16,7 @@
*/
package brave.jms;
-import brave.messaging.MessagingConsumerHandler;
+import brave.messaging.ConsumerHandler;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -29,30 +29,31 @@ import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSubscriber;
+import static brave.jms.JmsTracing.destination;
import static brave.jms.TracingConnection.TYPE_QUEUE;
import static brave.jms.TracingConnection.TYPE_TOPIC;
/** Implements all interfaces as according to ActiveMQ, this is typical of JMS 1.1. */
-final class TracingMessageConsumer
- extends MessagingConsumerHandler<MessageConsumer, Destination, Message>
- implements QueueReceiver, TopicSubscriber {
+final class TracingMessageConsumer implements QueueReceiver, TopicSubscriber {
static TracingMessageConsumer create(MessageConsumer delegate, JmsTracing jmsTracing) {
if (delegate instanceof TracingMessageConsumer) return (TracingMessageConsumer) delegate;
return new TracingMessageConsumer(delegate, jmsTracing);
}
+ final MessageConsumer delegate;
final JmsTracing jmsTracing;
+ final ConsumerHandler<Destination, Message, Message> handler;
final int types;
TracingMessageConsumer(MessageConsumer delegate, JmsTracing jmsTracing) {
- super(
- delegate,
- jmsTracing.msgTracing,
- jmsTracing.channelAdapter,
- jmsTracing.consumerMessageAdapter,
- jmsTracing.extractor,
- jmsTracing.injector);
+ this.delegate = delegate;
+ this.handler = ConsumerHandler.create(
+ jmsTracing.messageTracing,
+ jmsTracing.messageAdapter,
+ jmsTracing.messageExtractor,
+ jmsTracing.messageInjector
+ );
this.jmsTracing = jmsTracing;
int types = 0;
if (delegate instanceof QueueSender) types |= TYPE_QUEUE;
@@ -74,19 +75,19 @@ final class TracingMessageConsumer
@Override public Message receive() throws JMSException {
Message message = delegate.receive();
- handleConsume(message.getJMSDestination(), message);
+ handler.handleReceive(destination(message), message);
return message;
}
@Override public Message receive(long timeout) throws JMSException {
Message message = delegate.receive(timeout);
- handleConsume(message.getJMSDestination(), message);
+ handler.handleReceive(destination(message), message);
return message;
}
@Override public Message receiveNoWait() throws JMSException {
Message message = delegate.receiveNoWait();
- handleConsume(message.getJMSDestination(), message);
+ handler.handleReceive(destination(message), message);
return message;
}
diff --git a/instrumentation/jms/src/main/java/brave/jms/TracingMessageListener.java b/instrumentation/jms/src/main/java/brave/jms/TracingMessageListener.java
index 3f733cc..93b6bd3 100644
--- a/instrumentation/jms/src/main/java/brave/jms/TracingMessageListener.java
+++ b/instrumentation/jms/src/main/java/brave/jms/TracingMessageListener.java
@@ -17,28 +17,25 @@
package brave.jms;
import brave.Span;
-import brave.Tracer;
-import brave.Tracer.SpanInScope;
-import brave.Tracing;
-import brave.messaging.ChannelAdapter;
-import brave.propagation.TraceContextOrSamplingFlags;
+import brave.messaging.ProcessorHandler;
+import brave.propagation.CurrentTraceContext;
+import brave.propagation.CurrentTraceContext.Scope;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageListener;
-import static brave.Span.Kind.CONSUMER;
+import static brave.jms.JmsTracing.destination;
/**
* When {@link #addConsumerSpan} this creates 2 spans:
* <ol>
- * <li>A duration 1 {@link Span.Kind#CONSUMER} span to represent receipt from the destination</li>
- * <li>A child span with the duration of the delegated listener</li>
+ * <li>A duration 1 {@link Span.Kind#CONSUMER} span to represent receipt from the destination</li>
+ * <li>A child span with the duration of the delegated listener</li>
* </ol>
*
* <p>{@link #addConsumerSpan} should only be set when the message consumer is not traced.
*/
final class TracingMessageListener implements MessageListener {
-
/** Creates a message listener which also adds a consumer span. */
static MessageListener create(MessageListener delegate, JmsTracing jmsTracing) {
if (delegate instanceof TracingMessageListener) return delegate;
@@ -46,54 +43,27 @@ final class TracingMessageListener implements MessageListener {
}
final MessageListener delegate;
- final JmsTracing jmsTracing;
- final Tracing tracing;
- final Tracer tracer;
- final String remoteServiceName;
+ final CurrentTraceContext current;
+ final ProcessorHandler<Destination, Message, Message> handler;
final boolean addConsumerSpan;
- final ChannelAdapter<Destination> channelAdapter;
TracingMessageListener(MessageListener delegate, JmsTracing jmsTracing, boolean addConsumerSpan) {
this.delegate = delegate;
- this.jmsTracing = jmsTracing;
- this.tracing = jmsTracing.msgTracing.tracing();
- this.tracer = jmsTracing.msgTracing.tracing().tracer();
- this.remoteServiceName = jmsTracing.remoteServiceName;
+ this.current = jmsTracing.messageTracing.tracing().currentTraceContext();
+ this.handler = jmsTracing.processorHandler;
this.addConsumerSpan = addConsumerSpan;
- channelAdapter = JmsAdapter.JmsChannelAdapter.create(jmsTracing);
}
@Override public void onMessage(Message message) {
- Span listenerSpan = startMessageListenerSpan(message);
- try (SpanInScope ws = tracer.withSpanInScope(listenerSpan)) {
+ Span listenerSpan =
+ handler.startProcessor(destination(message), message, addConsumerSpan).name("on-message");
+ try (Scope ws = current.newScope(listenerSpan.context())) {
delegate.onMessage(message);
- } catch (Throwable t) {
- listenerSpan.error(t);
- throw t;
+ } catch (RuntimeException | Error e) {
+ listenerSpan.error(e);
+ throw e;
} finally {
listenerSpan.finish();
}
}
-
- Span startMessageListenerSpan(Message message) {
- if (!addConsumerSpan) return jmsTracing.nextSpan(message).name("on-message").start();
- TraceContextOrSamplingFlags extracted = jmsTracing.extractor.extract(message);
-
- // JMS has no visibility of the incoming message, which incidentally could be local!
- Span consumerSpan = tracer.nextSpan(extracted).kind(CONSUMER).name("receive");
- Span listenerSpan = tracer.newChild(consumerSpan.context());
-
- if (!consumerSpan.isNoop()) {
- long timestamp = tracing.clock(consumerSpan.context()).currentTimeMicroseconds();
- consumerSpan.start(timestamp);
- if (remoteServiceName != null) consumerSpan.remoteServiceName(remoteServiceName);
- jmsTracing.msgTracing.consumerParser().channel(channelAdapter, jmsTracing.destination(message), consumerSpan);
- long consumerFinish = timestamp + 1L; // save a clock reading
- consumerSpan.finish(consumerFinish);
-
- // not using scoped span as we want to start late
- listenerSpan.name("on-message").start(consumerFinish);
- }
- return listenerSpan;
- }
}
diff --git a/instrumentation/jms/src/main/java/brave/jms/TracingMessageProducer.java b/instrumentation/jms/src/main/java/brave/jms/TracingMessageProducer.java
index f79b8b3..8aaf910 100644
--- a/instrumentation/jms/src/main/java/brave/jms/TracingMessageProducer.java
+++ b/instrumentation/jms/src/main/java/brave/jms/TracingMessageProducer.java
@@ -18,9 +18,9 @@ package brave.jms;
import brave.Span;
import brave.Tracer;
-import brave.Tracer.SpanInScope;
-import brave.messaging.MessagingProducerHandler;
+import brave.messaging.ProducerHandler;
import brave.propagation.CurrentTraceContext;
+import brave.propagation.CurrentTraceContext.Scope;
import javax.jms.CompletionListener;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -31,46 +31,32 @@ import javax.jms.QueueSender;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
+import static brave.jms.JmsTracing.destination;
import static brave.jms.TracingConnection.TYPE_QUEUE;
import static brave.jms.TracingConnection.TYPE_TOPIC;
/** Implements all interfaces as according to ActiveMQ, this is typical of JMS 1.1. */
-final class TracingMessageProducer extends MessagingProducerHandler<MessageProducer, Destination, Message>
- implements QueueSender, TopicPublisher {
-
+final class TracingMessageProducer implements QueueSender, TopicPublisher {
static TracingMessageProducer create(MessageProducer delegate, JmsTracing jmsTracing) {
if (delegate instanceof TracingMessageProducer) return (TracingMessageProducer) delegate;
return new TracingMessageProducer(delegate, jmsTracing);
}
+ final MessageProducer delegate;
final int types;
+ final ProducerHandler<Destination, Message, Message> handler;
final CurrentTraceContext current;
final Tracer tracer;
TracingMessageProducer(MessageProducer delegate, JmsTracing jmsTracing) {
- super(delegate,
- jmsTracing.msgTracing,
- jmsTracing.channelAdapter,
- jmsTracing.producerMessageAdapter,
- jmsTracing.extractor,
- jmsTracing.injector);
+ this.delegate = delegate;
int types = 0;
if (delegate instanceof QueueSender) types |= TYPE_QUEUE;
if (delegate instanceof TopicPublisher) types |= TYPE_TOPIC;
this.types = types;
- this.current = jmsTracing.msgTracing.tracing().currentTraceContext();
- this.tracer = jmsTracing.msgTracing.tracing().tracer();
- }
-
- Destination destination(Message message) {
- try {
- Destination result = message.getJMSDestination();
- if (result != null) return result;
- return delegate.getDestination();
- } catch (JMSException ignored) {
- // don't crash on wonky exceptions!
- }
- return null;
+ this.handler = jmsTracing.producerHandler;
+ this.current = jmsTracing.messageTracing.tracing().currentTraceContext();
+ this.tracer = jmsTracing.messageTracing.tracing().tracer();
}
@Override public void setDisableMessageID(boolean value) throws JMSException {
@@ -132,8 +118,10 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
}
@Override public void send(Message message) throws JMSException {
- Span span = handleProduce(destination(message), message);
- SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+ Destination destination = destination(message);
+
+ Span span = handler.startSend(destination, message);
+ Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
try {
delegate.send(message);
} catch (RuntimeException | JMSException | Error e) {
@@ -141,14 +129,16 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
throw e;
} finally {
ws.close();
- span.finish();
+ handler.finishSend(destination, message, span);
}
}
@Override public void send(Message message, int deliveryMode, int priority, long timeToLive)
- throws JMSException {
- Span span = handleProduce(destination(message), message);
- SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+ throws JMSException {
+ Destination destination = destination(message);
+
+ Span span = handler.startSend(destination, message);
+ Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
try {
delegate.send(message, deliveryMode, priority, timeToLive);
} catch (RuntimeException | JMSException | Error e) {
@@ -156,32 +146,32 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
throw e;
} finally {
ws.close();
- span.finish();
+ handler.finishSend(destination, message, span);
}
}
enum SendDestination {
DESTINATION {
@Override void apply(MessageProducer producer, Destination destination, Message message)
- throws JMSException {
+ throws JMSException {
producer.send(destination, message);
}
},
QUEUE {
@Override void apply(MessageProducer producer, Destination destination, Message message)
- throws JMSException {
+ throws JMSException {
((QueueSender) producer).send((Queue) destination, message);
}
},
TOPIC {
@Override void apply(MessageProducer producer, Destination destination, Message message)
- throws JMSException {
+ throws JMSException {
((TopicPublisher) producer).publish((Topic) destination, message);
}
};
abstract void apply(MessageProducer producer, Destination destination, Message message)
- throws JMSException;
+ throws JMSException;
}
@Override public void send(Destination destination, Message message) throws JMSException {
@@ -189,9 +179,9 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
}
void send(SendDestination sendDestination, Destination destination, Message message)
- throws JMSException {
- Span span = handleProduce(destination, message);
- SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+ throws JMSException {
+ Span span = handler.startSend(destination, message);
+ Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
try {
sendDestination.apply(delegate, destination, message);
} catch (RuntimeException | JMSException | Error e) {
@@ -199,15 +189,15 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
throw e;
} finally {
ws.close();
- span.finish();
+ handler.finishSend(destination, message, span);
}
}
@Override
public void send(Destination destination, Message message, int deliveryMode, int priority,
- long timeToLive) throws JMSException {
- Span span = handleProduce(destination, message);
- SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+ long timeToLive) throws JMSException {
+ Span span = handler.startSend(destination, message);
+ Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
try {
delegate.send(destination, message, deliveryMode, priority, timeToLive);
} catch (RuntimeException | JMSException | Error e) {
@@ -215,20 +205,24 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
throw e;
} finally {
ws.close();
- span.finish();
+ handler.finishSend(destination, message, span);
}
}
/* @Override JMS 2.0 method: Intentionally no override to ensure JMS 1.1 works! */
@JMS2_0
public void send(Message message, CompletionListener completionListener) throws JMSException {
- Span span = handleProduce(destination(message), message);
- SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+ Destination destination = destination(message);
+
+ Span span = handler.startSend(destination, message);
+ completionListener =
+ tracingCompletionListener(message, completionListener, span, destination);
+ Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
try {
- delegate.send(message, TracingCompletionListener.create(completionListener, span, current));
+ delegate.send(message, completionListener);
} catch (RuntimeException | JMSException | Error e) {
span.error(e);
- span.finish();
+ handler.finishSend(destination, message, span);
throw e;
} finally {
ws.close();
@@ -237,32 +231,44 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
/* @Override JMS 2.0 method: Intentionally no override to ensure JMS 1.1 works! */
@JMS2_0 public void send(Message message, int deliveryMode, int priority, long timeToLive,
- CompletionListener completionListener) throws JMSException {
- Span span = handleProduce(destination(message), message);
- completionListener = TracingCompletionListener.create(completionListener, span, current);
- SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+ CompletionListener completionListener) throws JMSException {
+ Destination destination = destination(message);
+
+ Span span = handler.startSend(destination, message);
+ completionListener =
+ tracingCompletionListener(message, completionListener, span, destination);
+ Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
try {
delegate.send(message, deliveryMode, priority, timeToLive, completionListener);
} catch (RuntimeException | JMSException | Error e) {
span.error(e);
- span.finish();
+ handler.finishSend(destination, message, span);
throw e;
} finally {
ws.close();
}
}
+ private CompletionListener tracingCompletionListener(Message message,
+ CompletionListener completionListener,
+ Span span, Destination destination) {
+ completionListener =
+ TracingCompletionListener.create(completionListener, handler, current, destination, message,
+ span);
+ return completionListener;
+ }
+
/* @Override JMS 2.0 method: Intentionally no override to ensure JMS 1.1 works! */
@JMS2_0 public void send(Destination destination, Message message,
- CompletionListener completionListener) throws JMSException {
- Span span = handleProduce(destination, message);
- completionListener = TracingCompletionListener.create(completionListener, span, current);
- SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+ CompletionListener completionListener) throws JMSException {
+ Span span = handler.startSend(destination, message);
+ completionListener = tracingCompletionListener(message, completionListener, span, destination);
+ Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
try {
delegate.send(destination, message, completionListener);
} catch (RuntimeException | JMSException | Error e) {
span.error(e);
- span.finish();
+ handler.finishSend(destination, message, span);
throw e;
} finally {
ws.close();
@@ -271,15 +277,15 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
/* @Override JMS 2.0 method: Intentionally no override to ensure JMS 1.1 works! */
@JMS2_0 public void send(Destination destination, Message message, int deliveryMode, int priority,
- long timeToLive, CompletionListener completionListener) throws JMSException {
- Span span = handleProduce(destination, message);
- completionListener = TracingCompletionListener.create(completionListener, span, current);
- SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+ long timeToLive, CompletionListener completionListener) throws JMSException {
+ Span span = handler.startSend(destination, message);
+ completionListener = tracingCompletionListener(message, completionListener, span, destination);
+ Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
try {
delegate.send(destination, message, deliveryMode, priority, timeToLive, completionListener);
} catch (RuntimeException | JMSException | Error e) {
span.error(e);
- span.finish();
+ handler.finishSend(destination, message, span);
throw e;
} finally {
ws.close();
@@ -300,11 +306,14 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
@Override
public void send(Queue queue, Message message, int deliveryMode, int priority, long timeToLive)
- throws JMSException {
+ throws JMSException {
checkQueueSender();
QueueSender qs = (QueueSender) delegate;
- Span span = handleProduce(destination(message), message);
- SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+ Destination destination = destination(message);
+ if (destination == null) destination = qs.getDestination();
+
+ Span span = handler.startSend(destination, message);
+ Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
try {
qs.send(queue, message, deliveryMode, priority, timeToLive);
} catch (RuntimeException | JMSException | Error e) {
@@ -312,7 +321,7 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
throw e;
} finally {
ws.close();
- span.finish();
+ handler.finishSend(destination, message, span);
}
}
@@ -332,9 +341,11 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
@Override public void publish(Message message) throws JMSException {
checkTopicPublisher();
TopicPublisher tp = (TopicPublisher) delegate;
+ Destination destination = destination(message);
+ if (destination == null) destination = tp.getDestination();
- Span span = handleProduce(destination(message), message);
- SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+ Span span = handler.startSend(destination, message);
+ Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
try {
tp.publish(message);
} catch (RuntimeException | JMSException | Error e) {
@@ -342,17 +353,19 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
throw e;
} finally {
ws.close();
- span.finish();
+ handler.finishSend(destination, message, span);
}
}
@Override public void publish(Message message, int deliveryMode, int priority, long timeToLive)
- throws JMSException {
+ throws JMSException {
checkTopicPublisher();
TopicPublisher tp = (TopicPublisher) delegate;
+ Destination destination = destination(message);
+ if (destination == null) destination = tp.getDestination();
- Span span = handleProduce(destination(message), message);
- SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+ Span span = handler.startSend(destination, message);
+ Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
try {
tp.publish(message, deliveryMode, priority, timeToLive);
} catch (RuntimeException | JMSException | Error e) {
@@ -360,7 +373,7 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
throw e;
} finally {
ws.close();
- span.finish();
+ handler.finishSend(destination, message, span);
}
}
@@ -371,12 +384,14 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
@Override
public void publish(Topic topic, Message message, int deliveryMode, int priority, long timeToLive)
- throws JMSException {
+ throws JMSException {
checkTopicPublisher();
TopicPublisher tp = (TopicPublisher) delegate;
+ Destination destination = destination(message);
+ if (destination == null) destination = tp.getDestination();
- Span span = handleProduce(destination(message), message);
- SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+ Span span = handler.startSend(destination, message);
+ Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
try {
tp.publish(topic, message, deliveryMode, priority, timeToLive);
} catch (RuntimeException | JMSException | Error e) {
@@ -384,7 +399,7 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
throw e;
} finally {
ws.close();
- span.finish();
+ handler.finishSend(destination, message, span);
}
}
@@ -393,5 +408,4 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
throw new IllegalStateException(delegate + " is not a TopicPublisher");
}
}
-
}
diff --git a/instrumentation/jms/src/test/java/brave/jms/JmsTest.java b/instrumentation/jms/src/test/java/brave/jms/JmsTest.java
index 67637ce..9acbc4c 100644
--- a/instrumentation/jms/src/test/java/brave/jms/JmsTest.java
+++ b/instrumentation/jms/src/test/java/brave/jms/JmsTest.java
@@ -39,17 +39,6 @@ import zipkin2.Span;
import static org.assertj.core.api.Assertions.assertThat;
public abstract class JmsTest {
- static final Propagation.Setter<Message, String> SETTER =
- new Propagation.Setter<Message, String>() {
- @Override public void put(Message carrier, String key, String value) {
- try {
- carrier.setStringProperty(key, value);
- } catch (JMSException e) {
- throw new AssertionError(e);
- }
- }
- };
-
@After public void tearDown() {
tracing.close();
}
diff --git a/instrumentation/jms/src/test/java/brave/jms/TracingCompletionListenerTest.java b/instrumentation/jms/src/test/java/brave/jms/TracingCompletionListenerTest.java
index a5f2d2d..bf35264 100644
--- a/instrumentation/jms/src/test/java/brave/jms/TracingCompletionListenerTest.java
+++ b/instrumentation/jms/src/test/java/brave/jms/TracingCompletionListenerTest.java
@@ -17,8 +17,8 @@
package brave.jms;
import brave.Span;
-import brave.sampler.Sampler;
import javax.jms.CompletionListener;
+import javax.jms.Destination;
import javax.jms.Message;
import org.junit.Test;
@@ -27,46 +27,36 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
public class TracingCompletionListenerTest extends JmsTest {
- @Test public void create_returns_input_on_noop() {
- Span span = tracing.tracer().withSampler(Sampler.NEVER_SAMPLE).nextSpan();
-
- CompletionListener delegate = mock(CompletionListener.class);
- CompletionListener tracingCompletionListener =
- TracingCompletionListener.create(delegate, span, current);
-
- assertThat(tracingCompletionListener).isSameAs(delegate);
- }
+ Destination destination = mock(Destination.class);
+ Message message = mock(Message.class);
+ CompletionListener delegate = mock(CompletionListener.class);
@Test public void on_completion_should_finish_span() throws Exception {
- Message message = mock(Message.class);
Span span = tracing.tracer().nextSpan().start();
- CompletionListener tracingCompletionListener =
- TracingCompletionListener.create(mock(CompletionListener.class), span, current);
+ CompletionListener tracingCompletionListener = TracingCompletionListener.create(
+ delegate, jmsTracing.producerHandler, current, destination, message, span);
tracingCompletionListener.onCompletion(message);
assertThat(takeSpan()).isNotNull();
}
@Test public void on_exception_should_tag_if_exception() throws Exception {
- Message message = mock(Message.class);
Span span = tracing.tracer().nextSpan().start();
- CompletionListener tracingCompletionListener =
- TracingCompletionListener.create(mock(CompletionListener.class), span, current);
+ CompletionListener tracingCompletionListener = TracingCompletionListener.create(
+ delegate, jmsTracing.producerHandler, current, destination, message, span);
tracingCompletionListener.onException(message, new Exception("Test exception"));
assertThat(takeSpan().tags())
- .containsEntry("error", "Test exception");
+ .containsEntry("error", "Test exception");
}
@Test public void on_completion_should_forward_then_finish_span() throws Exception {
- Message message = mock(Message.class);
Span span = tracing.tracer().nextSpan().start();
- CompletionListener delegate = mock(CompletionListener.class);
- CompletionListener tracingCompletionListener =
- TracingCompletionListener.create(delegate, span, current);
+ CompletionListener tracingCompletionListener = TracingCompletionListener.create(
+ delegate, jmsTracing.producerHandler, current, destination, message, span);
tracingCompletionListener.onCompletion(message);
verify(delegate).onCompletion(message);
@@ -74,7 +64,6 @@ public class TracingCompletionListenerTest extends JmsTest {
}
@Test public void on_completion_should_have_span_in_scope() throws Exception {
- Message message = mock(Message.class);
Span span = tracing.tracer().nextSpan().start();
CompletionListener delegate = new CompletionListener() {
@@ -87,23 +76,25 @@ public class TracingCompletionListenerTest extends JmsTest {
}
};
- TracingCompletionListener.create(delegate, span, current).onCompletion(message);
+ CompletionListener tracingCompletionListener = TracingCompletionListener.create(
+ delegate, jmsTracing.producerHandler, current, destination, message, span);
+
+ tracingCompletionListener.onCompletion(message);
takeSpan(); // consumer reported span
}
@Test public void on_exception_should_forward_then_tag() throws Exception {
- Message message = mock(Message.class);
Span span = tracing.tracer().nextSpan().start();
- CompletionListener delegate = mock(CompletionListener.class);
- CompletionListener tracingCompletionListener =
- TracingCompletionListener.create(delegate, span, current);
+ CompletionListener tracingCompletionListener = TracingCompletionListener.create(
+ delegate, jmsTracing.producerHandler, current, destination, message, span);
+
Exception e = new Exception("Test exception");
tracingCompletionListener.onException(message, e);
verify(delegate).onException(message, e);
assertThat(takeSpan().tags())
- .containsEntry("error", "Test exception");
+ .containsEntry("error", "Test exception");
}
}
diff --git a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaTracing.java b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaTracing.java
index ae1359b..46159ac 100644
--- a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaTracing.java
+++ b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaTracing.java
@@ -17,21 +17,27 @@
package brave.kafka.clients;
import brave.Span;
+import brave.SpanCustomizer;
import brave.Tracing;
+import brave.internal.Nullable;
+import brave.kafka.clients.TracingConsumer.ConsumerRecordAdapter;
+import brave.kafka.clients.TracingProducer.ProducerRecordAdapter;
+import brave.messaging.ConsumerHandler;
+import brave.messaging.MessagingAdapter;
+import brave.messaging.MessagingParser;
import brave.messaging.MessagingTracing;
+import brave.messaging.ProcessorHandler;
+import brave.messaging.ProducerHandler;
import brave.propagation.B3SingleFormat;
import brave.propagation.Propagation;
import brave.propagation.TraceContext;
import brave.propagation.TraceContext.Extractor;
import brave.propagation.TraceContext.Injector;
-import brave.propagation.TraceContextOrSamplingFlags;
-import java.util.Iterator;
import java.util.List;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import static brave.kafka.clients.KafkaPropagation.B3_SINGLE_TEST_HEADERS;
@@ -42,11 +48,6 @@ import static brave.propagation.B3SingleFormat.writeB3SingleFormatWithoutParentI
/** Use this class to decorate your Kafka consumer / producer and enable Tracing. */
public final class KafkaTracing {
-
- static final String PROTOCOL = "kafka";
- static final String PRODUCER_OPERATION = "send";
- static final String CONSUMER_OPERATION = "poll";
-
public static KafkaTracing create(Tracing tracing) {
return new Builder(tracing).build();
}
@@ -56,18 +57,19 @@ public final class KafkaTracing {
}
public static final class Builder {
- final MessagingTracing msgTracing;
+ final MessagingTracing messagingTracing;
String remoteServiceName = "kafka";
boolean writeB3SingleFormat;
Builder(Tracing tracing) {
if (tracing == null) throw new NullPointerException("tracing == null");
- this.msgTracing = MessagingTracing.create(tracing);
+ this.messagingTracing =
+ MessagingTracing.newBuilder(tracing).parser(new LegacyMessagingParser()).build();
}
- Builder(MessagingTracing msgTracing) {
- if (msgTracing == null) throw new NullPointerException("msgTracing == null");
- this.msgTracing = msgTracing;
+ Builder(MessagingTracing messagingTracing) {
+ if (messagingTracing == null) throw new NullPointerException("messagingTracing == null");
+ this.messagingTracing = messagingTracing;
}
/**
@@ -95,72 +97,40 @@ public final class KafkaTracing {
}
}
- final MessagingTracing msgTracing;
- final String remoteServiceName;
- final List<String> propagationKeys;
-
- boolean singleFormat;
+ final MessagingTracing messagingTracing;
+ final ConsumerHandler<String, ConsumerRecord, Headers> consumerHandler;
+ final ProcessorHandler<String, ConsumerRecord, Headers> processorHandler;
+ final ProducerHandler<String, ProducerRecord, Headers> producerHandler;
KafkaTracing(Builder builder) { // intentionally hidden constructor
- this.msgTracing = builder.msgTracing;
- this.remoteServiceName = builder.remoteServiceName;
- this.propagationKeys = msgTracing.tracing().propagation().keys();
- final Extractor<Headers> extractor =
- msgTracing.tracing().propagation().extractor(HEADERS_GETTER);
- List<String> keyList = msgTracing.tracing().propagation().keys();
- singleFormat = false;
+ this.messagingTracing = builder.messagingTracing;
+ Extractor<Headers> extractor =
+ messagingTracing.tracing().propagation().extractor(HEADERS_GETTER);
+ Injector<Headers> injector;
+ List<String> keyList = messagingTracing.tracing().propagation().keys();
if (builder.writeB3SingleFormat || keyList.equals(Propagation.B3_SINGLE_STRING.keys())) {
TraceContext testExtraction = extractor.extract(B3_SINGLE_TEST_HEADERS).context();
if (!TEST_CONTEXT.equals(testExtraction)) {
throw new IllegalArgumentException(
- "KafkaTracing.Builder.writeB3SingleFormat set, but Tracing.Builder.propagationFactory cannot parse this format!");
+ "KafkaTracing.Builder.writeB3SingleFormat set, but Tracing.Builder.propagationFactory cannot parse this format!");
}
- singleFormat = true;
- }
- }
-
- <K, V> Extractor<ProducerRecord<K, V>> producerRecordExtractor() {
- return msgTracing.tracing()
- .propagation()
- .extractor((record, key) -> HEADERS_GETTER.get(record.headers(), key));
- }
-
- <K, V> Injector<ProducerRecord<K, V>> producerRecordInjector() {
- return singleFormat ?
- new Injector<ProducerRecord<K, V>>() {
- @Override public void inject(TraceContext traceContext, ProducerRecord<K, V> carrier) {
- carrier.headers().add("b3", writeB3SingleFormatWithoutParentIdAsBytes(traceContext));
- }
-
- @Override public String toString() {
- return "Headers::add(\"b3\",singleHeaderFormatWithoutParent)";
- }
+ injector = new Injector<Headers>() {
+ @Override public void inject(TraceContext traceContext, Headers headers) {
+ headers.add("b3", writeB3SingleFormatWithoutParentIdAsBytes(traceContext));
}
- : msgTracing.tracing().propagation().injector((record, key, value) -> {
- HEADERS_SETTER.put(record.headers(), key, value);
- });
- }
- <K, V> Extractor<ConsumerRecord<K, V>> consumerRecordExtractor() {
- return msgTracing.tracing()
- .propagation()
- .extractor((record, key) -> HEADERS_GETTER.get(record.headers(), key));
- }
-
- <K, V> Injector<ConsumerRecord<K, V>> consumerRecordInjector() {
- return singleFormat ?
- new Injector<ConsumerRecord<K, V>>() {
- @Override public void inject(TraceContext traceContext, ConsumerRecord<K, V> carrier) {
- carrier.headers().add("b3", writeB3SingleFormatWithoutParentIdAsBytes(traceContext));
- }
-
- @Override public String toString() {
- return "Headers::add(\"b3\",singleHeaderFormatWithoutParent)";
- }
+ @Override public String toString() {
+ return "Headers::add(\"b3\",singleHeaderFormatWithoutParent)";
}
- : msgTracing.tracing().propagation().injector((record, key, value) -> {
- HEADERS_SETTER.put(record.headers(), key, value);
- });
+ };
+ } else {
+ injector = messagingTracing.tracing().propagation().injector(HEADERS_SETTER);
+ }
+ consumerHandler = ConsumerHandler.create(
+ messagingTracing, new ConsumerRecordAdapter(builder.remoteServiceName), extractor, injector);
+ processorHandler = ProcessorHandler.create(messagingTracing, consumerHandler);
+ producerHandler = ProducerHandler.create(
+ messagingTracing, new ProducerRecordAdapter(builder.remoteServiceName), extractor, injector);
}
/**
@@ -185,35 +155,37 @@ public final class KafkaTracing {
* one couldn't be extracted.
*/
public <K, V> Span nextSpan(ConsumerRecord<K, V> record) {
- final TracingConsumer.KafkaConsumerAdapter<K, V> adapter =
- TracingConsumer.KafkaConsumerAdapter.create(this);
- return msgTracing.nextSpan(adapter, adapter, consumerRecordExtractor(), record, record);
- }
-
- <Record> String channelTagKey(Record record) {
- return String.format("%s.topic", PROTOCOL);
+ return processorHandler.startProcessor(record.topic(), record, false);
}
- String recordKey(Object key) {
+ @Nullable static String recordKey(Object key) {
if (key instanceof String && !"".equals(key)) {
return key.toString();
}
return null;
}
- String identifierTagKey() {
- return String.format("%s.key", PROTOCOL);
- }
+ static class LegacyMessagingParser extends MessagingParser {
+ @Override
+ protected <Chan, Msg, C> void addMessageTags(MessagingAdapter<Chan, Msg, C> adapter,
+ Chan channel, @Nullable Msg msg, TraceContext context, SpanCustomizer customizer) {
+ String channelName = adapter.channel(channel);
+ if (channelName != null) customizer.tag("kafka.topic", channelName);
+ if (msg != null && context.parentId() == null) { // is a root span
+ String messageKey = adapter.messageKey(msg);
+ if (messageKey != null) customizer.tag("kafka.key", messageKey);
+ }
+ }
- // BRAVE6: consider a messaging variant of extraction which clears headers as they are read.
- // this could prevent having to go back and clear them later. Another option is to encourage,
- // then special-case single header propagation. When there's only 1 propagation key, you don't
- // need to do a loop!
- void clearHeaders(Headers headers) {
- // Headers::remove creates and consumes an iterator each time. This does one loop instead.
- for (Iterator<Header> i = headers.iterator(); i.hasNext(); ) {
- Header next = i.next();
- if (propagationKeys.contains(next.key())) i.remove();
+ /** Returns the span name of a message operation. Defaults to the operation name. */
+ @Override protected <Chan, Msg, C> String spanName(String operation,
+ MessagingAdapter<Chan, Msg, C> adapter, Chan channel, @Nullable Msg msg) {
+ switch (operation) {
+ case "receive":
+ case "receive-batch":
+ return "poll";
+ }
+ return super.spanName(operation, adapter, channel, msg);
}
}
}
diff --git a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingConsumer.java b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingConsumer.java
index 7f17d30..de2b06d 100644
--- a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingConsumer.java
+++ b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingConsumer.java
@@ -18,9 +18,8 @@ package brave.kafka.clients;
import brave.Span;
import brave.Tracing;
-import brave.messaging.ChannelAdapter;
-import brave.messaging.MessageConsumerAdapter;
-import brave.messaging.MessagingConsumerHandler;
+import brave.messaging.ConsumerHandler;
+import brave.messaging.MessagingAdapter;
import java.time.Duration;
import java.util.Collection;
import java.util.LinkedHashMap;
@@ -40,39 +39,31 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
/**
* Kafka Consumer decorator. Read records headers to create and complete a child of the incoming
* producers span if possible.
*/
-final class TracingConsumer<K, V>
- extends MessagingConsumerHandler<Consumer<K, V>, ConsumerRecord<K, V>, ConsumerRecord<K, V>>
- implements Consumer<K, V> {
-
- final KafkaTracing kafkaTracing;
- final Tracing tracing;
- final String remoteServiceName;
-
+final class TracingConsumer<K, V> implements Consumer<K, V> {
// replicate org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener behaviour
static final ConsumerRebalanceListener NO_OP_CONSUMER_REBALANCE_LISTENER =
- new ConsumerRebalanceListener() {
- @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
- }
+ new ConsumerRebalanceListener() {
+ @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+ }
- @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
- }
- };
+ @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+ }
+ };
+
+ final Consumer<K, V> delegate;
+ final Tracing tracing;
+ final ConsumerHandler<String, ConsumerRecord, Headers> handler;
TracingConsumer(Consumer<K, V> delegate, KafkaTracing kafkaTracing) {
- super(delegate,
- kafkaTracing.msgTracing,
- KafkaConsumerAdapter.create(kafkaTracing),
- KafkaConsumerAdapter.create(kafkaTracing),
- kafkaTracing.consumerRecordExtractor(),
- kafkaTracing.consumerRecordInjector());
- this.kafkaTracing = kafkaTracing;
- this.tracing = kafkaTracing.msgTracing.tracing();
- this.remoteServiceName = kafkaTracing.remoteServiceName;
+ this.delegate = delegate;
+ this.handler = kafkaTracing.consumerHandler;
+ this.tracing = kafkaTracing.messagingTracing.tracing();
}
// Do not use @Override annotation to avoid compatibility issue version < 2.0
@@ -84,15 +75,13 @@ final class TracingConsumer<K, V>
@Override public ConsumerRecords<K, V> poll(long timeout) {
ConsumerRecords<K, V> records = delegate.poll(timeout);
if (records.isEmpty() || tracing.isNoop()) return records;
- long timestamp = 0L;
Map<String, Span> consumerSpansForTopic = new LinkedHashMap<>();
for (TopicPartition partition : records.partitions()) {
- List<ConsumerRecord<K, V>> recordsInPartition = records.records(partition);
+ List<? extends ConsumerRecord> recordsInPartition = records.records(partition);
consumerSpansForTopic =
- handleConsume(recordsInPartition.size() > 0 ? recordsInPartition.get(0) : null,
- recordsInPartition, consumerSpansForTopic);
+ handler.startBulkReceive(partition.topic(), recordsInPartition, consumerSpansForTopic);
}
- for (Span span : consumerSpansForTopic.values()) span.finish(timestamp);
+ handler.finishBulkReceive(consumerSpansForTopic);
return records;
}
@@ -156,7 +145,7 @@ final class TracingConsumer<K, V>
}
@Override public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets,
- OffsetCommitCallback callback) {
+ OffsetCommitCallback callback) {
delegate.commitAsync(offsets, callback);
}
@@ -228,13 +217,13 @@ final class TracingConsumer<K, V>
}
@Override public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
- Map<TopicPartition, Long> timestampsToSearch) {
+ Map<TopicPartition, Long> timestampsToSearch) {
return delegate.offsetsForTimes(timestampsToSearch);
}
// Do not use @Override annotation to avoid compatibility issue version < 2.0
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
- Map<TopicPartition, Long> timestampsToSearch, Duration timeout) {
+ Map<TopicPartition, Long> timestampsToSearch, Duration timeout) {
return delegate.offsetsForTimes(timestampsToSearch, timeout);
}
@@ -245,7 +234,7 @@ final class TracingConsumer<K, V>
// Do not use @Override annotation to avoid compatibility issue version < 2.0
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions,
- Duration timeout) {
+ Duration timeout) {
return delegate.beginningOffsets(partitions, timeout);
}
@@ -255,7 +244,7 @@ final class TracingConsumer<K, V>
// Do not use @Override annotation to avoid compatibility issue version < 2.0
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions,
- Duration timeout) {
+ Duration timeout) {
return delegate.endOffsets(partitions, timeout);
}
@@ -276,40 +265,36 @@ final class TracingConsumer<K, V>
delegate.wakeup();
}
- static class KafkaConsumerAdapter<K, V> implements MessageConsumerAdapter<ConsumerRecord<K, V>>,
- ChannelAdapter<ConsumerRecord<K, V>> {
- final KafkaTracing kafkaTracing;
-
- KafkaConsumerAdapter(KafkaTracing kafkaTracing) {
- this.kafkaTracing = kafkaTracing;
- }
+ static final class ConsumerRecordAdapter
+ extends MessagingAdapter<String, ConsumerRecord, Headers> {
+ final String remoteServiceName;
- static <K, V> KafkaConsumerAdapter<K, V> create(KafkaTracing kafkaTracing) {
- return new KafkaConsumerAdapter<>(kafkaTracing);
+ ConsumerRecordAdapter(String remoteServiceName) {
+ this.remoteServiceName = remoteServiceName;
}
- @Override public String channel(ConsumerRecord message) {
- return message.topic();
+ @Override public Headers carrier(ConsumerRecord message) {
+ return message.headers();
}
- @Override public String operation(ConsumerRecord message) {
- return KafkaTracing.CONSUMER_OPERATION;
+ @Override public String channel(String topic) {
+ return topic;
}
- @Override public String identifier(ConsumerRecord message) {
- return kafkaTracing.recordKey(message.key());
+ @Override public String channelType(String channel) {
+ return "topic";
}
- @Override public String remoteServiceName(ConsumerRecord message) {
- return kafkaTracing.remoteServiceName;
+ @Override public String messageKey(ConsumerRecord message) {
+ return KafkaTracing.recordKey(message.key());
}
- @Override public String channelTagKey(ConsumerRecord<K, V> message) {
- return kafkaTracing.channelTagKey(message);
+ @Override public String correlationId(ConsumerRecord message) {
+ return null;
}
- @Override public String identifierTagKey() {
- return kafkaTracing.identifierTagKey();
+ @Override public String brokerName(String topic) {
+ return remoteServiceName;
}
}
}
diff --git a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingProducer.java b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingProducer.java
index 9ce4092..6186e3e 100644
--- a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingProducer.java
+++ b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingProducer.java
@@ -18,11 +18,12 @@ package brave.kafka.clients;
import brave.Span;
import brave.Tracer;
+import brave.Tracing;
import brave.internal.Nullable;
-import brave.messaging.ChannelAdapter;
-import brave.messaging.MessageProducerAdapter;
-import brave.messaging.MessagingProducerHandler;
-import brave.propagation.CurrentTraceContext;
+import brave.messaging.MessagingAdapter;
+import brave.messaging.MessagingParser;
+import brave.messaging.ProducerHandler;
+import brave.propagation.CurrentTraceContext.Scope;
import java.time.Duration;
import java.util.List;
import java.util.Map;
@@ -37,28 +38,22 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
-final class TracingProducer<K, V>
- extends MessagingProducerHandler<Producer<K, V>, ProducerRecord<K, V>, ProducerRecord<K, V>>
- implements Producer<K, V> {
+final class TracingProducer<K, V> implements Producer<K, V> {
+ final Producer<K, V> delegate;
final KafkaTracing kafkaTracing;
- final CurrentTraceContext current;
- final Tracer tracer;
- @Nullable final String remoteServiceName;
+ final Tracing tracing;
+ final ProducerHandler<String, ProducerRecord, Headers> handler;
+ final MessagingParser parser;
TracingProducer(Producer<K, V> delegate, KafkaTracing kafkaTracing) {
- super(
- delegate,
- kafkaTracing.msgTracing,
- KafkaProducerAdapter.create(kafkaTracing),
- KafkaProducerAdapter.create(kafkaTracing),
- kafkaTracing.producerRecordExtractor(),
- kafkaTracing.producerRecordInjector());
+ this.delegate = delegate;
+ this.handler = kafkaTracing.producerHandler;
this.kafkaTracing = kafkaTracing;
- this.current = kafkaTracing.msgTracing.tracing().currentTraceContext();
- this.tracer = kafkaTracing.msgTracing.tracing().tracer();
- this.remoteServiceName = kafkaTracing.remoteServiceName;
+ this.tracing = kafkaTracing.messagingTracing.tracing();
+ this.parser = kafkaTracing.messagingTracing.parser();
}
@Override public void initTransactions() {
@@ -82,25 +77,27 @@ final class TracingProducer<K, V>
* tracing.
*/
@Override public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
- return this.send(record, null);
+ return send(record, null);
}
/**
* This wraps the send method to add tracing.
*
* <p>When there is no current span, this attempts to extract one from headers. This is possible
- * when a call to produce a message happens directly after a tracing consumer received a span. One
+ * when a call to produce a message happens directly after a tracing producer received a span. One
* example scenario is Kafka Streams instrumentation.
*/
// TODO: make b3single an option and then note how using this minimizes overhead
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, @Nullable Callback callback) {
- Span span = handleProduce(record, record);
+ Span span = handler.startSend(record.topic(), record);
+ if (span.isNoop()) return delegate.send(record, callback);
- try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
- return delegate.send(record, TracingCallback.create(callback, span, current));
+ FinishSpanCallback finishSpanCallback = tracingCallback(record, callback, span);
+ try (Tracer.SpanInScope ws = tracing.tracer().withSpanInScope(span)) {
+ return delegate.send(record, finishSpanCallback);
} catch (RuntimeException | Error e) {
- span.error(e).finish(); // finish as an exception means the callback won't finish the span
+ finishSpanCallback.finish(e); // an exception might imply the callback was not invoked
throw e;
}
}
@@ -132,89 +129,87 @@ final class TracingProducer<K, V>
@Override
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
- String consumerGroupId) {
- delegate.sendOffsetsToTransaction(offsets, consumerGroupId);
+ String producerGroupId) {
+ delegate.sendOffsetsToTransaction(offsets, producerGroupId);
}
/**
* Decorates, then finishes a producer span. Allows tracing to record the duration between
* batching for send and actual send.
*/
- static final class TracingCallback {
- static Callback create(@Nullable Callback delegate, Span span, CurrentTraceContext current) {
- if (span.isNoop()) return delegate; // save allocation overhead
- if (delegate == null) return new FinishSpan(span);
- return new DelegateAndFinishSpan(delegate, span, current);
- }
+ FinishSpanCallback tracingCallback(ProducerRecord<K, V> record, @Nullable Callback delegate,
+ Span span) {
+ if (delegate == null) return new FinishSpanCallback(record, span);
+ return new DelegateAndFinishSpanCallback(record, delegate, span);
+ }
- static class FinishSpan implements Callback {
- final Span span;
+ final class DelegateAndFinishSpanCallback extends FinishSpanCallback {
+ final Callback delegate;
- FinishSpan(Span span) {
- this.span = span;
- }
+ DelegateAndFinishSpanCallback(ProducerRecord<K, V> record, Callback delegate, Span span) {
+ super(record, span);
+ this.delegate = delegate;
+ }
- @Override public void onCompletion(RecordMetadata metadata, @Nullable Exception exception) {
- if (exception != null) span.error(exception);
- span.finish();
+ @Override public void onCompletion(RecordMetadata metadata, @Nullable Exception exception) {
+ try (Scope ws = tracing.currentTraceContext().maybeScope(span.context())) {
+ delegate.onCompletion(metadata, exception);
+ } finally {
+ finish(exception);
}
}
+ }
- static final class DelegateAndFinishSpan extends FinishSpan {
- final Callback delegate;
- final CurrentTraceContext current;
-
- DelegateAndFinishSpan(Callback delegate, Span span, CurrentTraceContext current) {
- super(span);
- this.delegate = delegate;
- this.current = current;
- }
+ class FinishSpanCallback implements Callback {
+ final ProducerRecord<K, V> record;
+ final Span span;
- @Override public void onCompletion(RecordMetadata metadata, @Nullable Exception exception) {
- try (CurrentTraceContext.Scope ws = current.maybeScope(span.context())) {
- delegate.onCompletion(metadata, exception);
- } finally {
- super.onCompletion(metadata, exception);
- }
- }
+ FinishSpanCallback(ProducerRecord<K, V> record, Span span) {
+ this.record = record;
+ this.span = span;
}
- }
- static final class KafkaProducerAdapter<K, V> implements
- MessageProducerAdapter<ProducerRecord<K, V>>,
- ChannelAdapter<ProducerRecord<K, V>> {
- final KafkaTracing kafkaTracing;
+ @Override public void onCompletion(RecordMetadata metadata, @Nullable Exception exception) {
+ finish(exception);
+ }
- KafkaProducerAdapter(KafkaTracing kafkaTracing) {
- this.kafkaTracing = kafkaTracing;
+ void finish(@Nullable Throwable error) {
+ if (error != null) span.error(error);
+ handler.finishSend(record.topic(), record, span);
+ span.finish();
}
+ }
+
+ static final class ProducerRecordAdapter
+ extends MessagingAdapter<String, ProducerRecord, Headers> {
+ final String remoteServiceName;
- static <K, V> KafkaProducerAdapter<K, V> create(KafkaTracing kafkaTracing) {
- return new KafkaProducerAdapter<>(kafkaTracing);
+ ProducerRecordAdapter(String remoteServiceName) {
+ this.remoteServiceName = remoteServiceName;
}
- @Override public String channel(ProducerRecord message) {
- return message.topic();
+ @Override public Headers carrier(ProducerRecord message) {
+ return message.headers();
}
- @Override public String operation(ProducerRecord message) {
- return KafkaTracing.PRODUCER_OPERATION;
+ @Override public String channel(String topic) {
+ return topic;
}
- @Override public String identifier(ProducerRecord message) {
- return kafkaTracing.recordKey(message.key());
+ @Override public String channelType(String channel) {
+ return "topic";
}
- @Override public String remoteServiceName(ProducerRecord message) {
- return kafkaTracing.remoteServiceName;
+ @Override public String messageKey(ProducerRecord message) {
+ return KafkaTracing.recordKey(message.key());
}
- @Override public String channelTagKey(ProducerRecord<K, V> message) {
- return kafkaTracing.channelTagKey(message);
+ @Override public String correlationId(ProducerRecord message) {
+ return null;
}
- @Override public String identifierTagKey() {
- return kafkaTracing.identifierTagKey();
+ @Override public String brokerName(String topic) {
+ return remoteServiceName;
}
}
}
diff --git a/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/TracingCallbackTest.java b/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/TracingCallbackTest.java
index 7cf6caa..e1a0dab 100644
--- a/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/TracingCallbackTest.java
+++ b/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/TracingCallbackTest.java
@@ -17,8 +17,9 @@
package brave.kafka.clients;
import brave.Span;
-import brave.sampler.Sampler;
import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.Test;
@@ -28,19 +29,14 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
public class TracingCallbackTest extends BaseTracingTest {
- @Test public void create_returns_input_on_noop() {
- Span span = tracing.tracer().withSampler(Sampler.NEVER_SAMPLE).nextSpan();
-
- Callback delegate = mock(Callback.class);
- Callback tracingCallback = TracingProducer.TracingCallback.create(delegate, span, current);
-
- assertThat(tracingCallback).isSameAs(delegate);
- }
+ Producer<String, String> producer = mock(Producer.class);
+ ProducerRecord<String, String> record = mock(ProducerRecord.class);
+ TracingProducer<String, String> tracingProducer = new TracingProducer<>(producer, kafkaTracing);
@Test public void on_completion_should_finish_span() {
Span span = tracing.tracer().nextSpan().start();
- Callback tracingCallback = TracingProducer.TracingCallback.create(null, span, current);
+ Callback tracingCallback = tracingProducer.tracingCallback(record, null, span);
tracingCallback.onCompletion(createRecordMetadata(), null);
assertThat(spans.getFirst()).isNotNull();
@@ -49,18 +45,18 @@ public class TracingCallbackTest extends BaseTracingTest {
@Test public void on_completion_should_tag_if_exception() {
Span span = tracing.tracer().nextSpan().start();
- Callback tracingCallback = TracingProducer.TracingCallback.create(null, span, current);
+ Callback tracingCallback = tracingProducer.tracingCallback(record, null, span);
tracingCallback.onCompletion(null, new Exception("Test exception"));
assertThat(spans.getFirst().tags())
- .containsEntry("error", "Test exception");
+ .containsEntry("error", "Test exception");
}
@Test public void on_completion_should_forward_then_finish_span() {
Span span = tracing.tracer().nextSpan().start();
Callback delegate = mock(Callback.class);
- Callback tracingCallback = TracingProducer.TracingCallback.create(delegate, span, current);
+ Callback tracingCallback = tracingProducer.tracingCallback(record, delegate, span);
RecordMetadata md = createRecordMetadata();
tracingCallback.onCompletion(md, null);
@@ -73,14 +69,16 @@ public class TracingCallbackTest extends BaseTracingTest {
Callback delegate = (metadata, exception) -> assertThat(current.get()).isSameAs(span.context());
- TracingProducer.TracingCallback.create(delegate, span, current).onCompletion(createRecordMetadata(), null);
+ Callback tracingCallback = tracingProducer.tracingCallback(record, delegate, span);
+
+ tracingCallback.onCompletion(createRecordMetadata(), null);
}
@Test public void on_completion_should_forward_then_tag_if_exception() {
Span span = tracing.tracer().nextSpan().start();
Callback delegate = mock(Callback.class);
- Callback tracingCallback = TracingProducer.TracingCallback.create(delegate, span, current);
+ Callback tracingCallback = tracingProducer.tracingCallback(record, delegate, span);
RecordMetadata md = createRecordMetadata();
Exception e = new Exception("Test exception");
tracingCallback.onCompletion(md, e);
@@ -88,7 +86,7 @@ public class TracingCallbackTest extends BaseTracingTest {
verify(delegate).onCompletion(md, e);
assertThat(spans.getFirst().tags())
- .containsEntry("error", "Test exception");
+ .containsEntry("error", "Test exception");
}
RecordMetadata createRecordMetadata() {
diff --git a/instrumentation/messaging/pom.xml b/instrumentation/messaging/pom.xml
index 310ac2b..1f6ed5b 100644
--- a/instrumentation/messaging/pom.xml
+++ b/instrumentation/messaging/pom.xml
@@ -38,6 +38,18 @@
<errorprone.args>-Xep:MissingOverride:OFF</errorprone.args>
</properties>
+ <dependencies>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>sqs</artifactId>
+ <version>2.5.52</version>
+ </dependency>
+ <dependency>
+ <groupId>com.rabbitmq</groupId>
+ <artifactId>amqp-client</artifactId>
+ <version>5.7.1</version>
+ </dependency>
+ </dependencies>
<build>
<plugins>
<plugin>
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/ConsumerHandler.java b/instrumentation/messaging/src/main/java/brave/messaging/ConsumerHandler.java
new file mode 100644
index 0000000..7456b16
--- /dev/null
+++ b/instrumentation/messaging/src/main/java/brave/messaging/ConsumerHandler.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brave.messaging;
+
+import brave.Span;
+import brave.Tracing;
+import brave.internal.Nullable;
+import brave.propagation.TraceContext;
+import brave.propagation.TraceContextOrSamplingFlags;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @param <Chan> the type of the channel
+ * @param <Msg> the type of the message
+ * @param <C> the type that carriers the trace context, usually headers
+ */
+public final class ConsumerHandler<Chan, Msg, C> {
+
+ public static <Chan, Msg, C> ConsumerHandler<Chan, Msg, C> create(
+ MessagingTracing messagingTracing,
+ MessagingAdapter<Chan, Msg, C> adapter,
+ TraceContext.Extractor<C> extractor,
+ TraceContext.Injector<C> injector
+ ) {
+ return new ConsumerHandler<>(messagingTracing, adapter, extractor, injector);
+ }
+
+ final Tracing tracing;
+ final TraceContext.Extractor<C> extractor;
+ final TraceContext.Injector<C> injector;
+ final MessagingParser parser;
+ final MessagingAdapter<Chan, Msg, C> adapter;
+
+ ConsumerHandler(MessagingTracing messagingTracing,
+ MessagingAdapter<Chan, Msg, C> adapter,
+ TraceContext.Extractor<C> extractor,
+ TraceContext.Injector<C> injector
+ ) {
+ this.tracing = messagingTracing.tracing;
+ this.extractor = extractor;
+ this.injector = injector;
+ this.parser = messagingTracing.parser;
+ this.adapter = adapter;
+ }
+
+ public void handleReceive(Chan channel, Msg message) {
+ if (message == null || tracing.isNoop()) return;
+ C carrier = adapter.carrier(message);
+ TraceContextOrSamplingFlags extracted = extractor.extract(carrier);
+ handleReceive(channel, message, carrier, adapter.brokerName(channel), extracted, false);
+ }
+
+ /** Returns a started processor span if {@code createProcessor} is true */
+ @Nullable Span handleReceive(Chan channel, Msg message, C carrier, String remoteServiceName,
+ TraceContextOrSamplingFlags extracted, boolean createProcessor) {
+ Span span = tracing.tracer().nextSpan(extracted);
+ // Creating the processor while the consumer is not finished ensures clocks are the same. This
+ // allows the processor to start later, but not be subject to clock drift relative to the parent.
+ Span processorSpan = createProcessor ? tracing.tracer().newChild(span.context()) : null;
+ if (!span.isNoop()) {
+ span.kind(Span.Kind.CONSUMER);
+ parser.start("receive", adapter, channel, message, span.context(), span.customizer());
+ if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
+
+ // incur timestamp overhead only once
+ long timestamp = tracing.clock(span.context()).currentTimeMicroseconds();
+ span.start(timestamp);
+ parser.finish("receive", adapter, channel, message, span.context(), span.customizer());
+ span.finish(timestamp + 1);
+
+ // eventhough we are setting the timestamp here, start timestamp is allowed to be overwritten
+ // later as needed. Doing so here avoids the overhead of a tick reading
+ if (processorSpan != null) processorSpan.start(timestamp + 1);
+ }
+ injector.inject(createProcessor ? processorSpan.context() : span.context(), carrier);
+ return processorSpan;
+ }
+
+ public Map<Chan, Span> startBulkReceive(Chan channel, List<? extends Msg> messages,
+ Map<Chan, Span> spanForChannel) {
+ long timestamp = 0L;
+ for (int i = 0, length = messages.size(); i < length; i++) {
+ Msg message = messages.get(i);
+ C carrier = adapter.carrier(message);
+ TraceContextOrSamplingFlags extracted = extractor.extract(carrier);
+ String remoteServiceName = adapter.brokerName(channel);
+
+ // If we extracted neither a trace context, nor request-scoped data (extra),
+ // make or reuse a span for this topic
+ if (extracted.samplingFlags() != null && extracted.extra().isEmpty()) {
+ Span span = spanForChannel.get(channel);
+ if (span == null) {
+ span = tracing.tracer().nextSpan(extracted);
+ if (!span.isNoop()) {
+ span.kind(Span.Kind.CONSUMER);
+ parser.start("receive-batch", adapter, channel, null, span.context(),
+ span.customizer());
+ if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
+ // incur timestamp overhead only once
+ if (timestamp == 0L) {
+ timestamp = tracing.clock(span.context()).currentTimeMicroseconds();
+ }
+ span.start(timestamp);
+ }
+ spanForChannel.put(channel, span);
+ }
+ injector.inject(span.context(), carrier);
+ } else { // we extracted request-scoped data, so cannot share a consumer span.
+ handleReceive(channel, message, carrier, remoteServiceName, extracted, false);
+ }
+ }
+ return spanForChannel;
+ }
+
+ public void finishBulkReceive(Map<Chan, Span> spanForChannel) {
+ long timestamp = 0L;
+ for (Map.Entry<Chan, Span> entry : spanForChannel.entrySet()) {
+ Span span = entry.getValue();
+ // incur timestamp overhead only once
+ if (timestamp == 0L) {
+ timestamp = tracing.clock(span.context()).currentTimeMicroseconds();
+ }
+ parser.finish("receive-batch", adapter, entry.getKey(), null, span.context(),
+ span.customizer());
+ span.finish(timestamp);
+ }
+ }
+}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessageAdapter.java b/instrumentation/messaging/src/main/java/brave/messaging/MessageAdapter.java
deleted file mode 100644
index 502890b..0000000
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessageAdapter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package brave.messaging;
-
-interface MessageAdapter<Msg> {
- /**
- * Messaging operation semantics, e.g. pull, push, send, receive, etc.
- */
- String operation(Msg message);
-
- /**
- * Message identifier, e.g. kafka record key, jms message correlation id.
- */
- String identifier(Msg message);
-
- /**
- * Removes propagation context from Message context carrier.
- */
- //void clearPropagation(Msg message);
-
- String identifierTagKey();
-}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessageConsumerAdapter.java b/instrumentation/messaging/src/main/java/brave/messaging/MessageConsumerAdapter.java
deleted file mode 100644
index f31e327..0000000
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessageConsumerAdapter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package brave.messaging;
-
-public interface MessageConsumerAdapter<Msg> extends MessageAdapter<Msg> {
- /**
- * Messaging operation semantics, e.g. pull, push, send, receive, etc.
- */
- String operation(Msg message);
-
- /**
- * Message identifier, e.g. kafka record key, jms message correlation id.
- */
- String identifier(Msg message);
-
- /**
- * Removes propagation context from Message context carrier.
- */
- //void clearPropagation(Msg message);
-
- String identifierTagKey();
-}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessageProducerAdapter.java b/instrumentation/messaging/src/main/java/brave/messaging/MessageProducerAdapter.java
deleted file mode 100644
index 2bfbde6..0000000
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessageProducerAdapter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package brave.messaging;
-
-public interface MessageProducerAdapter<Msg> extends MessageAdapter<Msg> {
- /**
- * Messaging operation semantics, e.g. pull, push, send, receive, etc.
- */
- String operation(Msg message);
-
- /**
- * Message identifier, e.g. kafka record key, jms message correlation id.
- */
- String identifier(Msg message);
-
- /**
- * Removes propagation context from Message context carrier.
- */
- //void clearPropagation(Msg message);
-
- String identifierTagKey();
-}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessagingAdapter.java b/instrumentation/messaging/src/main/java/brave/messaging/MessagingAdapter.java
new file mode 100644
index 0000000..11a4e6c
--- /dev/null
+++ b/instrumentation/messaging/src/main/java/brave/messaging/MessagingAdapter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brave.messaging;
+
+import brave.Span;
+import brave.internal.Nullable;
+
+/**
+ * @param <Chan> the type of the channel
+ * @param <Msg> the type of the message
+ * @param <C> the type that carriers the trace context, usually headers
+ */
+// abstract class instead of interface to allow method adds before Java 1.8
+public abstract class MessagingAdapter<Chan, Msg, C> {
+ // TODO: make some of these methods not abstract as they don't have meaning for all impls
+
+ /** Returns the trace context carrier from the message. Usually, this is headers. */
+ public abstract C carrier(Msg message);
+
+ /**
+ * Messaging channel, e.g. kafka queue or JMS topic name. {@code null} if unreadable.
+ *
+ * <p>Conventionally associated with the key "message.channel"
+ */
+ @Nullable public abstract String channel(Chan channel);
+
+ /**
+ * Type of channel, e.g. queue or topic. {@code null} if unreadable.
+ *
+ * <p>Conventionally associated with the key "message.channel_type"
+ */
+ // TODO: naming is arbitrary.. we once used "kind" for Span but only because stackdriver did..
+ // Not sure we should re-use kind for parity with that or not..
+ @Nullable public abstract String channelType(Chan channel);
+
+ /**
+ * Key used to identity or partition messages. {@code null} if unreadable.
+ *
+ * <p>Conventionally associated with the key "message.key"
+ */
+ @Nullable public abstract String messageKey(Msg message);
+
+ /**
+ * Identifier used to correlate logs. {@code null} if unreadable.
+ *
+ * <p>Conventionally associated with the key "message.correlation_id"
+ */
+ @Nullable public abstract String correlationId(Msg message);
+
+ /**
+ * Message broker name. {@code null} if unreadable.
+ *
+ * <p>Conventionally associated with {@link Span#remoteServiceName(String)}
+ */
+ @Nullable public abstract String brokerName(Chan channel);
+
+ protected MessagingAdapter() {
+ }
+}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessagingConsumerHandler.java b/instrumentation/messaging/src/main/java/brave/messaging/MessagingConsumerHandler.java
deleted file mode 100644
index 629e3b9..0000000
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessagingConsumerHandler.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package brave.messaging;
-
-import brave.Span;
-import brave.SpanCustomizer;
-import brave.Tracing;
-import brave.propagation.TraceContext;
-import brave.propagation.TraceContextOrSamplingFlags;
-import java.util.List;
-import java.util.Map;
-
-public class MessagingConsumerHandler<C, Chan, Msg>
- extends MessagingHandler<Chan, Msg, ChannelAdapter<Chan>, MessageAdapter<Msg>> {
-
- static public <C, Chan, Msg> MessagingConsumerHandler<C, Chan, Msg> create(
- C delegate,
- MessagingTracing tracing,
- ChannelAdapter<Chan> channelAdapter,
- MessageConsumerAdapter<Msg> messageAdapter,
- TraceContext.Extractor<Msg> extractor,
- TraceContext.Injector<Msg> injector) {
- return new MessagingConsumerHandler<>(delegate, tracing, channelAdapter, messageAdapter,
- extractor, injector);
- }
-
- public final C delegate;
- final Tracing tracing;
-
- public MessagingConsumerHandler(
- C delegate,
- MessagingTracing messagingTracing,
- ChannelAdapter<Chan> channelAdapter,
- MessageConsumerAdapter<Msg> messageAdapter,
- TraceContext.Extractor<Msg> extractor,
- TraceContext.Injector<Msg> injector) {
- super(messagingTracing.tracing.currentTraceContext(), channelAdapter, messageAdapter,
- messagingTracing.consumerParser, extractor, injector);
- this.delegate = delegate;
- this.tracing = messagingTracing.tracing;
- }
-
- public Span nextSpan(Chan channel, Msg message) {
- TraceContextOrSamplingFlags extracted = extractor.extract(message);
- Span result = tracing.tracer().nextSpan(extracted);
- if (extracted.context() == null && !result.isNoop()) {
- addTags(channel, result);
- }
- return result;
- }
-
- /** When an upstream context was not present, lookup keys are unlikely added */
- void addTags(Chan channel, SpanCustomizer result) {
- parser.channel(channelAdapter, channel, result);
- //parser.identifier(messageAdapter, message, result);
- }
-
- public void handleConsume(Chan channel, Msg message) {
- if (message == null || tracing.isNoop()) return;
- // remove prior propagation headers from the message
- Span span = nextSpan(channel, message);
- if (!span.isNoop()) {
- span.kind(Span.Kind.CONSUMER);
- parser.message(channelAdapter, messageAdapter, channel, message, span);
-
- // incur timestamp overhead only once
- long timestamp = tracing.clock(span.context()).currentTimeMicroseconds();
- span.start(timestamp).finish(timestamp);
- }
- injector.inject(span.context(), message);
- }
-
- public Map<String, Span> handleConsume(Chan chan, List<Msg> messages,
- Map<String, Span> spanForChannel) {
- long timestamp = 0L;
- for (int i = 0, length = messages.size(); i < length; i++) {
- Msg message = messages.get(i);
- TraceContextOrSamplingFlags extracted = extractor.extract(message);
-
- // If we extracted neither a trace context, nor request-scoped data (extra),
- // make or reuse a span for this topic
- if (extracted.samplingFlags() != null && extracted.extra().isEmpty()) {
- String channel = channelAdapter.channel(chan);
- Span span = spanForChannel.get(channel);
- if (span == null) {
- span = tracing.tracer().nextSpan(extracted);
- if (!span.isNoop()) {
- span.name(messageAdapter.operation(message)).kind(Span.Kind.CONSUMER);
- parser.message(channelAdapter, messageAdapter, chan, message, span);
- String remoteServiceName = channelAdapter.remoteServiceName(chan);
- if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
- // incur timestamp overhead only once
- if (timestamp == 0L) {
- timestamp = tracing.clock(span.context()).currentTimeMicroseconds();
- }
- span.start(timestamp);
- }
- spanForChannel.put(channel, span);
- }
- injector.inject(span.context(), message);
- } else { // we extracted request-scoped data, so cannot share a consumer span.
- Span span = tracing.tracer().nextSpan(extracted);
- if (!span.isNoop()) {
- span.kind(Span.Kind.CONSUMER);
- parser.message(channelAdapter, messageAdapter, chan, message, span);
- String remoteServiceName = channelAdapter.remoteServiceName(chan);
- if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
- // incur timestamp overhead only once
- if (timestamp == 0L) {
- timestamp = tracing.clock(span.context()).currentTimeMicroseconds();
- }
- span.start(timestamp).finish(timestamp); // span won't be shared by other records
- }
- injector.inject(span.context(), message);
- }
- }
- return spanForChannel;
- }
-}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessagingConsumerParser.java b/instrumentation/messaging/src/main/java/brave/messaging/MessagingConsumerParser.java
deleted file mode 100644
index c463860..0000000
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessagingConsumerParser.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package brave.messaging;
-
-import brave.SpanCustomizer;
-import brave.propagation.TraceContext;
-import brave.propagation.TraceContextOrSamplingFlags;
-
-public class MessagingConsumerParser extends MessagingParser {
-
- public <Chan, Msg> void message(ChannelAdapter<Chan> channelAdapter,
- MessageAdapter<Msg> messageAdapter,
- Chan channel, Msg message, SpanCustomizer customizer) {
- customizer.name(messageAdapter.operation(message));
- channel(channelAdapter, channel, customizer);
- //identifier(messageAdapter, message, customizer);
- }
-
- //public <Msg> TraceContextOrSamplingFlags extractContextAndClearMessage(
- // MessageAdapter<Msg> adapter,
- // TraceContext.Extractor<Msg> extractor,
- // Msg message) {
- // // clear propagation headers if we were able to extract a span
- // //TODO check if correct to not filter on empty flags. Diff between kafka and jms instrumentation
- // //if (!extracted.equals(TraceContextOrSamplingFlags.EMPTY)) {
- // //adapter.clearPropagation(message);
- // //}
- // return extractor.extract(message);
- //}
-}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessagingHandler.java b/instrumentation/messaging/src/main/java/brave/messaging/MessagingHandler.java
deleted file mode 100644
index 0d22086..0000000
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessagingHandler.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package brave.messaging;
-
-import brave.propagation.CurrentTraceContext;
-import brave.propagation.TraceContext;
-
-abstract class MessagingHandler<Chan, Msg, CA extends ChannelAdapter<Chan>, MA extends MessageAdapter<Msg>> {
-
- final CurrentTraceContext currentTraceContext;
- final CA channelAdapter;
- final MA messageAdapter;
- final MessagingParser parser;
- final TraceContext.Extractor<Msg> extractor;
- final TraceContext.Injector<Msg> injector;
-
- MessagingHandler(
- CurrentTraceContext currentTraceContext,
- CA channelAdapter,
- MA adapter,
- MessagingParser parser,
- TraceContext.Extractor<Msg> extractor,
- TraceContext.Injector<Msg> injector) {
- this.currentTraceContext = currentTraceContext;
- this.channelAdapter = channelAdapter;
- this.messageAdapter = adapter;
- this.parser = parser;
- this.extractor = extractor;
- this.injector = injector;
- }
-}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/ChannelAdapter.java b/instrumentation/messaging/src/main/java/brave/messaging/MessagingOperation.java
similarity index 72%
rename from instrumentation/messaging/src/main/java/brave/messaging/ChannelAdapter.java
rename to instrumentation/messaging/src/main/java/brave/messaging/MessagingOperation.java
index 601410e..8194791 100644
--- a/instrumentation/messaging/src/main/java/brave/messaging/ChannelAdapter.java
+++ b/instrumentation/messaging/src/main/java/brave/messaging/MessagingOperation.java
@@ -16,16 +16,9 @@
*/
package brave.messaging;
-public interface ChannelAdapter<Channel> {
- /**
- * Messaging channel, e.g. kafka topic, jms queue, jms topic, etc.
- */
- String channel(Channel channel);
-
- String channelTagKey(Channel channel);
-
- /**
- * Messaging broker service, e.g. kafka-cluster, jms-server.
- */
- String remoteServiceName(Channel channel);
+public enum MessagingOperation {
+ SEND,
+ BULK_SEND,
+ RECEIVE,
+ BULK_RECEIVE;
}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessagingParser.java b/instrumentation/messaging/src/main/java/brave/messaging/MessagingParser.java
index 73ae644..87ccbed 100644
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessagingParser.java
+++ b/instrumentation/messaging/src/main/java/brave/messaging/MessagingParser.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * messaging://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,42 +17,61 @@
package brave.messaging;
import brave.SpanCustomizer;
+import brave.Tracing;
+import brave.internal.Nullable;
+import brave.propagation.ExtraFieldPropagation;
+import brave.propagation.TraceContext;
+/**
+ * <p>Methods will not be invoked with a span in scope. Please use the explicit {@link
+ * TraceContext} if you need to create tags based on propagated data like {@link
+ * ExtraFieldPropagation}.
+ */
+// there are no producer/consumer subtypes because it is simpler than multiple inheritance when the
+// same libary handles producer and consumer relationships.
public class MessagingParser {
- public <Chan, Msg> void message(ChannelAdapter<Chan> channelAdapter,
- MessageAdapter<Msg> messageAdapter,
- Chan channel, Msg message, SpanCustomizer customizer) {
- customizer.name(messageAdapter.operation(message));
- channel(channelAdapter, channel, customizer);
- identifier(messageAdapter, message, customizer);
+ /**
+ * Override to change what data to add to the span when a message operation starts. By default,
+ * this sets the span name to the operation name the tag "messaging.channel" if available.
+ *
+ * <p>If you only want to change the span name, you can override {@link
+ * #spanName(String, MessagingAdapter, Object, Object)} instead.
+ *
+ * @param msg null when a bulk operation
+ * @see #spanName(String, MessagingAdapter, Object, Object)
+ * @see #finish(String, MessagingAdapter, Object, Object, TraceContext, SpanCustomizer)
+ */
+ // Context is here so that people can decide to add tags based on local root etc.
+ public <Chan, Msg, C> void start(String operation, MessagingAdapter<Chan, Msg, C> adapter,
+ Chan channel, @Nullable Msg msg, TraceContext context, SpanCustomizer customizer) {
+ customizer.name(spanName(operation, adapter, channel, msg));
+ addMessageTags(adapter, channel, msg, context, customizer);
}
- public <Chan> void channel(ChannelAdapter<Chan> adapter, Chan chan,
- SpanCustomizer customizer) {
- String channel = adapter.channel(chan);
- if (chan != null) customizer.tag(adapter.channelTagKey(chan), channel);
+ // channel is nullable as JMS could have an exception getting it from the message
+ protected <Chan, Msg, C> void addMessageTags(MessagingAdapter<Chan, Msg, C> adapter,
+ @Nullable Chan channel, @Nullable Msg msg, TraceContext context, SpanCustomizer customizer) {
+ String channelName = adapter.channel(channel);
+ if (channelName != null) customizer.tag("messaging.channel", channelName);
}
- public <Msg> void identifier(MessageAdapter<Msg> adapter, Msg message,
- SpanCustomizer customizer) {
- String identifier = adapter.identifier(message);
- if (identifier != null) {
- customizer.tag(adapter.identifierTagKey(), identifier);
- }
+ /**
+ * Override to change what data to add to the span when a message operation completes.
+ *
+ * <p>This adds no tags by default. Error tagging is delegated to {@link Tracing#errorParser()}.
+ *
+ * @param msg null when a bulk operation
+ * @see #start(String, MessagingAdapter, Object, Object, TraceContext, SpanCustomizer)
+ */
+ // Context is here so that people can add tags based on extra fields without the cost of a scoping
+ public <Chan, Msg, C> void finish(String operation, MessagingAdapter<Chan, Msg, C> adapter,
+ Chan channel, @Nullable Msg msg, TraceContext context, SpanCustomizer customizer) {
}
- //public <Msg> TraceContextOrSamplingFlags extractContextAndClearMessage(
- // MessageAdapter<Msg> adapter,
- // TraceContext.Extractor<Msg> extractor,
- // Msg message) {
- // TraceContextOrSamplingFlags extracted = extractor.extract(message);
- // // clear propagation headers if we were able to extract a span
- // //TODO check if correct to not filter on empty flags. Diff between kafka and jms instrumentation
- // //if (!extracted.equals(TraceContextOrSamplingFlags.EMPTY)) {
- // //TODO check we dont need this
- // // adapter.clearPropagation(message);
- // //}
- // return extracted;
- //}
+ /** Returns the span name of a message operation. Defaults to the operation name. */
+ protected <Chan, Msg, C> String spanName(String operation,
+ MessagingAdapter<Chan, Msg, C> adapter, Chan channel, @Nullable Msg msg) {
+ return operation;
+ }
}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessagingProducerHandler.java b/instrumentation/messaging/src/main/java/brave/messaging/MessagingProducerHandler.java
deleted file mode 100644
index d330fc3..0000000
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessagingProducerHandler.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package brave.messaging;
-
-import brave.Span;
-import brave.Tracer;
-import brave.propagation.TraceContext;
-
-public class MessagingProducerHandler<P, Chan, Msg>
- extends MessagingHandler<Chan, Msg, ChannelAdapter<Chan>, MessageProducerAdapter<Msg>> {
-
- public static <P, Chan, Msg> MessagingProducerHandler<P, Chan, Msg> create(
- P delegate,
- MessagingTracing tracing,
- ChannelAdapter<Chan> channelAdapter,
- MessageProducerAdapter<Msg> messageAdapter,
- TraceContext.Extractor<Msg> extractor,
- TraceContext.Injector<Msg> injector) {
- return new MessagingProducerHandler<>(delegate, tracing, channelAdapter, messageAdapter,
- extractor, injector);
- }
-
- public final P delegate;
- final Tracer tracer;
-
- public MessagingProducerHandler(
- P delegate,
- MessagingTracing messagingTracing,
- ChannelAdapter<Chan> channelAdapter,
- MessageProducerAdapter<Msg> messageAdapter,
- TraceContext.Extractor<Msg> extractor,
- TraceContext.Injector<Msg> injector) {
- super(messagingTracing.tracing.currentTraceContext(), channelAdapter, messageAdapter,
- messagingTracing.producerParser, extractor, injector);
- this.delegate = delegate;
- this.tracer = messagingTracing.tracing.tracer();
- }
-
- public Span handleProduce(Chan channel, Msg message) {
- TraceContext maybeParent = currentTraceContext.get();
- // Unlike message consumers, we try current span before trying extraction. This is the proper
- // order because the span in scope should take precedence over a potentially stale header entry.
- //
- // NOTE: Brave instrumentation used properly does not result in stale header entries, as we
- // always clear message headers after reading.
- Span span;
- if (maybeParent == null) {
- span = tracer.nextSpan(extractor.extract(message));
- } else {
- // As JMS is sensitive about write access to headers, we defensively clear even if it seems
- // upstream would have cleared (because there is a span in scope!).
- span = tracer.newChild(maybeParent);
- //TODO check we dont need this
- // messageAdapter.clearPropagation(message);
- }
-
- if (!span.isNoop()) {
- span.kind(Span.Kind.PRODUCER).name(messageAdapter.operation(message));
- parser.message(channelAdapter, messageAdapter, channel, message, span);
- String remoteServiceName = channelAdapter.remoteServiceName(channel);
- if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
- span.start();
- }
-
- injector.inject(span.context(), message);
-
- return span;
- }
-}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessagingProducerParser.java b/instrumentation/messaging/src/main/java/brave/messaging/MessagingProducerParser.java
deleted file mode 100644
index 84723b5..0000000
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessagingProducerParser.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package brave.messaging;
-
-import brave.SpanCustomizer;
-
-public class MessagingProducerParser extends MessagingParser {
-
- public <Chan, Msg> void message(ChannelAdapter<Chan> channelAdapter,
- MessageAdapter<Msg> messageAdapter,
- Chan channel, Msg message, SpanCustomizer customizer) {
- customizer.name(messageAdapter.operation(message));
- channel(channelAdapter, channel, customizer);
- identifier(messageAdapter, message, customizer);
- }
-
- //public <Msg> TraceContextOrSamplingFlags extractContextAndClearMessage(
- // MessageAdapter<Msg> adapter,
- // TraceContext.Extractor<Msg> extractor,
- // Msg message) {
- // // clear propagation headers if we were able to extract a span
- // //TODO check if correct to not filter on empty flags. Diff between kafka and jms instrumentation
- // //if (!extracted.equals(TraceContextOrSamplingFlags.EMPTY)) {
- // //adapter.clearPropagation(message);
- // //}
- // return extractor.extract(message);
- //}
-}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessagingTracing.java b/instrumentation/messaging/src/main/java/brave/messaging/MessagingTracing.java
index a2d6e66..109ddaa 100644
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessagingTracing.java
+++ b/instrumentation/messaging/src/main/java/brave/messaging/MessagingTracing.java
@@ -16,13 +16,9 @@
*/
package brave.messaging;
-import brave.Span;
import brave.Tracing;
-import brave.propagation.TraceContext;
-import brave.propagation.TraceContextOrSamplingFlags;
public class MessagingTracing {
-
public static MessagingTracing create(Tracing tracing) {
return newBuilder(tracing).build();
}
@@ -32,66 +28,37 @@ public class MessagingTracing {
}
final Tracing tracing;
- final MessagingConsumerParser consumerParser;
- final MessagingProducerParser producerParser;
+ final MessagingParser parser;
MessagingTracing(Builder builder) {
this.tracing = builder.tracing;
- this.consumerParser = builder.consumerParser;
- this.producerParser = builder.producerParser;
+ this.parser = builder.parser;
}
public Tracing tracing() {
return tracing;
}
- public MessagingProducerParser producerParser() {
- return producerParser;
- }
-
- public MessagingConsumerParser consumerParser() {
- return consumerParser;
- }
-
- public <Chan, Msg> Span nextSpan(ChannelAdapter<Chan> channelAdapter,
- MessageAdapter<Msg> messageAdapter,
- TraceContext.Extractor<Msg> extractor,
- Msg message,
- Chan channel) {
- TraceContextOrSamplingFlags extracted = extractor.extract(message);
- Span result = tracing.tracer().nextSpan(extracted);
-
- // When an upstream context was not present, lookup keys are unlikely added
- if (extracted.context() == null && !result.isNoop()) {
- consumerParser.channel(channelAdapter, channel, result);
- consumerParser.identifier(messageAdapter, message, result);
- }
- return result;
+ public MessagingParser parser() {
+ return parser;
}
public static class Builder {
final Tracing tracing;
- MessagingConsumerParser consumerParser = new MessagingConsumerParser();
- MessagingProducerParser producerParser = new MessagingProducerParser();
+ MessagingParser parser = new MessagingParser();
Builder(Tracing tracing) {
if (tracing == null) throw new NullPointerException("tracing == null");
this.tracing = tracing;
}
- public Builder consumerParser(MessagingConsumerParser consumerParser) {
- if (producerParser == null) throw new NullPointerException("consumerParser == null");
- this.consumerParser = consumerParser;
- return this;
- }
-
- public Builder producerParser(MessagingProducerParser producerParser) {
- if (producerParser == null) throw new NullPointerException("producerParser == null");
- this.producerParser = producerParser;
+ public Builder parser(MessagingParser parser) {
+ if (parser == null) throw new NullPointerException("parser == null");
+ this.parser = parser;
return this;
}
- MessagingTracing build() {
+ public MessagingTracing build() {
return new MessagingTracing(this);
}
}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/ProcessorHandler.java b/instrumentation/messaging/src/main/java/brave/messaging/ProcessorHandler.java
new file mode 100644
index 0000000..fa5da4c
--- /dev/null
+++ b/instrumentation/messaging/src/main/java/brave/messaging/ProcessorHandler.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brave.messaging;
+
+import brave.Span;
+import brave.Tracer;
+import brave.internal.Nullable;
+
+/**
+ * @param <Chan> the type of the channel
+ * @param <Msg> the type of the message
+ * @param <C> the type that carriers the trace context, usually headers
+ */
+public final class ProcessorHandler<Chan, Msg, C> {
+ public static <Chan, Msg, C> ProcessorHandler<Chan, Msg, C> create(
+ MessagingTracing messagingTracing,
+ ConsumerHandler<Chan, Msg, C> consumerHandler
+ ) {
+ return new ProcessorHandler<>(messagingTracing, consumerHandler);
+ }
+
+ final Tracer tracer;
+ final ConsumerHandler<Chan, Msg, C> consumerHandler;
+ final MessagingAdapter<Chan, Msg, C> adapter;
+
+ ProcessorHandler(MessagingTracing messagingTracing,
+ ConsumerHandler<Chan, Msg, C> consumerHandler) {
+ this.tracer = messagingTracing.tracing.tracer();
+ this.consumerHandler = consumerHandler;
+ this.adapter = consumerHandler.adapter;
+ }
+
+ /**
+ * When {@code addConsumerSpan} is true, this creates 2 spans:
+ * <ol>
+ * <li>A duration 1 {@link Span.Kind#CONSUMER} span to represent receipt from the
+ * destination</li>
+ * <li>A child span with the duration of the delegated listener</li>
+ * </ol>
+ *
+ * <p>{@code addConsumerSpan} should only be set when the message consumer is not traced.
+ */
+ // channel is nullable as JMS could have an exception getting it from the message
+ public Span startProcessor(@Nullable Chan channel, Msg message, boolean addConsumerSpan) {
+ if (!addConsumerSpan) {
+ Span result = tracer.nextSpan().start();
+ if (!result.isNoop()) {
+ consumerHandler.parser.addMessageTags(adapter, channel, message,
+ result.context(), result.customizer());
+ }
+ return result;
+ }
+ C carrier = adapter.carrier(message);
+ return consumerHandler.handleReceive(channel, message, carrier,
+ adapter.brokerName(channel), consumerHandler.extractor.extract(carrier), true);
+ }
+}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/ProducerHandler.java b/instrumentation/messaging/src/main/java/brave/messaging/ProducerHandler.java
new file mode 100644
index 0000000..81d9c72
--- /dev/null
+++ b/instrumentation/messaging/src/main/java/brave/messaging/ProducerHandler.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brave.messaging;
+
+import brave.Span;
+import brave.Tracing;
+import brave.propagation.TraceContext;
+
+/**
+ * @param <Chan> the type of the channel
+ * @param <Msg> the type of the message
+ * @param <C> the type that carriers the trace context, usually headers
+ */
+public final class ProducerHandler<Chan, Msg, C> {
+
+ public static <Chan, Msg, C> ProducerHandler<Chan, Msg, C> create(
+ MessagingTracing messagingTracing,
+ MessagingAdapter<Chan, Msg, C> adapter,
+ TraceContext.Extractor<C> extractor,
+ TraceContext.Injector<C> injector
+ ) {
+ return new ProducerHandler<>(
+ messagingTracing.tracing, messagingTracing.parser(), adapter, extractor, injector);
+ }
+
+ final Tracing tracing;
+ final TraceContext.Extractor<C> extractor;
+ final TraceContext.Injector<C> injector;
+ final MessagingParser parser;
+ final MessagingAdapter<Chan, Msg, C> adapter;
+
+ ProducerHandler(Tracing tracing, MessagingParser parser,
+ MessagingAdapter<Chan, Msg, C> adapter, TraceContext.Extractor<C> extractor,
+ TraceContext.Injector<C> injector) {
+ this.tracing = tracing;
+ this.extractor = extractor;
+ this.injector = injector;
+ this.parser = parser;
+ this.adapter = adapter;
+ }
+
+ /**
+ * Attempts to resume a trace from the current span, falling back to extracting context from the
+ * carrier. Tags are added before the span is started.
+ *
+ * <p>This is typically called before the send is processed by the actual library.
+ */
+ public Span startSend(Chan channel, Msg message) {
+ C carrier = adapter.carrier(message);
+ TraceContext maybeParent = tracing.currentTraceContext().get();
+ // Unlike message consumers, we try current span before trying extraction. This is the proper
+ // order because the span in scope should take precedence over a potentially stale header entry.
+ //
+ // NOTE: Brave instrumentation used properly does not result in stale header entries, as we use
+ // propagation formats that can always be overwritten.
+ Span span;
+ if (maybeParent == null) {
+ span = tracing.tracer().nextSpan(extractor.extract(carrier));
+ } else {
+ span = tracing.tracer().newChild(maybeParent);
+ }
+
+ if (!span.isNoop()) {
+ span.kind(Span.Kind.PRODUCER);
+ parser.start("send", adapter, channel, message, span.context(), span.customizer());
+ String remoteServiceName = adapter.brokerName(channel);
+ if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
+ span.start();
+ }
+
+ injector.inject(span.context(), carrier);
+
+ return span;
+ }
+
+ public void finishSend(Chan channel, Msg message, Span span) {
+ parser.finish("send", adapter, channel, message, span.context(), span.customizer());
+ span.finish();
+ }
+}