You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/11/01 21:08:23 UTC

[activemq-artemis] branch main updated: ARTEMIS-3461: add some tests and resolve various issues spotted with the prior changes

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 3f9de5f  ARTEMIS-3461: add some tests and resolve various issues spotted with the prior changes
3f9de5f is described below

commit 3f9de5fa304b828494a4b1ba1f380fe269d91055
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Mon Nov 1 20:00:15 2021 +0000

    ARTEMIS-3461: add some tests and resolve various issues spotted with the prior changes
    
    - Avoid blowing up on string bodies of any size if the valueSizeLimit bits are configured to disable limit
    - Dont NPE if amqp-value + binary body is sent without a content-type, as it always should be.
    - Include expected prefix when adding delivery delay and ingress time annotations.
    - Use the actual name for ingress time annotation, as with all other annotations.
    - Use correct object type when testing equality with content-type value.
    - Use consistent case for 'groupId' in different properties.
---
 .../apache/activemq/artemis/api/core/JsonUtil.java |   2 +-
 .../activemq/artemis/api/core/JsonUtilTest.java    |  43 ++-
 .../artemis/protocol/amqp/broker/AMQPMessage.java  |  22 +-
 .../amqp/converter/AMQPMessageSupport.java         |  10 +-
 .../protocol/amqp/broker/AMQPMessageTest.java      | 410 ++++++++++++++++++++-
 5 files changed, 468 insertions(+), 19 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java
