You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/08/19 02:24:04 UTC

[activemq-artemis] branch master updated: ARTEMIS-2437 Allow extended types in annotations in AMQP to Core

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 448f727  ARTEMIS-2437 Allow extended types in annotations in AMQP to Core
     new cfdec52  This closes #2795
448f727 is described below

commit 448f72738b1b7e3f4bc0a757e3905e9c73fb2336
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Tue Aug 13 11:48:13 2019 -0400

    ARTEMIS-2437 Allow extended types in annotations in AMQP to Core
    
    When converting from AMQP to core and back again support annotations that
    aren't able to be placed into Core message properties by storing the bytes
    from encoding the types to AMQP encodings and then decoding them again
    when converting back into AMQP messages.
    
    Requires update to proton-j 0.33.2 for encoding fix
---
 .../amqp/converter/AMQPMessageSupport.java         |   4 +
 .../protocol/amqp/converter/AmqpCoreConverter.java |  37 ++++-
 .../protocol/amqp/converter/CoreAmqpConverter.java |  44 ++++-
 .../protocol/amqp/converter/TestConversions.java   | 178 ++++++++++++++++++++-
 pom.xml                                            |   2 +-
 .../integration/amqp/AmqpExpiredMessageTest.java   |   4 +-
 .../integration/amqp/AmqpLargeMessageTest.java     | 104 ++++++++++++
 7 files changed, 364 insertions(+), 9 deletions(-)

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
index 5f73950..21116a6 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
@@ -155,6 +155,7 @@ public final class AMQPMessageSupport {
    public static final String DELIVERY_ANNOTATION_PREFIX = "DA_";
    public static final String MESSAGE_ANNOTATION_PREFIX = "MA_";
    public static final String FOOTER_PREFIX = "FT_";
+   public static final String ENCODED_PREFIX = "ENCODED_";
 
    public static final String JMS_AMQP_HEADER = JMS_AMQP_PREFIX + HEADER;
    public static final String JMS_AMQP_HEADER_DURABLE = JMS_AMQP_PREFIX + HEADER + DURABLE;
@@ -168,6 +169,9 @@ public final class AMQPMessageSupport {
    public static final String JMS_AMQP_DELIVERY_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + DELIVERY_ANNOTATION_PREFIX;
    public static final String JMS_AMQP_MESSAGE_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + MESSAGE_ANNOTATION_PREFIX;
    public static final String JMS_AMQP_FOOTER_PREFIX = JMS_AMQP_PREFIX + FOOTER_PREFIX;
+   public static final String JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + ENCODED_PREFIX + DELIVERY_ANNOTATION_PREFIX;
+   public static final String JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + ENCODED_PREFIX + MESSAGE_ANNOTATION_PREFIX;
+   public static final String JMS_AMQP_ENCODED_FOOTER_PREFIX = JMS_AMQP_PREFIX + ENCODED_PREFIX + FOOTER_PREFIX;
    public static final String JMS_AMQP_ORIGINAL_ENCODING = JMS_AMQP_PREFIX + ORIGINAL_ENCODING;
 
    // Message body type definitions
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
index 739d437..3c21e08 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
@@ -28,6 +28,8 @@ import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSup
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_STRING;
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING;
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_FOOTER_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX;
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX;
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER;
@@ -60,6 +62,7 @@ import java.util.UUID;
 import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
 
+import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
@@ -88,6 +91,7 @@ import org.apache.qpid.proton.amqp.messaging.Header;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Properties;
 import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.codec.EncoderImpl;
 import org.apache.qpid.proton.codec.WritableBuffer;
 
 import io.netty.buffer.ByteBuf;
@@ -280,7 +284,11 @@ public class AmqpCoreConverter {
                }
             }
 
-            setProperty(jms, JMS_AMQP_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue());
+            try {
+               setProperty(jms, JMS_AMQP_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue());
+            } catch (ActiveMQPropertyConversionException e) {
+               encodeUnsupportedMessagePropertyType(jms, JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue());
+            }
          }
       }
 
@@ -403,15 +411,38 @@ public class AmqpCoreConverter {
    @SuppressWarnings("unchecked")
    private static ServerJMSMessage processFooter(ServerJMSMessage jms, Footer footer) throws Exception {
       if (footer != null && footer.getValue() != null) {
-         for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) footer.getValue().entrySet()) {
+         for (Map.Entry<Symbol, Object> entry : (Set<Map.Entry<Symbol, Object>>) footer.getValue().entrySet()) {
             String key = entry.getKey().toString();
-            setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue());
+            try {
+               setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue());
+            } catch (ActiveMQPropertyConversionException e) {
+               encodeUnsupportedMessagePropertyType(jms, JMS_AMQP_ENCODED_FOOTER_PREFIX + key, entry.getValue());
+            }
          }
       }
 
       return jms;
    }
 
