You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ha...@apache.org on 2014/12/18 03:41:32 UTC

[02/17] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5220

https://issues.apache.org/jira/browse/AMQ-5220

Fixes empty message bodies from responses to statistics plugin queries
over the STOMP transport.

This closes #41


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fe09b748
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fe09b748
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fe09b748

Branch: refs/heads/activemq-5.10.x
Commit: fe09b7482c98dab25086a879cd2891a62f8c3b20
Parents: fb3e96a
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Aug 7 14:18:40 2014 -0400
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 19:23:55 2014 -0500

----------------------------------------------------------------------
 .../transport/stomp/FrameTranslator.java        |   5 +-
 .../transport/stomp/JmsFrameTranslator.java     | 184 ++++++++++---------
 .../transport/stomp/ProtocolConverter.java      |  15 +-
 .../apache/activemq/transport/stomp/Stomp.java  |   5 +
 .../transport/stomp/StompAdvisoryTest.java      |  58 ++++++
 .../transport/stomp/StompTestSupport.java       |   5 +
 6 files changed, 174 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/fe09b748/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
----------------------------------------------------------------------
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
index d37d364..7496472 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
@@ -27,12 +27,13 @@ import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 
 /**
- * Implementations of this interface are used to map back and forth from Stomp
+ * Implementations of this interface are used to map back and forth from STOMP
  * to ActiveMQ. There are several standard mappings which are semantically the
  * same, the inner class, Helper, provides functions to copy those properties
  * from one to the other
  */
 public interface FrameTranslator {
+
     ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame frame) throws JMSException, ProtocolException;
 
     StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException;
@@ -142,7 +143,7 @@ public interface FrameTranslator {
                 msg.setPersistent("true".equals(o));
             }
 
-            // Stomp specific headers
+            // STOMP specific headers
             headers.remove(Stomp.Headers.RECEIPT_REQUESTED);
 
             // Since we take the rest of the header and put them in properties which could then

http://git-wip-us.apache.org/repos/asf/activemq/blob/fe09b748/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java
----------------------------------------------------------------------
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java
index 6ae68fc..3525b23 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.transport.stomp;
 
+import static org.apache.activemq.transport.stomp.FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage;
+import static org.apache.activemq.transport.stomp.FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.io.StringReader;
@@ -33,6 +36,9 @@ import org.apache.activemq.command.ActiveMQMapMessage;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQObjectMessage;
 import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.transport.stomp.Stomp.Headers;
+import org.apache.activemq.transport.stomp.Stomp.Responses;
+import org.apache.activemq.transport.stomp.Stomp.Transformations;
 import org.codehaus.jettison.mapped.Configuration;
 import org.fusesource.hawtbuf.UTF8Buffer;
 
@@ -49,133 +55,129 @@ import com.thoughtworks.xstream.io.xml.xppdom.XppFactory;
 /**
  * Frame translator implementation that uses XStream to convert messages to and
  * from XML and JSON
- *
- * @author <a href="mailto:dejan@nighttale.net">Dejan Bosanac</a>
  */
