You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/02/03 20:15:08 UTC

[1/2] git commit: CAMEL-7163: BacklogDebugger - Should not change body/header type to string

Updated Branches:
  refs/heads/camel-2.12.x 6e8689a2d -> f2749a941
  refs/heads/master d6646e648 -> 5f726d0b9


CAMEL-7163: BacklogDebugger - Should not change body/header type to string


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5f726d0b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5f726d0b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5f726d0b

Branch: refs/heads/master
Commit: 5f726d0b995f90f59489b5ebf32aad09ca80f951
Parents: d6646e6
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Feb 3 20:13:57 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Feb 3 20:13:57 2014 +0100

----------------------------------------------------------------------
 .../mbean/ManagedBacklogDebuggerMBean.java      |  20 ++-
 .../mbean/ManagedBacklogDebugger.java           |  38 ++++-
 .../processor/interceptor/BacklogDebugger.java  |  94 ++++++++++-
 .../camel/management/BacklogDebuggerTest.java   | 160 ++++++++++++++++++-
 4 files changed, 296 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/5f726d0b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
index 02d2d96..036ef3d 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
@@ -53,11 +53,23 @@ public interface ManagedBacklogDebuggerMBean {
     @ManagedOperation(description = "Resume running from the suspended breakpoint at the given node id")
     void resumeBreakpoint(String nodeId);
 
-    @ManagedOperation(description = "Updates the message body on the suspended breakpoint at the given node id")
-    void setMessageBodyOnBreakpoint(String nodeId, String body);
+    @ManagedOperation(description = "Updates the message body (uses same type as old body) on the suspended breakpoint at the given node id")
+    void setMessageBodyOnBreakpoint(String nodeId, Object body);
 
-    @ManagedOperation(description = "Updates/adds the message header on the suspended breakpoint at the given node id")
-    void setMessageHeaderOnBreakpoint(String nodeId, String headerName, String value);
+    @ManagedOperation(description = "Updates the message body (with a new type) on the suspended breakpoint at the given node id")
+    void setMessageBodyOnBreakpoint(String nodeId, Object body, String type);
+
+    @ManagedOperation(description = "Removes the message body on the suspended breakpoint at the given node id")
+    void removeMessageBodyOnBreakpoint(String nodeId);
+
+    @ManagedOperation(description = "Updates/adds the message header (uses same type as old header value) on the suspended breakpoint at the given node id")
+    void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value);
+
+    @ManagedOperation(description = "Removes the message header on the suspended breakpoint at the given node id")
+    void removeMessageHeaderOnBreakpoint(String nodeId, String headerName);
+
+    @ManagedOperation(description = "Updates/adds the message header (with a new type) on the suspended breakpoint at the given node id")
+    void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value, String type);
 
     @ManagedOperation(description = "Resume running any suspended breakpoints, and exits step mode")
     void resumeAll();

http://git-wip-us.apache.org/repos/asf/camel/blob/5f726d0b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
index 33f1310..9f3a94c 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
@@ -19,10 +19,12 @@ package org.apache.camel.management.mbean;
 import java.util.Set;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.api.management.mbean.ManagedBacklogDebuggerMBean;
 import org.apache.camel.processor.interceptor.BacklogDebugger;
 import org.apache.camel.spi.ManagementStrategy;