+   private static void encodeUnsupportedMessagePropertyType(ServerJMSMessage jms, String key, Object value) throws JMSException {
+      final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer();
+      final EncoderImpl encoder = TLSEncode.getEncoder();
+
+      try {
+         encoder.setByteBuffer(new NettyWritable(buffer));
+         encoder.writeObject(value);
+
+         final byte[] encodedBytes = new byte[buffer.writerIndex()];
+
+         buffer.readBytes(encodedBytes);
+
+         setProperty(jms, key, encodedBytes);
+      } finally {
+         encoder.setByteBuffer((WritableBuffer) null);
+         buffer.release();
+      }
+   }
+
    private static void setProperty(javax.jms.Message msg, String key, Object value) throws JMSException {
       if (value instanceof UnsignedLong) {
          long v = ((UnsignedLong) value).longValue();
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
index 453b7ec..5c6a8f3 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
@@ -31,6 +31,9 @@ import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSup
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING;
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE;
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_DELIVERY_ANNOTATION_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_FOOTER_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX;
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX;
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER;
@@ -91,7 +94,9 @@ import org.apache.qpid.proton.amqp.messaging.Header;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Properties;
 import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.codec.DecoderImpl;
 import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.ReadableBuffer.ByteBufferReader;
 import org.apache.qpid.proton.codec.WritableBuffer;
 import org.jboss.logging.Logger;
 
@@ -131,7 +136,7 @@ public class CoreAmqpConverter {
       Map<Symbol, Object> daMap = null;
       final Map<Symbol, Object> maMap = new HashMap<>();
       Map<String, Object> apMap = null;
-      Map<Object, Object> footerMap = null;
+      Map<Symbol, Object> footerMap = null;
 
       Section body = convertBody(message, maMap, properties);
 
@@ -261,10 +266,21 @@ public class CoreAmqpConverter {
                String name = key.substring(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX.length());
                daMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
                continue;
+            } else if (key.startsWith(JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX)) {
+               if (daMap == null) {
+                  daMap = new HashMap<>();
+               }
+               String name = key.substring(JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX.length());
+               daMap.put(Symbol.valueOf(name), decodeEmbeddedAMQPType(message.getObjectProperty(key)));
+               continue;
             } else if (key.startsWith(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX)) {
                String name = key.substring(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX.length());
                maMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
                continue;
+            } else if (key.startsWith(JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX)) {
+               String name = key.substring(JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX.length());
+               maMap.put(Symbol.valueOf(name), decodeEmbeddedAMQPType(message.getObjectProperty(key)));
+               continue;
             } else if (key.equals(JMS_AMQP_CONTENT_TYPE)) {
                properties.setContentType(Symbol.getSymbol(message.getStringProperty(key)));
                continue;
@@ -277,12 +293,19 @@ public class CoreAmqpConverter {
             } else if (key.equals(JMS_AMQP_ORIGINAL_ENCODING)) {
                // skip..remove annotation from previous inbound transformation
                continue;
+            } else if (key.startsWith(JMS_AMQP_ENCODED_FOOTER_PREFIX)) {
+               if (footerMap == null) {
+                  footerMap = new HashMap<>();
+               }
+               String name = key.substring(JMS_AMQP_ENCODED_FOOTER_PREFIX.length());
+               footerMap.put(Symbol.valueOf(name), decodeEmbeddedAMQPType(message.getObjectProperty(key)));
+               continue;
             } else if (key.startsWith(JMS_AMQP_FOOTER_PREFIX)) {
                if (footerMap == null) {
                   footerMap = new HashMap<>();
                }
                String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length());
-               footerMap.put(name, message.getObjectProperty(key));
+               footerMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
                continue;
             }
          } else if (key.equals(Message.HDR_GROUP_ID.toString())) {
@@ -351,6 +374,23 @@ public class CoreAmqpConverter {
       }
    }
 
+   private static Object decodeEmbeddedAMQPType(Object payload) {
+      final byte[] encodedType = (byte[]) payload;
+
+      final DecoderImpl decoder = TLSEncode.getDecoder();
+      Object decodedType = null;
+
+      decoder.setBuffer(ByteBufferReader.wrap(encodedType));
+
+      try {
+         decodedType = decoder.readObject();
+      } finally {
+         decoder.setBuffer(null);
+      }
+
+      return decodedType;
+   }
+
    private static Section convertBody(ServerJMSMessage message, Map<Symbol, Object> maMap, Properties properties) throws JMSException {
 
       Section body = null;
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
index d3976a9..37b1103 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
@@ -17,14 +17,22 @@
 package org.apache.activemq.artemis.protocol.amqp.converter;
 
 import java.nio.ByteBuffer;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_NULL;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_FOOTER_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
+
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
-import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
@@ -33,19 +41,25 @@ import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMe
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
 import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
 import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.Footer;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.message.Message;
 import org.apache.qpid.proton.message.impl.MessageImpl;
 import org.jboss.logging.Logger;
 import org.junit.Assert;
 import org.junit.Test;
 
+import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
 
@@ -232,6 +246,157 @@ public class TestConversions extends Assert {
       assertEquals(text, textMessage.getText());
    }
 
+   @SuppressWarnings("unchecked")
+   @Test
+   public void testConvertMessageWithMapInMessageAnnotations() throws Exception {
+      Map<String, Object> mapprop = createPropertiesMap();
+      ApplicationProperties properties = new ApplicationProperties(mapprop);
+      MessageImpl message = (MessageImpl) Message.Factory.create();
+      message.setApplicationProperties(properties);
+
+      final String annotationName = "x-opt-test-annotation";
+      final Symbol annotationNameSymbol = Symbol.valueOf(annotationName);
+
+      Map<String, String> embeddedMap = new LinkedHashMap<>();
+      embeddedMap.put("key1", "value1");
+      embeddedMap.put("key2", "value2");
+      embeddedMap.put("key3", "value3");
+      Map<Symbol, Object> annotationsMap = new LinkedHashMap<>();
+      annotationsMap.put(annotationNameSymbol, embeddedMap);
+      MessageAnnotations messageAnnotations = new MessageAnnotations(annotationsMap);
+      byte[] encodedEmbeddedMap = encodeObject(embeddedMap);
+
+      Map<String, Object> mapValues = new HashMap<>();
+      mapValues.put("somestr", "value");
+      mapValues.put("someint", Integer.valueOf(1));
+
+      message.setMessageAnnotations(messageAnnotations);
+      message.setBody(new AmqpValue(mapValues));
+
+      AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
+
+      ICoreMessage serverMessage = encodedMessage.toCore();
+      serverMessage.getReadOnlyBodyBuffer();
+
+      ServerJMSMapMessage mapMessage = (ServerJMSMapMessage) ServerJMSMessage.wrapCoreMessage(serverMessage);
+      mapMessage.decode();
+
+      verifyProperties(mapMessage);
+
+      assertEquals(1, mapMessage.getInt("someint"));
+      assertEquals("value", mapMessage.getString("somestr"));
+      assertTrue(mapMessage.propertyExists(JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX + annotationName));
+      assertArrayEquals(encodedEmbeddedMap, (byte[]) mapMessage.getObjectProperty(JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX + annotationName));
+
+      AMQPMessage newAMQP = CoreAmqpConverter.fromCore(mapMessage.getInnerMessage());
+      assertNotNull(newAMQP.getBody());
+      assertNotNull(newAMQP.getMessageAnnotations());
+      assertNotNull(newAMQP.getMessageAnnotations().getValue());
+      assertTrue(newAMQP.getMessageAnnotations().getValue().containsKey(annotationNameSymbol));
+      Object result = newAMQP.getMessageAnnotations().getValue().get(annotationNameSymbol);
+      assertTrue(result instanceof Map);
+      assertEquals(embeddedMap, (Map<String, String>) result);
+   }
+
+   @SuppressWarnings("unchecked")
+   @Test
+   public void testConvertMessageWithMapInFooter() throws Exception {
+      Map<String, Object> mapprop = createPropertiesMap();
+      ApplicationProperties properties = new ApplicationProperties(mapprop);
+      MessageImpl message = (MessageImpl) Message.Factory.create();
+      message.setApplicationProperties(properties);
+
+      final String footerName = "test-footer";
+      final Symbol footerNameSymbol = Symbol.valueOf(footerName);
+
+      Map<String, String> embeddedMap = new LinkedHashMap<>();
+      embeddedMap.put("key1", "value1");
+      embeddedMap.put("key2", "value2");
+      embeddedMap.put("key3", "value3");
+      Map<Symbol, Object> footerMap = new LinkedHashMap<>();
+      footerMap.put(footerNameSymbol, embeddedMap);
+      Footer messageFooter = new Footer(footerMap);
+      byte[] encodedEmbeddedMap = encodeObject(embeddedMap);
+
+      Map<String, Object> mapValues = new HashMap<>();
+      mapValues.put("somestr", "value");
+      mapValues.put("someint", Integer.valueOf(1));
+
+      message.setFooter(messageFooter);
+      message.setBody(new AmqpValue(mapValues));
+
+      AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
+
+      ICoreMessage serverMessage = encodedMessage.toCore();
+      serverMessage.getReadOnlyBodyBuffer();
+
+      ServerJMSMapMessage mapMessage = (ServerJMSMapMessage) ServerJMSMessage.wrapCoreMessage(serverMessage);
+      mapMessage.decode();
+
+      verifyProperties(mapMessage);
+
+      assertEquals(1, mapMessage.getInt("someint"));
+      assertEquals("value", mapMessage.getString("somestr"));
+      assertTrue(mapMessage.propertyExists(JMS_AMQP_ENCODED_FOOTER_PREFIX + footerName));
+      assertArrayEquals(encodedEmbeddedMap, (byte[]) mapMessage.getObjectProperty(JMS_AMQP_ENCODED_FOOTER_PREFIX + footerName));
+
+      AMQPMessage newAMQP = CoreAmqpConverter.fromCore(mapMessage.getInnerMessage());
+      assertNotNull(newAMQP.getBody());
+      assertNotNull(newAMQP.getFooter());
+      assertNotNull(newAMQP.getFooter().getValue());
+      assertTrue(newAMQP.getFooter().getValue().containsKey(footerNameSymbol));
+      Object result = newAMQP.getFooter().getValue().get(footerNameSymbol);
+      assertTrue(result instanceof Map);
+      assertEquals(embeddedMap, (Map<String, String>) result);
+   }
+
+   @SuppressWarnings("unchecked")
+   @Test
+   public void testConvertFromCoreWithEncodedDeliveryAnnotationProperty() throws Exception {
+
+      final String annotationName = "x-opt-test-annotation";
+      final Symbol annotationNameSymbol = Symbol.valueOf(annotationName);
+
+      Map<String, String> embeddedMap = new LinkedHashMap<>();
+      embeddedMap.put("key1", "value1");
+      embeddedMap.put("key2", "value2");
+      embeddedMap.put("key3", "value3");
+
+      byte[] encodedEmbeddedMap = encodeObject(embeddedMap);
+
+      ServerJMSMessage serverMessage = createMessage();
+
+      serverMessage.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL);
+      serverMessage.setObjectProperty(JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX + annotationName, encodedEmbeddedMap);
+      serverMessage.encode();
+
+      AMQPMessage newAMQP = CoreAmqpConverter.fromCore(serverMessage.getInnerMessage());
+      assertNull(newAMQP.getBody());
+      assertNotNull(newAMQP.getDeliveryAnnotations());
+      assertNotNull(newAMQP.getDeliveryAnnotations().getValue());
+      assertTrue(newAMQP.getDeliveryAnnotations().getValue().containsKey(annotationNameSymbol));
+      Object result = newAMQP.getDeliveryAnnotations().getValue().get(annotationNameSymbol);
+      assertTrue(result instanceof Map);
+      assertEquals(embeddedMap, (Map<String, String>) result);
+   }
+
+   private byte[] encodeObject(Object toEncode) {
+      ByteBuf scratch = Unpooled.buffer();
+      EncoderImpl encoder = TLSEncode.getEncoder();
+      encoder.setByteBuffer(new NettyWritable(scratch));
+
+      try {
+         encoder.writeObject(toEncode);
+      } finally {
+         encoder.setByteBuffer((WritableBuffer) null);
+      }
+
+      byte[] result = new byte[scratch.writerIndex()];
+      scratch.readBytes(result);
+
+      return result;
+   }
+
    @Test
    public void testEditAndConvert() throws Exception {
 
@@ -323,4 +488,15 @@ public class TestConversions extends Assert {
 
       return new AMQPMessage(AMQPMessage.DEFAULT_MESSAGE_FORMAT, readable, null, null);
    }
+
+   private ServerJMSMessage createMessage() {
+      return new ServerJMSMessage(newMessage(org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE));
+   }
+
+   private CoreMessage newMessage(byte messageType) {
+      CoreMessage message = new CoreMessage(0, 512);
+      message.setType(messageType);
+      ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null);
+      return message;
+   }
 }
diff --git a/pom.xml b/pom.xml
index 23570df..67ed992 100644
--- a/pom.xml
+++ b/pom.xml
@@ -92,7 +92,7 @@
       <mockito.version>2.25.0</mockito.version>
       <netty.version>4.1.34.Final</netty.version>
       <netty-tcnative-version>2.0.22.Final</netty-tcnative-version>
-      <proton.version>0.33.1</proton.version>
+      <proton.version>0.33.2</proton.version>
       <resteasy.version>3.0.19.Final</resteasy.version>
       <slf4j.version>1.7.21</slf4j.version>
       <qpid.jms.version>0.43.0</qpid.jms.version>
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
index cfcbc20..759a854 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
@@ -171,7 +171,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
       sender.send(message);
       sender.close();
 
-      assertEquals(1, queueView.getMessageCount());
+      Wait.assertEquals(1, queueView::getMessageCount);
 
       // Now try and get the message
       AmqpReceiver receiver = session.createReceiver(getQueueName());
@@ -204,7 +204,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
       sender.send(message);
       sender.close();
 
-      assertEquals(1, queueView.getMessageCount());
+      Wait.assertEquals(1, queueView::getMessageCount);
 
       Thread.sleep(1000);
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
index 15bfb21..10ccc95 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -51,6 +52,7 @@ import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.Data;
@@ -116,6 +118,52 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
    }
 
    @Test(timeout = 60000)