-public class JmsFrameTranslator extends LegacyFrameTranslator implements
-        BrokerContextAware {
+public class JmsFrameTranslator extends LegacyFrameTranslator implements BrokerContextAware {
 
     XStream xStream = null;
     BrokerContext brokerContext;
 
     @Override
-    public ActiveMQMessage convertFrame(ProtocolConverter converter,
-            StompFrame command) throws JMSException, ProtocolException {
+    public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException {
         Map<String, String> headers = command.getHeaders();
         ActiveMQMessage msg;
-        String transformation = headers.get(Stomp.Headers.TRANSFORMATION);
-        if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) {
+        String transformation = headers.get(Headers.TRANSFORMATION);
+        if (headers.containsKey(Headers.CONTENT_LENGTH) || transformation.equals(Transformations.JMS_BYTE.toString())) {
             msg = super.convertFrame(converter, command);
         } else {
             HierarchicalStreamReader in;
 
             try {
                 String text = new String(command.getContent(), "UTF-8");
-                switch (Stomp.Transformations.getValue(transformation)) {
-                case JMS_OBJECT_XML:
-                    in = new XppReader(new StringReader(text), XppFactory.createDefaultParser());
-                    msg = createObjectMessage(in);
-                    break;
-                case JMS_OBJECT_JSON:
-                    in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
-                    msg = createObjectMessage(in);
-                    break;
-                case JMS_MAP_XML:
-                    in = new XppReader(new StringReader(text), XppFactory.createDefaultParser());
-                    msg = createMapMessage(in);
-                    break;
-                case JMS_MAP_JSON:
-                    in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
-                    msg = createMapMessage(in);
-                    break;
-                default:
-                    throw new Exception("Unkown transformation: " + transformation);
+                switch (Transformations.getValue(transformation)) {
+                    case JMS_OBJECT_XML:
+                        in = new XppReader(new StringReader(text), XppFactory.createDefaultParser());
+                        msg = createObjectMessage(in);
+                        break;
+                    case JMS_OBJECT_JSON:
+                        in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
+                        msg = createObjectMessage(in);
+                        break;
+                    case JMS_MAP_XML:
+                        in = new XppReader(new StringReader(text), XppFactory.createDefaultParser());
+                        msg = createMapMessage(in);
+                        break;
+                    case JMS_MAP_JSON:
+                        in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
+                        msg = createMapMessage(in);
+                        break;
+                    default:
+                        throw new Exception("Unkown transformation: " + transformation);
                 }
             } catch (Throwable e) {
-                command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage());
+                command.getHeaders().put(Headers.TRANSFORMATION_ERROR, e.getMessage());
                 msg = super.convertFrame(converter, command);
             }
         }
-        FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
+
+        copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
         return msg;
     }
 
     @Override
-    public StompFrame convertMessage(ProtocolConverter converter,
-            ActiveMQMessage message) throws IOException, JMSException {
+    public StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException {
+
+        StompFrame command = new StompFrame();
+        command.setAction(Responses.MESSAGE);
+        Map<String, String> headers = new HashMap<String, String>(25);
+        command.setHeaders(headers);
+
+        copyStandardHeadersFromMessageToFrame(converter, message, command, this);
+
+        String transformation = headers.get(Headers.TRANSFORMATION);
 
         if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
-            StompFrame command = new StompFrame();
-            command.setAction(Stomp.Responses.MESSAGE);
-            Map<String, String> headers = new HashMap<String, String>(25);
-            command.setHeaders(headers);
-
-            FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
-                    converter, message, command, this);
-
-            if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
-                headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_XML.toString());
-            } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
-                headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_JSON.toString());
+
+            if (Transformations.JMS_XML.equals(transformation)) {
+                headers.put(Headers.TRANSFORMATION, Transformations.JMS_OBJECT_XML.toString());
+            } else if (Transformations.JMS_JSON.equals(transformation)) {
+                headers.put(Headers.TRANSFORMATION, Transformations.JMS_OBJECT_JSON.toString());
+            }
+
+            if (!headers.containsKey(Headers.TRANSFORMATION)) {
+                headers.put(Headers.TRANSFORMATION, Transformations.JMS_OBJECT_XML.toString());
             }
 
             ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy();
-            command.setContent(marshall(msg.getObject(),
-                    headers.get(Stomp.Headers.TRANSFORMATION))
-                    .getBytes("UTF-8"));
-            return command;
+            command.setContent(marshall(msg.getObject(), headers.get(Headers.TRANSFORMATION)).getBytes("UTF-8"));
 
         } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
-            StompFrame command = new StompFrame();
-            command.setAction(Stomp.Responses.MESSAGE);
-            Map<String, String> headers = new HashMap<String, String>(25);
-            command.setHeaders(headers);
-
-            FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
-                    converter, message, command, this);
-
-            if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
-                headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_XML.toString());
-            } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
-                headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_JSON.toString());
+
+            if (Transformations.JMS_XML.equals(transformation)) {
+                headers.put(Headers.TRANSFORMATION, Transformations.JMS_MAP_XML.toString());
+            } else if (Transformations.JMS_JSON.equals(transformation)) {
+                headers.put(Headers.TRANSFORMATION, Transformations.JMS_MAP_JSON.toString());
+            }
+
+            if (!headers.containsKey(Headers.TRANSFORMATION)) {
+                headers.put(Headers.TRANSFORMATION, Transformations.JMS_MAP_XML.toString());
             }
 
             ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
