You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/06/28 07:37:18 UTC

[2/3] camel git commit: CAMEL-10096: Camel tracer with stream caching should tracer after stream cache has been setup.

CAMEL-10096: Camel tracer with stream caching should tracer after stream cache has been setup.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5b7a9d36
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5b7a9d36
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5b7a9d36

Branch: refs/heads/master
Commit: 5b7a9d3612b3fecfa71bdee46c87793859ba6508
Parents: def7feb
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Jun 28 09:32:29 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Jun 28 09:34:23 2016 +0200

----------------------------------------------------------------------
 .../camel/processor/CamelInternalProcessor.java |  33 +++++-
 .../BacklogTracerStreamCachingTest.java         | 101 +++++++++++++++++++
 2 files changed, 130 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/5b7a9d36/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index d9dc7a1..3c15167 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -17,6 +17,7 @@
 package org.apache.camel.processor;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.concurrent.RejectedExecutionException;
@@ -25,6 +26,7 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.MessageHistory;
+import org.apache.camel.Ordered;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.StatefulService;
@@ -44,6 +46,7 @@ import org.apache.camel.spi.RoutePolicy;
 import org.apache.camel.spi.StreamCachingStrategy;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.util.MessageHelper;
+import org.apache.camel.util.OrderedComparator;
 import org.apache.camel.util.StopWatch;
 import org.apache.camel.util.UnitOfWorkHelper;
 import org.slf4j.Logger;
@@ -72,6 +75,8 @@ import org.slf4j.LoggerFactory;
  * <b>Debugging tips:</b> Camel end users whom want to debug their Camel applications with the Camel source code, then make sure to
  * read the source code of this class about the debugging tips, which you can find in the
  * {@link #process(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)} method.
+ * <p/>
+ * The added advices can implement {@link Ordered} to control in which order the advices are executed.
  */
 public class CamelInternalProcessor extends DelegateAsyncProcessor {
 
@@ -92,6 +97,8 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
      */
     public void addAdvice(CamelInternalProcessorAdvice advice) {
         advices.add(advice);
+        // ensure advices are sorted so they are in the order we want
+        Collections.sort(advices, new OrderedComparator());
     }
 
     /**
@@ -125,7 +132,6 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
         // you can see in the code below.
         // ----------------------------------------------------------
 
-
         if (processor == null || !continueProcessing(exchange)) {
             // no processor or we should not continue then we are done
             callback.done(true);
@@ -522,7 +528,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
     /**
      * Advice to execute the {@link BacklogTracer} if enabled.
      */
-    public static final class BacklogTracerAdvice implements CamelInternalProcessorAdvice {
+    public static final class BacklogTracerAdvice implements CamelInternalProcessorAdvice, Ordered {
 
         private final BacklogTracer backlogTracer;
         private final ProcessorDefinition<?> processorDefinition;
@@ -564,12 +570,19 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
         public void after(Exchange exchange, Object data) throws Exception {
             // noop
         }
+
+        @Override
+        public int getOrder() {
+            // we want tracer just before calling the processor
+            return Ordered.LOWEST - 1;
+        }
+
     }
 
     /**
      * Advice to execute the {@link org.apache.camel.processor.interceptor.BacklogDebugger} if enabled.
      */
-    public static final class BacklogDebuggerAdvice implements CamelInternalProcessorAdvice<StopWatch> {
+    public static final class BacklogDebuggerAdvice implements CamelInternalProcessorAdvice<StopWatch>, Ordered {
 
         private final BacklogDebugger backlogDebugger;
         private final Processor target;
@@ -600,6 +613,12 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
                 backlogDebugger.afterProcess(exchange, target, definition, stopWatch.stop());
             }
         }
+
+        @Override
+        public int getOrder() {
+            // we want debugger just before calling the processor
+            return Ordered.LOWEST;
+        }
     }
 
     /**
@@ -744,7 +763,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
     /**
      * Advice for {@link org.apache.camel.spi.StreamCachingStrategy}
      */
-    public static class StreamCachingAdvice implements CamelInternalProcessorAdvice<StreamCache> {
+    public static class StreamCachingAdvice implements CamelInternalProcessorAdvice<StreamCache>, Ordered {
 
         private final StreamCachingStrategy strategy;
 
@@ -785,6 +804,12 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
                 ((StreamCache) body).reset();
             }
         }
+
+        @Override
+        public int getOrder() {
+            // we want stream caching first
+            return Ordered.HIGHEST;
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/5b7a9d36/camel-core/src/test/java/org/apache/camel/management/BacklogTracerStreamCachingTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/BacklogTracerStreamCachingTest.java b/camel-core/src/test/java/org/apache/camel/management/BacklogTracerStreamCachingTest.java
new file mode 100644
index 0000000..14eeecb
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/management/BacklogTracerStreamCachingTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import java.io.ByteArrayInputStream;
+import java.util.List;
+import javax.management.Attribute;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.api.management.mbean.BacklogTracerEventMessage;
+import org.apache.camel.builder.RouteBuilder;
+
+public class BacklogTracerStreamCachingTest extends ManagementTestSupport {
+
+    @SuppressWarnings("unchecked")
+    public void testBacklogTracerEventMessageStreamCaching() throws Exception {
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        MBeanServer mbeanServer = getMBeanServer();
+        ObjectName on = new ObjectName("org.apache.camel:context=camel-1,type=tracer,name=BacklogTracer");
+        assertNotNull(on);
+        assertTrue(mbeanServer.isRegistered(on));
+
+        Boolean enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled");
+        assertEquals("Should not be enabled", Boolean.FALSE, enabled);
+
+        Integer size = (Integer) mbeanServer.getAttribute(on, "BacklogSize");
+        assertEquals("Should be 1000", 1000, size.intValue());
+
+        Boolean removeOnDump = (Boolean) mbeanServer.getAttribute(on, "RemoveOnDump");
+        assertEquals(Boolean.TRUE, removeOnDump);
+
+        // enable streams
+        mbeanServer.setAttribute(on, new Attribute("BodyIncludeStreams", Boolean.TRUE));
+
+        // enable it
+        mbeanServer.setAttribute(on, new Attribute("Enabled", Boolean.TRUE));
+
+        getMockEndpoint("mock:bar").expectedMessageCount(1);
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        List<Exchange> exchanges = getMockEndpoint("mock:bar").getReceivedExchanges();
+
+        List<BacklogTracerEventMessage> events = (List<BacklogTracerEventMessage>) mbeanServer.invoke(on, "dumpTracedMessages",
+                new Object[]{"bar"}, new String[]{"java.lang.String"});
+
+        assertNotNull(events);
+        assertEquals(1, events.size());
+
+        BacklogTracerEventMessage event1 = events.get(0);
+        assertEquals("bar", event1.getToNode());
+        assertEquals("    <message exchangeId=\"" + exchanges.get(0).getExchangeId() + "\">\n"
+                + "      <body type=\"org.apache.camel.converter.stream.ByteArrayInputStreamCache\">Bye World</body>\n"
+                + "    </message>", event1.getMessageAsXml());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.setUseBreadcrumb(false);
+
+                from("direct:start").streamCaching()
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws Exception {
+                                ByteArrayInputStream is = new ByteArrayInputStream("Bye World".getBytes());
+                                exchange.getIn().setBody(is);
+                            }
+                        })
+                        .log("Got ${body}")
+                        .to("mock:bar").id("bar");
+            }
+        };
+    }
+
+}