You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/02/03 20:15:08 UTC
[1/2] git commit: CAMEL-7163: BacklogDebugger - Should not change
body/header type to string
Updated Branches:
refs/heads/camel-2.12.x 6e8689a2d -> f2749a941
refs/heads/master d6646e648 -> 5f726d0b9
CAMEL-7163: BacklogDebugger - Should not change body/header type to string
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5f726d0b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5f726d0b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5f726d0b
Branch: refs/heads/master
Commit: 5f726d0b995f90f59489b5ebf32aad09ca80f951
Parents: d6646e6
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Feb 3 20:13:57 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Feb 3 20:13:57 2014 +0100
----------------------------------------------------------------------
.../mbean/ManagedBacklogDebuggerMBean.java | 20 ++-
.../mbean/ManagedBacklogDebugger.java | 38 ++++-
.../processor/interceptor/BacklogDebugger.java | 94 ++++++++++-
.../camel/management/BacklogDebuggerTest.java | 160 ++++++++++++++++++-
4 files changed, 296 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/5f726d0b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
index 02d2d96..036ef3d 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
@@ -53,11 +53,23 @@ public interface ManagedBacklogDebuggerMBean {
@ManagedOperation(description = "Resume running from the suspended breakpoint at the given node id")
void resumeBreakpoint(String nodeId);
- @ManagedOperation(description = "Updates the message body on the suspended breakpoint at the given node id")
- void setMessageBodyOnBreakpoint(String nodeId, String body);
+ @ManagedOperation(description = "Updates the message body (uses same type as old body) on the suspended breakpoint at the given node id")
+ void setMessageBodyOnBreakpoint(String nodeId, Object body);
- @ManagedOperation(description = "Updates/adds the message header on the suspended breakpoint at the given node id")
- void setMessageHeaderOnBreakpoint(String nodeId, String headerName, String value);
+ @ManagedOperation(description = "Updates the message body (with a new type) on the suspended breakpoint at the given node id")
+ void setMessageBodyOnBreakpoint(String nodeId, Object body, String type);
+
+ @ManagedOperation(description = "Removes the message body on the suspended breakpoint at the given node id")
+ void removeMessageBodyOnBreakpoint(String nodeId);
+
+ @ManagedOperation(description = "Updates/adds the message header (uses same type as old header value) on the suspended breakpoint at the given node id")
+ void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value);
+
+ @ManagedOperation(description = "Removes the message header on the suspended breakpoint at the given node id")
+ void removeMessageHeaderOnBreakpoint(String nodeId, String headerName);
+
+ @ManagedOperation(description = "Updates/adds the message header (with a new type) on the suspended breakpoint at the given node id")
+ void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value, String type);
@ManagedOperation(description = "Resume running any suspended breakpoints, and exits step mode")
void resumeAll();
http://git-wip-us.apache.org/repos/asf/camel/blob/5f726d0b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
index 33f1310..9f3a94c 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
@@ -19,10 +19,12 @@ package org.apache.camel.management.mbean;
import java.util.Set;
import org.apache.camel.CamelContext;
+import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.api.management.mbean.ManagedBacklogDebuggerMBean;
import org.apache.camel.processor.interceptor.BacklogDebugger;
import org.apache.camel.spi.ManagementStrategy;
+import org.apache.camel.util.ObjectHelper;
@ManagedResource(description = "Managed BacklogDebugger")
public class ManagedBacklogDebugger implements ManagedBacklogDebuggerMBean {
@@ -91,12 +93,42 @@ public class ManagedBacklogDebugger implements ManagedBacklogDebuggerMBean {
backlogDebugger.resumeBreakpoint(nodeId);
}
- public void setMessageBodyOnBreakpoint(String nodeId, String body) {
+ public void setMessageBodyOnBreakpoint(String nodeId, Object body) {
backlogDebugger.setMessageBodyOnBreakpoint(nodeId, body);
}
- public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, String value) {
- backlogDebugger.setMessageHeaderOnBreakpoint(nodeId, headerName, value);
+ public void setMessageBodyOnBreakpoint(String nodeId, Object body, String type) {
+ try {
+ Class<?> classType = camelContext.getClassResolver().resolveMandatoryClass(type);
+ backlogDebugger.setMessageBodyOnBreakpoint(nodeId, body, classType);
+ } catch (ClassNotFoundException e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ }
+
+ public void removeMessageBodyOnBreakpoint(String nodeId) {
+ backlogDebugger.removeMessageBodyOnBreakpoint(nodeId);
+ }
+
+ public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value) {
+ try {
+ backlogDebugger.setMessageHeaderOnBreakpoint(nodeId, headerName, value);
+ } catch (NoTypeConversionAvailableException e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ }
+
+ public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value, String type) {
+ try {
+ Class<?> classType = camelContext.getClassResolver().resolveMandatoryClass(type);
+ backlogDebugger.setMessageHeaderOnBreakpoint(nodeId, headerName, value, classType);
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ }
+
+ public void removeMessageHeaderOnBreakpoint(String nodeId, String headerName) {
+ backlogDebugger.removeMessageHeaderOnBreakpoint(nodeId, headerName);
}
public void resumeAll() {
http://git-wip-us.apache.org/repos/asf/camel/blob/5f726d0b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java
index 3c2e290..571b174 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
+import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.api.management.mbean.BacklogTracerEventMessage;
@@ -268,26 +269,105 @@ public class BacklogDebugger extends ServiceSupport implements InterceptStrategy
}
}
- public void setMessageBodyOnBreakpoint(String nodeId, String body) {
+ public void setMessageBodyOnBreakpoint(String nodeId, Object body) {
SuspendedExchange se = suspendedBreakpoints.get(nodeId);
if (se != null) {
- logger.log("Breakpoint at node " + nodeId + " is updating message body on exchangeId: " + se.getExchange().getExchangeId() + " with new body: " + body);
+ boolean remove = body == null;
+ if (remove) {
+ removeMessageBodyOnBreakpoint(nodeId);
+ } else {
+ Class oldType;
+ if (se.getExchange().hasOut()) {
+ oldType = se.getExchange().getOut().getBody() != null ? se.getExchange().getOut().getBody().getClass() : null;
+ } else {
+ oldType = se.getExchange().getIn().getBody() != null ? se.getExchange().getIn().getBody().getClass() : null;
+ }
+ setMessageBodyOnBreakpoint(nodeId, body, oldType);
+ }
+ }
+ }
+
+ public void setMessageBodyOnBreakpoint(String nodeId, Object body, Class type) {
+ SuspendedExchange se = suspendedBreakpoints.get(nodeId);
+ if (se != null) {
+ boolean remove = body == null;
+ if (remove) {
+ removeMessageBodyOnBreakpoint(nodeId);
+ } else {
+ logger.log("Breakpoint at node " + nodeId + " is updating message body on exchangeId: " + se.getExchange().getExchangeId() + " with new body: " + body);
+ if (se.getExchange().hasOut()) {
+ // preserve type
+ if (type != null) {
+ se.getExchange().getOut().setBody(body, type);
+ } else {
+ se.getExchange().getOut().setBody(body);
+ }
+ } else {
+ if (type != null) {
+ se.getExchange().getIn().setBody(body, type);
+ } else {
+ se.getExchange().getIn().setBody(body);
+ }
+ }
+ }
+ }
+ }
+
+ public void removeMessageBodyOnBreakpoint(String nodeId) {
+ SuspendedExchange se = suspendedBreakpoints.get(nodeId);
+ if (se != null) {
+ logger.log("Breakpoint at node " + nodeId + " is removing message body on exchangeId: " + se.getExchange().getExchangeId());
if (se.getExchange().hasOut()) {
- se.getExchange().getOut().setBody(body);
+ se.getExchange().getOut().setBody(null);
} else {
- se.getExchange().getIn().setBody(body);
+ se.getExchange().getIn().setBody(null);
}
}
}
- public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, String value) {
+ public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value) throws NoTypeConversionAvailableException {
+ SuspendedExchange se = suspendedBreakpoints.get(nodeId);
+ if (se != null) {
+ Class oldType;
+ if (se.getExchange().hasOut()) {
+ oldType = se.getExchange().getOut().getHeader(headerName) != null ? se.getExchange().getOut().getHeader(headerName).getClass() : null;
+ } else {
+ oldType = se.getExchange().getIn().getHeader(headerName) != null ? se.getExchange().getIn().getHeader(headerName).getClass() : null;
+ }
+ setMessageHeaderOnBreakpoint(nodeId, headerName, value, oldType);
+ }
+ }
+
+ public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value, Class type) throws NoTypeConversionAvailableException {
SuspendedExchange se = suspendedBreakpoints.get(nodeId);
if (se != null) {
logger.log("Breakpoint at node " + nodeId + " is updating message header on exchangeId: " + se.getExchange().getExchangeId() + " with header: " + headerName + " and value: " + value);
if (se.getExchange().hasOut()) {
- se.getExchange().getOut().setHeader(headerName, value);
+ if (type != null) {
+ Object convertedValue = se.getExchange().getContext().getTypeConverter().mandatoryConvertTo(type, se.getExchange(), value);
+ se.getExchange().getOut().setHeader(headerName, convertedValue);
+ } else {
+ se.getExchange().getOut().setHeader(headerName, value);
+ }
+ } else {
+ if (type != null) {
+ Object convertedValue = se.getExchange().getContext().getTypeConverter().mandatoryConvertTo(type, se.getExchange(), value);
+ se.getExchange().getIn().setHeader(headerName, convertedValue);
+ } else {
+ se.getExchange().getIn().setHeader(headerName, value);
+ }
+ }
+ }
+ }
+
+ public void removeMessageHeaderOnBreakpoint(String nodeId, String headerName) {
+ SuspendedExchange se = suspendedBreakpoints.get(nodeId);
+ if (se != null) {
+ logger.log("Breakpoint at node " + nodeId + " is removing message header on exchangeId: " + se.getExchange().getExchangeId() + " with header: " + headerName);
+ if (se.getExchange().hasOut()) {
+ se.getExchange().getOut().removeHeader(headerName);
} else {
- se.getExchange().getIn().setHeader(headerName, value);
+ se.getExchange().getIn().removeHeader(headerName);
}
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/5f726d0b/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java b/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
index 6d9140c..d717fa4 100644
--- a/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
@@ -125,8 +125,8 @@ public class BacklogDebuggerTest extends ManagementTestSupport {
assertEquals("foo", nodes.iterator().next());
// update body and header
- mbeanServer.invoke(on, "setMessageBodyOnBreakpoint", new Object[]{"foo", "Changed body"}, new String[]{"java.lang.String", "java.lang.String"});
- mbeanServer.invoke(on, "setMessageHeaderOnBreakpoint", new Object[]{"foo", "beer", "Carlsberg"}, new String[]{"java.lang.String", "java.lang.String", "java.lang.String"});
+ mbeanServer.invoke(on, "setMessageBodyOnBreakpoint", new Object[]{"foo", "Changed body"}, new String[]{"java.lang.String", "java.lang.Object"});
+ mbeanServer.invoke(on, "setMessageHeaderOnBreakpoint", new Object[]{"foo", "beer", "Carlsberg"}, new String[]{"java.lang.String", "java.lang.String", "java.lang.Object"});
// resume breakpoint
mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"});
@@ -163,6 +163,162 @@ public class BacklogDebuggerTest extends ManagementTestSupport {
}
@SuppressWarnings("unchecked")
+ public void testBacklogDebuggerUpdateBodyAndHeaderType() throws Exception {
+ // JMX tests dont work well on AIX CI servers (hangs them)
+ if (isPlatform("aix")) {
+ return;
+ }
+
+ MBeanServer mbeanServer = getMBeanServer();
+ ObjectName on = new ObjectName("org.apache.camel:context=camel-1,type=tracer,name=BacklogDebugger");
+ assertNotNull(on);
+ mbeanServer.isRegistered(on);
+
+ Boolean enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled");
+ assertEquals("Should not be enabled", Boolean.FALSE, enabled);
+
+ // enable debugger
+ mbeanServer.invoke(on, "enableDebugger", null, null);
+
+ enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled");
+ assertEquals("Should be enabled", Boolean.TRUE, enabled);
+
+ // add breakpoint at bar
+ mbeanServer.invoke(on, "addBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"});
+ mbeanServer.invoke(on, "addBreakpoint", new Object[]{"bar"}, new String[]{"java.lang.String"});
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(0);
+ mock.setSleepForEmptyTest(1000);
+
+ template.sendBody("seda:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+
+ // add breakpoint at bar
+ Set<String> nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null);
+ assertNotNull(nodes);
+ assertEquals(1, nodes.size());
+ assertEquals("foo", nodes.iterator().next());
+
+ // update body and header
+ mbeanServer.invoke(on, "setMessageBodyOnBreakpoint", new Object[]{"foo", "444", "java.lang.Integer"},
+ new String[]{"java.lang.String", "java.lang.Object", "java.lang.String"});
+ mbeanServer.invoke(on, "setMessageHeaderOnBreakpoint", new Object[]{"foo", "beer", "123", "java.lang.Integer"},
+ new String[]{"java.lang.String", "java.lang.String", "java.lang.Object", "java.lang.String"});
+
+ // resume breakpoint
+ mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"});
+
+ Thread.sleep(1000);
+
+ // add breakpoint at bar
+ nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null);
+ assertNotNull(nodes);
+ assertEquals(1, nodes.size());
+ assertEquals("bar", nodes.iterator().next());
+
+ // the message should be ours
+ String xml = (String) mbeanServer.invoke(on, "dumpTracedMessagesAsXml", new Object[]{"bar"}, new String[]{"java.lang.String"});
+ assertNotNull(xml);
+ log.info(xml);
+
+ assertTrue("Should contain our body", xml.contains("444"));
+ assertTrue("Should contain bar node", xml.contains("<toNode>bar</toNode>"));
+ assertTrue("Should contain our added header", xml.contains("<header key=\"beer\" type=\"java.lang.Integer\">123</header>"));
+
+ resetMocks();
+ mock.expectedMessageCount(1);
+
+ // resume breakpoint
+ mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"bar"}, new String[]{"java.lang.String"});
+
+ assertMockEndpointsSatisfied();
+
+ // and no suspended anymore
+ nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null);
+ assertNotNull(nodes);
+ assertEquals(0, nodes.size());
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testBacklogDebuggerRemoveBodyAndHeader() throws Exception {
+ // JMX tests dont work well on AIX CI servers (hangs them)
+ if (isPlatform("aix")) {
+ return;
+ }
+
+ MBeanServer mbeanServer = getMBeanServer();
+ ObjectName on = new ObjectName("org.apache.camel:context=camel-1,type=tracer,name=BacklogDebugger");
+ assertNotNull(on);
+ mbeanServer.isRegistered(on);
+
+ Boolean enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled");
+ assertEquals("Should not be enabled", Boolean.FALSE, enabled);
+
+ // enable debugger
+ mbeanServer.invoke(on, "enableDebugger", null, null);
+
+ enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled");
+ assertEquals("Should be enabled", Boolean.TRUE, enabled);
+
+ // add breakpoint at bar
+ mbeanServer.invoke(on, "addBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"});
+ mbeanServer.invoke(on, "addBreakpoint", new Object[]{"bar"}, new String[]{"java.lang.String"});
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(0);
+ mock.setSleepForEmptyTest(1000);
+
+ template.sendBody("seda:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+
+ // add breakpoint at bar
+ Set<String> nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null);
+ assertNotNull(nodes);
+ assertEquals(1, nodes.size());
+ assertEquals("foo", nodes.iterator().next());
+
+ // update body and header
+ mbeanServer.invoke(on, "removeMessageBodyOnBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"});
+ mbeanServer.invoke(on, "removeMessageHeaderOnBreakpoint", new Object[]{"foo", "beer"}, new String[]{"java.lang.String", "java.lang.String"});
+
+ // resume breakpoint
+ mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"});
+
+ Thread.sleep(1000);
+
+ // add breakpoint at bar
+ nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null);
+ assertNotNull(nodes);
+ assertEquals(1, nodes.size());
+ assertEquals("bar", nodes.iterator().next());
+
+ // the message should be ours
+ String xml = (String) mbeanServer.invoke(on, "dumpTracedMessagesAsXml", new Object[]{"bar"}, new String[]{"java.lang.String"});
+ assertNotNull(xml);
+ log.info(xml);
+
+ assertTrue("Should not contain our body", xml.contains("<body>[Body is null]</body>"));
+ assertTrue("Should contain bar node", xml.contains("<toNode>bar</toNode>"));
+ assertFalse("Should not contain any headers", xml.contains("<header"));
+
+ resetMocks();
+ mock.expectedMessageCount(1);
+
+ // resume breakpoint
+ mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"bar"}, new String[]{"java.lang.String"});
+
+ assertMockEndpointsSatisfied();
+
+ // and no suspended anymore
+ nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null);
+ assertNotNull(nodes);
+ assertEquals(0, nodes.size());
+ }
+
+ @SuppressWarnings("unchecked")
public void testBacklogDebuggerSuspendOnlyOneAtBreakpoint() throws Exception {
// JMX tests dont work well on AIX CI servers (hangs them)
if (isPlatform("aix")) {
[2/2] git commit: CAMEL-7163: BacklogDebugger - Should not change
body/header type to string
Posted by da...@apache.org.
CAMEL-7163: BacklogDebugger - Should not change body/header type to string
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f2749a94
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f2749a94
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f2749a94
Branch: refs/heads/camel-2.12.x
Commit: f2749a9416df0cfa8592eb93ae4f5d4a01816f42
Parents: 6e8689a
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Feb 3 20:13:57 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Feb 3 20:15:50 2014 +0100
----------------------------------------------------------------------
.../mbean/ManagedBacklogDebuggerMBean.java | 20 ++-
.../mbean/ManagedBacklogDebugger.java | 38 ++++-
.../processor/interceptor/BacklogDebugger.java | 94 ++++++++++-
.../camel/management/BacklogDebuggerTest.java | 160 ++++++++++++++++++-
4 files changed, 296 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/f2749a94/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
index 02d2d96..036ef3d 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
@@ -53,11 +53,23 @@ public interface ManagedBacklogDebuggerMBean {
@ManagedOperation(description = "Resume running from the suspended breakpoint at the given node id")
void resumeBreakpoint(String nodeId);
- @ManagedOperation(description = "Updates the message body on the suspended breakpoint at the given node id")
- void setMessageBodyOnBreakpoint(String nodeId, String body);
+ @ManagedOperation(description = "Updates the message body (uses same type as old body) on the suspended breakpoint at the given node id")
+ void setMessageBodyOnBreakpoint(String nodeId, Object body);
- @ManagedOperation(description = "Updates/adds the message header on the suspended breakpoint at the given node id")
- void setMessageHeaderOnBreakpoint(String nodeId, String headerName, String value);
+ @ManagedOperation(description = "Updates the message body (with a new type) on the suspended breakpoint at the given node id")
+ void setMessageBodyOnBreakpoint(String nodeId, Object body, String type);
+
+ @ManagedOperation(description = "Removes the message body on the suspended breakpoint at the given node id")
+ void removeMessageBodyOnBreakpoint(String nodeId);
+
+ @ManagedOperation(description = "Updates/adds the message header (uses same type as old header value) on the suspended breakpoint at the given node id")
+ void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value);
+
+ @ManagedOperation(description = "Removes the message header on the suspended breakpoint at the given node id")
+ void removeMessageHeaderOnBreakpoint(String nodeId, String headerName);
+
+ @ManagedOperation(description = "Updates/adds the message header (with a new type) on the suspended breakpoint at the given node id")
+ void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value, String type);
@ManagedOperation(description = "Resume running any suspended breakpoints, and exits step mode")
void resumeAll();
http://git-wip-us.apache.org/repos/asf/camel/blob/f2749a94/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
index 33f1310..9f3a94c 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
@@ -19,10 +19,12 @@ package org.apache.camel.management.mbean;
import java.util.Set;
import org.apache.camel.CamelContext;
+import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.api.management.mbean.ManagedBacklogDebuggerMBean;
import org.apache.camel.processor.interceptor.BacklogDebugger;
import org.apache.camel.spi.ManagementStrategy;
+import org.apache.camel.util.ObjectHelper;
@ManagedResource(description = "Managed BacklogDebugger")
public class ManagedBacklogDebugger implements ManagedBacklogDebuggerMBean {
@@ -91,12 +93,42 @@ public class ManagedBacklogDebugger implements ManagedBacklogDebuggerMBean {
backlogDebugger.resumeBreakpoint(nodeId);
}
- public void setMessageBodyOnBreakpoint(String nodeId, String body) {
+ public void setMessageBodyOnBreakpoint(String nodeId, Object body) {
backlogDebugger.setMessageBodyOnBreakpoint(nodeId, body);
}
- public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, String value) {
- backlogDebugger.setMessageHeaderOnBreakpoint(nodeId, headerName, value);
+ public void setMessageBodyOnBreakpoint(String nodeId, Object body, String type) {
+ try {
+ Class<?> classType = camelContext.getClassResolver().resolveMandatoryClass(type);
+ backlogDebugger.setMessageBodyOnBreakpoint(nodeId, body, classType);
+ } catch (ClassNotFoundException e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ }
+
+ public void removeMessageBodyOnBreakpoint(String nodeId) {
+ backlogDebugger.removeMessageBodyOnBreakpoint(nodeId);
+ }
+
+ public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value) {
+ try {
+ backlogDebugger.setMessageHeaderOnBreakpoint(nodeId, headerName, value);
+ } catch (NoTypeConversionAvailableException e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ }
+
+ public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value, String type) {
+ try {
+ Class<?> classType = camelContext.getClassResolver().resolveMandatoryClass(type);
+ backlogDebugger.setMessageHeaderOnBreakpoint(nodeId, headerName, value, classType);
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ }
+
+ public void removeMessageHeaderOnBreakpoint(String nodeId, String headerName) {
+ backlogDebugger.removeMessageHeaderOnBreakpoint(nodeId, headerName);
}
public void resumeAll() {
http://git-wip-us.apache.org/repos/asf/camel/blob/f2749a94/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java
index 3c2e290..571b174 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
+import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.api.management.mbean.BacklogTracerEventMessage;
@@ -268,26 +269,105 @@ public class BacklogDebugger extends ServiceSupport implements InterceptStrategy
}
}
- public void setMessageBodyOnBreakpoint(String nodeId, String body) {
+ public void setMessageBodyOnBreakpoint(String nodeId, Object body) {
SuspendedExchange se = suspendedBreakpoints.get(nodeId);
if (se != null) {
- logger.log("Breakpoint at node " + nodeId + " is updating message body on exchangeId: " + se.getExchange().getExchangeId() + " with new body: " + body);
+ boolean remove = body == null;
+ if (remove) {
+ removeMessageBodyOnBreakpoint(nodeId);
+ } else {
+ Class oldType;
+ if (se.getExchange().hasOut()) {
+ oldType = se.getExchange().getOut().getBody() != null ? se.getExchange().getOut().getBody().getClass() : null;
+ } else {
+ oldType = se.getExchange().getIn().getBody() != null ? se.getExchange().getIn().getBody().getClass() : null;
+ }
+ setMessageBodyOnBreakpoint(nodeId, body, oldType);
+ }
+ }
+ }
+
+ public void setMessageBodyOnBreakpoint(String nodeId, Object body, Class type) {
+ SuspendedExchange se = suspendedBreakpoints.get(nodeId);
+ if (se != null) {
+ boolean remove = body == null;
+ if (remove) {
+ removeMessageBodyOnBreakpoint(nodeId);
+ } else {
+ logger.log("Breakpoint at node " + nodeId + " is updating message body on exchangeId: " + se.getExchange().getExchangeId() + " with new body: " + body);
+ if (se.getExchange().hasOut()) {
+ // preserve type
+ if (type != null) {
+ se.getExchange().getOut().setBody(body, type);
+ } else {
+ se.getExchange().getOut().setBody(body);
+ }
+ } else {
+ if (type != null) {
+ se.getExchange().getIn().setBody(body, type);
+ } else {
+ se.getExchange().getIn().setBody(body);
+ }
+ }
+ }
+ }
+ }
+
+ public void removeMessageBodyOnBreakpoint(String nodeId) {
+ SuspendedExchange se = suspendedBreakpoints.get(nodeId);
+ if (se != null) {
+ logger.log("Breakpoint at node " + nodeId + " is removing message body on exchangeId: " + se.getExchange().getExchangeId());
if (se.getExchange().hasOut()) {
- se.getExchange().getOut().setBody(body);
+ se.getExchange().getOut().setBody(null);
} else {
- se.getExchange().getIn().setBody(body);
+ se.getExchange().getIn().setBody(null);
}
}
}
- public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, String value) {
+ public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value) throws NoTypeConversionAvailableException {
+ SuspendedExchange se = suspendedBreakpoints.get(nodeId);
+ if (se != null) {
+ Class oldType;
+ if (se.getExchange().hasOut()) {
+ oldType = se.getExchange().getOut().getHeader(headerName) != null ? se.getExchange().getOut().getHeader(headerName).getClass() : null;
+ } else {
+ oldType = se.getExchange().getIn().getHeader(headerName) != null ? se.getExchange().getIn().getHeader(headerName).getClass() : null;
+ }
+ setMessageHeaderOnBreakpoint(nodeId, headerName, value, oldType);
+ }
+ }
+
+ public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value, Class type) throws NoTypeConversionAvailableException {
SuspendedExchange se = suspendedBreakpoints.get(nodeId);
if (se != null) {
logger.log("Breakpoint at node " + nodeId + " is updating message header on exchangeId: " + se.getExchange().getExchangeId() + " with header: " + headerName + " and value: " + value);
if (se.getExchange().hasOut()) {
- se.getExchange().getOut().setHeader(headerName, value);
+ if (type != null) {
+ Object convertedValue = se.getExchange().getContext().getTypeConverter().mandatoryConvertTo(type, se.getExchange(), value);
+ se.getExchange().getOut().setHeader(headerName, convertedValue);
+ } else {
+ se.getExchange().getOut().setHeader(headerName, value);
+ }
+ } else {
+ if (type != null) {
+ Object convertedValue = se.getExchange().getContext().getTypeConverter().mandatoryConvertTo(type, se.getExchange(), value);
+ se.getExchange().getIn().setHeader(headerName, convertedValue);
+ } else {
+ se.getExchange().getIn().setHeader(headerName, value);
+ }
+ }
+ }
+ }
+
+ public void removeMessageHeaderOnBreakpoint(String nodeId, String headerName) {
+ SuspendedExchange se = suspendedBreakpoints.get(nodeId);
+ if (se != null) {
+ logger.log("Breakpoint at node " + nodeId + " is removing message header on exchangeId: " + se.getExchange().getExchangeId() + " with header: " + headerName);
+ if (se.getExchange().hasOut()) {
+ se.getExchange().getOut().removeHeader(headerName);
} else {
- se.getExchange().getIn().setHeader(headerName, value);
+ se.getExchange().getIn().removeHeader(headerName);
}
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/f2749a94/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java b/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
index 694865f..7b674a3 100644
--- a/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
@@ -125,8 +125,8 @@ public class BacklogDebuggerTest extends ManagementTestSupport {
assertEquals("foo", nodes.iterator().next());
// update body and header
- mbeanServer.invoke(on, "setMessageBodyOnBreakpoint", new Object[]{"foo", "Changed body"}, new String[]{"java.lang.String", "java.lang.String"});
- mbeanServer.invoke(on, "setMessageHeaderOnBreakpoint", new Object[]{"foo", "beer", "Carlsberg"}, new String[]{"java.lang.String", "java.lang.String", "java.lang.String"});
+ mbeanServer.invoke(on, "setMessageBodyOnBreakpoint", new Object[]{"foo", "Changed body"}, new String[]{"java.lang.String", "java.lang.Object"});
+ mbeanServer.invoke(on, "setMessageHeaderOnBreakpoint", new Object[]{"foo", "beer", "Carlsberg"}, new String[]{"java.lang.String", "java.lang.String", "java.lang.Object"});
// resume breakpoint
mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"});
@@ -163,6 +163,162 @@ public class BacklogDebuggerTest extends ManagementTestSupport {
}
@SuppressWarnings("unchecked")
+ public void testBacklogDebuggerUpdateBodyAndHeaderType() throws Exception {
+ // JMX tests dont work well on AIX CI servers (hangs them)
+ if (isPlatform("aix")) {
+ return;
+ }
+
+ MBeanServer mbeanServer = getMBeanServer();
+ ObjectName on = new ObjectName("org.apache.camel:context=camel-1,type=tracer,name=BacklogDebugger");
+ assertNotNull(on);
+ mbeanServer.isRegistered(on);
+
+ Boolean enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled");
+ assertEquals("Should not be enabled", Boolean.FALSE, enabled);
+
+ // enable debugger
+ mbeanServer.invoke(on, "enableDebugger", null, null);
+
+ enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled");
+ assertEquals("Should be enabled", Boolean.TRUE, enabled);
+
+ // add breakpoint at bar
+ mbeanServer.invoke(on, "addBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"});
+ mbeanServer.invoke(on, "addBreakpoint", new Object[]{"bar"}, new String[]{"java.lang.String"});
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(0);
+ mock.setSleepForEmptyTest(1000);
+
+ template.sendBody("seda:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+
+ // add breakpoint at bar
+ Set<String> nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null);
+ assertNotNull(nodes);
+ assertEquals(1, nodes.size());
+ assertEquals("foo", nodes.iterator().next());
+
+ // update body and header
+ mbeanServer.invoke(on, "setMessageBodyOnBreakpoint", new Object[]{"foo", "444", "java.lang.Integer"},
+ new String[]{"java.lang.String", "java.lang.Object", "java.lang.String"});
+ mbeanServer.invoke(on, "setMessageHeaderOnBreakpoint", new Object[]{"foo", "beer", "123", "java.lang.Integer"},
+ new String[]{"java.lang.String", "java.lang.String", "java.lang.Object", "java.lang.String"});
+
+ // resume breakpoint
+ mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"});
+
+ Thread.sleep(1000);
+
+ // add breakpoint at bar
+ nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null);
+ assertNotNull(nodes);
+ assertEquals(1, nodes.size());
+ assertEquals("bar", nodes.iterator().next());
+
+ // the message should be ours
+ String xml = (String) mbeanServer.invoke(on, "dumpTracedMessagesAsXml", new Object[]{"bar"}, new String[]{"java.lang.String"});
+ assertNotNull(xml);
+ log.info(xml);
+
+ assertTrue("Should contain our body", xml.contains("444"));
+ assertTrue("Should contain bar node", xml.contains("<toNode>bar</toNode>"));
+ assertTrue("Should contain our added header", xml.contains("<header key=\"beer\" type=\"java.lang.Integer\">123</header>"));
+
+ resetMocks();
+ mock.expectedMessageCount(1);
+
+ // resume breakpoint
+ mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"bar"}, new String[]{"java.lang.String"});
+
+ assertMockEndpointsSatisfied();
+
+ // and no suspended anymore
+ nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null);
+ assertNotNull(nodes);
+ assertEquals(0, nodes.size());
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testBacklogDebuggerRemoveBodyAndHeader() throws Exception {
+ // JMX tests dont work well on AIX CI servers (hangs them)
+ if (isPlatform("aix")) {
+ return;
+ }
+
+ MBeanServer mbeanServer = getMBeanServer();
+ ObjectName on = new ObjectName("org.apache.camel:context=camel-1,type=tracer,name=BacklogDebugger");
+ assertNotNull(on);
+ mbeanServer.isRegistered(on);
+
+ Boolean enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled");
+ assertEquals("Should not be enabled", Boolean.FALSE, enabled);
+
+ // enable debugger
+ mbeanServer.invoke(on, "enableDebugger", null, null);
+
+ enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled");
+ assertEquals("Should be enabled", Boolean.TRUE, enabled);
+
+ // add breakpoint at bar
+ mbeanServer.invoke(on, "addBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"});
+ mbeanServer.invoke(on, "addBreakpoint", new Object[]{"bar"}, new String[]{"java.lang.String"});
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(0);
+ mock.setSleepForEmptyTest(1000);
+
+ template.sendBody("seda:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+
+ // add breakpoint at bar
+ Set<String> nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null);
+ assertNotNull(nodes);
+ assertEquals(1, nodes.size());
+ assertEquals("foo", nodes.iterator().next());
+
+ // update body and header
+ mbeanServer.invoke(on, "removeMessageBodyOnBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"});
+ mbeanServer.invoke(on, "removeMessageHeaderOnBreakpoint", new Object[]{"foo", "beer"}, new String[]{"java.lang.String", "java.lang.String"});
+
+ // resume breakpoint
+ mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"});
+
+ Thread.sleep(1000);
+
+ // add breakpoint at bar
+ nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null);
+ assertNotNull(nodes);
+ assertEquals(1, nodes.size());
+ assertEquals("bar", nodes.iterator().next());
+
+ // the message should be ours
+ String xml = (String) mbeanServer.invoke(on, "dumpTracedMessagesAsXml", new Object[]{"bar"}, new String[]{"java.lang.String"});
+ assertNotNull(xml);
+ log.info(xml);
+
+ assertTrue("Should not contain our body", xml.contains("<body>[Body is null]</body>"));
+ assertTrue("Should contain bar node", xml.contains("<toNode>bar</toNode>"));
+ assertFalse("Should not contain any headers", xml.contains("<header"));
+
+ resetMocks();
+ mock.expectedMessageCount(1);
+
+ // resume breakpoint
+ mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"bar"}, new String[]{"java.lang.String"});
+
+ assertMockEndpointsSatisfied();
+
+ // and no suspended anymore
+ nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null);
+ assertNotNull(nodes);
+ assertEquals(0, nodes.size());
+ }
+
+ @SuppressWarnings("unchecked")
public void testBacklogDebuggerSuspendOnlyOneAtBreakpoint() throws Exception {
// JMX tests dont work well on AIX CI servers (hangs them)
if (isPlatform("aix")) {