You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2008/08/21 22:57:56 UTC

svn commit: r687861 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/jmx/ test/java/org/apache/activemq/broker/jmx/

Author: jstrachan
Date: Thu Aug 21 13:57:56 2008
New Revision: 687861

URL: http://svn.apache.org/viewvc?rev=687861&view=rev
Log:
added a fix for AMQ-1904 to expose the user properties as TabularData so JMX tools can browse all of the information in a message

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/CompositeDataConstants.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/CompositeDataHelper.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/CompositeDataConstants.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/CompositeDataConstants.java?rev=687861&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/CompositeDataConstants.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/CompositeDataConstants.java Thu Aug 21 13:57:56 2008
@@ -0,0 +1,41 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public interface CompositeDataConstants {
+    String PROPERTIES = "PropertiesText";
+    String JMSXGROUP_SEQ = "JMSXGroupSeq";
+    String JMSXGROUP_ID = "JMSXGroupID";
+    String BODY_LENGTH = "BodyLength";
+    String BODY_PREVIEW = "BodyPreview";
+    String CONTENT_MAP = "ContentMap";
+    String MESSAGE_TEXT = "Text";
+
+    // User properties
+    String STRING_PROPERTIES = "StringProperties";
+    String BOOLEAN_PROPERTIES = "BooleanProperties";
+    String BYTE_PROPERTIES = "ByteProperties";
+    String SHORT_PROPERTIES = "ShortProperties";
+    String INT_PROPERTIES = "IntProperties";
+    String LONG_PROPERTIES = "LongProperties";
+    String FLOAT_PROPERTIES = "FloatProperties";
+    String DOUBLE_PROPERTIES = "DoubleProperties";
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/CompositeDataConstants.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/CompositeDataHelper.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/CompositeDataHelper.java?rev=687861&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/CompositeDataHelper.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/CompositeDataHelper.java Thu Aug 21 13:57:56 2008
@@ -0,0 +1,49 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public class CompositeDataHelper {
+
+    /**
+     * Extracts the named TabularData field from the CompositeData and converts it to a Map
+     * which is the method used to get the typesafe user properties.
+     */
+    public static Map getTabularMap(CompositeData cdata, String fieldName) {
+        Map map = new HashMap();
+        Object tabularObject = cdata.get(fieldName);
+        if (tabularObject instanceof TabularData) {
+            TabularData tabularData = (TabularData) tabularObject;
+            Collection<CompositeData> values = tabularData.values();
+            for (CompositeData compositeData : values) {
+                Object key = compositeData.get("key");
+                Object value = compositeData.get("value");
+                map.put(key, value);
+            }
+        }
+        return map;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/CompositeDataHelper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=687861&r1=687860&r2=687861&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Thu Aug 21 13:57:56 2008
@@ -156,6 +156,10 @@
                 }
 
             } catch (Throwable e) {
+                // TODO DELETE ME
+                System.out.println(e);
+                e.printStackTrace();
+                // TODO DELETE ME
                 LOG.warn("exception browsing destination", e);
             }
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java?rev=687861&r1=687860&r2=687861&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java Thu Aug 21 13:57:56 2008
@@ -22,6 +22,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
@@ -32,6 +33,8 @@
 import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.OpenType;
 import javax.management.openmbean.SimpleType;
+import javax.management.openmbean.TabularType;
+import javax.management.openmbean.TabularDataSupport;
 
 import org.apache.activemq.command.ActiveMQBytesMessage;
 import org.apache.activemq.command.ActiveMQMapMessage;
@@ -40,6 +43,7 @@
 import org.apache.activemq.command.ActiveMQStreamMessage;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.Message;
+import static org.apache.activemq.broker.jmx.CompositeDataConstants.*;
 
 public final class OpenTypeSupport {
 
@@ -95,6 +99,14 @@
     }
 
     static class MessageOpenTypeFactory extends AbstractOpenTypeFactory {
+        protected TabularType stringPropertyTabularType;
+        protected TabularType booleanPropertyTabularType;
+        protected TabularType bytePropertyTabularType;
+        protected TabularType shortPropertyTabularType;
+        protected TabularType intPropertyTabularType;
+        protected TabularType longPropertyTabularType;
+        protected TabularType floatPropertyTabularType;
+        protected TabularType doublePropertyTabularType;
 
         protected String getTypeName() {
             return ActiveMQMessage.class.getName();
@@ -112,7 +124,28 @@
             addItem("JMSPriority", "JMSPriority", SimpleType.INTEGER);
             addItem("JMSRedelivered", "JMSRedelivered", SimpleType.BOOLEAN);
             addItem("JMSTimestamp", "JMSTimestamp", SimpleType.DATE);
-            addItem("Properties", "Properties", SimpleType.STRING);
+            addItem(JMSXGROUP_ID, "Message Group ID", SimpleType.STRING);
+            addItem(JMSXGROUP_SEQ, "Message Group Sequence Number", SimpleType.INTEGER);
+            addItem(CompositeDataConstants.PROPERTIES, "User Properties Text", SimpleType.STRING);
+
+            // now lets expose the type safe properties
+            stringPropertyTabularType = createTabularType(String.class, SimpleType.STRING);
+            booleanPropertyTabularType = createTabularType(Boolean.class, SimpleType.BOOLEAN);
+            bytePropertyTabularType = createTabularType(Byte.class, SimpleType.BYTE);
+            shortPropertyTabularType = createTabularType(Short.class, SimpleType.SHORT);
+            intPropertyTabularType = createTabularType(Integer.class, SimpleType.INTEGER);
+            longPropertyTabularType = createTabularType(Long.class, SimpleType.LONG);
+            floatPropertyTabularType = createTabularType(Float.class, SimpleType.FLOAT);
+            doublePropertyTabularType = createTabularType(Double.class, SimpleType.DOUBLE);
+
+            addItem(CompositeDataConstants.STRING_PROPERTIES, "User String Properties", stringPropertyTabularType);
+            addItem(CompositeDataConstants.BOOLEAN_PROPERTIES, "User Boolean Properties", booleanPropertyTabularType);
+            addItem(CompositeDataConstants.BYTE_PROPERTIES, "User Byte Properties", bytePropertyTabularType);
+            addItem(CompositeDataConstants.SHORT_PROPERTIES, "User Short Properties", shortPropertyTabularType);
+            addItem(CompositeDataConstants.INT_PROPERTIES, "User Integer Properties", intPropertyTabularType);
+            addItem(CompositeDataConstants.LONG_PROPERTIES, "User Long Properties", longPropertyTabularType);
+            addItem(CompositeDataConstants.FLOAT_PROPERTIES, "User Float Properties", floatPropertyTabularType);
+            addItem(CompositeDataConstants.DOUBLE_PROPERTIES, "User Double Properties", doublePropertyTabularType);
         }
 
         public Map<String, Object> getFields(Object o) throws OpenDataException {
@@ -128,25 +161,98 @@
             rc.put("JMSPriority", Integer.valueOf(m.getJMSPriority()));
             rc.put("JMSRedelivered", Boolean.valueOf(m.getJMSRedelivered()));
             rc.put("JMSTimestamp", new Date(m.getJMSTimestamp()));
+            rc.put(JMSXGROUP_ID, m.getGroupID());
+            rc.put(JMSXGROUP_SEQ, m.getGroupSequence());
             try {
-                rc.put("Properties", "" + m.getProperties());
+                rc.put(CompositeDataConstants.PROPERTIES, "" + m.getProperties());
             } catch (IOException e) {
-                rc.put("Properties", "");
+                rc.put(CompositeDataConstants.PROPERTIES, "");
+            }
+
+            try {
+                rc.put(CompositeDataConstants.STRING_PROPERTIES, createTabularData(m, stringPropertyTabularType, String.class));
+            } catch (IOException e) {
+                rc.put(CompositeDataConstants.STRING_PROPERTIES, new TabularDataSupport(stringPropertyTabularType));
+            }
+            try {
+                rc.put(CompositeDataConstants.BOOLEAN_PROPERTIES, createTabularData(m, booleanPropertyTabularType, Boolean.class));
+            } catch (IOException e) {
+                rc.put(CompositeDataConstants.BOOLEAN_PROPERTIES, new TabularDataSupport(booleanPropertyTabularType));
+            }
+            try {
+                rc.put(CompositeDataConstants.BYTE_PROPERTIES, createTabularData(m, bytePropertyTabularType, Byte.class));
+            } catch (IOException e) {
+                rc.put(CompositeDataConstants.BYTE_PROPERTIES, new TabularDataSupport(bytePropertyTabularType));
+            }
+            try {
+                rc.put(CompositeDataConstants.SHORT_PROPERTIES, createTabularData(m, shortPropertyTabularType, Short.class));
+            } catch (IOException e) {
+                rc.put(CompositeDataConstants.SHORT_PROPERTIES, new TabularDataSupport(shortPropertyTabularType));
+            }
+            try {
+                rc.put(CompositeDataConstants.INT_PROPERTIES, createTabularData(m, intPropertyTabularType, Integer.class));
+            } catch (IOException e) {
+                rc.put(CompositeDataConstants.INT_PROPERTIES, new TabularDataSupport(intPropertyTabularType));
+            }
+            try {
+                rc.put(CompositeDataConstants.LONG_PROPERTIES, createTabularData(m, longPropertyTabularType, Long.class));
+            } catch (IOException e) {
+                rc.put(CompositeDataConstants.LONG_PROPERTIES, new TabularDataSupport(longPropertyTabularType));
+            }
+            try {
+                rc.put(CompositeDataConstants.FLOAT_PROPERTIES, createTabularData(m, floatPropertyTabularType, Float.class));
+            } catch (IOException e) {
+                rc.put(CompositeDataConstants.FLOAT_PROPERTIES, new TabularDataSupport(floatPropertyTabularType));
+            }
+            try {
+                rc.put(CompositeDataConstants.DOUBLE_PROPERTIES, createTabularData(m, doublePropertyTabularType, Double.class));
+            } catch (IOException e) {
+                rc.put(CompositeDataConstants.DOUBLE_PROPERTIES, new TabularDataSupport(doublePropertyTabularType));
             }
             return rc;
         }
+
+
+        protected <T> TabularType createTabularType(Class<T> type, OpenType openType) throws OpenDataException {
+            String typeName = "java.util.Map<java.lang.String, " + type.getName() + ">";
+            String[] keyValue = new String[]{"key", "value"};
+            OpenType[] openTypes = new OpenType[]{SimpleType.STRING, openType};
+            CompositeType rowType = new CompositeType(typeName, typeName, keyValue, keyValue, openTypes);
+            return new TabularType(typeName, typeName, rowType, new String[]{"key"});
+        }
+
+        protected TabularDataSupport createTabularData(ActiveMQMessage m, TabularType type, Class valueType) throws IOException, OpenDataException {
+            TabularDataSupport answer = new TabularDataSupport(type);
+            Set<Map.Entry<String,Object>> entries = m.getProperties().entrySet();
+            for (Map.Entry<String, Object> entry : entries) {
+                Object value = entry.getValue();
+                if (valueType.isInstance(value)) {
+                    CompositeDataSupport compositeData = createTabularRowValue(type, entry.getKey(), value);
+                    answer.put(compositeData);
+                }
+            }
+            return answer;
+        }
+
+        protected CompositeDataSupport createTabularRowValue(TabularType type, String key, Object value) throws OpenDataException {
+            Map<String,Object> fields = new HashMap<String, Object>();
+            fields.put("key", key);
+            fields.put("value", value);
+            return new CompositeDataSupport(type.getRowType(), fields);
+        }
     }
 
     static class ByteMessageOpenTypeFactory extends MessageOpenTypeFactory {
 
+
         protected String getTypeName() {
             return ActiveMQBytesMessage.class.getName();
         }
 
         protected void init() throws OpenDataException {
             super.init();
-            addItem("BodyLength", "Body length", SimpleType.LONG);
-            addItem("BodyPreview", "Body preview", new ArrayType(1, SimpleType.BYTE));
+            addItem(BODY_LENGTH, "Body length", SimpleType.LONG);
+            addItem(BODY_PREVIEW, "Body preview", new ArrayType(1, SimpleType.BYTE));
         }
 
         public Map<String, Object> getFields(Object o) throws OpenDataException {
@@ -155,9 +261,9 @@
             long length = 0;
             try {
                 length = m.getBodyLength();
-                rc.put("BodyLength", Long.valueOf(length));
+                rc.put(BODY_LENGTH, Long.valueOf(length));
             } catch (JMSException e) {
-                rc.put("BodyLength", Long.valueOf(0));
+                rc.put(BODY_LENGTH, Long.valueOf(0));
             }
             try {
                 byte preview[] = new byte[(int)Math.min(length, 255)];
@@ -171,9 +277,9 @@
                     data[i] = new Byte(preview[i]);
                 }
 
-                rc.put("BodyPreview", data);
+                rc.put(BODY_PREVIEW, data);
             } catch (JMSException e) {
-                rc.put("BodyPreview", new byte[] {});
+                rc.put(BODY_PREVIEW, new byte[] {});
             }
             return rc;
         }
@@ -181,22 +287,23 @@
     }
 
     static class MapMessageOpenTypeFactory extends MessageOpenTypeFactory {
+
         protected String getTypeName() {
             return ActiveMQMapMessage.class.getName();
         }
 
         protected void init() throws OpenDataException {
             super.init();
-            addItem("ContentMap", "Content map", SimpleType.STRING);
+            addItem(CONTENT_MAP, "Content map", SimpleType.STRING);
         }
 
         public Map<String, Object> getFields(Object o) throws OpenDataException {
             ActiveMQMapMessage m = (ActiveMQMapMessage)o;
             Map<String, Object> rc = super.getFields(o);
             try {
-                rc.put("ContentMap", "" + m.getContentMap());
+                rc.put(CONTENT_MAP, "" + m.getContentMap());
             } catch (JMSException e) {
-                rc.put("ContentMap", "");
+                rc.put(CONTENT_MAP, "");
             }
             return rc;
         }
@@ -233,22 +340,23 @@
     }
 
     static class TextMessageOpenTypeFactory extends MessageOpenTypeFactory {
+
         protected String getTypeName() {
             return ActiveMQTextMessage.class.getName();
         }
 
         protected void init() throws OpenDataException {
             super.init();
-            addItem("Text", "Text", SimpleType.STRING);
+            addItem(MESSAGE_TEXT, MESSAGE_TEXT, SimpleType.STRING);
         }
 
         public Map<String, Object> getFields(Object o) throws OpenDataException {
             ActiveMQTextMessage m = (ActiveMQTextMessage)o;
             Map<String, Object> rc = super.getFields(o);
             try {
-                rc.put("Text", "" + m.getText());
+                rc.put(MESSAGE_TEXT, "" + m.getText());
             } catch (JMSException e) {
-                rc.put("Text", "");
+                rc.put(MESSAGE_TEXT, "");
             }
             return rc;
         }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=687861&r1=687860&r2=687861&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java Thu Aug 21 13:57:56 2008
@@ -18,6 +18,8 @@
 
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
+import java.util.Map;
+import java.util.HashMap;
 import javax.jms.Connection;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
@@ -30,6 +32,7 @@
 import javax.management.openmbean.TabularData;
 import junit.textui.TestRunner;
 import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.BaseDestination;
 import org.apache.commons.logging.Log;
@@ -72,6 +75,7 @@
 
         // test all the various MBeans now we have a producer, consumer and
         // messages on a queue
+        assertSendViaMBean();
         assertQueueBrowseWorks();
         assertCreateAndDestroyDurableSubscriptions();
         assertConsumerCounts();
@@ -126,6 +130,77 @@
         assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
     }
 
