You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/11/01 21:08:23 UTC
[activemq-artemis] branch main updated: ARTEMIS-3461: add some
tests and resolve various issues spotted with the prior changes
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 3f9de5f ARTEMIS-3461: add some tests and resolve various issues spotted with the prior changes
3f9de5f is described below
commit 3f9de5fa304b828494a4b1ba1f380fe269d91055
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Mon Nov 1 20:00:15 2021 +0000
ARTEMIS-3461: add some tests and resolve various issues spotted with the prior changes
- Avoid blowing up on string bodies of any size if the valueSizeLimit bits are configured to disable limit
- Dont NPE if amqp-value + binary body is sent without a content-type, as it always should be.
- Include expected prefix when adding delivery delay and ingress time annotations.
- Use the actual name for ingress time annotation, as with all other annotations.
- Use correct object type when testing equality with content-type value.
- Use consistent case for 'groupId' in different properties.
---
.../apache/activemq/artemis/api/core/JsonUtil.java | 2 +-
.../activemq/artemis/api/core/JsonUtilTest.java | 43 ++-
.../artemis/protocol/amqp/broker/AMQPMessage.java | 22 +-
.../amqp/converter/AMQPMessageSupport.java | 10 +-
.../protocol/amqp/broker/AMQPMessageTest.java | 410 ++++++++++++++++++++-
5 files changed, 468 insertions(+), 19 deletions(-)
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java
index 69dfa8a..b1535bb 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java
@@ -326,7 +326,7 @@ public final class JsonUtil {
}
public static String truncateString(final String str, final int valueSizeLimit) {
- if (str.length() > valueSizeLimit) {
+ if (valueSizeLimit >= 0 && str.length() > valueSizeLimit) {
return new StringBuilder(valueSizeLimit + 32).append(str.substring(0, valueSizeLimit)).append(", + ").append(str.length() - valueSizeLimit).append(" more").toString();
} else {
return str;
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/api/core/JsonUtilTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/api/core/JsonUtilTest.java
index f6740dd..ab87cf2 100644
--- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/api/core/JsonUtilTest.java
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/api/core/JsonUtilTest.java
@@ -69,7 +69,8 @@ public class JsonUtilTest {
Assert.assertEquals(6, jsonObject.getJsonArray("byteArray").size());
}
- @Test public void testAddByteArrayToJsonArray() {
+ @Test
+ public void testAddByteArrayToJsonArray() {
JsonArrayBuilder jsonArrayBuilder = JsonLoader.createArrayBuilder();
byte[] bytes = {0x0a, 0x1b, 0x2c, 0x3d, 0x4e, 0x5f};
@@ -79,4 +80,44 @@ public class JsonUtilTest {
Assert.assertEquals(1, jsonArray.size());
}
+
+ @Test
+ public void testTruncateUsingStringWithValueSizeLimit() {
+ String prefix = "12345";
+ int valueSizeLimit = prefix.length();
+ String remaining = "remaining";
+
+ String truncated = (String) JsonUtil.truncate(prefix + remaining, valueSizeLimit);
+
+ String expected = prefix + ", + " + String.valueOf(remaining.length()) + " more";
+ Assert.assertEquals(expected, truncated);
+ }
+
+ @Test
+ public void testTruncateUsingStringWithoutValueSizeLimit() {
+ String input = "testTruncateUsingStringWithoutValueSizeLimit";
+ String notTruncated = (String) JsonUtil.truncate(input, -1);
+
+ Assert.assertEquals(input, notTruncated);
+ }
+
+ @Test
+ public void testTruncateStringWithValueSizeLimit() {
+ String prefix = "12345";
+ int valueSizeLimit = prefix.length();
+ String remaining = "remaining";
+
+ String truncated = JsonUtil.truncateString(prefix + remaining, valueSizeLimit);
+
+ String expected = prefix + ", + " + String.valueOf(remaining.length()) + " more";
+ Assert.assertEquals(expected, truncated);
+ }
+
+ @Test
+ public void testTruncateStringWithoutValueSizeLimit() {
+ String input = "testTruncateStringWithoutValueSizeLimit";
+ String notTruncated = JsonUtil.truncateString(input, -1);
+
+ Assert.assertEquals(input, notTruncated);
+ }
}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 64da6eb..10904da 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -881,7 +881,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
map.put(propertiesPrefix + "contentEncoding", properties.getContentEncoding().toString());
}
if (properties.getGroupId() != null) {
- map.put(propertiesPrefix + "groupID", properties.getGroupId());
+ map.put(propertiesPrefix + "groupId", properties.getGroupId());
}
if (properties.getGroupSequence() != null) {
map.put(propertiesPrefix + "groupSequence", properties.getGroupSequence().intValue());
@@ -916,9 +916,9 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
map.put(prefix + "x-opt-delivery-time", deliveryTime);
} else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) {
long delay = ((Number) entry.getValue()).longValue();
- map.put("x-opt-delivery-delay", delay);
+ map.put(prefix + "x-opt-delivery-delay", delay);
} else if (AMQPMessageSupport.X_OPT_INGRESS_TIME.equals(key) && entry.getValue() != null) {
- map.put("X_OPT_INGRESS_TIME", ((Number) entry.getValue()).longValue());
+ map.put(prefix + AMQPMessageSupport.X_OPT_INGRESS_TIME, ((Number) entry.getValue()).longValue());
} else {
try {
map.put(prefix + key, entry.getValue());
@@ -1878,30 +1878,25 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
}
final Symbol contentType = properties != null ? properties.getContentType() : null;
- final String contentTypeString = contentType != null ? contentType.toString() : null;
if (m.getBody() instanceof Data && contentType != null) {
-
- if (contentType.equals(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE)) {
+ if (AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.equals(contentType)) {
type = OBJECT_TYPE;
- } else if (contentType.equals(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE)) {
+ } else if (AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE_SYMBOL.equals(contentType)) {
type = BYTES_TYPE;
} else {
- Charset charset = getCharsetForTextualContent(contentTypeString);
+ Charset charset = getCharsetForTextualContent(contentType.toString());
if (StandardCharsets.UTF_8.equals(charset)) {
type = TEXT_TYPE;
}
}
- } else if (m.getBody() instanceof AmqpSequence) {
- type = STREAM_TYPE;
} else if (m.getBody() instanceof AmqpValue) {
Object value = ((AmqpValue) m.getBody()).getValue();
if (value instanceof String) {
type = TEXT_TYPE;
} else if (value instanceof Binary) {
-
- if (contentType.equals(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE)) {
+ if (AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.equals(contentType)) {
type = OBJECT_TYPE;
} else {
type = BYTES_TYPE;
@@ -1911,7 +1906,10 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
} else if (value instanceof Map) {
type = MAP_TYPE;
}
+ } else if (m.getBody() instanceof AmqpSequence) {
+ type = STREAM_TYPE;
}
+
return type;
}
}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
index c8e1eba..1b47944 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
@@ -66,6 +66,8 @@ public final class AMQPMessageSupport {
public static SimpleString HDR_ORIGINAL_ADDRESS_ANNOTATION = SimpleString.toSimpleString("x-opt-ORIG-ADDRESS");
public static final String JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = "x-opt-jms-reply-to";
+ public static final String X_OPT_DELIVERY_TIME = "x-opt-delivery-time";
+ public static final String X_OPT_DELIVERY_DELAY = "x-opt-delivery-delay";
// Message Properties used to map AMQP to JMS and back
/**
@@ -85,12 +87,12 @@ public final class AMQPMessageSupport {
/**
* Attribute used to mark the Application defined delivery time assigned to the message
*/
- public static final Symbol SCHEDULED_DELIVERY_TIME = Symbol.getSymbol("x-opt-delivery-time");
+ public static final Symbol SCHEDULED_DELIVERY_TIME = Symbol.getSymbol(X_OPT_DELIVERY_TIME);
/**
* Attribute used to mark the Application defined delivery time assigned to the message
*/
- public static final Symbol SCHEDULED_DELIVERY_DELAY = Symbol.getSymbol("x-opt-delivery-delay");
+ public static final Symbol SCHEDULED_DELIVERY_DELAY = Symbol.getSymbol(X_OPT_DELIVERY_DELAY);
/**
* Attribute used to mark the Application defined delivery time assigned to the message
@@ -204,10 +206,12 @@ public final class AMQPMessageSupport {
public static final byte TEMP_QUEUE_TYPE = 0x02;
public static final byte TEMP_TOPIC_TYPE = 0x03;
+ public static final String OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
+
/**
* Content type used to mark Data sections as containing arbitrary bytes.
*/
- public static final String OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
+ public static final Symbol OCTET_STREAM_CONTENT_TYPE_SYMBOL = Symbol.valueOf(OCTET_STREAM_CONTENT_TYPE);
/**
* Lookup and return the correct Proton Symbol instance based on the given key.
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
index 3eb4d23..8a450ec 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
@@ -39,12 +39,17 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.OpenDataException;
+
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
@@ -54,6 +59,7 @@ import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.UnsignedInteger;
@@ -86,12 +92,26 @@ public class AMQPMessageTest {
private static final String TEST_MESSAGE_ANNOTATION_KEY = "x-opt-test-annotation";
private static final String TEST_MESSAGE_ANNOTATION_VALUE = "test-annotation";
+ private static final String TEST_MESSAGE_ANNOTATION_KEY2 = "x-opt-test-annotation2";
+ private static final String TEST_MESSAGE_ANNOTATION_VALUE2 = "test-annotation2";
+
+ private static final String TEST_EXTRA_PROPERTY_KEY1 = "extraPropertyKey1";
+ private static final String TEST_EXTRA_PROPERTY_VALUE1 = "extraPropertyValue1";
+ private static final String TEST_EXTRA_PROPERTY_KEY2 = "extraPropertyKey2";
+ private static final String TEST_EXTRA_PROPERTY_VALUE2 = "extraPropertyValue2";
private static final String TEST_APPLICATION_PROPERTY_KEY = "key-1";
private static final String TEST_APPLICATION_PROPERTY_VALUE = "value-1";
+ private static final String TEST_APPLICATION_PROPERTY_KEY2 = "key-2";
+ private static final String TEST_APPLICATION_PROPERTY_VALUE2 = "value-2";
private static final String TEST_STRING_BODY = "test-string-body";
+ private static final String PROPERTY_MAP_APP_PROPERTIES_PREFIX = "applicationProperties.";
+ private static final String PROPERTY_MAP_PROPERTIES_PREFIX = "properties.";
+ private static final String PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX = "messageAnnotations.";
+ private static final String PROPERTY_MAP_EXTRA_PROPERTIES_PREFIX = "extraProperties.";
+
private byte[] encodedProtonMessage;
@Before
@@ -2120,6 +2140,388 @@ public class AMQPMessageTest {
assertEquals(annotationValue2, msgFromSendBuffer2.getDeliveryAnnotations().getValue().get(Symbol.getSymbol(annotationKey2)));
}
+ //----- CompositeData handling -------------------------------------------//
+
+ @Test
+ public void testToCompositeDataHeaderSectionDurable() throws Exception {
+ MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+
+ // With section missing (defaults false)
+ AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
+ CompositeData cd = decoded.toCompositeData(0, 0);
+
+ assertTrue(cd.containsKey(CompositeDataConstants.DURABLE));
+ Object durableObj = cd.get(CompositeDataConstants.DURABLE);
+ assertTrue(durableObj instanceof Boolean);
+
+ assertEquals(Boolean.FALSE, durableObj);
+
+ // With section present, but value not set (defaults false)
+ Header protonHeader = new Header();
+ protonMessage.setHeader(protonHeader);
+
+ decoded = encodeAndDecodeMessage(protonMessage);
+ cd = decoded.toCompositeData(0, 0);
+
+ assertTrue(cd.containsKey(CompositeDataConstants.DURABLE));
+ durableObj = cd.get(CompositeDataConstants.DURABLE);
+ assertTrue(durableObj instanceof Boolean);
+
+ assertEquals(Boolean.FALSE, durableObj);
+
+ // With section present, value set False explicitly
+ protonHeader = new Header();
+ protonHeader.setDurable(Boolean.FALSE);
+ protonMessage.setHeader(protonHeader);
+
+ decoded = encodeAndDecodeMessage(protonMessage);
+ cd = decoded.toCompositeData(0, 0);
+
+ assertTrue(cd.containsKey(CompositeDataConstants.DURABLE));
+ durableObj = cd.get(CompositeDataConstants.DURABLE);
+ assertTrue(durableObj instanceof Boolean);
+
+ assertEquals(Boolean.FALSE, durableObj);
+
+ // With section present, value set True explicitly
+ protonHeader = new Header();
+ protonHeader.setDurable(Boolean.TRUE);
+ protonMessage.setHeader(protonHeader);
+
+ decoded = encodeAndDecodeMessage(protonMessage);
+ cd = decoded.toCompositeData(0, 0);
+
+ assertTrue(cd.containsKey(CompositeDataConstants.DURABLE));
+ durableObj = cd.get(CompositeDataConstants.DURABLE);
+ assertTrue(durableObj instanceof Boolean);
+
+ assertEquals(Boolean.TRUE, durableObj);
+ }
+
+ @Test
+ public void testToCompositeDataHeaderSectionPriority() throws Exception {
+ MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+
+ // With section missing (defaults 4)
+ AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
+ CompositeData cd = decoded.toCompositeData(0, 0);
+
+ assertTrue(cd.containsKey(CompositeDataConstants.PRIORITY));
+ Object priorityObj = cd.get(CompositeDataConstants.PRIORITY);
+ assertTrue(priorityObj instanceof Byte);
+
+ assertEquals(Byte.valueOf((byte) 4), priorityObj);
+
+ // With section present, but value not set (defaults 4)
+ Header protonHeader = new Header();
+ protonMessage.setHeader(protonHeader);
+
+ decoded = encodeAndDecodeMessage(protonMessage);
+ cd = decoded.toCompositeData(0, 0);
+
+ assertTrue(cd.containsKey(CompositeDataConstants.PRIORITY));
+ priorityObj = cd.get(CompositeDataConstants.PRIORITY);
+ assertTrue(priorityObj instanceof Byte);
+
+ assertEquals(Byte.valueOf((byte) 4), priorityObj);
+
+ // With section present, value set to 5 explicitly
+ protonHeader = new Header();
+ protonHeader.setPriority(UnsignedByte.valueOf((byte) 5));
+ protonMessage.setHeader(protonHeader);
+
+ decoded = encodeAndDecodeMessage(protonMessage);
+ cd = decoded.toCompositeData(0, 0);
+
+ assertTrue(cd.containsKey(CompositeDataConstants.PRIORITY));
+ priorityObj = cd.get(CompositeDataConstants.PRIORITY);
+ assertTrue(priorityObj instanceof Byte);
+
+ assertEquals(Byte.valueOf((byte) 5), priorityObj);
+ }
+
+ @Test
+ public void testToCompositeDataPropertiesSection() throws Exception {
+ MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+
+ String testContentEncoding = "gzip";
+ String testGroupId = "testGroupId";
+ int testGroupSequence = 45678;
+ String testReplyToGroupId = "testReplyToGroupId";
+ long testCreationTime = System.currentTimeMillis();
+ long testExpiryTime = testCreationTime + 5000;
+ String testSubject = "testSubject";
+ String testMessageId = "testMessageId";
+
+ Properties protonProperties = new Properties();
+ protonProperties.setContentType(Symbol.valueOf(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE));
+ protonProperties.setContentEncoding(Symbol.valueOf(testContentEncoding));
+ protonProperties.setGroupId(testGroupId);
+ protonProperties.setGroupSequence(UnsignedInteger.valueOf(testGroupSequence));
+ protonProperties.setReplyToGroupId(testReplyToGroupId);
+ protonProperties.setCreationTime(new Date(testCreationTime));
+ protonProperties.setAbsoluteExpiryTime(new Date(testExpiryTime));
+ protonProperties.setSubject(testSubject);
+ protonProperties.setTo(TEST_TO_ADDRESS);
+ protonProperties.setMessageId(testMessageId);
+
+ protonMessage.setProperties(protonProperties);
+
+ AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+ CompositeData cd = decoded.toCompositeData(-1, 0);
+
+ assertTrue(cd.containsKey(CompositeDataConstants.PROPERTIES));
+ Object propsObject = cd.get(CompositeDataConstants.PROPERTIES);
+ assertTrue(propsObject instanceof String);
+ String properties = (String) propsObject;
+
+ assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "contentType" + "=" + AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE));
+ assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "contentEncoding" + "=" + testContentEncoding));
+ assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "groupId" + "=" + testGroupId));
+ assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "groupSequence" + "=" + testGroupSequence));
+ assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "replyToGroupId" + "=" + testReplyToGroupId));
+ assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "creationTime" + "=" + testCreationTime));
+ assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "absoluteExpiryTime" + "=" + testExpiryTime));
+ assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "to" + "=" + TEST_TO_ADDRESS));
+ assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "subject" + "=" + testSubject));
+
+ // TODO: should these fields be included in the 'properties' string and tested above?
+ // Some are shown elsewhere in a way, others missing entirely. Eg:
+ //
+ // correlation-id: not included.
+ // message-id: included'ish, with an ID: prefix added, as the CompositeDataConstants.USER_ID.
+ // reply-to: not included, though the replyToGroupId is given as shown above.
+ // user-id: not included.
+
+ // Some fields of the properties section already align with fields given
+ // their own top level entries of the CompositeData, which remain:
+
+ // The message-id is presented via the 'user id' field, inc an added prefix.
+ assertTrue(cd.containsKey(CompositeDataConstants.USER_ID));
+ Object messageIdObj = cd.get(CompositeDataConstants.USER_ID);
+ assertTrue(messageIdObj instanceof String);
+
+ assertEquals(AMQPMessageIdHelper.JMS_ID_PREFIX + testMessageId, messageIdObj);
+
+ // The creation-time is duplicated as the 'timestamp' field
+ assertTrue(cd.containsKey(CompositeDataConstants.TIMESTAMP));
+ Object timestampObj = cd.get(CompositeDataConstants.TIMESTAMP);
+ assertTrue(timestampObj instanceof Long);
+
+ assertEquals(testCreationTime, timestampObj);
+ }
+
+ @Test
+ public void testToCompositeDataApplicationPropertiesSection() throws Exception {
+ MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+
+ Map<String, Object> appPropsMap = new HashMap<>();
+ appPropsMap.put(TEST_APPLICATION_PROPERTY_KEY, TEST_APPLICATION_PROPERTY_VALUE);
+ appPropsMap.put(TEST_APPLICATION_PROPERTY_KEY2, TEST_APPLICATION_PROPERTY_VALUE2);
+ ApplicationProperties appProps = new ApplicationProperties(appPropsMap);
+
+ protonMessage.setApplicationProperties(appProps);
+
+ AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+ CompositeData cd = decoded.toCompositeData(-1, 0);
+
+ assertTrue(cd.containsKey(CompositeDataConstants.PROPERTIES));
+ Object propsObject = cd.get(CompositeDataConstants.PROPERTIES);
+ assertTrue(propsObject instanceof String);
+ String properties = (String) propsObject;
+
+ assertTrue(properties.contains(PROPERTY_MAP_APP_PROPERTIES_PREFIX +
+ TEST_APPLICATION_PROPERTY_KEY + "=" + TEST_APPLICATION_PROPERTY_VALUE));
+ assertTrue(properties.contains(PROPERTY_MAP_APP_PROPERTIES_PREFIX +
+ TEST_APPLICATION_PROPERTY_KEY2 + "=" + TEST_APPLICATION_PROPERTY_VALUE2));
+ }
+
+ @Test
+ public void testToCompositeDataMessageAnnotationSection() throws Exception {
+ MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+
+ Map<Symbol, Object> annotationsMap = new HashMap<>();
+ annotationsMap.put(Symbol.valueOf(TEST_MESSAGE_ANNOTATION_KEY), TEST_MESSAGE_ANNOTATION_VALUE);
+ annotationsMap.put(Symbol.valueOf(TEST_MESSAGE_ANNOTATION_KEY2), TEST_MESSAGE_ANNOTATION_VALUE2);
+ MessageAnnotations annotations = new MessageAnnotations(annotationsMap);
+
+ protonMessage.setMessageAnnotations(annotations);
+
+ AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+ CompositeData cd = decoded.toCompositeData(-1, 0);
+
+ assertTrue(cd.containsKey(CompositeDataConstants.PROPERTIES));
+ Object propsObject = cd.get(CompositeDataConstants.PROPERTIES);
+ assertTrue(propsObject instanceof String);
+ String properties = (String) propsObject;
+
+ assertTrue(properties.contains(PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX +
+ TEST_MESSAGE_ANNOTATION_KEY + "=" + TEST_MESSAGE_ANNOTATION_VALUE));
+ assertTrue(properties.contains(PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX +
+ TEST_MESSAGE_ANNOTATION_KEY2 + "=" + TEST_MESSAGE_ANNOTATION_VALUE2));
+
+ // Now try some specific annotations with their own handling
+ long testIngressTime = System.currentTimeMillis();
+ long testDeliveryTime = System.currentTimeMillis() + 5678;
+ long testDeliveryDelay = 6789;
+
+ annotationsMap = new HashMap<>();
+ annotationsMap.put(Symbol.valueOf(AMQPMessageSupport.X_OPT_INGRESS_TIME), testIngressTime);
+ annotationsMap.put(Symbol.valueOf(AMQPMessageSupport.X_OPT_DELIVERY_TIME), testDeliveryTime);
+ annotationsMap.put(Symbol.valueOf(AMQPMessageSupport.X_OPT_DELIVERY_DELAY), testDeliveryDelay);
+ annotations = new MessageAnnotations(annotationsMap);
+ protonMessage.setMessageAnnotations(annotations);
+
+ decoded = encodeAndDecodeMessage(protonMessage);
+
+ cd = decoded.toCompositeData(-1, 0);
+
+ assertTrue(cd.containsKey(CompositeDataConstants.PROPERTIES));
+ propsObject = cd.get(CompositeDataConstants.PROPERTIES);
+ assertTrue(propsObject instanceof String);
+ properties = (String) propsObject;
+
+ assertTrue(properties.contains(PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX +
+ AMQPMessageSupport.X_OPT_INGRESS_TIME + "=" + testIngressTime));
+ assertTrue(properties.contains(PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX +
+ AMQPMessageSupport.X_OPT_DELIVERY_TIME + "=" + testDeliveryTime));
+ assertTrue(properties.contains(PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX +
+ AMQPMessageSupport.X_OPT_DELIVERY_DELAY + "=" + testDeliveryDelay));
+ }
+
+ @Test
+ public void testToCompositeDataExtraProperties() throws Exception {
+ MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+
+ TypedProperties extraProperties = new TypedProperties();
+ extraProperties.putProperty(new SimpleString(TEST_EXTRA_PROPERTY_KEY1), TEST_EXTRA_PROPERTY_VALUE1);
+ extraProperties.putProperty(new SimpleString(TEST_EXTRA_PROPERTY_KEY2), TEST_EXTRA_PROPERTY_VALUE2);
+
+ AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage, extraProperties);
+
+ CompositeData cd = decoded.toCompositeData(-1, 0);
+
+ assertTrue(cd.containsKey(CompositeDataConstants.PROPERTIES));
+ Object propsObject = cd.get(CompositeDataConstants.PROPERTIES);
+ assertTrue(propsObject instanceof String);
+ String properties = (String) propsObject;
+
+ assertTrue(properties.contains(PROPERTY_MAP_EXTRA_PROPERTIES_PREFIX +
+ TEST_EXTRA_PROPERTY_KEY1 + "=" + TEST_EXTRA_PROPERTY_VALUE1));
+ assertTrue(properties.contains(PROPERTY_MAP_EXTRA_PROPERTIES_PREFIX +
+ TEST_EXTRA_PROPERTY_KEY2 + "=" + TEST_EXTRA_PROPERTY_VALUE2));
+ }
+
+ @Test
+ public void testToCompositeDataWithDataBodySectionWithoutContentType() throws Exception {
+ doToCompositeDataWithDataBodySectionTestImpl(null, org.apache.activemq.artemis.api.core.Message.BYTES_TYPE);
+ }
+
+ @Test
+ public void testToCompositeDataWithDataBodySectionWithOctetStreamContentType() throws Exception {
+ doToCompositeDataWithDataBodySectionTestImpl(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE, org.apache.activemq.artemis.api.core.Message.BYTES_TYPE);
+ }
+
+ @Test
+ public void testToCompositeDataWithDataBodySectionWithTextPlainContentType() throws Exception {
+ doToCompositeDataWithDataBodySectionTestImpl("text/plain", org.apache.activemq.artemis.api.core.Message.TEXT_TYPE);
+ }
+
+ @Test
+ public void testToCompositeDataWithDataBodySectionWithSerializedObjectContentType() throws Exception {
+ doToCompositeDataWithDataBodySectionTestImpl(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE);
+ }
+
+ private void doToCompositeDataWithDataBodySectionTestImpl(String contentType, byte expectedMessageType) throws OpenDataException {
+ Message protonMessage = Message.Factory.create();
+
+ // Not the right payload for some of the content types,but it
+ // doesnt matter, mainly checking type value and for lack of NPEs.
+ String bytesSource = "testPayloadBytes";
+ String expectedBodyText = "Data{" + bytesSource + "}";
+ Data body = new Data(new Binary(bytesSource.getBytes(StandardCharsets.UTF_8)));
+
+ protonMessage.setBody(body);
+ protonMessage.setContentType(contentType);
+
+ AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+ CompositeData cd = decoded.toCompositeData(-1, 0);
+
+ assertTrue(cd.containsKey(CompositeDataConstants.TEXT_BODY));
+ assertEquals(expectedBodyText, cd.get(CompositeDataConstants.TEXT_BODY));
+
+ assertTrue(cd.containsKey(CompositeDataConstants.TYPE));
+ assertEquals(expectedMessageType, cd.get(CompositeDataConstants.TYPE));
+ }
+
+ @Test
+ public void testToCompositeDataWithAmqpValueBinaryBodySectionWithoutContentType() throws Exception {
+ doToCompositeDataWithAmqpValueBodySectionWithBinaryTestImpl(null, org.apache.activemq.artemis.api.core.Message.BYTES_TYPE);
+ }
+
+ @Test
+ public void testToCompositeDataWithAmqpValueBinaryBodySectionWithSerializedObjectContentType() throws Exception {
+ // Shouldnt really get in this situation, not meant to use content-type without the Data body section.
+ doToCompositeDataWithAmqpValueBodySectionWithBinaryTestImpl(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE);
+ }
+
+ private void doToCompositeDataWithAmqpValueBodySectionWithBinaryTestImpl(String contentType, byte expectedMessageType) throws OpenDataException {
+ Message protonMessage = Message.Factory.create();
+
+ // Not the right payload for some of the content types,but it
+ // doesnt matter, mainly checking type value and for lack of NPEs.
+ String bytesSource = "testPayloadBytes";
+ AmqpValue body = new AmqpValue(new Binary(bytesSource.getBytes(StandardCharsets.UTF_8)));
+
+ protonMessage.setBody(body);
+ protonMessage.setContentType(contentType);
+
+ AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+ CompositeData cd = decoded.toCompositeData(-1, 0);
+
+ assertTrue(cd.containsKey(CompositeDataConstants.TEXT_BODY));
+ assertEquals(bytesSource, cd.get(CompositeDataConstants.TEXT_BODY));
+
+ assertTrue(cd.containsKey(CompositeDataConstants.TYPE));
+ assertEquals(expectedMessageType, cd.get(CompositeDataConstants.TYPE));
+ }
+
+ @Test
+ public void testToCompositeDataWithStringBodyWithoutValueSizeLimit() throws Exception {
+ doToCompositeDataWithStringBodyValueSizeLimitTestImpl(-1, TEST_STRING_BODY);
+ }
+
+ @Test
+ public void testToCompositeDataWithStringBodyWithValueSizeLimit() throws Exception {
+ int limit = 11;
+ int testBodyLength = TEST_STRING_BODY.length();
+ assertTrue(testBodyLength > limit);
+
+ String expected = TEST_STRING_BODY.substring(0, limit) + ", + " + String.valueOf(testBodyLength - limit) + " more";
+
+ doToCompositeDataWithStringBodyValueSizeLimitTestImpl(limit, expected);
+ }
+
+ private void doToCompositeDataWithStringBodyValueSizeLimitTestImpl(int fieldsLimit, String expectedBodyText) throws OpenDataException {
+ Message protonMessage = Message.Factory.create();
+ protonMessage.setBody(new AmqpValue(TEST_STRING_BODY));
+
+ AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+ CompositeData cd = decoded.toCompositeData(fieldsLimit, 0);
+
+ assertTrue(cd.containsKey(CompositeDataConstants.TEXT_BODY));
+ assertEquals(expectedBodyText, cd.get(CompositeDataConstants.TEXT_BODY));
+
+ assertTrue(cd.containsKey(CompositeDataConstants.TYPE));
+ assertEquals(org.apache.activemq.artemis.api.core.Message.TEXT_TYPE, cd.get(CompositeDataConstants.TYPE));
+ }
+
//----- Test Support ------------------------------------------------------//
private MessageImpl createProtonMessage() {
@@ -2483,13 +2885,17 @@ public class AMQPMessageTest {
return bytes;
}
- private AMQPStandardMessage encodeAndDecodeMessage(MessageImpl message) {
+ private AMQPStandardMessage encodeAndDecodeMessage(Message message) {
+ return encodeAndDecodeMessage(message, null);
+ }
+
+ private AMQPStandardMessage encodeAndDecodeMessage(Message message, TypedProperties extraProperties) {
ByteBuf nettyBuffer = Unpooled.buffer(1500);
message.encode(new NettyWritable(nettyBuffer));
byte[] bytes = new byte[nettyBuffer.writerIndex()];
nettyBuffer.readBytes(bytes);
- return new AMQPStandardMessage(0, bytes, null);
+ return new AMQPStandardMessage(0, bytes, extraProperties);
}
}