You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/01/30 10:02:26 UTC

[camel] branch master updated: Fixed CS for Camel-Zipkin

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 76c3ea2  Fixed CS for Camel-Zipkin
76c3ea2 is described below

commit 76c3ea2cefe48ab4431361b255252deec86b9744
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Wed Jan 30 11:02:04 2019 +0100

    Fixed CS for Camel-Zipkin
---
 .../java/org/apache/camel/zipkin/ZipkinTracer.java | 191 ++++++++++++---------
 1 file changed, 111 insertions(+), 80 deletions(-)

diff --git a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java
index 22b4932..eca68dd 100644
--- a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java
+++ b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java
@@ -34,6 +34,7 @@ import brave.propagation.TraceContext.Extractor;
 import brave.propagation.TraceContext.Injector;
 import brave.propagation.TraceContextOrSamplingFlags;
 import brave.sampler.Sampler;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Endpoint;
@@ -52,18 +53,19 @@ import org.apache.camel.spi.CamelEvent.ExchangeSendingEvent;
 import org.apache.camel.spi.CamelEvent.ExchangeSentEvent;
 import org.apache.camel.spi.RoutePolicy;
 import org.apache.camel.spi.RoutePolicyFactory;
+import org.apache.camel.support.EndpointHelper;
 import org.apache.camel.support.EventNotifierSupport;
 import org.apache.camel.support.PatternHelper;
 import org.apache.camel.support.RoutePolicySupport;
+import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.support.service.ServiceSupport;
-import org.apache.camel.support.EndpointHelper;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
+
 import zipkin2.reporter.AsyncReporter;
 import zipkin2.reporter.Reporter;
 import zipkin2.reporter.libthrift.LibthriftSender;
@@ -72,44 +74,55 @@ import zipkin2.reporter.urlconnection.URLConnectionSender;
 import static org.apache.camel.builder.ExpressionBuilder.routeIdExpression;
 
 /**
- * To use Zipkin with Camel then setup this {@link ZipkinTracer} in your Camel application.
+ * To use Zipkin with Camel then setup this {@link ZipkinTracer} in your Camel
+ * application.
  * <p/>
- * Events (span) are captured for incoming and outgoing messages being sent to/from Camel.
- * This means you need to configure which which Camel endpoints that maps to zipkin service names.
- * The mapping can be configured using
+ * Events (span) are captured for incoming and outgoing messages being sent
+ * to/from Camel. This means you need to configure which which Camel endpoints
+ * that maps to zipkin service names. The mapping can be configured using
  * <ul>
  * <li>route id - A Camel route id</li>
  * <li>endpoint url - A Camel endpoint url</li>
  * </ul>
- * For both kinds you can use wildcards and regular expressions to match, which is using the rules from
- * {@link PatternHelper#matchPattern(String, String)} and {@link EndpointHelper#matchEndpoint(CamelContext, String, String)}
+ * For both kinds you can use wildcards and regular expressions to match, which
+ * is using the rules from {@link PatternHelper#matchPattern(String, String)}
+ * and {@link EndpointHelper#matchEndpoint(CamelContext, String, String)}
  * <p/>
- * To match all Camel messages you can use <tt>*</tt> in the pattern and configure that to the same service name.
- * <br/>
- * If no mapping has been configured then Camel will fallback and use endpoint uri's as service names.
- * However its recommended to configure service mappings so you can use human logic names instead of Camel
- * endpoint uris in the names.
+ * To match all Camel messages you can use <tt>*</tt> in the pattern and
+ * configure that to the same service name. <br/>
+ * If no mapping has been configured then Camel will fallback and use endpoint
+ * uri's as service names. However its recommended to configure service mappings
+ * so you can use human logic names instead of Camel endpoint uris in the names.
  * <p/>
- * Camel will auto-configure a {@link Reporter span reporter} one hasn't been explicitly configured,
- * and if the hostname and port to a zipkin collector has been configured as environment variables
+ * Camel will auto-configure a {@link Reporter span reporter} one hasn't been
+ * explicitly configured, and if the hostname and port to a zipkin collector has
+ * been configured as environment variables
  * <ul>
- *     <li>ZIPKIN_COLLECTOR_HTTP_SERVICE_HOST - The http hostname</li>
- *     <li>ZIPKIN_COLLECTOR_HTTP_SERVICE_PORT - The port number</li>
+ * <li>ZIPKIN_COLLECTOR_HTTP_SERVICE_HOST - The http hostname</li>
+ * <li>ZIPKIN_COLLECTOR_HTTP_SERVICE_PORT - The port number</li>
  * </ul>
  * or
  * <ul>
- *     <li>ZIPKIN_COLLECTOR_THRIFT_SERVICE_HOST - The Scribe (Thrift RPC) hostname</li>
- *     <li>ZIPKIN_COLLECTOR_THRIFT_SERVICE_PORT - The port number</li>
+ * <li>ZIPKIN_COLLECTOR_THRIFT_SERVICE_HOST - The Scribe (Thrift RPC)
+ * hostname</li>
+ * <li>ZIPKIN_COLLECTOR_THRIFT_SERVICE_PORT - The port number</li>
  * </ul>
  * <p/>
- * This class is implemented as both an {@link org.apache.camel.spi.EventNotifier} and {@link RoutePolicy} that allows
- * to trap when Camel starts/ends an {@link Exchange} being routed using the {@link RoutePolicy} and during the routing
- * if the {@link Exchange} sends messages, then we track them using the {@link org.apache.camel.spi.EventNotifier}.
+ * This class is implemented as both an
+ * {@link org.apache.camel.spi.EventNotifier} and {@link RoutePolicy} that
+ * allows to trap when Camel starts/ends an {@link Exchange} being routed using
+ * the {@link RoutePolicy} and during the routing if the {@link Exchange} sends
+ * messages, then we track them using the
+ * {@link org.apache.camel.spi.EventNotifier}.
  */