+   public void testSendAMQPMessageWithComplexAnnotationsReceiveCore() throws Exception {
+      server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      try {
+         connection.connect();
+
+         String annotation = "x-opt-embedded-map";
+         Map<String, String> embeddedMap = new LinkedHashMap<>();
+         embeddedMap.put("test-key-1", "value-1");
+         embeddedMap.put("test-key-2", "value-2");
+         embeddedMap.put("test-key-3", "value-3");
+
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(testQueueName);
+         AmqpMessage message = createAmqpMessage((byte) 'A', PAYLOAD);
+
+         message.setApplicationProperty("IntProperty", (Integer) 42);
+         message.setDurable(true);
+         message.setMessageAnnotation(annotation, embeddedMap);
+         sender.send(message);
+
+         session.close();
+
+         Wait.assertEquals(1, () -> getMessageCount(server.getPostOffice(), testQueueName));
+
+         try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
+              Connection connection2 = factory.createConnection()) {
+
+            Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            connection2.start();
+            MessageConsumer consumer = session2.createConsumer(session2.createQueue(testQueueName));
+
+            Message received = consumer.receive(5000);
+            Assert.assertNotNull(received);
+            Assert.assertEquals(42, received.getIntProperty("IntProperty"));
+
+            connection2.close();
+         }
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
    public void testSendAMQPReceiveOpenWire() throws Exception {
       server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
 
@@ -206,6 +254,62 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
    }
 
    @Test(timeout = 60000)
+   public void testSendAMQPMessageWithComplexAnnotationsReceiveAMQP() throws Exception {
+      server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
+
+      String testQueueName = "ConnectionFrameSize";
+      int nMsgs = 200;
+
+      AmqpClient client = createAmqpClient();
+
+      Symbol annotation = Symbol.valueOf("x-opt-embedded-map");
+      Map<String, String> embeddedMap = new LinkedHashMap<>();
+      embeddedMap.put("test-key-1", "value-1");
+      embeddedMap.put("test-key-2", "value-2");
+      embeddedMap.put("test-key-3", "value-3");
+
+      {
+         AmqpConnection connection = addConnection(client.connect());
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(testQueueName);
+         AmqpMessage message = createAmqpMessage((byte) 'A', PAYLOAD);
+
+         message.setApplicationProperty("IntProperty", (Integer) 42);
+         message.setDurable(true);
+         message.setMessageAnnotation(annotation.toString(), embeddedMap);
+         sender.send(message);
+         session.close();
+         connection.close();
+      }
+
+      Wait.assertEquals(1, () -> getMessageCount(server.getPostOffice(), testQueueName));
+
+      {
+         AmqpConnection connection = addConnection(client.connect());
+         AmqpSession session = connection.createSession();
+         AmqpReceiver receiver = session.createReceiver(testQueueName);
+         receiver.flow(nMsgs);
+
+         AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+         assertNotNull("Failed to read message with embedded map in annotations", message);
+         MessageImpl wrapped = (MessageImpl) message.getWrappedMessage();
+         if (wrapped.getBody() instanceof Data) {
+            Data data = (Data) wrapped.getBody();
+            System.out.println("received : message: " + data.getValue().getLength());
+            assertEquals(PAYLOAD, data.getValue().getLength());
+         }
+
+         assertNotNull(message.getWrappedMessage().getMessageAnnotations());
+         assertNotNull(message.getWrappedMessage().getMessageAnnotations().getValue());
+         assertEquals(embeddedMap, message.getWrappedMessage().getMessageAnnotations().getValue().get(annotation));
+
+         message.accept();
+         session.close();
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
    public void testSendAMQPReceiveAMQPViaJMSObjectMessage() throws Exception {
       server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));