+import org.apache.camel.util.ObjectHelper;
 
 @ManagedResource(description = "Managed BacklogDebugger")
 public class ManagedBacklogDebugger implements ManagedBacklogDebuggerMBean {
@@ -91,12 +93,42 @@ public class ManagedBacklogDebugger implements ManagedBacklogDebuggerMBean {
         backlogDebugger.resumeBreakpoint(nodeId);
     }
 
-    public void setMessageBodyOnBreakpoint(String nodeId, String body) {
+    public void setMessageBodyOnBreakpoint(String nodeId, Object body) {
         backlogDebugger.setMessageBodyOnBreakpoint(nodeId, body);
     }
 
-    public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, String value) {
-        backlogDebugger.setMessageHeaderOnBreakpoint(nodeId, headerName, value);
+    public void setMessageBodyOnBreakpoint(String nodeId, Object body, String type) {
+        try {
+            Class<?> classType = camelContext.getClassResolver().resolveMandatoryClass(type);
+            backlogDebugger.setMessageBodyOnBreakpoint(nodeId, body, classType);
+        } catch (ClassNotFoundException e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+    }
+
+    public void removeMessageBodyOnBreakpoint(String nodeId) {
+        backlogDebugger.removeMessageBodyOnBreakpoint(nodeId);
+    }
+
+    public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value) {
+        try {
+            backlogDebugger.setMessageHeaderOnBreakpoint(nodeId, headerName, value);
+        } catch (NoTypeConversionAvailableException e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+    }
+
+    public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value, String type) {
+        try {
+            Class<?> classType = camelContext.getClassResolver().resolveMandatoryClass(type);
+            backlogDebugger.setMessageHeaderOnBreakpoint(nodeId, headerName, value, classType);
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+    }
+
+    public void removeMessageHeaderOnBreakpoint(String nodeId, String headerName) {
+        backlogDebugger.removeMessageHeaderOnBreakpoint(nodeId, headerName);
     }
 
     public void resumeAll() {

http://git-wip-us.apache.org/repos/asf/camel/blob/5f726d0b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java
index 3c2e290..571b174 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
+import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.api.management.mbean.BacklogTracerEventMessage;
@@ -268,26 +269,105 @@ public class BacklogDebugger extends ServiceSupport implements InterceptStrategy
         }
     }
 
-    public void setMessageBodyOnBreakpoint(String nodeId, String body) {
+    public void setMessageBodyOnBreakpoint(String nodeId, Object body) {
         SuspendedExchange se = suspendedBreakpoints.get(nodeId);
         if (se != null) {
-            logger.log("Breakpoint at node " + nodeId + " is updating message body on exchangeId: " + se.getExchange().getExchangeId() + " with new body: " + body);
+            boolean remove = body == null;
+            if (remove) {
+                removeMessageBodyOnBreakpoint(nodeId);
+            } else {
+                Class oldType;
+                if (se.getExchange().hasOut()) {
+                    oldType = se.getExchange().getOut().getBody() != null ? se.getExchange().getOut().getBody().getClass() : null;
+                } else {
+                    oldType = se.getExchange().getIn().getBody() != null ? se.getExchange().getIn().getBody().getClass() : null;
+                }
+                setMessageBodyOnBreakpoint(nodeId, body, oldType);
+            }
+        }
+    }
+
+    public void setMessageBodyOnBreakpoint(String nodeId, Object body, Class type) {
+        SuspendedExchange se = suspendedBreakpoints.get(nodeId);
+        if (se != null) {
+            boolean remove = body == null;
+            if (remove) {
+                removeMessageBodyOnBreakpoint(nodeId);
+            } else {
+                logger.log("Breakpoint at node " + nodeId + " is updating message body on exchangeId: " + se.getExchange().getExchangeId() + " with new body: " + body);
+                if (se.getExchange().hasOut()) {
+                    // preserve type
+                    if (type != null) {
+                        se.getExchange().getOut().setBody(body, type);
+                    } else {
+                        se.getExchange().getOut().setBody(body);
+                    }
+                } else {
+                    if (type != null) {
+                        se.getExchange().getIn().setBody(body, type);
+                    } else {
+                        se.getExchange().getIn().setBody(body);
+                    }
+                }
+            }
+        }
+    }
+
+    public void removeMessageBodyOnBreakpoint(String nodeId) {
+        SuspendedExchange se = suspendedBreakpoints.get(nodeId);
+        if (se != null) {
+            logger.log("Breakpoint at node " + nodeId + " is removing message body on exchangeId: " + se.getExchange().getExchangeId());
             if (se.getExchange().hasOut()) {
-                se.getExchange().getOut().setBody(body);
+                se.getExchange().getOut().setBody(null);
             } else {
-                se.getExchange().getIn().setBody(body);
+                se.getExchange().getIn().setBody(null);
             }
         }
     }
 
-    public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, String value) {
+    public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value) throws NoTypeConversionAvailableException {
+        SuspendedExchange se = suspendedBreakpoints.get(nodeId);
+        if (se != null) {
+            Class oldType;
+            if (se.getExchange().hasOut()) {
+                oldType = se.getExchange().getOut().getHeader(headerName) != null ? se.getExchange().getOut().getHeader(headerName).getClass() : null;
+            } else {
+                oldType = se.getExchange().getIn().getHeader(headerName) != null ? se.getExchange().getIn().getHeader(headerName).getClass() : null;
+            }
+            setMessageHeaderOnBreakpoint(nodeId, headerName, value, oldType);
+        }
+    }
+
+    public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value, Class type) throws NoTypeConversionAvailableException {
         SuspendedExchange se = suspendedBreakpoints.get(nodeId);
         if (se != null) {
             logger.log("Breakpoint at node " + nodeId + " is updating message header on exchangeId: " + se.getExchange().getExchangeId() + " with header: " + headerName + " and value: " + value);
             if (se.getExchange().hasOut()) {
-                se.getExchange().getOut().setHeader(headerName, value);
+                if (type != null) {
+                    Object convertedValue = se.getExchange().getContext().getTypeConverter().mandatoryConvertTo(type, se.getExchange(), value);
+                    se.getExchange().getOut().setHeader(headerName, convertedValue);
+                } else {
+                    se.getExchange().getOut().setHeader(headerName, value);
+                }
+            } else {
+                if (type != null) {
+                    Object convertedValue = se.getExchange().getContext().getTypeConverter().mandatoryConvertTo(type, se.getExchange(), value);
+                    se.getExchange().getIn().setHeader(headerName, convertedValue);
+                } else {
+                    se.getExchange().getIn().setHeader(headerName, value);
+                }
+            }
+        }
+    }
+
+    public void removeMessageHeaderOnBreakpoint(String nodeId, String headerName) {
+        SuspendedExchange se = suspendedBreakpoints.get(nodeId);
+        if (se != null) {
+            logger.log("Breakpoint at node " + nodeId + " is removing message header on exchangeId: " + se.getExchange().getExchangeId() + " with header: " + headerName);
+            if (se.getExchange().hasOut()) {
+                se.getExchange().getOut().removeHeader(headerName);
             } else {
-                se.getExchange().getIn().setHeader(headerName, value);
+                se.getExchange().getIn().removeHeader(headerName);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/5f726d0b/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java b/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
index 6d9140c..d717fa4 100644
--- a/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
@@ -125,8 +125,8 @@ public class BacklogDebuggerTest extends ManagementTestSupport {
         assertEquals("foo", nodes.iterator().next());
 
         // update body and header
-        mbeanServer.invoke(on, "setMessageBodyOnBreakpoint", new Object[]{"foo", "Changed body"}, new String[]{"java.lang.String", "java.lang.String"});
-        mbeanServer.invoke(on, "setMessageHeaderOnBreakpoint", new Object[]{"foo", "beer", "Carlsberg"}, new String[]{"java.lang.String", "java.lang.String", "java.lang.String"});
+        mbeanServer.invoke(on, "setMessageBodyOnBreakpoint", new Object[]{"foo", "Changed body"}, new String[]{"java.lang.String", "java.lang.Object"});
+        mbeanServer.invoke(on, "setMessageHeaderOnBreakpoint", new Object[]{"foo", "beer", "Carlsberg"}, new String[]{"java.lang.String", "java.lang.String", "java.lang.Object"});
 
         // resume breakpoint
         mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"});
@@ -163,6 +163,162 @@ public class BacklogDebuggerTest extends ManagementTestSupport {
     }
 
     @SuppressWarnings("unchecked")
+    public void testBacklogDebuggerUpdateBodyAndHeaderType() throws Exception {
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        MBeanServer mbeanServer = getMBeanServer();
+        ObjectName on = new ObjectName("org.apache.camel:context=camel-1,type=tracer,name=BacklogDebugger");
+        assertNotNull(on);
+        mbeanServer.isRegistered(on);
+
+        Boolean enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled");
+        assertEquals("Should not be enabled", Boolean.FALSE, enabled);
+
+        // enable debugger
+        mbeanServer.invoke(on, "enableDebugger", null, null);
+
+        enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled");
+        assertEquals("Should be enabled", Boolean.TRUE, enabled);
+
+        // add breakpoint at bar
+        mbeanServer.invoke(on, "addBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"});
+        mbeanServer.invoke(on, "addBreakpoint", new Object[]{"bar"}, new String[]{"java.lang.String"});
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(0);
+        mock.setSleepForEmptyTest(1000);
+
+        template.sendBody("seda:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        // add breakpoint at bar
+        Set<String> nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null);
+        assertNotNull(nodes);
+        assertEquals(1, nodes.size());
+        assertEquals("foo", nodes.iterator().next());
+
+        // update body and header
+        mbeanServer.invoke(on, "setMessageBodyOnBreakpoint", new Object[]{"foo", "444", "java.lang.Integer"},
+                new String[]{"java.lang.String", "java.lang.Object", "java.lang.String"});
+        mbeanServer.invoke(on, "setMessageHeaderOnBreakpoint", new Object[]{"foo", "beer", "123", "java.lang.Integer"},
+                new String[]{"java.lang.String", "java.lang.String", "java.lang.Object", "java.lang.String"});
+
+        // resume breakpoint
+        mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"});
+
+        Thread.sleep(1000);
+
+        // add breakpoint at bar
+        nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null);
+        assertNotNull(nodes);
+        assertEquals(1, nodes.size());
+        assertEquals("bar", nodes.iterator().next());
+
+        // the message should be ours
+        String xml = (String) mbeanServer.invoke(on, "dumpTracedMessagesAsXml", new Object[]{"bar"}, new String[]{"java.lang.String"});
+        assertNotNull(xml);
+        log.info(xml);
+
+        assertTrue("Should contain our body", xml.contains("444"));
+        assertTrue("Should contain bar node", xml.contains("<toNode>bar</toNode>"));
+        assertTrue("Should contain our added header", xml.contains("<header key=\"beer\" type=\"java.lang.Integer\">123</header>"));
+
+        resetMocks();
+        mock.expectedMessageCount(1);
+
+        // resume breakpoint
+        mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"bar"}, new String[]{"java.lang.String"});
+
+        assertMockEndpointsSatisfied();
+
+        // and no suspended anymore
+        nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null);
+        assertNotNull(nodes);
+        assertEquals(0, nodes.size());
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testBacklogDebuggerRemoveBodyAndHeader() throws Exception {
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        MBeanServer mbeanServer = getMBeanServer();
+        ObjectName on = new ObjectName("org.apache.camel:context=camel-1,type=tracer,name=BacklogDebugger");
+        assertNotNull(on);
+        mbeanServer.isRegistered(on);
+
+        Boolean enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled");
+        assertEquals("Should not be enabled", Boolean.FALSE, enabled);
+
+        // enable debugger
+        mbeanServer.invoke(on, "enableDebugger", null, null);
+
+        enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled");
+        assertEquals("Should be enabled", Boolean.TRUE, enabled);
+
+        // add breakpoint at bar
+        mbeanServer.invoke(on, "addBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"});
+        mbeanServer.invoke(on, "addBreakpoint", new Object[]{"bar"}, new String[]{"java.lang.String"});
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(0);
+        mock.setSleepForEmptyTest(1000);
+
+        template.sendBody("seda:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        // add breakpoint at bar
+        Set<String> nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null);
+        assertNotNull(nodes);
+        assertEquals(1, nodes.size());
+        assertEquals("foo", nodes.iterator().next());
+
+        // update body and header
+        mbeanServer.invoke(on, "removeMessageBodyOnBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"});
+        mbeanServer.invoke(on, "removeMessageHeaderOnBreakpoint", new Object[]{"foo", "beer"}, new String[]{"java.lang.String", "java.lang.String"});
+
+        // resume breakpoint
+        mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"});
+
+        Thread.sleep(1000);
+
+        // add breakpoint at bar
+        nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null);
+        assertNotNull(nodes);
+        assertEquals(1, nodes.size());
+        assertEquals("bar", nodes.iterator().next());
+
+        // the message should be ours
+        String xml = (String) mbeanServer.invoke(on, "dumpTracedMessagesAsXml", new Object[]{"bar"}, new String[]{"java.lang.String"});
+        assertNotNull(xml);
+        log.info(xml);
+
+        assertTrue("Should not contain our body", xml.contains("<body>[Body is null]</body>"));
+        assertTrue("Should contain bar node", xml.contains("<toNode>bar</toNode>"));
+        assertFalse("Should not contain any headers", xml.contains("<header"));
+
+        resetMocks();
+        mock.expectedMessageCount(1);
+
+        // resume breakpoint
+        mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"bar"}, new String[]{"java.lang.String"});
+
+        assertMockEndpointsSatisfied();
+
+        // and no suspended anymore
+        nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null);
+        assertNotNull(nodes);
+        assertEquals(0, nodes.size());
+    }
+
+    @SuppressWarnings("unchecked")
     public void testBacklogDebuggerSuspendOnlyOneAtBreakpoint() throws Exception {
         // JMX tests dont work well on AIX CI servers (hangs them)
         if (isPlatform("aix")) {


[2/2] git commit: CAMEL-7163: BacklogDebugger - Should not change body/header type to string

Posted by da...@apache.org.
CAMEL-7163: BacklogDebugger - Should not change body/header type to string


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f2749a94
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f2749a94
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f2749a94

Branch: refs/heads/camel-2.12.x
Commit: f2749a9416df0cfa8592eb93ae4f5d4a01816f42
Parents: 6e8689a
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Feb 3 20:13:57 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Feb 3 20:15:50 2014 +0100

----------------------------------------------------------------------
 .../mbean/ManagedBacklogDebuggerMBean.java      |  20 ++-
 .../mbean/ManagedBacklogDebugger.java           |  38 ++++-
 .../processor/interceptor/BacklogDebugger.java  |  94 ++++++++++-
 .../camel/management/BacklogDebuggerTest.java   | 160 ++++++++++++++++++-
 4 files changed, 296 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/f2749a94/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
index 02d2d96..036ef3d 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
@@ -53,11 +53,23 @@ public interface ManagedBacklogDebuggerMBean {
     @ManagedOperation(description = "Resume running from the suspended breakpoint at the given node id")
     void resumeBreakpoint(String nodeId);
 
-    @ManagedOperation(description = "Updates the message body on the suspended breakpoint at the given node id")
-    void setMessageBodyOnBreakpoint(String nodeId, String body);
+    @ManagedOperation(description = "Updates the message body (uses same type as old body) on the suspended breakpoint at the given node id")
+    void setMessageBodyOnBreakpoint(String nodeId, Object body);
 
-    @ManagedOperation(description = "Updates/adds the message header on the suspended breakpoint at the given node id")
-    void setMessageHeaderOnBreakpoint(String nodeId, String headerName, String value);
+    @ManagedOperation(description = "Updates the message body (with a new type) on the suspended breakpoint at the given node id")
+    void setMessageBodyOnBreakpoint(String nodeId, Object body, String type);
+
+    @ManagedOperation(description = "Removes the message body on the suspended breakpoint at the given node id")
+    void removeMessageBodyOnBreakpoint(String nodeId);
+
+    @ManagedOperation(description = "Updates/adds the message header (uses same type as old header value) on the suspended breakpoint at the given node id")
+    void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value);
+
+    @ManagedOperation(description = "Removes the message header on the suspended breakpoint at the given node id")
+    void removeMessageHeaderOnBreakpoint(String nodeId, String headerName);
+
+    @ManagedOperation(description = "Updates/adds the message header (with a new type) on the suspended breakpoint at the given node id")
+    void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value, String type);
 
     @ManagedOperation(description = "Resume running any suspended breakpoints, and exits step mode")
     void resumeAll();

http://git-wip-us.apache.org/repos/asf/camel/blob/f2749a94/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
index 33f1310..9f3a94c 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
@@ -19,10 +19,12 @@ package org.apache.camel.management.mbean;
 import java.util.Set;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.api.management.mbean.ManagedBacklogDebuggerMBean;
 import org.apache.camel.processor.interceptor.BacklogDebugger;
 import org.apache.camel.spi.ManagementStrategy;
+import org.apache.camel.util.ObjectHelper;
 
 @ManagedResource(description = "Managed BacklogDebugger")
 public class ManagedBacklogDebugger implements ManagedBacklogDebuggerMBean {
@@ -91,12 +93,42 @@ public class ManagedBacklogDebugger implements ManagedBacklogDebuggerMBean {
         backlogDebugger.resumeBreakpoint(nodeId);
     }
 
-    public void setMessageBodyOnBreakpoint(String nodeId, String body) {
+    public void setMessageBodyOnBreakpoint(String nodeId, Object body) {
         backlogDebugger.setMessageBodyOnBreakpoint(nodeId, body);
     }
 
-    public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, String value) {
-        backlogDebugger.setMessageHeaderOnBreakpoint(nodeId, headerName, value);
+    public void setMessageBodyOnBreakpoint(String nodeId, Object body, String type) {
+        try {
+            Class<?> classType = camelContext.getClassResolver().resolveMandatoryClass(type);
+            backlogDebugger.setMessageBodyOnBreakpoint(nodeId, body, classType);
+        } catch (ClassNotFoundException e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+    }
+
+    public void removeMessageBodyOnBreakpoint(String nodeId) {
+        backlogDebugger.removeMessageBodyOnBreakpoint(nodeId);
+    }
+
+    public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value) {
+        try {
+            backlogDebugger.setMessageHeaderOnBreakpoint(nodeId, headerName, value);
+        } catch (NoTypeConversionAvailableException e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+    }
+
+    public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value, String type) {
+        try {
+            Class<?> classType = camelContext.getClassResolver().resolveMandatoryClass(type);
+            backlogDebugger.setMessageHeaderOnBreakpoint(nodeId, headerName, value, classType);
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+    }
+
+    public void removeMessageHeaderOnBreakpoint(String nodeId, String headerName) {
+        backlogDebugger.removeMessageHeaderOnBreakpoint(nodeId, headerName);
     }
 
     public void resumeAll() {

http://git-wip-us.apache.org/repos/asf/camel/blob/f2749a94/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java
index 3c2e290..571b174 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
+import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.api.management.mbean.BacklogTracerEventMessage;
@@ -268,26 +269,105 @@ public class BacklogDebugger extends ServiceSupport implements InterceptStrategy
         }
     }
 
-    public void setMessageBodyOnBreakpoint(String nodeId, String body) {
+    public void setMessageBodyOnBreakpoint(String nodeId, Object body) {
         SuspendedExchange se = suspendedBreakpoints.get(nodeId);
         if (se != null) {
-            logger.log("Breakpoint at node " + nodeId + " is updating message body on exchangeId: " + se.getExchange().getExchangeId() + " with new body: " + body);
+            boolean remove = body == null;
+            if (remove) {
+                removeMessageBodyOnBreakpoint(nodeId);
+            } else {
+                Class oldType;
+                if (se.getExchange().hasOut()) {
+                    oldType = se.getExchange().getOut().getBody() != null ? se.getExchange().getOut().getBody().getClass() : null;
+                } else {
+                    oldType = se.getExchange().getIn().getBody() != null ? se.getExchange().getIn().getBody().getClass() : null;
+                }
+                setMessageBodyOnBreakpoint(nodeId, body, oldType);
+            }
+        }
+    }
+
+    public void setMessageBodyOnBreakpoint(String nodeId, Object body, Class type) {
+        SuspendedExchange se = suspendedBreakpoints.get(nodeId);
+        if (se != null) {
+            boolean remove = body == null;
+            if (remove) {
+                removeMessageBodyOnBreakpoint(nodeId);
+            } else {
+                logger.log("Breakpoint at node " + nodeId + " is updating message body on exchangeId: " + se.getExchange().getExchangeId() + " with new body: " + body);
+                if (se.getExchange().hasOut()) {
+                    // preserve type
+                    if (type != null) {
+                        se.getExchange().getOut().setBody(body, type);
+                    } else {
+                        se.getExchange().getOut().setBody(body);
+                    }
+                } else {
+                    if (type != null) {
+                        se.getExchange().getIn().setBody(body, type);
+                    } else {
+                        se.getExchange().getIn().setBody(body);
+                    }
+                }
+            }
+        }
+    }
+
+    public void removeMessageBodyOnBreakpoint(String nodeId) {
+        SuspendedExchange se = suspendedBreakpoints.get(nodeId);
+        if (se != null) {
+            logger.log("Breakpoint at node " + nodeId + " is removing message body on exchangeId: " + se.getExchange().getExchangeId());
             if (se.getExchange().hasOut()) {
-                se.getExchange().getOut().setBody(body);
+                se.getExchange().getOut().setBody(null);
             } else {
-                se.getExchange().getIn().setBody(body);
+                se.getExchange().getIn().setBody(null);
             }
         }
     }
 
-    public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, String value) {
+    public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value) throws NoTypeConversionAvailableException {
+        SuspendedExchange se = suspendedBreakpoints.get(nodeId);
+        if (se != null) {
+            Class oldType;
+            if (se.getExchange().hasOut()) {
+                oldType = se.getExchange().getOut().getHeader(headerName) != null ? se.getExchange().getOut().getHeader(headerName).getClass() : null;
+            } else {
+                oldType = se.getExchange().getIn().getHeader(headerName) != null ? se.getExchange().getIn().getHeader(headerName).getClass() : null;
+            }
+            setMessageHeaderOnBreakpoint(nodeId, headerName, value, oldType);
+        }
+    }
+
+    public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value, Class type) throws NoTypeConversionAvailableException {
         SuspendedExchange se = suspendedBreakpoints.get(nodeId);
         if (se != null) {
             logger.log("Breakpoint at node " + nodeId + " is updating message header on exchangeId: " + se.getExchange().getExchangeId() + " with header: " + headerName + " and value: " + value);
             if (se.getExchange().hasOut()) {
-                se.getExchange().getOut().setHeader(headerName, value);
+                if (type != null) {
+                    Object convertedValue = se.getExchange().getContext().getTypeConverter().mandatoryConvertTo(type, se.getExchange(), value);
+                    se.getExchange().getOut().setHeader(headerName, convertedValue);
+                } else {
+                    se.getExchange().getOut().setHeader(headerName, value);
+                }
+            } else {
+                if (type != null) {
+                    Object convertedValue = se.getExchange().getContext().getTypeConverter().mandatoryConvertTo(type, se.getExchange(), value);
+                    se.getExchange().getIn().setHeader(headerName, convertedValue);
+                } else {
+                    se.getExchange().getIn().setHeader(headerName, value);
+                }
+            }
+        }
+    }
+
+    public void removeMessageHeaderOnBreakpoint(String nodeId, String headerName) {
+        SuspendedExchange se = suspendedBreakpoints.get(nodeId);
+        if (se != null) {
+            logger.log("Breakpoint at node " + nodeId + " is removing message header on exchangeId: " + se.getExchange().getExchangeId() + " with header: " + headerName);
+            if (se.getExchange().hasOut()) {
+                se.getExchange().getOut().removeHeader(headerName);
             } else {
-                se.getExchange().getIn().setHeader(headerName, value);
+                se.getExchange().getIn().removeHeader(headerName);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/f2749a94/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java b/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
index 694865f..7b674a3 100644
--- a/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
@@ -125,8 +125,8 @@ public class BacklogDebuggerTest extends ManagementTestSupport {
         assertEquals("foo", nodes.iterator().next());
 
         // update body and header
-        mbeanServer.invoke(on, "setMessageBodyOnBreakpoint", new Object[]{"foo", "Changed body"}, new String[]{"java.lang.String", "java.lang.String"});
-        mbeanServer.invoke(on, "setMessageHeaderOnBreakpoint", new Object[]{"foo", "beer", "Carlsberg"}, new String[]{"java.lang.String", "java.lang.String", "java.lang.String"});
+        mbeanServer.invoke(on, "setMessageBodyOnBreakpoint", new Object[]{"foo", "Changed body"}, new String[]{"java.lang.String", "java.lang.Object"});
+        mbeanServer.invoke(on, "setMessageHeaderOnBreakpoint", new Object[]{"foo", "beer", "Carlsberg"}, new String[]{"java.lang.String", "java.lang.String", "java.lang.Object"});
 
         // resume breakpoint
         mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"});
@@ -163,6 +163,162 @@ public class BacklogDebuggerTest extends ManagementTestSupport {
     }
 
     @SuppressWarnings("unchecked")
+    public void testBacklogDebuggerUpdateBodyAndHeaderType() throws Exception {
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        MBeanServer mbeanServer = getMBeanServer();
+        ObjectName on = new ObjectName("org.apache.camel:context=camel-1,type=tracer,name=BacklogDebugger");
+        assertNotNull(on);
+        mbeanServer.isRegistered(on);
+
+        Boolean enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled");
+        assertEquals("Should not be enabled", Boolean.FALSE, enabled);
+
+        // enable debugger
+        mbeanServer.invoke(on, "enableDebugger", null, null);
+
+        enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled");
+        assertEquals("Should be enabled", Boolean.TRUE, enabled);
+
+        // add breakpoint at bar
+        mbeanServer.invoke(on, "addBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"});
+        mbeanServer.invoke(on, "addBreakpoint", new Object[]{"bar"}, new String[]{"java.lang.String"});
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(0);
+        mock.setSleepForEmptyTest(1000);
+
+        template.sendBody("seda:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        // add breakpoint at bar
+        Set<String> nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null);
+        assertNotNull(nodes);
+        assertEquals(1, nodes.size());
+        assertEquals("foo", nodes.iterator().next());
+
+        // update body and header
+        mbeanServer.invoke(on, "setMessageBodyOnBreakpoint", new Object[]{"foo", "444", "java.lang.Integer"},
+                new String[]{"java.lang.String", "java.lang.Object", "java.lang.String"});
+        mbeanServer.invoke(on, "setMessageHeaderOnBreakpoint", new Object[]{"foo", "beer", "123", "java.lang.Integer"},
+                new String[]{"java.lang.String", "java.lang.String", "java.lang.Object", "java.lang.String"});
+
+        // resume breakpoint
+        mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"});
+
+        Thread.sleep(1000);
+
+        // add breakpoint at bar
+        nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null);
+        assertNotNull(nodes);
+        assertEquals(1, nodes.size());
+        assertEquals("bar", nodes.iterator().next());
+
+        // the message should be ours
+        String xml = (String) mbeanServer.invoke(on, "dumpTracedMessagesAsXml", new Object[]{"bar"}, new String[]{"java.lang.String"});
+        assertNotNull(xml);
+        log.info(xml);
+
+        assertTrue("Should contain our body", xml.contains("444"));
+        assertTrue("Should contain bar node", xml.contains("<toNode>bar</toNode>"));
+        assertTrue("Should contain our added header", xml.contains("<header key=\"beer\" type=\"java.lang.Integer\">123</header>"));
+
+        resetMocks();
+        mock.expectedMessageCount(1);
+
+        // resume breakpoint
+        mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"bar"}, new String[]{"java.lang.String"});
+
+        assertMockEndpointsSatisfied();
+
+        // and no suspended anymore
+        nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null);
+        assertNotNull(nodes);
+        assertEquals(0, nodes.size());
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testBacklogDebuggerRemoveBodyAndHeader() throws Exception {
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        MBeanServer mbeanServer = getMBeanServer();
+        ObjectName on = new ObjectName("org.apache.camel:context=camel-1,type=tracer,name=BacklogDebugger");
+        assertNotNull(on);
+        mbeanServer.isRegistered(on);
+
+        Boolean enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled");
+        assertEquals("Should not be enabled", Boolean.FALSE, enabled);
+
+        // enable debugger
+        mbeanServer.invoke(on, "enableDebugger", null, null);
+
+        enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled");
+        assertEquals("Should be enabled", Boolean.TRUE, enabled);
+
+        // add breakpoint at bar
+        mbeanServer.invoke(on, "addBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"});
+        mbeanServer.invoke(on, "addBreakpoint", new Object[]{"bar"}, new String[]{"java.lang.String"});
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(0);
+        mock.setSleepForEmptyTest(1000);
+
+        template.sendBody("seda:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        // add breakpoint at bar
+        Set<String> nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null);
+        assertNotNull(nodes);
+        assertEquals(1, nodes.size());
+        assertEquals("foo", nodes.iterator().next());
+
+        // update body and header
+        mbeanServer.invoke(on, "removeMessageBodyOnBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"});
+        mbeanServer.invoke(on, "removeMessageHeaderOnBreakpoint", new Object[]{"foo", "beer"}, new String[]{"java.lang.String", "java.lang.String"});
+
+        // resume breakpoint
+        mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"});
+
+        Thread.sleep(1000);
+
+        // add breakpoint at bar
+        nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null);
+        assertNotNull(nodes);
+        assertEquals(1, nodes.size());
+        assertEquals("bar", nodes.iterator().next());
+
+        // the message should be ours
+        String xml = (String) mbeanServer.invoke(on, "dumpTracedMessagesAsXml", new Object[]{"bar"}, new String[]{"java.lang.String"});
+        assertNotNull(xml);
+        log.info(xml);
+
+        assertTrue("Should not contain our body", xml.contains("<body>[Body is null]</body>"));
+        assertTrue("Should contain bar node", xml.contains("<toNode>bar</toNode>"));
+        assertFalse("Should not contain any headers", xml.contains("<header"));
+
+        resetMocks();
+        mock.expectedMessageCount(1);
+
+        // resume breakpoint
+        mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"bar"}, new String[]{"java.lang.String"});
+
+        assertMockEndpointsSatisfied();
+
+        // and no suspended anymore
+        nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null);
+        assertNotNull(nodes);
+        assertEquals(0, nodes.size());
+    }
+
+    @SuppressWarnings("unchecked")
     public void testBacklogDebuggerSuspendOnlyOneAtBreakpoint() throws Exception {
         // JMX tests dont work well on AIX CI servers (hangs them)
         if (isPlatform("aix")) {