-// NOTE: this implementation currently only does explicit propagation, meaning that non-camel
-// components will not see the current trace context, and therefore will be unassociated. This can
-// be fixed by using CurrentTraceContext to scope a span where user code is invoked.
-// If this is desirable, an instance variable of CurrentTraceContext.Default.create() could do the
+// NOTE: this implementation currently only does explicit propagation, meaning
+// that non-camel
+// components will not see the current trace context, and therefore will be
+// unassociated. This can
+// be fixed by using CurrentTraceContext to scope a span where user code is
+// invoked.
+// If this is desirable, an instance variable of
+// CurrentTraceContext.Default.create() could do the
 // trick.
 @ManagedResource(description = "ZipkinTracer")
 public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory, StaticService, CamelContextAware {
@@ -118,19 +131,20 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
     private static final String ZIPKIN_COLLECTOR_HTTP_SERVICE = "zipkin-collector-http";
     private static final String ZIPKIN_COLLECTOR_THRIFT_SERVICE = "zipkin-collector-thrift";
     private static final Getter<Message, String> GETTER = new Getter<Message, String>() {
-        @Override public String get(Message message, String key) {
+        @Override
+        public String get(Message message, String key) {
             return message.getHeader(key, String.class);
         }
     };
     private static final Setter<Message, String> SETTER = new Setter<Message, String>() {
-        @Override public void put(Message message, String key, String value) {
+        @Override
+        public void put(Message message, String key, String value) {
             message.setHeader(key, value);
         }
     };
     private static final Extractor<Message> EXTRACTOR = B3Propagation.B3_STRING.extractor(GETTER);
     private static final Injector<Message> INJECTOR = B3Propagation.B3_STRING.injector(SETTER);
 
-
     private final ZipkinEventNotifier eventNotifier = new ZipkinEventNotifier();
     private final Map<String, Tracing> braves = new HashMap<>();
     private transient boolean useFallbackServiceNames;
@@ -158,12 +172,14 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
     }
 
     /**
-     * Registers this {@link ZipkinTracer} on the {@link CamelContext} if not already registered.
+     * Registers this {@link ZipkinTracer} on the {@link CamelContext} if not
+     * already registered.
      */
     public void init(CamelContext camelContext) {
         if (!camelContext.hasService(this)) {
             try {
-                // start this service eager so we init before Camel is starting up
+                // start this service eager so we init before Camel is starting
+                // up
                 camelContext.addService(this, true, true);
             } catch (Exception e) {
                 throw RuntimeCamelException.wrapRuntimeCamelException(e);
@@ -185,7 +201,8 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
     }
 
     /**
-     * Sets the POST URL for zipkin's <a href="http://zipkin.io/zipkin-api/#/">v2 api</a>, usually
+     * Sets the POST URL for zipkin's
+     * <a href="http://zipkin.io/zipkin-api/#/">v2 api</a>, usually
      * "http://zipkinhost:9411/api/v2/spans"
      */
     public void setEndpoint(String endpoint) {
@@ -222,8 +239,9 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
     }
 
     /**
-     * Configures a rate that decides how many events should be traced by zipkin.
-     * The rate is expressed as a percentage (1.0f = 100%, 0.5f is 50%, 0.1f is 10%).
+     * Configures a rate that decides how many events should be traced by
+     * zipkin. The rate is expressed as a percentage (1.0f = 100%, 0.5f is 50%,
+     * 0.1f is 10%).
      *
      * @param rate minimum sample rate is 0.0001, or 0.01% of traces
      */
@@ -231,12 +249,17 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
         this.rate = rate;
     }
 
-    /** Sets the reporter used to send timing data (spans) to the zipkin server. */
+    /**
+     * Sets the reporter used to send timing data (spans) to the zipkin server.
+     */
     public void setSpanReporter(Reporter<zipkin2.Span> spanReporter) {
         this.spanReporter = spanReporter;
     }
 
-    /** Returns the reporter used to send timing data (spans) to the zipkin server. */
+    /**
+     * Returns the reporter used to send timing data (spans) to the zipkin
+     * server.
+     */
     public Reporter<zipkin2.Span> getSpanReporter() {
         return spanReporter;
     }
@@ -262,10 +285,10 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
     }
 
     /**
-     * Adds a client service mapping that matches Camel events to the given zipkin service name.
-     * See more details at the class javadoc.
+     * Adds a client service mapping that matches Camel events to the given
+     * zipkin service name. See more details at the class javadoc.
      *
-     * @param pattern  the pattern such as route id, endpoint url
+     * @param pattern the pattern such as route id, endpoint url
      * @param serviceName the zipkin service name
      */
     public void addClientServiceMapping(String pattern, String serviceName) {
@@ -281,10 +304,10 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
     }
 
     /**
-     * Adds a server service mapping that matches Camel events to the given zipkin service name.
-     * See more details at the class javadoc.
+     * Adds a server service mapping that matches Camel events to the given
+     * zipkin service name. See more details at the class javadoc.
      *
-     * @param pattern  the pattern such as route id, endpoint url
+     * @param pattern the pattern such as route id, endpoint url
      * @param serviceName the zipkin service name
      */
     public void addServerServiceMapping(String pattern, String serviceName) {
@@ -300,9 +323,10 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
     }
 
     /**
-     * Adds an exclude pattern that will disable tracing with zipkin for Camel messages that matches the pattern.
+     * Adds an exclude pattern that will disable tracing with zipkin for Camel
+     * messages that matches the pattern.
      *
-     * @param pattern  the pattern such as route id, endpoint url
+     * @param pattern the pattern such as route id, endpoint url
      */
     public void addExcludePattern(String pattern) {
         excludePatterns.add(pattern);
@@ -316,10 +340,13 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
     /**
      * Whether to include the Camel message body in the zipkin traces.
      * <p/>
-     * This is not recommended for production usage, or when having big payloads. You can limit the size by
-     * configuring the <a href="http://camel.apache.org/how-do-i-set-the-max-chars-when-debug-logging-messages-in-camel.html">max debug log size</a>.
+     * This is not recommended for production usage, or when having big
+     * payloads. You can limit the size by configuring the <a href=
+     * "http://camel.apache.org/how-do-i-set-the-max-chars-when-debug-logging-messages-in-camel.html">max
+     * debug log size</a>.
      * <p/>
-     * By default message bodies that are stream based are <b>not</b> included. You can use the option {@link #setIncludeMessageBodyStreams(boolean)} to
+     * By default message bodies that are stream based are <b>not</b> included.
+     * You can use the option {@link #setIncludeMessageBodyStreams(boolean)} to
      * turn that on.
      */
     @ManagedAttribute(description = "Whether to include the Camel message body in the zipkin traces")
@@ -333,12 +360,17 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
     }
 
     /**
-     * Whether to include message bodies that are stream based in the zipkin traces.
+     * Whether to include message bodies that are stream based in the zipkin
+     * traces.
      * <p/>
-     * This requires enabling <a href="http://camel.apache.org/stream-caching.html">stream caching</a> on the routes or globally on the CamelContext.
+     * This requires enabling
+     * <a href="http://camel.apache.org/stream-caching.html">stream caching</a>
+     * on the routes or globally on the CamelContext.
      * <p/>
-     * This is not recommended for production usage, or when having big payloads. You can limit the size by
-     * configuring the <a href="http://camel.apache.org/how-do-i-set-the-max-chars-when-debug-logging-messages-in-camel.html">max debug log size</a>.
+     * This is not recommended for production usage, or when having big
+     * payloads. You can limit the size by configuring the <a href=
+     * "http://camel.apache.org/how-do-i-set-the-max-chars-when-debug-logging-messages-in-camel.html">max
+     * debug log size</a>.
      */
     @ManagedAttribute(description = "Whether to include stream based Camel message bodies in the zipkin traces")
     public void setIncludeMessageBodyStreams(boolean includeMessageBodyStreams) {
@@ -363,7 +395,8 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
                 LibthriftSender sender = LibthriftSender.newBuilder().host(hostName).port(port).build();
                 spanReporter = AsyncReporter.create(sender);
             } else {
-                // is there a zipkin service setup as ENV variable to auto register a span reporter
+                // is there a zipkin service setup as ENV variable to auto
+                // register a span reporter
                 String host = new ServiceHostPropertiesFunction().apply(ZIPKIN_COLLECTOR_HTTP_SERVICE);
                 String port = new ServicePortPropertiesFunction().apply(ZIPKIN_COLLECTOR_HTTP_SERVICE);
                 if (ObjectHelper.isNotEmpty(host) && ObjectHelper.isNotEmpty(port)) {
@@ -385,7 +418,8 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
         }
 
         if (spanReporter == null) {
-            // Try to lookup the span reporter from the registry if only one instance is present
+            // Try to lookup the span reporter from the registry if only one
+            // instance is present
             Set<Reporter> reporters = camelContext.getRegistry().findByType(Reporter.class);
             if (reporters.size() == 1) {
                 spanReporter = reporters.iterator().next();
@@ -423,7 +457,7 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
         // stop and close collector
         ServiceHelper.stopAndShutdownService(spanReporter);
         if (spanReporter instanceof Closeable) {
-            IOHelper.close((Closeable) spanReporter);
+            IOHelper.close((Closeable)spanReporter);
         }
         // clear braves
         braves.clear();
@@ -540,17 +574,10 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
     private Tracing newTracing(String serviceName) {
         Tracing brave = null;
         if (camelContext.isUseMDCLogging()) {
-            brave = Tracing.newBuilder()
-                     .currentTraceContext(ThreadLocalCurrentTraceContext.newBuilder()
-                     .addScopeDecorator(MDCScopeDecorator.create()).build())        
-                     .localServiceName(serviceName)
-                     .sampler(Sampler.create(rate))
-                     .spanReporter(spanReporter).build();
+            brave = Tracing.newBuilder().currentTraceContext(ThreadLocalCurrentTraceContext.newBuilder().addScopeDecorator(MDCScopeDecorator.create()).build())
+                .localServiceName(serviceName).sampler(Sampler.create(rate)).spanReporter(spanReporter).build();
         } else {
-            brave = Tracing.newBuilder()
-                    .localServiceName(serviceName)
-                    .sampler(Sampler.create(rate))
-                    .spanReporter(spanReporter).build();
+            brave = Tracing.newBuilder().localServiceName(serviceName).sampler(Sampler.create(rate)).spanReporter(spanReporter).build();
         }
         return brave;
     }
@@ -577,7 +604,8 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
             state = new ZipkinState();
             event.getExchange().setProperty(ZipkinState.KEY, state);
         }
-        // if we started from a server span then lets reuse that when we call a downstream service
+        // if we started from a server span then lets reuse that when we call a
+        // downstream service
         Span last = state.peekServerSpan();
         Span span;
         if (last != null) {
@@ -653,7 +681,7 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
         TraceContextOrSamplingFlags sampleFlag = EXTRACTOR.extract(exchange.getIn());
         if (ObjectHelper.isEmpty(sampleFlag)) {
             span = brave.tracer().nextSpan();
-            INJECTOR.inject(span.context(), exchange.getIn()); 
+            INJECTOR.inject(span.context(), exchange.getIn());
         } else {
             span = brave.tracer().nextSpan(sampleFlag);
         }
@@ -718,19 +746,20 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
 
         @Override
         public void notify(CamelEvent event) throws Exception {
-            // use event notifier to track events when Camel messages to endpoints
+            // use event notifier to track events when Camel messages to
+            // endpoints
             // these events corresponds to Zipkin client events
 
             // client events
             if (event instanceof ExchangeSendingEvent) {
-                ExchangeSendingEvent ese = (ExchangeSendingEvent) event;
+                ExchangeSendingEvent ese = (ExchangeSendingEvent)event;
                 String serviceName = getServiceName(ese.getExchange(), ese.getEndpoint(), false, true);
                 Tracing brave = getTracing(serviceName);
                 if (brave != null) {
                     clientRequest(brave, serviceName, ese);
                 }
             } else if (event instanceof ExchangeSentEvent) {
-                ExchangeSentEvent ese = (ExchangeSentEvent) event;
+                ExchangeSentEvent ese = (ExchangeSentEvent)event;
                 String serviceName = getServiceName(ese.getExchange(), ese.getEndpoint(), false, true);
                 Tracing brave = getTracing(serviceName);
                 if (brave != null) {
@@ -742,14 +771,14 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
         @Override
         public boolean isEnabled(CamelEvent event) {
             switch (event.getType()) {
-                case ExchangeSending:
-                case ExchangeSent:
-                case ExchangeCreated:
-                case ExchangeCompleted:
-                case ExchangeFailed:
-                    return true;
-                default:
-                    return false;
+            case ExchangeSending:
+            case ExchangeSent:
+            case ExchangeCreated:
+            case ExchangeCompleted:
+            case ExchangeFailed:
+                return true;
+            default:
+                return false;
             }
         }
 
@@ -763,7 +792,8 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
 
         @Override
         public void onExchangeBegin(Route route, Exchange exchange) {
-            // use route policy to track events when Camel a Camel route begins/end the lifecycle of an Exchange
+            // use route policy to track events when Camel a Camel route
+            // begins/end the lifecycle of an Exchange
             // these events corresponds to Zipkin server events
 
             String serviceName = getServiceName(exchange, route.getEndpoint(), true, false);
@@ -771,10 +801,11 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory,
             if (brave != null) {
                 serverRequest(brave, serviceName, exchange);
             }
-          
+
         }
 
-        // Report Server send after route has completed processing of the exchange.
+        // Report Server send after route has completed processing of the
+        // exchange.
         @Override
         public void onExchangeDone(Route route, Exchange exchange) {
             String serviceName = getServiceName(exchange, route.getEndpoint(), true, false);