You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/06/28 07:37:18 UTC
[2/3] camel git commit: CAMEL-10096: Camel tracer with stream caching
should tracer after stream cache has been setup.
CAMEL-10096: Camel tracer with stream caching should tracer after stream cache has been setup.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5b7a9d36
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5b7a9d36
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5b7a9d36
Branch: refs/heads/master
Commit: 5b7a9d3612b3fecfa71bdee46c87793859ba6508
Parents: def7feb
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Jun 28 09:32:29 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Jun 28 09:34:23 2016 +0200
----------------------------------------------------------------------
.../camel/processor/CamelInternalProcessor.java | 33 +++++-
.../BacklogTracerStreamCachingTest.java | 101 +++++++++++++++++++
2 files changed, 130 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/5b7a9d36/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index d9dc7a1..3c15167 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -17,6 +17,7 @@
package org.apache.camel.processor;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
@@ -25,6 +26,7 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.MessageHistory;
+import org.apache.camel.Ordered;
import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.StatefulService;
@@ -44,6 +46,7 @@ import org.apache.camel.spi.RoutePolicy;
import org.apache.camel.spi.StreamCachingStrategy;
import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.util.MessageHelper;
+import org.apache.camel.util.OrderedComparator;
import org.apache.camel.util.StopWatch;
import org.apache.camel.util.UnitOfWorkHelper;
import org.slf4j.Logger;
@@ -72,6 +75,8 @@ import org.slf4j.LoggerFactory;
* <b>Debugging tips:</b> Camel end users whom want to debug their Camel applications with the Camel source code, then make sure to
* read the source code of this class about the debugging tips, which you can find in the
* {@link #process(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)} method.
+ * <p/>
+ * The added advices can implement {@link Ordered} to control in which order the advices are executed.
*/
public class CamelInternalProcessor extends DelegateAsyncProcessor {
@@ -92,6 +97,8 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
*/
public void addAdvice(CamelInternalProcessorAdvice advice) {
advices.add(advice);
+ // ensure advices are sorted so they are in the order we want
+ Collections.sort(advices, new OrderedComparator());
}
/**
@@ -125,7 +132,6 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
// you can see in the code below.
// ----------------------------------------------------------
-
if (processor == null || !continueProcessing(exchange)) {
// no processor or we should not continue then we are done
callback.done(true);
@@ -522,7 +528,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
/**
* Advice to execute the {@link BacklogTracer} if enabled.
*/
- public static final class BacklogTracerAdvice implements CamelInternalProcessorAdvice {
+ public static final class BacklogTracerAdvice implements CamelInternalProcessorAdvice, Ordered {
private final BacklogTracer backlogTracer;
private final ProcessorDefinition<?> processorDefinition;
@@ -564,12 +570,19 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
public void after(Exchange exchange, Object data) throws Exception {
// noop
}
+
+ @Override
+ public int getOrder() {
+ // we want tracer just before calling the processor
+ return Ordered.LOWEST - 1;
+ }
+
}
/**
* Advice to execute the {@link org.apache.camel.processor.interceptor.BacklogDebugger} if enabled.
*/
- public static final class BacklogDebuggerAdvice implements CamelInternalProcessorAdvice<StopWatch> {
+ public static final class BacklogDebuggerAdvice implements CamelInternalProcessorAdvice<StopWatch>, Ordered {
private final BacklogDebugger backlogDebugger;
private final Processor target;
@@ -600,6 +613,12 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
backlogDebugger.afterProcess(exchange, target, definition, stopWatch.stop());
}
}
+
+ @Override
+ public int getOrder() {
+ // we want debugger just before calling the processor
+ return Ordered.LOWEST;
+ }
}
/**
@@ -744,7 +763,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
/**
* Advice for {@link org.apache.camel.spi.StreamCachingStrategy}
*/
- public static class StreamCachingAdvice implements CamelInternalProcessorAdvice<StreamCache> {
+ public static class StreamCachingAdvice implements CamelInternalProcessorAdvice<StreamCache>, Ordered {
private final StreamCachingStrategy strategy;
@@ -785,6 +804,12 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
((StreamCache) body).reset();
}
}
+
+ @Override
+ public int getOrder() {
+ // we want stream caching first
+ return Ordered.HIGHEST;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/camel/blob/5b7a9d36/camel-core/src/test/java/org/apache/camel/management/BacklogTracerStreamCachingTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/BacklogTracerStreamCachingTest.java b/camel-core/src/test/java/org/apache/camel/management/BacklogTracerStreamCachingTest.java
new file mode 100644
index 0000000..14eeecb
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/management/BacklogTracerStreamCachingTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import java.io.ByteArrayInputStream;
+import java.util.List;
+import javax.management.Attribute;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.api.management.mbean.BacklogTracerEventMessage;
+import org.apache.camel.builder.RouteBuilder;
+
+public class BacklogTracerStreamCachingTest extends ManagementTestSupport {
+
+ @SuppressWarnings("unchecked")
+ public void testBacklogTracerEventMessageStreamCaching() throws Exception {
+ // JMX tests dont work well on AIX CI servers (hangs them)
+ if (isPlatform("aix")) {
+ return;
+ }
+
+ MBeanServer mbeanServer = getMBeanServer();
+ ObjectName on = new ObjectName("org.apache.camel:context=camel-1,type=tracer,name=BacklogTracer");
+ assertNotNull(on);
+ assertTrue(mbeanServer.isRegistered(on));
+
+ Boolean enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled");
+ assertEquals("Should not be enabled", Boolean.FALSE, enabled);
+
+ Integer size = (Integer) mbeanServer.getAttribute(on, "BacklogSize");
+ assertEquals("Should be 1000", 1000, size.intValue());
+
+ Boolean removeOnDump = (Boolean) mbeanServer.getAttribute(on, "RemoveOnDump");
+ assertEquals(Boolean.TRUE, removeOnDump);
+
+ // enable streams
+ mbeanServer.setAttribute(on, new Attribute("BodyIncludeStreams", Boolean.TRUE));
+
+ // enable it
+ mbeanServer.setAttribute(on, new Attribute("Enabled", Boolean.TRUE));
+
+ getMockEndpoint("mock:bar").expectedMessageCount(1);
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+
+ List<Exchange> exchanges = getMockEndpoint("mock:bar").getReceivedExchanges();
+
+ List<BacklogTracerEventMessage> events = (List<BacklogTracerEventMessage>) mbeanServer.invoke(on, "dumpTracedMessages",
+ new Object[]{"bar"}, new String[]{"java.lang.String"});
+
+ assertNotNull(events);
+ assertEquals(1, events.size());
+
+ BacklogTracerEventMessage event1 = events.get(0);
+ assertEquals("bar", event1.getToNode());
+ assertEquals(" <message exchangeId=\"" + exchanges.get(0).getExchangeId() + "\">\n"
+ + " <body type=\"org.apache.camel.converter.stream.ByteArrayInputStreamCache\">Bye World</body>\n"
+ + " </message>", event1.getMessageAsXml());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ context.setUseBreadcrumb(false);
+
+ from("direct:start").streamCaching()
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ ByteArrayInputStream is = new ByteArrayInputStream("Bye World".getBytes());
+ exchange.getIn().setBody(is);
+ }
+ })
+ .log("Got ${body}")
+ .to("mock:bar").id("bar");
+ }
+ };
+ }
+
+}