You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/05 13:21:02 UTC

[2/3] camel git commit: CAMEL-9795: camel-zipkin - Reuse existing span for complex eips like multicast.

CAMEL-9795: camel-zipkin - Reuse existing span for complex eips like multicast.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6c81334c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6c81334c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6c81334c

Branch: refs/heads/master
Commit: 6c81334c1f4274c4c3f1392296f985bc23e9e084
Parents: 22b7b17
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Apr 5 11:42:03 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Apr 5 13:20:49 2016 +0200

----------------------------------------------------------------------
 .../camel/processor/CamelInternalProcessor.java |  2 +-
 .../zipkin/ZipkinClientRequestAdapter.java      |  7 ++
 .../zipkin/ZipkinClientResponseAdaptor.java     |  7 ++
 .../org/apache/camel/zipkin/ZipkinHelper.java   | 28 ++++++++
 .../zipkin/ZipkinServerRequestAdapter.java      |  6 ++
 .../zipkin/ZipkinServerResponseAdapter.java     |  7 ++
 .../org/apache/camel/zipkin/ZipkinTracer.java   | 27 ++++++--
 .../zipkin/ZipkinSimpleLogStreamsRouteTest.java | 69 ++++++++++++++++++++
 .../src/main/java/sample/camel/ClientRoute.java |  5 +-
 .../main/java/sample/camel/Service1Route.java   | 10 +--
 .../main/java/sample/camel/Service2Route.java   |  7 +-
 11 files changed, 159 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/6c81334c/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 7ce5a4d..d9dc7a1 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -774,7 +774,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
 
         @Override
         public void after(Exchange exchange, StreamCache sc) throws Exception {
-            Object body = null;
+            Object body;
             if (exchange.hasOut()) {
                 body = exchange.getOut().getBody();
             } else {

http://git-wip-us.apache.org/repos/asf/camel/blob/6c81334c/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinClientRequestAdapter.java
----------------------------------------------------------------------
diff --git a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinClientRequestAdapter.java b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinClientRequestAdapter.java
index a8f2207..ecffa7e 100644
--- a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinClientRequestAdapter.java
+++ b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinClientRequestAdapter.java
@@ -28,9 +28,12 @@ import com.github.kristofa.brave.SpanId;
 import com.github.kristofa.brave.internal.Nullable;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.StreamCache;
 import org.apache.camel.util.MessageHelper;
 import org.apache.camel.util.URISupport;
 
+import static org.apache.camel.zipkin.ZipkinHelper.prepareBodyForLogging;
+
 public final class ZipkinClientRequestAdapter implements ClientRequestAdapter {
 
     private final ZipkinTracer eventNotifier;
@@ -82,8 +85,12 @@ public final class ZipkinClientRequestAdapter implements ClientRequestAdapter {
         KeyValueAnnotation key4 = null;
         if (eventNotifier.isIncludeMessageBody() || eventNotifier.isIncludeMessageBodyStreams()) {
             boolean streams = eventNotifier.isIncludeMessageBodyStreams();
+            StreamCache cache = prepareBodyForLogging(exchange, streams);
             String body = MessageHelper.extractBodyForLogging(exchange.hasOut() ? exchange.getOut() : exchange.getIn(), "", streams, streams);
             key4 = KeyValueAnnotation.create("camel.client.exchange.message.request.body", body);
+            if (cache != null) {
+                cache.reset();
+            }
         }
 
         List<KeyValueAnnotation> list = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/camel/blob/6c81334c/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinClientResponseAdaptor.java
----------------------------------------------------------------------
diff --git a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinClientResponseAdaptor.java b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinClientResponseAdaptor.java
index ec3711d..803dfcc 100644
--- a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinClientResponseAdaptor.java
+++ b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinClientResponseAdaptor.java
@@ -24,9 +24,12 @@ import com.github.kristofa.brave.ClientResponseAdapter;
 import com.github.kristofa.brave.KeyValueAnnotation;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.StreamCache;
 import org.apache.camel.util.MessageHelper;
 import org.apache.camel.util.URISupport;
 
+import static org.apache.camel.zipkin.ZipkinHelper.prepareBodyForLogging;
+
 public class ZipkinClientResponseAdaptor implements ClientResponseAdapter {
 
     private final ZipkinTracer eventNotifier;
@@ -50,8 +53,12 @@ public class ZipkinClientResponseAdaptor implements ClientResponseAdapter {
         KeyValueAnnotation key4 = null;
         if (eventNotifier.isIncludeMessageBody() || eventNotifier.isIncludeMessageBodyStreams()) {
             boolean streams = eventNotifier.isIncludeMessageBodyStreams();
+            StreamCache cache = prepareBodyForLogging(exchange, streams);
             String body = MessageHelper.extractBodyForLogging(exchange.hasOut() ? exchange.getOut() : exchange.getIn(), "", streams, streams);
             key4 = KeyValueAnnotation.create("camel.client.exchange.message.response.body", body);
+            if (cache != null) {
+                cache.reset();
+            }
         }
 
         KeyValueAnnotation key5 = null;

http://git-wip-us.apache.org/repos/asf/camel/blob/6c81334c/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinHelper.java b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinHelper.java
index 96d86b7..da5e568 100644
--- a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinHelper.java
+++ b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinHelper.java
@@ -18,6 +18,9 @@ package org.apache.camel.zipkin;
 
 import com.github.kristofa.brave.IdConversion;
 import com.github.kristofa.brave.SpanId;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.StreamCache;
 
 /**
  * Helper class.
@@ -34,4 +37,29 @@ public final class ZipkinHelper {
         return SpanId.create(IdConversion.convertToLong(traceId), IdConversion.convertToLong(spanId), null);
     }
 
+    public static StreamCache prepareBodyForLogging(Exchange exchange, boolean streams) {
+        if (!streams) {
+            // no need to prepare if streams is not enabled
+            return null;
+        }
+
+        Message message = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
+        // check if body is already cached
+        Object body = message.getBody();
+        if (body == null) {
+            return null;
+        } else if (body instanceof StreamCache) {
+            StreamCache sc = (StreamCache) body;
+            // reset so the cache is ready to be used before processing
+            sc.reset();
+            return sc;
+        }
+        // cache the body and if we could do that replace it as the new body
+        StreamCache sc = exchange.getContext().getStreamCachingStrategy().cache(exchange);
+        if (sc != null) {
+            message.setBody(sc);
+        }
+        return sc;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/6c81334c/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinServerRequestAdapter.java
----------------------------------------------------------------------
diff --git a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinServerRequestAdapter.java b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinServerRequestAdapter.java
index 3877b5f..1a38f6b 100644
--- a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinServerRequestAdapter.java
+++ b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinServerRequestAdapter.java
@@ -27,10 +27,12 @@ import com.github.kristofa.brave.SpanId;
 import com.github.kristofa.brave.TraceData;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.StreamCache;
 import org.apache.camel.util.MessageHelper;
 import org.apache.camel.util.URISupport;
 
 import static org.apache.camel.zipkin.ZipkinHelper.createSpanId;
+import static org.apache.camel.zipkin.ZipkinHelper.prepareBodyForLogging;
 
 public class ZipkinServerRequestAdapter implements ServerRequestAdapter {
 
@@ -79,8 +81,12 @@ public class ZipkinServerRequestAdapter implements ServerRequestAdapter {
         KeyValueAnnotation key4 = null;
         if (eventNotifier.isIncludeMessageBody() || eventNotifier.isIncludeMessageBodyStreams()) {
             boolean streams = eventNotifier.isIncludeMessageBodyStreams();
+            StreamCache cache = prepareBodyForLogging(exchange, streams);
             String body = MessageHelper.extractBodyForLogging(exchange.hasOut() ? exchange.getOut() : exchange.getIn(), "", streams, streams);
             key4 = KeyValueAnnotation.create("camel.server.exchange.message.request.body", body);
+            if (cache != null) {
+                cache.reset();
+            }
         }
 
         List<KeyValueAnnotation> list = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/camel/blob/6c81334c/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinServerResponseAdapter.java
----------------------------------------------------------------------
diff --git a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinServerResponseAdapter.java b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinServerResponseAdapter.java
index 31071df..3ab4fe9 100644
--- a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinServerResponseAdapter.java
+++ b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinServerResponseAdapter.java
@@ -24,9 +24,12 @@ import com.github.kristofa.brave.KeyValueAnnotation;
 import com.github.kristofa.brave.ServerResponseAdapter;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.StreamCache;
 import org.apache.camel.util.MessageHelper;
 import org.apache.camel.util.URISupport;
 
+import static org.apache.camel.zipkin.ZipkinHelper.prepareBodyForLogging;
+
 public class ZipkinServerResponseAdapter implements ServerResponseAdapter {
 
     private final ZipkinTracer eventNotifier;
@@ -56,8 +59,12 @@ public class ZipkinServerResponseAdapter implements ServerResponseAdapter {
             key4 = KeyValueAnnotation.create("camel.server.exchange.failure", message);
         } else if (eventNotifier.isIncludeMessageBody() || eventNotifier.isIncludeMessageBodyStreams()) {
             boolean streams = eventNotifier.isIncludeMessageBodyStreams();
+            StreamCache cache = prepareBodyForLogging(exchange, streams);
             String body = MessageHelper.extractBodyForLogging(exchange.hasOut() ? exchange.getOut() : exchange.getIn(), "", streams, streams);
             key4 = KeyValueAnnotation.create("camel.server.exchange.message.response.body", body);
+            if (cache != null) {
+                cache.reset();
+            }
         }
 
         KeyValueAnnotation key5 = null;

http://git-wip-us.apache.org/repos/asf/camel/blob/6c81334c/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java
----------------------------------------------------------------------
diff --git a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java
index 53a5432..f311aaf 100644
--- a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java
+++ b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java
@@ -50,6 +50,7 @@ import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.spi.RoutePolicy;
 import org.apache.camel.spi.RoutePolicyFactory;
 import org.apache.camel.support.EventNotifierSupport;
+import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.util.EndpointHelper;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -271,6 +272,8 @@ public class ZipkinTracer extends EventNotifierSupport implements RoutePolicy, R
     /**
      * Whether to include message bodies that are stream based in the zipkin traces.
      * <p/>
+     * This requires enabling <a href="http://camel.apache.org/stream-caching.html">stream caching</a> on the routes or globally on the CamelContext.
+     * <p/>
      * This is not recommended for production usage, or when having big payloads. You can limit the size by
      * configuring the <a href="http://camel.apache.org/how-do-i-set-the-max-chars-when-debug-logging-messages-in-camel.html">max debug log size</a>.
      */
@@ -686,15 +689,29 @@ public class ZipkinTracer extends EventNotifierSupport implements RoutePolicy, R
                 serverRequest(brave, serviceName, exchange);
             }
         }
+
+        // add on completion after the route is done, but before the consumer writes the response
+        // this allows us to track the zipkin event before returning the response which is the right time
+        exchange.addOnCompletion(new SynchronizationAdapter() {
+            @Override
+            public void onAfterRoute(Route route, Exchange exchange) {
+                String serviceName = getServiceName(exchange, route.getEndpoint(), true, false);
+                Brave brave = getBrave(serviceName);
+                if (brave != null) {
+                    serverResponse(brave, serviceName, exchange);
+                }
+            }
+
+            @Override
+            public String toString() {
+                return "ZipkinTracerOnCompletion";
+            }
+        });
     }
 
     @Override
     public void onExchangeDone(Route route, Exchange exchange) {
-        String serviceName = getServiceName(exchange, route.getEndpoint(), true, false);
-        Brave brave = getBrave(serviceName);
-        if (brave != null) {
-            serverResponse(brave, serviceName, exchange);
-        }
+        // noop
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/6c81334c/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinSimpleLogStreamsRouteTest.java
----------------------------------------------------------------------
diff --git a/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinSimpleLogStreamsRouteTest.java b/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinSimpleLogStreamsRouteTest.java
new file mode 100644
index 0000000..4eab008
--- /dev/null
+++ b/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinSimpleLogStreamsRouteTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.zipkin;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class ZipkinSimpleLogStreamsRouteTest extends CamelTestSupport {
+
+    private ZipkinTracer zipkin;
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+
+        zipkin = new ZipkinTracer();
+        zipkin.setServiceName("dude");
+        zipkin.setIncludeMessageBodyStreams(true);
+        zipkin.setSpanCollector(new ZipkinLoggingSpanCollector());
+
+        // attaching ourself to CamelContext
+        zipkin.init(context);
+
+        return context;
+    }
+
+    @Test
+    public void testZipkinRoute() throws Exception {
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(5).create();
+
+        for (int i = 0; i < 5; i++) {
+            template.sendBody("seda:dude", "Hello World");
+        }
+
+        assertTrue(notify.matches(30, TimeUnit.SECONDS));
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:dude").routeId("dude")
+                        .log("routing at ${routeId}")
+                        .delay(simple("${random(1000,2000)}"));
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6c81334c/examples/camel-example-zipkin/client/src/main/java/sample/camel/ClientRoute.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-zipkin/client/src/main/java/sample/camel/ClientRoute.java b/examples/camel-example-zipkin/client/src/main/java/sample/camel/ClientRoute.java
index 1ba7ad4..ece56de 100644
--- a/examples/camel-example-zipkin/client/src/main/java/sample/camel/ClientRoute.java
+++ b/examples/camel-example-zipkin/client/src/main/java/sample/camel/ClientRoute.java
@@ -23,10 +23,11 @@ public class ClientRoute extends RouteBuilder {
     @Override
     public void configure() {
         // you can configure the route rule with Java DSL here
-        from("timer:trigger?exchangePattern=InOut&period=30s")
+        from("timer:trigger?exchangePattern=InOut&period=30s").streamCaching()
             .bean("counterBean")
+            .log(" Client request: ${body}")
             .to("http://localhost:9090/service1")
-            .log("Result: ${body}");
+            .log("Client response: ${body}");
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/6c81334c/examples/camel-example-zipkin/service1/src/main/java/sample/camel/Service1Route.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-zipkin/service1/src/main/java/sample/camel/Service1Route.java b/examples/camel-example-zipkin/service1/src/main/java/sample/camel/Service1Route.java
index f8789c5..598f50e 100644
--- a/examples/camel-example-zipkin/service1/src/main/java/sample/camel/Service1Route.java
+++ b/examples/camel-example-zipkin/service1/src/main/java/sample/camel/Service1Route.java
@@ -24,13 +24,13 @@ public class Service1Route extends RouteBuilder {
 
     @Override
     public void configure() throws Exception {
-        from("jetty:http://0.0.0.0:{{service1.port}}/service1").routeId("service1")
+        from("jetty:http://0.0.0.0:{{service1.port}}/service1").routeId("service1").streamCaching()
             .removeHeaders("CamelHttp*")
-            .convertBodyTo(String.class)
+            .log("Service1 request: ${body}")
             .delay(simple("${random(1000,2000)}"))
-            .transform(simple("Service1: ${body}"))
-            .to("http://0.0.0.0:{{service2.port}}/service2");
-
+            .transform(simple("Service1-${body}"))
+            .to("http://0.0.0.0:{{service2.port}}/service2")
+            .log("Service1 response: ${body}");
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/6c81334c/examples/camel-example-zipkin/service2/src/main/java/sample/camel/Service2Route.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-zipkin/service2/src/main/java/sample/camel/Service2Route.java b/examples/camel-example-zipkin/service2/src/main/java/sample/camel/Service2Route.java
index edc5146..7174b9e 100644
--- a/examples/camel-example-zipkin/service2/src/main/java/sample/camel/Service2Route.java
+++ b/examples/camel-example-zipkin/service2/src/main/java/sample/camel/Service2Route.java
@@ -37,10 +37,11 @@ public class Service2Route extends RouteBuilder {
         // add zipkin to CamelContext
         zipkin.init(getContext());
 
-        from("undertow:http://0.0.0.0:7070/service2").routeId("service2")
-                .convertBodyTo(String.class)
+        from("undertow:http://0.0.0.0:7070/service2").routeId("service2").streamCaching()
+                .log(" Service2 request: ${body}")
                 .delay(simple("${random(1000,2000)}"))
-                .transform(simple("Service2: ${body}"));
+                .transform(simple("Service2-${body}"))
+                .log("Service2 response: ${body}");
     }
 
 }