-            command.setContent(marshall((Serializable)msg.getContentMap(),
-                    headers.get(Stomp.Headers.TRANSFORMATION)).getBytes("UTF-8"));
-            return command;
-        } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE &&
-                AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
-
-            StompFrame command = new StompFrame();
-            command.setAction(Stomp.Responses.MESSAGE);
-            Map<String, String> headers = new HashMap<String, String>(25);
-            command.setHeaders(headers);
-
-            FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
-                    converter, message, command, this);
-
-            if (!headers.containsKey(Stomp.Headers.TRANSFORMATION)) {
-                headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_JSON.toString());
+            command.setContent(marshall((Serializable) msg.getContentMap(), headers.get(Headers.TRANSFORMATION)).getBytes("UTF-8"));
+
+        } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE && AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
+
+            if (Transformations.JMS_XML.equals(transformation)) {
+                headers.put(Headers.TRANSFORMATION, Transformations.JMS_ADVISORY_XML.toString());
+            } else if (Transformations.JMS_JSON.equals(transformation)) {
+                headers.put(Headers.TRANSFORMATION, Transformations.JMS_ADVISORY_JSON.toString());
             }
 
-            if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
-                headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_XML.toString());
-            } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
-                headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_JSON.toString());
+            if (!headers.containsKey(Headers.TRANSFORMATION)) {
+                headers.put(Headers.TRANSFORMATION, Transformations.JMS_ADVISORY_JSON.toString());
             }
 
-            String body = marshallAdvisory(message.getDataStructure(),
-                    headers.get(Stomp.Headers.TRANSFORMATION));
+            String body = marshallAdvisory(message.getDataStructure(), headers.get(Headers.TRANSFORMATION));
             command.setContent(body.getBytes("UTF-8"));
-            return command;
+
         } else {
-            return super.convertMessage(converter, message);
+            command = super.convertMessage(converter, message);
         }