+
+    protected void assertSendViaMBean() throws Exception {
+        String queueName = getDestinationString() + ".SendMBBean";
+
+        ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
+        echo("Create QueueView MBean...");
+        BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
+        broker.addQueue(queueName);
+
+        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + queueName + ",BrokerName=localhost");
+
+        echo("Create QueueView MBean...");
+        QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+
+        int count = 5;
+        for (int i = 0; i < count; i++) {
+            String body = "message:" + i;
+
+            Map headers = new HashMap();
+            headers.put("JMSCorrelationID", "MyCorrId");
+            headers.put("JMSDeliveryMode", Boolean.TRUE);
+            headers.put("JMSXGroupID", "MyGroupID");
+            headers.put("JMSXGroupSeq", 1234);
+            headers.put("JMSPriority", i);
+            headers.put("JMSType", "MyType");
+            headers.put("MyHeader", i);
+            headers.put("MyStringHeader", "StringHeader" + i);
+
+            proxy.sendTextMessage(headers, body);
+        }
+        
+        CompositeData[] compdatalist = proxy.browse();
+        if (compdatalist.length == 0) {
+            fail("There is no message in the queue:");
+        }
+        String[] messageIDs = new String[compdatalist.length];
+
+        for (int i = 0; i < compdatalist.length; i++) {
+            CompositeData cdata = compdatalist[i];
+
+            if (i == 0) {
+                echo("Columns: " + cdata.getCompositeType().keySet());
+            }
+
+            assertComplexData(cdata, "JMSCorrelationID", "MyCorrId");
+            assertComplexData(cdata, "JMSPriority", i);
+            assertComplexData(cdata, "JMSType", "MyType");
+            assertComplexData(cdata, "JMSCorrelationID", "MyCorrId");
+            assertComplexData(cdata, "PropertiesText", "{MyStringHeader=StringHeader" + i + ", MyHeader=" + i + "}");
+
+            Map intProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.INT_PROPERTIES);
+            assertEquals("intProperties size()", 1, intProperties.size());
+            assertEquals("intProperties.MyHeader", i, intProperties.get("MyHeader"));
+
+            Map stringProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.STRING_PROPERTIES);
+            assertEquals("stringProperties size()", 1, stringProperties.size());
+            assertEquals("stringProperties.MyHeader", "StringHeader" + i, stringProperties.get("MyStringHeader"));
+
+            assertComplexData(cdata, "JMSXGroupSeq", 1234);
+            assertComplexData(cdata, "JMSXGroupID", "MyGroupID");
+            assertComplexData(cdata, "Text", "message:" + i);
+
+        }
+    }
+
+    protected void assertComplexData(CompositeData cdata, String name, Object expected) {
+        Object value = cdata.get(name);
+        assertEquals("CData field: " + name, expected, value);
+    }
+
+
     protected void assertQueueBrowseWorks() throws Exception {
         Integer mbeancnt = mbeanServer.getMBeanCount();
         echo("Mbean count :" + mbeancnt);
@@ -309,6 +384,10 @@
         for (int i = 0; i < MESSAGE_COUNT; i++) {
             Message message = session.createTextMessage("Message: " + i);
             message.setIntProperty("counter", i);
+            message.setJMSCorrelationID("MyCorrelationID");
+            message.setJMSReplyTo(new ActiveMQQueue("MyReplyTo"));
+            message.setJMSType("MyType");
+            message.setJMSPriority(5);
             producer.send(message);
         }
         Thread.sleep(1000);