index 69dfa8a..b1535bb 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java
@@ -326,7 +326,7 @@ public final class JsonUtil {
    }
 
    public static String truncateString(final String str, final int valueSizeLimit) {
-      if (str.length() > valueSizeLimit) {
+      if (valueSizeLimit >= 0 && str.length() > valueSizeLimit) {
          return new StringBuilder(valueSizeLimit + 32).append(str.substring(0, valueSizeLimit)).append(", + ").append(str.length() - valueSizeLimit).append(" more").toString();
       } else {
          return str;
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/api/core/JsonUtilTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/api/core/JsonUtilTest.java
index f6740dd..ab87cf2 100644
--- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/api/core/JsonUtilTest.java
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/api/core/JsonUtilTest.java
@@ -69,7 +69,8 @@ public class JsonUtilTest {
       Assert.assertEquals(6, jsonObject.getJsonArray("byteArray").size());
    }
 
-   @Test public void testAddByteArrayToJsonArray() {
+   @Test
+   public void testAddByteArrayToJsonArray() {
       JsonArrayBuilder jsonArrayBuilder = JsonLoader.createArrayBuilder();
       byte[] bytes = {0x0a, 0x1b, 0x2c, 0x3d, 0x4e, 0x5f};
 
@@ -79,4 +80,44 @@ public class JsonUtilTest {
 
       Assert.assertEquals(1, jsonArray.size());
    }
+
+   @Test
+   public void testTruncateUsingStringWithValueSizeLimit() {
+      String prefix = "12345";
+      int valueSizeLimit = prefix.length();
+      String remaining = "remaining";
+
+      String truncated = (String) JsonUtil.truncate(prefix + remaining, valueSizeLimit);
+
+      String expected = prefix + ", + " + String.valueOf(remaining.length()) + " more";
+      Assert.assertEquals(expected, truncated);
+   }
+
+   @Test
+   public void testTruncateUsingStringWithoutValueSizeLimit() {
+      String input = "testTruncateUsingStringWithoutValueSizeLimit";
+      String notTruncated = (String) JsonUtil.truncate(input, -1);
+
+      Assert.assertEquals(input, notTruncated);
+   }
+
+   @Test
+   public void testTruncateStringWithValueSizeLimit() {
+      String prefix = "12345";
+      int valueSizeLimit = prefix.length();
+      String remaining = "remaining";
+
+      String truncated = JsonUtil.truncateString(prefix + remaining, valueSizeLimit);
+
+      String expected = prefix + ", + " + String.valueOf(remaining.length()) + " more";
+      Assert.assertEquals(expected, truncated);
+   }
+
+   @Test
+   public void testTruncateStringWithoutValueSizeLimit() {
+      String input = "testTruncateStringWithoutValueSizeLimit";
+      String notTruncated = JsonUtil.truncateString(input, -1);
+
+      Assert.assertEquals(input, notTruncated);
+   }
 }
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 64da6eb..10904da 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -881,7 +881,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
             map.put(propertiesPrefix + "contentEncoding", properties.getContentEncoding().toString());
          }
          if (properties.getGroupId() != null) {
-            map.put(propertiesPrefix + "groupID", properties.getGroupId());
+            map.put(propertiesPrefix + "groupId", properties.getGroupId());
          }
          if (properties.getGroupSequence() != null) {
             map.put(propertiesPrefix + "groupSequence", properties.getGroupSequence().intValue());
@@ -916,9 +916,9 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
                map.put(prefix + "x-opt-delivery-time", deliveryTime);
             } else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) {
                long delay = ((Number) entry.getValue()).longValue();
-               map.put("x-opt-delivery-delay", delay);
+               map.put(prefix + "x-opt-delivery-delay", delay);
             } else if (AMQPMessageSupport.X_OPT_INGRESS_TIME.equals(key) && entry.getValue() != null) {
-               map.put("X_OPT_INGRESS_TIME", ((Number) entry.getValue()).longValue());
+               map.put(prefix + AMQPMessageSupport.X_OPT_INGRESS_TIME, ((Number) entry.getValue()).longValue());
             } else {
                try {
                   map.put(prefix + key, entry.getValue());
@@ -1878,30 +1878,25 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
          }
 
          final Symbol contentType = properties != null ? properties.getContentType() : null;
-         final String contentTypeString = contentType != null ? contentType.toString() : null;
 
          if (m.getBody() instanceof Data && contentType != null) {
-
-            if (contentType.equals(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE)) {
+            if (AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.equals(contentType)) {
                type = OBJECT_TYPE;
-            } else if (contentType.equals(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE)) {
+            } else if (AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE_SYMBOL.equals(contentType)) {
                type = BYTES_TYPE;
             } else {
-               Charset charset = getCharsetForTextualContent(contentTypeString);
+               Charset charset = getCharsetForTextualContent(contentType.toString());
                if (StandardCharsets.UTF_8.equals(charset)) {
                   type = TEXT_TYPE;
                }
             }
-         } else if (m.getBody() instanceof AmqpSequence) {
-            type = STREAM_TYPE;
          } else if (m.getBody() instanceof AmqpValue) {
             Object value = ((AmqpValue) m.getBody()).getValue();
 
             if (value instanceof String) {
                type = TEXT_TYPE;
             } else if (value instanceof Binary) {
-
-               if (contentType.equals(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE)) {
+               if (AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.equals(contentType)) {
                   type = OBJECT_TYPE;
                } else {
                   type = BYTES_TYPE;
@@ -1911,7 +1906,10 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
             } else if (value instanceof Map) {
                type = MAP_TYPE;
             }
+         } else if (m.getBody() instanceof AmqpSequence) {
+            type = STREAM_TYPE;
          }
+
          return type;
       }
    }
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
index c8e1eba..1b47944 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
@@ -66,6 +66,8 @@ public final class AMQPMessageSupport {
    public static SimpleString HDR_ORIGINAL_ADDRESS_ANNOTATION = SimpleString.toSimpleString("x-opt-ORIG-ADDRESS");
 
    public static final String JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = "x-opt-jms-reply-to";
+   public static final String X_OPT_DELIVERY_TIME = "x-opt-delivery-time";
+   public static final String X_OPT_DELIVERY_DELAY = "x-opt-delivery-delay";
 
    // Message Properties used to map AMQP to JMS and back
    /**
@@ -85,12 +87,12 @@ public final class AMQPMessageSupport {
    /**
     * Attribute used to mark the Application defined delivery time assigned to the message
     */
-   public static final Symbol SCHEDULED_DELIVERY_TIME = Symbol.getSymbol("x-opt-delivery-time");
+   public static final Symbol SCHEDULED_DELIVERY_TIME = Symbol.getSymbol(X_OPT_DELIVERY_TIME);
 
    /**
     * Attribute used to mark the Application defined delivery time assigned to the message
     */
-   public static final Symbol SCHEDULED_DELIVERY_DELAY = Symbol.getSymbol("x-opt-delivery-delay");
+   public static final Symbol SCHEDULED_DELIVERY_DELAY = Symbol.getSymbol(X_OPT_DELIVERY_DELAY);
 
    /**
     * Attribute used to mark the Application defined delivery time assigned to the message
@@ -204,10 +206,12 @@ public final class AMQPMessageSupport {
    public static final byte TEMP_QUEUE_TYPE = 0x02;
    public static final byte TEMP_TOPIC_TYPE = 0x03;
 
+   public static final String OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
+
    /**
     * Content type used to mark Data sections as containing arbitrary bytes.
     */
-   public static final String OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
+   public static final Symbol OCTET_STREAM_CONTENT_TYPE_SYMBOL = Symbol.valueOf(OCTET_STREAM_CONTENT_TYPE);
 
    /**
     * Lookup and return the correct Proton Symbol instance based on the given key.
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
index 3eb4d23..8a450ec 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
@@ -39,12 +39,17 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.OpenDataException;
+
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper;
 import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
@@ -54,6 +59,7 @@ import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
 import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedByte;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
@@ -86,12 +92,26 @@ public class AMQPMessageTest {
 
    private static final String TEST_MESSAGE_ANNOTATION_KEY = "x-opt-test-annotation";
    private static final String TEST_MESSAGE_ANNOTATION_VALUE = "test-annotation";
+   private static final String TEST_MESSAGE_ANNOTATION_KEY2 = "x-opt-test-annotation2";
+   private static final String TEST_MESSAGE_ANNOTATION_VALUE2 = "test-annotation2";
+
+   private static final String TEST_EXTRA_PROPERTY_KEY1 = "extraPropertyKey1";
+   private static final String TEST_EXTRA_PROPERTY_VALUE1 = "extraPropertyValue1";
+   private static final String TEST_EXTRA_PROPERTY_KEY2 = "extraPropertyKey2";
+   private static final String TEST_EXTRA_PROPERTY_VALUE2 = "extraPropertyValue2";
 
    private static final String TEST_APPLICATION_PROPERTY_KEY = "key-1";
    private static final String TEST_APPLICATION_PROPERTY_VALUE = "value-1";
+   private static final String TEST_APPLICATION_PROPERTY_KEY2 = "key-2";
+   private static final String TEST_APPLICATION_PROPERTY_VALUE2 = "value-2";
 
    private static final String TEST_STRING_BODY = "test-string-body";
 
+   private static final String PROPERTY_MAP_APP_PROPERTIES_PREFIX = "applicationProperties.";
+   private static final String PROPERTY_MAP_PROPERTIES_PREFIX = "properties.";
+   private static final String PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX = "messageAnnotations.";
+   private static final String PROPERTY_MAP_EXTRA_PROPERTIES_PREFIX = "extraProperties.";
+
    private byte[] encodedProtonMessage;
 
    @Before
@@ -2120,6 +2140,388 @@ public class AMQPMessageTest {
       assertEquals(annotationValue2, msgFromSendBuffer2.getDeliveryAnnotations().getValue().get(Symbol.getSymbol(annotationKey2)));
    }
 
+   //-----  CompositeData handling -------------------------------------------//
+
+   @Test
+   public void testToCompositeDataHeaderSectionDurable() throws Exception {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+
+      // With section missing (defaults false)
+      AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
+      CompositeData cd = decoded.toCompositeData(0, 0);
+
+      assertTrue(cd.containsKey(CompositeDataConstants.DURABLE));
+      Object durableObj = cd.get(CompositeDataConstants.DURABLE);
+      assertTrue(durableObj instanceof Boolean);
+
+      assertEquals(Boolean.FALSE, durableObj);
+
+      // With section present, but value not set (defaults false)
+      Header protonHeader = new Header();
+      protonMessage.setHeader(protonHeader);
+
+      decoded = encodeAndDecodeMessage(protonMessage);
+      cd = decoded.toCompositeData(0, 0);
+
+      assertTrue(cd.containsKey(CompositeDataConstants.DURABLE));
+      durableObj = cd.get(CompositeDataConstants.DURABLE);
+      assertTrue(durableObj instanceof Boolean);
+
+      assertEquals(Boolean.FALSE, durableObj);
+
+      // With section present, value set False explicitly
+      protonHeader = new Header();
+      protonHeader.setDurable(Boolean.FALSE);
+      protonMessage.setHeader(protonHeader);
+
+      decoded = encodeAndDecodeMessage(protonMessage);
+      cd = decoded.toCompositeData(0, 0);
+
+      assertTrue(cd.containsKey(CompositeDataConstants.DURABLE));
+      durableObj = cd.get(CompositeDataConstants.DURABLE);
+      assertTrue(durableObj instanceof Boolean);
+
+      assertEquals(Boolean.FALSE, durableObj);
+
+      // With section present, value set True explicitly
+      protonHeader = new Header();
+      protonHeader.setDurable(Boolean.TRUE);
+      protonMessage.setHeader(protonHeader);
+
+      decoded = encodeAndDecodeMessage(protonMessage);
+      cd = decoded.toCompositeData(0, 0);
+
+      assertTrue(cd.containsKey(CompositeDataConstants.DURABLE));
+      durableObj = cd.get(CompositeDataConstants.DURABLE);
+      assertTrue(durableObj instanceof Boolean);
+
+      assertEquals(Boolean.TRUE, durableObj);
+   }
+
+   @Test
+   public void testToCompositeDataHeaderSectionPriority() throws Exception {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+
+      // With section missing (defaults 4)
+      AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
+      CompositeData cd = decoded.toCompositeData(0, 0);
+
+      assertTrue(cd.containsKey(CompositeDataConstants.PRIORITY));
+      Object priorityObj = cd.get(CompositeDataConstants.PRIORITY);
+      assertTrue(priorityObj instanceof Byte);
+
+      assertEquals(Byte.valueOf((byte) 4), priorityObj);
+
+      // With section present, but value not set (defaults 4)
+      Header protonHeader = new Header();
+      protonMessage.setHeader(protonHeader);
+
+      decoded = encodeAndDecodeMessage(protonMessage);
+      cd = decoded.toCompositeData(0, 0);
+
+      assertTrue(cd.containsKey(CompositeDataConstants.PRIORITY));
+      priorityObj = cd.get(CompositeDataConstants.PRIORITY);
+      assertTrue(priorityObj instanceof Byte);
+
+      assertEquals(Byte.valueOf((byte) 4), priorityObj);
+
+      // With section present, value set to 5 explicitly
+      protonHeader = new Header();
+      protonHeader.setPriority(UnsignedByte.valueOf((byte) 5));
+      protonMessage.setHeader(protonHeader);
+
+      decoded = encodeAndDecodeMessage(protonMessage);
+      cd = decoded.toCompositeData(0, 0);
+
+      assertTrue(cd.containsKey(CompositeDataConstants.PRIORITY));
+      priorityObj = cd.get(CompositeDataConstants.PRIORITY);
+      assertTrue(priorityObj instanceof Byte);
+
+      assertEquals(Byte.valueOf((byte) 5), priorityObj);
+   }
+
+   @Test
+   public void testToCompositeDataPropertiesSection() throws Exception {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+
+      String testContentEncoding = "gzip";
+      String testGroupId = "testGroupId";
+      int testGroupSequence = 45678;
+      String testReplyToGroupId = "testReplyToGroupId";
+      long testCreationTime = System.currentTimeMillis();
+      long testExpiryTime = testCreationTime + 5000;
+      String testSubject = "testSubject";
+      String testMessageId = "testMessageId";
+
+      Properties protonProperties = new Properties();
+      protonProperties.setContentType(Symbol.valueOf(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE));
+      protonProperties.setContentEncoding(Symbol.valueOf(testContentEncoding));
+      protonProperties.setGroupId(testGroupId);
+      protonProperties.setGroupSequence(UnsignedInteger.valueOf(testGroupSequence));
+      protonProperties.setReplyToGroupId(testReplyToGroupId);
+      protonProperties.setCreationTime(new Date(testCreationTime));
+      protonProperties.setAbsoluteExpiryTime(new Date(testExpiryTime));
+      protonProperties.setSubject(testSubject);
+      protonProperties.setTo(TEST_TO_ADDRESS);
+      protonProperties.setMessageId(testMessageId);
+
+      protonMessage.setProperties(protonProperties);
+
+      AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      CompositeData cd = decoded.toCompositeData(-1, 0);
+
+      assertTrue(cd.containsKey(CompositeDataConstants.PROPERTIES));
+      Object propsObject = cd.get(CompositeDataConstants.PROPERTIES);
+      assertTrue(propsObject instanceof String);
+      String properties = (String) propsObject;
+
+      assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "contentType" + "=" + AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE));
+      assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "contentEncoding" + "=" + testContentEncoding));
+      assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "groupId" + "=" + testGroupId));
+      assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "groupSequence" + "=" + testGroupSequence));
+      assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "replyToGroupId" + "=" + testReplyToGroupId));
+      assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "creationTime" + "=" + testCreationTime));
+      assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "absoluteExpiryTime" + "=" + testExpiryTime));
+      assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "to" + "=" + TEST_TO_ADDRESS));
+      assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "subject" + "=" + testSubject));
+
+      // TODO: should these fields be included in the 'properties' string and tested above?
+      // Some are shown elsewhere in a way, others missing entirely. Eg:
+      //
+      // correlation-id: not included.
+      // message-id: included'ish, with an ID: prefix added, as the CompositeDataConstants.USER_ID.
+      // reply-to: not included, though the replyToGroupId is given as shown above.
+      // user-id: not included.
+
+      // Some fields of the properties section already align with fields given
+      // their own top level entries of the CompositeData, which remain:
+
+      // The message-id is presented via the 'user id' field, inc an added prefix.
+      assertTrue(cd.containsKey(CompositeDataConstants.USER_ID));
+      Object messageIdObj = cd.get(CompositeDataConstants.USER_ID);
+      assertTrue(messageIdObj instanceof String);
+
+      assertEquals(AMQPMessageIdHelper.JMS_ID_PREFIX + testMessageId, messageIdObj);
+
+      // The creation-time is duplicated as the 'timestamp' field
+      assertTrue(cd.containsKey(CompositeDataConstants.TIMESTAMP));
+      Object timestampObj = cd.get(CompositeDataConstants.TIMESTAMP);
+      assertTrue(timestampObj instanceof Long);
+
+      assertEquals(testCreationTime, timestampObj);
+   }
+
+   @Test
+   public void testToCompositeDataApplicationPropertiesSection() throws Exception {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+
+      Map<String, Object> appPropsMap = new HashMap<>();
+      appPropsMap.put(TEST_APPLICATION_PROPERTY_KEY, TEST_APPLICATION_PROPERTY_VALUE);
+      appPropsMap.put(TEST_APPLICATION_PROPERTY_KEY2, TEST_APPLICATION_PROPERTY_VALUE2);
+      ApplicationProperties appProps = new ApplicationProperties(appPropsMap);
+
+      protonMessage.setApplicationProperties(appProps);
+
+      AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      CompositeData cd = decoded.toCompositeData(-1, 0);
+
+      assertTrue(cd.containsKey(CompositeDataConstants.PROPERTIES));
+      Object propsObject = cd.get(CompositeDataConstants.PROPERTIES);
+      assertTrue(propsObject instanceof String);
+      String properties = (String) propsObject;
+
+      assertTrue(properties.contains(PROPERTY_MAP_APP_PROPERTIES_PREFIX +
+            TEST_APPLICATION_PROPERTY_KEY + "=" + TEST_APPLICATION_PROPERTY_VALUE));
+      assertTrue(properties.contains(PROPERTY_MAP_APP_PROPERTIES_PREFIX +
+            TEST_APPLICATION_PROPERTY_KEY2 + "=" + TEST_APPLICATION_PROPERTY_VALUE2));
+   }
+
+   @Test
+   public void testToCompositeDataMessageAnnotationSection() throws Exception {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+
+      Map<Symbol, Object> annotationsMap = new HashMap<>();
+      annotationsMap.put(Symbol.valueOf(TEST_MESSAGE_ANNOTATION_KEY), TEST_MESSAGE_ANNOTATION_VALUE);
+      annotationsMap.put(Symbol.valueOf(TEST_MESSAGE_ANNOTATION_KEY2), TEST_MESSAGE_ANNOTATION_VALUE2);
+      MessageAnnotations annotations = new MessageAnnotations(annotationsMap);
+
+      protonMessage.setMessageAnnotations(annotations);
+
+      AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      CompositeData cd = decoded.toCompositeData(-1, 0);
+
+      assertTrue(cd.containsKey(CompositeDataConstants.PROPERTIES));
+      Object propsObject = cd.get(CompositeDataConstants.PROPERTIES);
+      assertTrue(propsObject instanceof String);
+      String properties = (String) propsObject;
+
+      assertTrue(properties.contains(PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX +
+            TEST_MESSAGE_ANNOTATION_KEY + "=" + TEST_MESSAGE_ANNOTATION_VALUE));
+      assertTrue(properties.contains(PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX +
+            TEST_MESSAGE_ANNOTATION_KEY2 + "=" + TEST_MESSAGE_ANNOTATION_VALUE2));
+
+      // Now try some specific annotations with their own handling
+      long testIngressTime = System.currentTimeMillis();
+      long testDeliveryTime = System.currentTimeMillis() + 5678;
+      long testDeliveryDelay = 6789;
+
+      annotationsMap = new HashMap<>();
+      annotationsMap.put(Symbol.valueOf(AMQPMessageSupport.X_OPT_INGRESS_TIME), testIngressTime);
+      annotationsMap.put(Symbol.valueOf(AMQPMessageSupport.X_OPT_DELIVERY_TIME), testDeliveryTime);
+      annotationsMap.put(Symbol.valueOf(AMQPMessageSupport.X_OPT_DELIVERY_DELAY), testDeliveryDelay);
+      annotations = new MessageAnnotations(annotationsMap);
+      protonMessage.setMessageAnnotations(annotations);
+
+      decoded = encodeAndDecodeMessage(protonMessage);
+
+      cd = decoded.toCompositeData(-1, 0);
+
+      assertTrue(cd.containsKey(CompositeDataConstants.PROPERTIES));
+      propsObject = cd.get(CompositeDataConstants.PROPERTIES);
+      assertTrue(propsObject instanceof String);
+      properties = (String) propsObject;
+
+      assertTrue(properties.contains(PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX +
+            AMQPMessageSupport.X_OPT_INGRESS_TIME + "=" + testIngressTime));
+      assertTrue(properties.contains(PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX +
+            AMQPMessageSupport.X_OPT_DELIVERY_TIME + "=" + testDeliveryTime));
+      assertTrue(properties.contains(PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX +
+            AMQPMessageSupport.X_OPT_DELIVERY_DELAY + "=" + testDeliveryDelay));
+   }
+
+   @Test
+   public void testToCompositeDataExtraProperties() throws Exception {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+
+      TypedProperties extraProperties = new TypedProperties();
+      extraProperties.putProperty(new SimpleString(TEST_EXTRA_PROPERTY_KEY1), TEST_EXTRA_PROPERTY_VALUE1);
+      extraProperties.putProperty(new SimpleString(TEST_EXTRA_PROPERTY_KEY2), TEST_EXTRA_PROPERTY_VALUE2);
+
+      AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage, extraProperties);
+
+      CompositeData cd = decoded.toCompositeData(-1, 0);
+
+      assertTrue(cd.containsKey(CompositeDataConstants.PROPERTIES));
+      Object propsObject = cd.get(CompositeDataConstants.PROPERTIES);
+      assertTrue(propsObject instanceof String);
+      String properties = (String) propsObject;
+
+      assertTrue(properties.contains(PROPERTY_MAP_EXTRA_PROPERTIES_PREFIX +
+            TEST_EXTRA_PROPERTY_KEY1 + "=" + TEST_EXTRA_PROPERTY_VALUE1));
+      assertTrue(properties.contains(PROPERTY_MAP_EXTRA_PROPERTIES_PREFIX +
+            TEST_EXTRA_PROPERTY_KEY2 + "=" + TEST_EXTRA_PROPERTY_VALUE2));
+   }
+
+   @Test
+   public void testToCompositeDataWithDataBodySectionWithoutContentType() throws Exception {
+      doToCompositeDataWithDataBodySectionTestImpl(null, org.apache.activemq.artemis.api.core.Message.BYTES_TYPE);
+   }
+
+   @Test
+   public void testToCompositeDataWithDataBodySectionWithOctetStreamContentType() throws Exception {
+      doToCompositeDataWithDataBodySectionTestImpl(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE, org.apache.activemq.artemis.api.core.Message.BYTES_TYPE);
+   }
+
+   @Test
+   public void testToCompositeDataWithDataBodySectionWithTextPlainContentType() throws Exception {
+      doToCompositeDataWithDataBodySectionTestImpl("text/plain", org.apache.activemq.artemis.api.core.Message.TEXT_TYPE);
+   }
+
+   @Test
+   public void testToCompositeDataWithDataBodySectionWithSerializedObjectContentType() throws Exception {
+      doToCompositeDataWithDataBodySectionTestImpl(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE);
+   }
+
+   private void doToCompositeDataWithDataBodySectionTestImpl(String contentType, byte expectedMessageType) throws OpenDataException {
+      Message protonMessage = Message.Factory.create();
+
+      // Not the right payload for some of the content types,but it
+      // doesnt matter, mainly checking type value and for lack of NPEs.
+      String bytesSource = "testPayloadBytes";
+      String expectedBodyText = "Data{" + bytesSource + "}";
+      Data body = new Data(new Binary(bytesSource.getBytes(StandardCharsets.UTF_8)));
+
+      protonMessage.setBody(body);
+      protonMessage.setContentType(contentType);
+
+      AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      CompositeData cd = decoded.toCompositeData(-1, 0);
+
+      assertTrue(cd.containsKey(CompositeDataConstants.TEXT_BODY));
+      assertEquals(expectedBodyText, cd.get(CompositeDataConstants.TEXT_BODY));
+
+      assertTrue(cd.containsKey(CompositeDataConstants.TYPE));
+      assertEquals(expectedMessageType, cd.get(CompositeDataConstants.TYPE));
+   }
+
+   @Test
+   public void testToCompositeDataWithAmqpValueBinaryBodySectionWithoutContentType() throws Exception {
+      doToCompositeDataWithAmqpValueBodySectionWithBinaryTestImpl(null, org.apache.activemq.artemis.api.core.Message.BYTES_TYPE);
+   }
+
+   @Test
+   public void testToCompositeDataWithAmqpValueBinaryBodySectionWithSerializedObjectContentType() throws Exception {
+      // Shouldnt really get in this situation, not meant to use content-type without the Data body section.
+      doToCompositeDataWithAmqpValueBodySectionWithBinaryTestImpl(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE);
+   }
+
+   private void doToCompositeDataWithAmqpValueBodySectionWithBinaryTestImpl(String contentType, byte expectedMessageType) throws OpenDataException {
+      Message protonMessage = Message.Factory.create();
+
+      // Not the right payload for some of the content types,but it
+      // doesnt matter, mainly checking type value and for lack of NPEs.
+      String bytesSource = "testPayloadBytes";
+      AmqpValue body = new AmqpValue(new Binary(bytesSource.getBytes(StandardCharsets.UTF_8)));
+
+      protonMessage.setBody(body);
+      protonMessage.setContentType(contentType);
+
+      AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      CompositeData cd = decoded.toCompositeData(-1, 0);
+
+      assertTrue(cd.containsKey(CompositeDataConstants.TEXT_BODY));
+      assertEquals(bytesSource, cd.get(CompositeDataConstants.TEXT_BODY));
+
+      assertTrue(cd.containsKey(CompositeDataConstants.TYPE));
+      assertEquals(expectedMessageType, cd.get(CompositeDataConstants.TYPE));
+   }
+
+   @Test
+   public void testToCompositeDataWithStringBodyWithoutValueSizeLimit() throws Exception {
+      doToCompositeDataWithStringBodyValueSizeLimitTestImpl(-1, TEST_STRING_BODY);
+   }
+
+   @Test
+   public void testToCompositeDataWithStringBodyWithValueSizeLimit() throws Exception {
+      int limit = 11;
+      int testBodyLength = TEST_STRING_BODY.length();
+      assertTrue(testBodyLength > limit);
+
+      String expected = TEST_STRING_BODY.substring(0, limit) + ", + " + String.valueOf(testBodyLength - limit) + " more";
+
+      doToCompositeDataWithStringBodyValueSizeLimitTestImpl(limit, expected);
+   }
+
+   private void doToCompositeDataWithStringBodyValueSizeLimitTestImpl(int fieldsLimit, String expectedBodyText) throws OpenDataException {
+      Message protonMessage = Message.Factory.create();
+      protonMessage.setBody(new AmqpValue(TEST_STRING_BODY));
+
+      AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      CompositeData cd = decoded.toCompositeData(fieldsLimit, 0);
+
+      assertTrue(cd.containsKey(CompositeDataConstants.TEXT_BODY));
+      assertEquals(expectedBodyText, cd.get(CompositeDataConstants.TEXT_BODY));
+
+      assertTrue(cd.containsKey(CompositeDataConstants.TYPE));
+      assertEquals(org.apache.activemq.artemis.api.core.Message.TEXT_TYPE, cd.get(CompositeDataConstants.TYPE));
+   }
+
    //----- Test Support ------------------------------------------------------//
 
    private MessageImpl createProtonMessage() {
@@ -2483,13 +2885,17 @@ public class AMQPMessageTest {
       return bytes;
    }
 
-   private AMQPStandardMessage encodeAndDecodeMessage(MessageImpl message) {
+   private AMQPStandardMessage encodeAndDecodeMessage(Message message) {
+      return encodeAndDecodeMessage(message, null);
+   }
+
+   private AMQPStandardMessage encodeAndDecodeMessage(Message message, TypedProperties extraProperties) {
       ByteBuf nettyBuffer = Unpooled.buffer(1500);
 
       message.encode(new NettyWritable(nettyBuffer));
       byte[] bytes = new byte[nettyBuffer.writerIndex()];
       nettyBuffer.readBytes(bytes);
 
-      return new AMQPStandardMessage(0, bytes, null);
+      return new AMQPStandardMessage(0, bytes, extraProperties);
    }
 }