+
+        return command;
     }
 
     /**
-     * Marshalls the Object to a string using XML or JSON encoding
+     * Marshal the Object to a string using XML or JSON encoding
+     *
+     * @param object
+     *        the object to marshal
+     * @param transformation
+     *        the transformation to apply to the object.
+     *
+     * @returns the marshaled form of the given object, in JSON or XML.
+     *
+     * @throws JMSException if an error occurs during the marshal operation.
      */
     protected String marshall(Serializable object, String transformation) throws JMSException {
         StringWriter buffer = new StringWriter();
@@ -199,7 +201,7 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements
     @SuppressWarnings("unchecked")
     protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException {
         ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
-        Map<String, Object> map = (Map<String, Object>)getXStream().unmarshal(in);
+        Map<String, Object> map = (Map<String, Object>) getXStream().unmarshal(in);
         for (String key : map.keySet()) {
             mapMsg.setObject(key, map.get(key));
         }
@@ -256,8 +258,9 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements
             xstream.ignoreUnknownElements();
         }
 
-        // For any object whose elements contains an UTF8Buffer instance instead of a String
-        // type we map it to String both in and out such that we don't marshal UTF8Buffers out
+        // For any object whose elements contains an UTF8Buffer instance instead
+        // of a String type we map it to String both in and out such that we don't
+        // marshal UTF8Buffers out
         xstream.registerConverter(new AbstractSingleValueConverter() {
 
             @Override
@@ -283,14 +286,17 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements
     }
 
     @Override
-    public  BrokerContext getBrokerContext() {
+    public BrokerContext getBrokerContext() {
         return this.brokerContext;
     }
 
     /**
      * Return an Advisory message as a JSON formatted string
+     *
      * @param ds
-     * @return
+     *        the DataStructure instance that is being marshaled.
+     *
+     * @return the JSON marshaled form of the given DataStructure instance.
      */
     protected String marshallAdvisory(final DataStructure ds) {
         XStream xstream = new XStream(new JsonHierarchicalStreamDriver());

http://git-wip-us.apache.org/repos/asf/activemq/blob/fe09b748/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
index 0ed08e4..edefb15 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
@@ -204,17 +204,16 @@ public class ProtocolConverter {
     }
 
     protected FrameTranslator findTranslator(String header) {
-        return findTranslator(header, null);
+        return findTranslator(header, null, false);
     }
 
-    protected FrameTranslator findTranslator(String header, ActiveMQDestination destination) {
+    protected FrameTranslator findTranslator(String header, ActiveMQDestination destination, boolean advisory) {
         FrameTranslator translator = frameTranslator;
         try {
             if (header != null) {
-                translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER
-                        .newInstance(header);
+                translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER.newInstance(header);
             } else {
-                if (destination != null && AdvisorySupport.isAdvisoryTopic(destination)) {
+                if (destination != null && (advisory || AdvisorySupport.isAdvisoryTopic(destination))) {
                     translator = new JmsFrameTranslator();
                 }
             }
@@ -230,7 +229,7 @@ public class ProtocolConverter {
     }
 
     /**
-     * Convert a stomp command
+     * Convert a STOMP command
      *
      * @param command
      */
@@ -894,7 +893,9 @@ public class ProtocolConverter {
         if (ignoreTransformation == true) {
             return frameTranslator.convertMessage(this, message);
         } else {
-            return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION), message.getDestination()).convertMessage(this, message);
+            FrameTranslator translator = findTranslator(
+                message.getStringProperty(Stomp.Headers.TRANSFORMATION), message.getDestination(), message.isAdvisory());
+            return translator.convertMessage(this, message);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/fe09b748/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
----------------------------------------------------------------------
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/Stomp.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
index a66b5ee..767e947 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
@@ -176,10 +176,15 @@ public interface Stomp {
         JMS_ADVISORY_XML,
         JMS_ADVISORY_JSON;
 
+        @Override
         public String toString() {
             return name().replaceAll("_", "-").toLowerCase(Locale.ENGLISH);
         }
 
+        public boolean equals(String value) {
+            return toString().equals(value);
+        }
+
         public static Transformations getValue(String value) {
             return valueOf(value.replaceAll("-", "_").toUpperCase(Locale.ENGLISH));
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fe09b748/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java
----------------------------------------------------------------------
diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java
index 10d09b0..cc78308 100644
--- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java
+++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java
@@ -22,29 +22,41 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.HashMap;
+import java.util.List;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.Queue;
 import javax.jms.Session;
+import javax.jms.Topic;
 
 import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.broker.BrokerPlugin;
 import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.plugin.StatisticsBrokerPlugin;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class StompAdvisoryTest extends StompTestSupport {
+    static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination";
+
     private static final Logger LOG = LoggerFactory.getLogger(StompAdvisoryTest.class);
 
     protected ActiveMQConnection connection;
 
     @Override
+    protected void addAdditionalPlugins(List<BrokerPlugin> plugins) throws Exception {
+        plugins.add(new StatisticsBrokerPlugin());
+    }
+
+    @Override
     protected void applyBrokerPolicies() throws Exception {
 
         PolicyEntry policy = new PolicyEntry();
@@ -269,4 +281,50 @@ public class StompAdvisoryTest extends StompTestSupport {
         c.stop();
         c.close();
     }
+
+    @Test
+    public void testStatisticsAdvisory() throws Exception {
+        Connection c = cf.createConnection("system", "manager");
+        c.start();
+        final Session session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Topic replyTo = session.createTopic("stats");
+
+        // Dummy Queue used to later gather statistics.
+        final ActiveMQQueue testQueue = new ActiveMQQueue("queueToBeTestedForStats");
+        final MessageProducer producer = session.createProducer(null);
+        Message mess = session.createTextMessage("test");
+        producer.send(testQueue, mess);
+
+        // Create a request for Queue statistics
+        Thread child = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    Thread.sleep(1000);
+                    Queue query = session.createQueue(STATS_DESTINATION_PREFIX + testQueue.getQueueName());
+                    Message msg = session.createMessage();
+                    msg.setJMSReplyTo(replyTo);
+                    producer.send(query, msg);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+        child.start();
+
+        // Attempt to gather the statistics response from the previous request.
+        stompConnection.connect("system", "manager");
+        stompConnection.subscribe("/topic/" + replyTo.getTopicName(), Stomp.Headers.Subscribe.AckModeValues.AUTO);
+        stompConnection.begin("TX");
+        StompFrame f = stompConnection.receive(5000);
+        stompConnection.commit("TX");
+
+        LOG.debug(f.toString());
+        assertEquals(f.getAction(),"MESSAGE");
+        assertTrue("Should have a body", f.getBody().length() > 0);
+        assertTrue("Should contains memoryUsage stats", f.getBody().contains("memoryUsage"));
+
+        c.stop();
+        c.close();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fe09b748/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java
index 3cf1356..e763552 100644
--- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java
+++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java
@@ -146,6 +146,8 @@ public class StompTestSupport {
             plugins.add(configureAuthentication());
         }
 
+        addAdditionalPlugins(plugins);
+
         if (!plugins.isEmpty()) {
             BrokerPlugin[] array = new BrokerPlugin[plugins.size()];
             brokerService.setPlugins(plugins.toArray(array));
@@ -172,6 +174,9 @@ public class StompTestSupport {
         brokerService.setJobSchedulerStore(jobStore);
     }
 
+    protected void addAdditionalPlugins(List<BrokerPlugin> plugins) throws Exception {
+    }
+
     protected BrokerPlugin configureAuthentication() throws Exception {
         List<AuthenticationUser> users = new ArrayList<AuthenticationUser>();
         users.add(new AuthenticationUser("system", "manager", "users,admins"));