You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/01/30 10:02:26 UTC
[camel] branch master updated: Fixed CS for Camel-Zipkin
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 76c3ea2 Fixed CS for Camel-Zipkin
76c3ea2 is described below
commit 76c3ea2cefe48ab4431361b255252deec86b9744
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Wed Jan 30 11:02:04 2019 +0100
Fixed CS for Camel-Zipkin
---
.../java/org/apache/camel/zipkin/ZipkinTracer.java | 191 ++++++++++++---------
1 file changed, 111 insertions(+), 80 deletions(-)
diff --git a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java
index 22b4932..eca68dd 100644
--- a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java
+++ b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java
@@ -34,6 +34,7 @@ import brave.propagation.TraceContext.Extractor;
import brave.propagation.TraceContext.Injector;
import brave.propagation.TraceContextOrSamplingFlags;
import brave.sampler.Sampler;
+
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Endpoint;
@@ -52,18 +53,19 @@ import org.apache.camel.spi.CamelEvent.ExchangeSendingEvent;
import org.apache.camel.spi.CamelEvent.ExchangeSentEvent;
import org.apache.camel.spi.RoutePolicy;
import org.apache.camel.spi.RoutePolicyFactory;
+import org.apache.camel.support.EndpointHelper;
import org.apache.camel.support.EventNotifierSupport;
import org.apache.camel.support.PatternHelper;
import org.apache.camel.support.RoutePolicySupport;
+import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
-import org.apache.camel.support.EndpointHelper;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
+
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.Reporter;
import zipkin2.reporter.libthrift.LibthriftSender;
@@ -72,44 +74,55 @@ import zipkin2.reporter.urlconnection.URLConnectionSender;
import static org.apache.camel.builder.ExpressionBuilder.routeIdExpression;
/**
- * To use Zipkin with Camel then setup this {@link ZipkinTracer} in your Camel application.
+ * To use Zipkin with Camel then setup this {@link ZipkinTracer} in your Camel
+ * application.
* <p/>
- * Events (span) are captured for incoming and outgoing messages being sent to/from Camel.
- * This means you need to configure which which Camel endpoints that maps to zipkin service names.
- * The mapping can be configured using
+ * Events (span) are captured for incoming and outgoing messages being sent
+ * to/from Camel. This means you need to configure which which Camel endpoints
+ * that maps to zipkin service names. The mapping can be configured using
* <ul>
* <li>route id - A Camel route id</li>
* <li>endpoint url - A Camel endpoint url</li>
* </ul>
- * For both kinds you can use wildcards and regular expressions to match, which is using the rules from
- * {@link PatternHelper#matchPattern(String, String)} and {@link EndpointHelper#matchEndpoint(CamelContext, String, String)}
+ * For both kinds you can use wildcards and regular expressions to match, which
+ * is using the rules from {@link PatternHelper#matchPattern(String, String)}
+ * and {@link EndpointHelper#matchEndpoint(CamelContext, String, String)}
* <p/>
- * To match all Camel messages you can use <tt>*</tt> in the pattern and configure that to the same service name.
- * <br/>
- * If no mapping has been configured then Camel will fallback and use endpoint uri's as service names.
- * However its recommended to configure service mappings so you can use human logic names instead of Camel
- * endpoint uris in the names.
+ * To match all Camel messages you can use <tt>*</tt> in the pattern and
+ * configure that to the same service name. <br/>
+ * If no mapping has been configured then Camel will fallback and use endpoint
+ * uri's as service names. However its recommended to configure service mappings
+ * so you can use human logic names instead of Camel endpoint uris in the names.
* <p/>
- * Camel will auto-configure a {@link Reporter span reporter} one hasn't been explicitly configured,
- * and if the hostname and port to a zipkin collector has been configured as environment variables
+ * Camel will auto-configure a {@link Reporter span reporter} one hasn't been
+ * explicitly configured, and if the hostname and port to a zipkin collector has
+ * been configured as environment variables
* <ul>
- * <li>ZIPKIN_COLLECTOR_HTTP_SERVICE_HOST - The http hostname</li>
- * <li>ZIPKIN_COLLECTOR_HTTP_SERVICE_PORT - The port number</li>
+ * <li>ZIPKIN_COLLECTOR_HTTP_SERVICE_HOST - The http hostname</li>
+ * <li>ZIPKIN_COLLECTOR_HTTP_SERVICE_PORT - The port number</li>
* </ul>
* or
* <ul>
- * <li>ZIPKIN_COLLECTOR_THRIFT_SERVICE_HOST - The Scribe (Thrift RPC) hostname</li>
- * <li>ZIPKIN_COLLECTOR_THRIFT_SERVICE_PORT - The port number</li>
+ * <li>ZIPKIN_COLLECTOR_THRIFT_SERVICE_HOST - The Scribe (Thrift RPC)
+ * hostname</li>
+ * <li>ZIPKIN_COLLECTOR_THRIFT_SERVICE_PORT - The port number</li>
* </ul>
* <p/>
- * This class is implemented as both an {@link org.apache.camel.spi.EventNotifier} and {@link RoutePolicy} that allows
- * to trap when Camel starts/ends an {@link Exchange} being routed using the {@link RoutePolicy} and during the routing
- * if the {@link Exchange} sends messages, then we track them using the {@link org.apache.camel.spi.EventNotifier}.
+ * This class is implemented as both an
+ * {@link org.apache.camel.spi.EventNotifier} and {@link RoutePolicy} that
+ * allows to trap when Camel starts/ends an {@link Exchange} being routed using
+ * the {@link RoutePolicy} and during the routing if the {@link Exchange} sends
+ * messages, then we track them using the
+ * {@link org.apache.camel.spi.EventNotifier}.
*/
-// NOTE: this implementation currently only does explicit propagation, meaning that non-camel
-// components will not see the current trace context, and therefore will be unassociated. This can
-// be fixed by using CurrentTraceContext to scope a span where user code is invoked.
-// If this is desirable, an instance variable of CurrentTraceContext.Default.create() could do the
+// NOTE: this implementation currently only does explicit propagation, meaning
+// that non-camel
+// components will not see the current trace context, and therefore will be
+// unassociated. This can
+// be fixed by using CurrentTraceContext to scope a span where user code is
+// invoked.
+// If this is desirable, an instance variable of
+// CurrentTraceContext.Default.create() could do the
// trick.
@ManagedResource(description = "ZipkinTracer")
public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory, StaticService, CamelContextAware {
@@ -118,19 +131,20 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
private static final String ZIPKIN_COLLECTOR_HTTP_SERVICE = "zipkin-collector-http";
private static final String ZIPKIN_COLLECTOR_THRIFT_SERVICE = "zipkin-collector-thrift";
private static final Getter<Message, String> GETTER = new Getter<Message, String>() {
- @Override public String get(Message message, String key) {
+ @Override
+ public String get(Message message, String key) {
return message.getHeader(key, String.class);
}
};
private static final Setter<Message, String> SETTER = new Setter<Message, String>() {
- @Override public void put(Message message, String key, String value) {
+ @Override
+ public void put(Message message, String key, String value) {
message.setHeader(key, value);
}
};
private static final Extractor<Message> EXTRACTOR = B3Propagation.B3_STRING.extractor(GETTER);
private static final Injector<Message> INJECTOR = B3Propagation.B3_STRING.injector(SETTER);
-
private final ZipkinEventNotifier eventNotifier = new ZipkinEventNotifier();
private final Map<String, Tracing> braves = new HashMap<>();
private transient boolean useFallbackServiceNames;
@@ -158,12 +172,14 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
}
/**
- * Registers this {@link ZipkinTracer} on the {@link CamelContext} if not already registered.
+ * Registers this {@link ZipkinTracer} on the {@link CamelContext} if not
+ * already registered.
*/
public void init(CamelContext camelContext) {
if (!camelContext.hasService(this)) {
try {
- // start this service eager so we init before Camel is starting up
+ // start this service eager so we init before Camel is starting
+ // up
camelContext.addService(this, true, true);
} catch (Exception e) {
throw RuntimeCamelException.wrapRuntimeCamelException(e);
@@ -185,7 +201,8 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
}
/**
- * Sets the POST URL for zipkin's <a href="http://zipkin.io/zipkin-api/#/">v2 api</a>, usually
+ * Sets the POST URL for zipkin's
+ * <a href="http://zipkin.io/zipkin-api/#/">v2 api</a>, usually
* "http://zipkinhost:9411/api/v2/spans"
*/
public void setEndpoint(String endpoint) {
@@ -222,8 +239,9 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
}
/**
- * Configures a rate that decides how many events should be traced by zipkin.
- * The rate is expressed as a percentage (1.0f = 100%, 0.5f is 50%, 0.1f is 10%).
+ * Configures a rate that decides how many events should be traced by
+ * zipkin. The rate is expressed as a percentage (1.0f = 100%, 0.5f is 50%,
+ * 0.1f is 10%).
*
* @param rate minimum sample rate is 0.0001, or 0.01% of traces
*/
@@ -231,12 +249,17 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
this.rate = rate;
}
- /** Sets the reporter used to send timing data (spans) to the zipkin server. */
+ /**
+ * Sets the reporter used to send timing data (spans) to the zipkin server.
+ */
public void setSpanReporter(Reporter<zipkin2.Span> spanReporter) {
this.spanReporter = spanReporter;
}
- /** Returns the reporter used to send timing data (spans) to the zipkin server. */
+ /**
+ * Returns the reporter used to send timing data (spans) to the zipkin
+ * server.
+ */
public Reporter<zipkin2.Span> getSpanReporter() {
return spanReporter;
}
@@ -262,10 +285,10 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
}
/**
- * Adds a client service mapping that matches Camel events to the given zipkin service name.
- * See more details at the class javadoc.
+ * Adds a client service mapping that matches Camel events to the given
+ * zipkin service name. See more details at the class javadoc.
*
- * @param pattern the pattern such as route id, endpoint url
+ * @param pattern the pattern such as route id, endpoint url
* @param serviceName the zipkin service name
*/
public void addClientServiceMapping(String pattern, String serviceName) {
@@ -281,10 +304,10 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
}
/**
- * Adds a server service mapping that matches Camel events to the given zipkin service name.
- * See more details at the class javadoc.
+ * Adds a server service mapping that matches Camel events to the given
+ * zipkin service name. See more details at the class javadoc.
*
- * @param pattern the pattern such as route id, endpoint url
+ * @param pattern the pattern such as route id, endpoint url
* @param serviceName the zipkin service name
*/
public void addServerServiceMapping(String pattern, String serviceName) {
@@ -300,9 +323,10 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
}
/**
- * Adds an exclude pattern that will disable tracing with zipkin for Camel messages that matches the pattern.
+ * Adds an exclude pattern that will disable tracing with zipkin for Camel
+ * messages that matches the pattern.
*
- * @param pattern the pattern such as route id, endpoint url
+ * @param pattern the pattern such as route id, endpoint url
*/
public void addExcludePattern(String pattern) {
excludePatterns.add(pattern);
@@ -316,10 +340,13 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
/**
* Whether to include the Camel message body in the zipkin traces.
* <p/>
- * This is not recommended for production usage, or when having big payloads. You can limit the size by
- * configuring the <a href="http://camel.apache.org/how-do-i-set-the-max-chars-when-debug-logging-messages-in-camel.html">max debug log size</a>.
+ * This is not recommended for production usage, or when having big
+ * payloads. You can limit the size by configuring the <a href=
+ * "http://camel.apache.org/how-do-i-set-the-max-chars-when-debug-logging-messages-in-camel.html">max
+ * debug log size</a>.
* <p/>
- * By default message bodies that are stream based are <b>not</b> included. You can use the option {@link #setIncludeMessageBodyStreams(boolean)} to
+ * By default message bodies that are stream based are <b>not</b> included.
+ * You can use the option {@link #setIncludeMessageBodyStreams(boolean)} to
* turn that on.
*/
@ManagedAttribute(description = "Whether to include the Camel message body in the zipkin traces")
@@ -333,12 +360,17 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
}
/**
- * Whether to include message bodies that are stream based in the zipkin traces.
+ * Whether to include message bodies that are stream based in the zipkin
+ * traces.
* <p/>
- * This requires enabling <a href="http://camel.apache.org/stream-caching.html">stream caching</a> on the routes or globally on the CamelContext.
+ * This requires enabling
+ * <a href="http://camel.apache.org/stream-caching.html">stream caching</a>
+ * on the routes or globally on the CamelContext.
* <p/>
- * This is not recommended for production usage, or when having big payloads. You can limit the size by
- * configuring the <a href="http://camel.apache.org/how-do-i-set-the-max-chars-when-debug-logging-messages-in-camel.html">max debug log size</a>.
+ * This is not recommended for production usage, or when having big
+ * payloads. You can limit the size by configuring the <a href=
+ * "http://camel.apache.org/how-do-i-set-the-max-chars-when-debug-logging-messages-in-camel.html">max
+ * debug log size</a>.
*/
@ManagedAttribute(description = "Whether to include stream based Camel message bodies in the zipkin traces")
public void setIncludeMessageBodyStreams(boolean includeMessageBodyStreams) {
@@ -363,7 +395,8 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
LibthriftSender sender = LibthriftSender.newBuilder().host(hostName).port(port).build();
spanReporter = AsyncReporter.create(sender);
} else {
- // is there a zipkin service setup as ENV variable to auto register a span reporter
+ // is there a zipkin service setup as ENV variable to auto
+ // register a span reporter
String host = new ServiceHostPropertiesFunction().apply(ZIPKIN_COLLECTOR_HTTP_SERVICE);
String port = new ServicePortPropertiesFunction().apply(ZIPKIN_COLLECTOR_HTTP_SERVICE);
if (ObjectHelper.isNotEmpty(host) && ObjectHelper.isNotEmpty(port)) {
@@ -385,7 +418,8 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
}
if (spanReporter == null) {
- // Try to lookup the span reporter from the registry if only one instance is present
+ // Try to lookup the span reporter from the registry if only one
+ // instance is present
Set<Reporter> reporters = camelContext.getRegistry().findByType(Reporter.class);
if (reporters.size() == 1) {
spanReporter = reporters.iterator().next();
@@ -423,7 +457,7 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
// stop and close collector
ServiceHelper.stopAndShutdownService(spanReporter);
if (spanReporter instanceof Closeable) {
- IOHelper.close((Closeable) spanReporter);
+ IOHelper.close((Closeable)spanReporter);
}
// clear braves
braves.clear();
@@ -540,17 +574,10 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
private Tracing newTracing(String serviceName) {
Tracing brave = null;
if (camelContext.isUseMDCLogging()) {
- brave = Tracing.newBuilder()
- .currentTraceContext(ThreadLocalCurrentTraceContext.newBuilder()
- .addScopeDecorator(MDCScopeDecorator.create()).build())
- .localServiceName(serviceName)
- .sampler(Sampler.create(rate))
- .spanReporter(spanReporter).build();
+ brave = Tracing.newBuilder().currentTraceContext(ThreadLocalCurrentTraceContext.newBuilder().addScopeDecorator(MDCScopeDecorator.create()).build())
+ .localServiceName(serviceName).sampler(Sampler.create(rate)).spanReporter(spanReporter).build();
} else {
- brave = Tracing.newBuilder()
- .localServiceName(serviceName)
- .sampler(Sampler.create(rate))
- .spanReporter(spanReporter).build();
+ brave = Tracing.newBuilder().localServiceName(serviceName).sampler(Sampler.create(rate)).spanReporter(spanReporter).build();
}
return brave;
}
@@ -577,7 +604,8 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
state = new ZipkinState();
event.getExchange().setProperty(ZipkinState.KEY, state);
}
- // if we started from a server span then lets reuse that when we call a downstream service
+ // if we started from a server span then lets reuse that when we call a
+ // downstream service
Span last = state.peekServerSpan();
Span span;
if (last != null) {
@@ -653,7 +681,7 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
TraceContextOrSamplingFlags sampleFlag = EXTRACTOR.extract(exchange.getIn());
if (ObjectHelper.isEmpty(sampleFlag)) {
span = brave.tracer().nextSpan();
- INJECTOR.inject(span.context(), exchange.getIn());
+ INJECTOR.inject(span.context(), exchange.getIn());
} else {
span = brave.tracer().nextSpan(sampleFlag);
}
@@ -718,19 +746,20 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
@Override
public void notify(CamelEvent event) throws Exception {
- // use event notifier to track events when Camel messages to endpoints
+ // use event notifier to track events when Camel messages to
+ // endpoints
// these events corresponds to Zipkin client events
// client events
if (event instanceof ExchangeSendingEvent) {
- ExchangeSendingEvent ese = (ExchangeSendingEvent) event;
+ ExchangeSendingEvent ese = (ExchangeSendingEvent)event;
String serviceName = getServiceName(ese.getExchange(), ese.getEndpoint(), false, true);
Tracing brave = getTracing(serviceName);
if (brave != null) {
clientRequest(brave, serviceName, ese);
}
} else if (event instanceof ExchangeSentEvent) {
- ExchangeSentEvent ese = (ExchangeSentEvent) event;
+ ExchangeSentEvent ese = (ExchangeSentEvent)event;
String serviceName = getServiceName(ese.getExchange(), ese.getEndpoint(), false, true);
Tracing brave = getTracing(serviceName);
if (brave != null) {
@@ -742,14 +771,14 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
@Override
public boolean isEnabled(CamelEvent event) {
switch (event.getType()) {
- case ExchangeSending:
- case ExchangeSent:
- case ExchangeCreated:
- case ExchangeCompleted:
- case ExchangeFailed:
- return true;
- default:
- return false;
+ case ExchangeSending:
+ case ExchangeSent:
+ case ExchangeCreated:
+ case ExchangeCompleted:
+ case ExchangeFailed:
+ return true;
+ default:
+ return false;
}
}
@@ -763,7 +792,8 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
@Override
public void onExchangeBegin(Route route, Exchange exchange) {
- // use route policy to track events when Camel a Camel route begins/end the lifecycle of an Exchange
+ // use route policy to track events when Camel a Camel route
+ // begins/end the lifecycle of an Exchange
// these events corresponds to Zipkin server events
String serviceName = getServiceName(exchange, route.getEndpoint(), true, false);
@@ -771,10 +801,11 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
if (brave != null) {
serverRequest(brave, serviceName, exchange);
}
-
+
}
- // Report Server send after route has completed processing of the exchange.
+ // Report Server send after route has completed processing of the
+ // exchange.
@Override
public void onExchangeDone(Route route, Exchange exchange) {
String serviceName = getServiceName(exchange, route.getEndpoint(), true, false);