You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2018/07/02 18:59:14 UTC

activemq-artemis git commit: ARTEMIS-1941 Preserve AMQP body section type on "large" messages

Repository: activemq-artemis
Updated Branches:
  refs/heads/master da313f58b -> a63b0315c


ARTEMIS-1941 Preserve AMQP body section type on "large" messages

When "large" messages are converted to / from core in order to be stored
in the large message store the type of the AMQP body section is being
lost and reconstituted incorrectly in some cases.  The message needs to
be annotated with the original AMQP type for the body and that used to
manage the conversion back to AMQP from Core.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a63b0315
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a63b0315
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a63b0315

Branch: refs/heads/master
Commit: a63b0315c415f8a800c185a965e523d93861cd5a
Parents: da313f5
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Jun 29 15:22:05 2018 -0400
Committer: Justin Bertram <jb...@apache.org>
Committed: Mon Jul 2 13:58:45 2018 -0500

----------------------------------------------------------------------
 .../amqp/converter/AMQPMessageSupport.java      |  21 +-
 .../amqp/converter/AmqpCoreConverter.java       |  17 +
 .../amqp/converter/CoreAmqpConverter.java       |  75 ++++-
 .../JMSMappingOutboundTransformerTest.java      | 307 ++++++++-----------
 .../integration/amqp/AmqpLargeMessageTest.java  | 226 +++++++++++++-
 5 files changed, 441 insertions(+), 205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a63b0315/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
index 1bac1e5..7c4f425 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
@@ -16,13 +16,21 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter;
 
-import javax.jms.Destination;
-import javax.jms.JMSException;
+import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE;
+
 import java.nio.charset.Charset;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.Set;
 
+import javax.jms.Destination;
+import javax.jms.JMSException;
+
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
@@ -38,13 +46,6 @@ import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Data;
 import org.apache.qpid.proton.message.Message;
 
-import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE;
-import static org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE;
-import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE;
-import static org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE;
-import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE;
-import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE;
-
 /**
  * Support class containing constant values and static methods that are used to map to / from
  * AMQP Message types being sent or received.
@@ -115,6 +116,7 @@ public final class AMQPMessageSupport {
    public static final String JMS_AMQP_PREFIX = "JMS_AMQP_";
    public static final int JMS_AMQP_PREFIX_LENGTH = JMS_AMQP_PREFIX.length();
 
+   public static final String ORIGINAL_ENCODING = "ORIGINAL_ENCODING";
    public static final String NATIVE = "NATIVE";
    public static final String HEADER = "HEADER";
    public static final String PROPERTIES = "PROPERTIES";
@@ -142,6 +144,7 @@ public final class AMQPMessageSupport {
    public static final String JMS_AMQP_DELIVERY_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + DELIVERY_ANNOTATION_PREFIX;
    public static final String JMS_AMQP_MESSAGE_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + MESSAGE_ANNOTATION_PREFIX;
    public static final String JMS_AMQP_FOOTER_PREFIX = JMS_AMQP_PREFIX + FOOTER_PREFIX;
+   public static final String JMS_AMQP_ORIGINAL_ENCODING = JMS_AMQP_PREFIX + ORIGINAL_ENCODING;
 
    // Message body type definitions
    public static final Binary EMPTY_BINARY = new Binary(new byte[0]);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a63b0315/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
index d070579..e199d1f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
@@ -18,6 +18,14 @@
 package org.apache.activemq.artemis.protocol.amqp.converter;
 
 import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_DATA;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_NULL;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_SEQUENCE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_BINARY;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_LIST;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_MAP;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_NULL;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_STRING;
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING;
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE;
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
@@ -26,6 +34,7 @@ import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSup
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE;
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY;
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID;
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE;
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
@@ -108,6 +117,8 @@ public class AmqpCoreConverter {
                result = createMessage(message.getMessageID(), coreMessageObjectPools);
             }
          }
+
+         result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL);
       } else if (body instanceof Data) {
          Binary payload = ((Data) body).getValue();
 
@@ -131,6 +142,7 @@ public class AmqpCoreConverter {
             }
          }
 
+         result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
       } else if (body instanceof AmqpSequence) {
          AmqpSequence sequence = (AmqpSequence) body;
          ServerJMSStreamMessage m = createStreamMessage(message.getMessageID(), coreMessageObjectPools);
@@ -139,11 +151,13 @@ public class AmqpCoreConverter {
          }
 
          result = m;
+         result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE);
       } else if (body instanceof AmqpValue) {
          Object value = ((AmqpValue) body).getValue();
          if (value == null || value instanceof String) {
             result = createTextMessage(message.getMessageID(), (String) value, coreMessageObjectPools);
 
+            result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, value == null ? AMQP_VALUE_NULL : AMQP_VALUE_STRING);
          } else if (value instanceof Binary) {
             Binary payload = (Binary) value;
 
@@ -153,14 +167,17 @@ public class AmqpCoreConverter {
                result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
             }
 
+            result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
          } else if (value instanceof List) {
             ServerJMSStreamMessage m = createStreamMessage(message.getMessageID(), coreMessageObjectPools);
             for (Object item : (List<Object>) value) {
                m.writeObject(item);
             }
             result = m;
+            result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_LIST);
          } else if (value instanceof Map) {
             result = createMapMessage(message.getMessageID(), (Map<String, Object>) value, coreMessageObjectPools);
+            result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_MAP);
          } else {
             ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
             try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a63b0315/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
index 49372db..eb20219 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
@@ -19,6 +19,13 @@ package org.apache.activemq.artemis.protocol.amqp.converter;
 
 import static org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID;
 import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_DATA;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_NULL;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_SEQUENCE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_UNKNOWN;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_BINARY;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_LIST;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_STRING;
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.EMPTY_BINARY;
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING;
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE;
@@ -30,6 +37,7 @@ import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSup
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY;
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX;
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_NATIVE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_PREFIX;
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_PROPERTIES;
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID;
@@ -254,6 +262,9 @@ public class CoreAmqpConverter {
             } else if (key.equals(JMS_AMQP_REPLYTO_GROUP_ID)) {
                properties.setReplyToGroupId(message.getStringProperty(key));
                continue;
+            } else if (key.equals(JMS_AMQP_ORIGINAL_ENCODING)) {
+               // skip..remove annotation from previous inbound transformation
+               continue;
             } else if (key.startsWith(JMS_AMQP_FOOTER_PREFIX)) {
                if (footerMap == null) {
                   footerMap = new HashMap<>();
@@ -331,6 +342,13 @@ public class CoreAmqpConverter {
    private static Section convertBody(ServerJMSMessage message, Map<Symbol, Object> maMap, Properties properties) throws JMSException {
 
       Section body = null;
+      short orignalEncoding = AMQP_UNKNOWN;
+
+      try {
+         orignalEncoding = message.getShortProperty(JMS_AMQP_ORIGINAL_ENCODING);
+      } catch (Exception ex) {
+         // Ignore and stick with UNKNOWN
+      }
 
       if (message instanceof ServerJMSBytesMessage) {
          Binary payload = getBinaryFromMessageBody((ServerJMSBytesMessage) message);
@@ -338,11 +356,40 @@ public class CoreAmqpConverter {
          maMap.put(AMQPMessageSupport.JMS_MSG_TYPE, AMQPMessageSupport.JMS_BYTES_MESSAGE);
          if (payload == null) {
             payload = EMPTY_BINARY;
-         } else {
-            body = new AmqpValue(payload);
+         }
+
+         switch (orignalEncoding) {
+            case AMQP_NULL:
+               break;
+            case AMQP_VALUE_BINARY:
+               body = new AmqpValue(payload);
+               break;
+            case AMQP_DATA:
+            case AMQP_UNKNOWN:
+            default:
+               body = new Data(payload);
+               break;
          }
       } else if (message instanceof ServerJMSTextMessage) {
-         body = new AmqpValue(((TextMessage) message).getText());
+         String text = (((TextMessage) message).getText());
+
+         switch (orignalEncoding) {
+            case AMQP_NULL:
+               break;
+            case AMQP_DATA:
+               if (text == null) {
+                  body = new Data(EMPTY_BINARY);
+               } else {
+                  body = new Data(new Binary(text.getBytes(StandardCharsets.UTF_8)));
+               }
+               break;
+            case AMQP_VALUE_STRING:
+            case AMQP_UNKNOWN:
+            default:
+               body = new AmqpValue(text);
+               break;
+         }
+
          maMap.put(AMQPMessageSupport.JMS_MSG_TYPE, AMQPMessageSupport.JMS_TEXT_MESSAGE);
       } else if (message instanceof ServerJMSMapMessage) {
          body = new AmqpValue(getMapFromMessageBody((ServerJMSMapMessage) message));
@@ -358,7 +405,16 @@ public class CoreAmqpConverter {
          } catch (MessageEOFException e) {
          }
 
-         body = new AmqpSequence(list);
+         switch (orignalEncoding) {
+            case AMQP_SEQUENCE:
+               body = new AmqpSequence(list);
+               break;
+            case AMQP_VALUE_LIST:
+            case AMQP_UNKNOWN:
+            default:
+               body = new AmqpValue(list);
+               break;
+         }
       } else if (message instanceof ServerJMSObjectMessage) {
          properties.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
          maMap.put(AMQPMessageSupport.JMS_MSG_TYPE, AMQPMessageSupport.JMS_OBJECT_MESSAGE);
@@ -368,7 +424,16 @@ public class CoreAmqpConverter {
             payload = EMPTY_BINARY;
          }
 
-         body = new Data(payload);
+         switch (orignalEncoding) {
+            case AMQP_VALUE_BINARY:
+               body = new AmqpValue(payload);
+               break;
+            case AMQP_DATA:
+            case AMQP_UNKNOWN:
+            default:
+               body = new Data(payload);
+               break;
+         }
 
          // For a non-AMQP message we tag the outbound content type as containing
          // a serialized Java object so that an AMQP client has a hint as to what

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a63b0315/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
index ccafd37..f1cd588 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
@@ -16,16 +16,32 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter.message;
 
-import javax.jms.JMSException;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_DATA;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_NULL;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_SEQUENCE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_UNKNOWN;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_BINARY;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_LIST;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import javax.jms.JMSException;
+
 import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
@@ -44,39 +60,47 @@ import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.Data;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.message.Message;
-import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 public class JMSMappingOutboundTransformerTest {
 
    private final UUID TEST_OBJECT_VALUE = UUID.fromString("fee14b62-09e0-4ac6-a4c3-4206c630d844");
    private final String TEST_ADDRESS = "queue://testAddress";
 
-
    public static final byte QUEUE_TYPE = 0x00;
    public static final byte TOPIC_TYPE = 0x01;
    public static final byte TEMP_QUEUE_TYPE = 0x02;
    public static final byte TEMP_TOPIC_TYPE = 0x03;
 
-   @Before
-   public void setUp() {
+   // ----- no-body Message type tests ---------------------------------------//
+
+   @Test
+   public void testConvertMessageToAmqpMessageWithNoBody() throws Exception {
+      ServerJMSMessage outbound = createMessage();
+      outbound.encode();
+
+      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
+
+      assertNull(amqp.getBody());
    }
 
-   // ----- no-body Message type tests ---------------------------------------//
+   @Test
+   public void testConvertTextMessageToAmqpMessageWithNoBodyOriginalEncodingWasNull() throws Exception {
+      ServerJMSMessage outbound = createMessage();
+      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL);
+      outbound.encode();
+
+      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
+
+      assertNull(amqp.getBody());
+   }
+
+   // ----- BytesMessage type tests ---------------------------------------//
 
-   @Ignore("Compressed message body support not yet implemented.")
    @Test
-   public void testConvertCompressedBytesMessageToAmqpMessageWithDataBody() throws Exception {
-      byte[] expectedPayload = new byte[] {8, 16, 24, 32};
-      ServerJMSBytesMessage outbound = createBytesMessage(true);
+   public void testConvertBytesMessageToAmqpMessageWithDataBody() throws Exception {
+      byte[] expectedPayload = new byte[]{8, 16, 24, 32};
+      ServerJMSBytesMessage outbound = createBytesMessage();
       outbound.writeBytes(expectedPayload);
       outbound.encode();
 
@@ -96,6 +120,7 @@ public class JMSMappingOutboundTransformerTest {
    @Test
    public void testConvertEmptyBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception {
       ServerJMSBytesMessage outbound = createBytesMessage();
+      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
       outbound.encode();
 
       Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
@@ -107,30 +132,10 @@ public class JMSMappingOutboundTransformerTest {
    }
 
    @Test
-   public void testConvertUncompressedBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception {
-      byte[] expectedPayload = new byte[] {8, 16, 24, 32};
+   public void testConvertBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception {
+      byte[] expectedPayload = new byte[]{8, 16, 24, 32};
       ServerJMSBytesMessage outbound = createBytesMessage();
-      outbound.writeBytes(expectedPayload);
-      outbound.encode();
-
-      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
-
-      assertNotNull(amqp.getBody());
-      assertTrue(amqp.getBody() instanceof AmqpValue);
-      assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary);
-      assertEquals(4, ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength());
-
-      Binary amqpData = (Binary) ((AmqpValue) amqp.getBody()).getValue();
-      Binary inputData = new Binary(expectedPayload);
-
-      assertTrue(inputData.equals(amqpData));
-   }
-
-   @Ignore("Compressed message body support not yet implemented.")
-   @Test
-   public void testConvertCompressedBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception {
-      byte[] expectedPayload = new byte[] {8, 16, 24, 32};
-      ServerJMSBytesMessage outbound = createBytesMessage(true);
+      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
       outbound.writeBytes(expectedPayload);
       outbound.encode();
 
@@ -163,7 +168,7 @@ public class JMSMappingOutboundTransformerTest {
 
    @Test
    public void testConvertMapMessageToAmqpMessageWithByteArrayValueInBody() throws Exception {
-      final byte[] byteArray = new byte[] {1, 2, 3, 4, 5};
+      final byte[] byteArray = new byte[]{1, 2, 3, 4, 5};
 
       ServerJMSMapMessage outbound = createMapMessage();
       outbound.setBytes("bytes", byteArray);
@@ -204,42 +209,32 @@ public class JMSMappingOutboundTransformerTest {
       assertTrue("string".equals(amqpMap.get("property-1")));
    }
 
+   //----- StreamMessage type tests -----------------------------------------//
+
    @Test
-   public void testConvertCompressedMapMessageToAmqpMessage() throws Exception {
-      ServerJMSMapMessage outbound = createMapMessage(true);
-      outbound.setString("property-1", "string");
-      outbound.setInt("property-2", 1);
-      outbound.setBoolean("property-3", true);
+   public void testConvertStreamMessageToAmqpMessageWithAmqpValueBodyNoPropertySet() throws Exception {
+      ServerJMSStreamMessage outbound = createStreamMessage();
+      outbound.writeBoolean(false);
+      outbound.writeString("test");
       outbound.encode();
 
       Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof AmqpValue);
-      assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map);
+
+      AmqpValue list = (AmqpValue) amqp.getBody();
 
       @SuppressWarnings("unchecked")
-      Map<Object, Object> amqpMap = (Map<Object, Object>) ((AmqpValue) amqp.getBody()).getValue();
+      List<Object> amqpList = (List<Object>) list.getValue();
 
-      assertEquals(3, amqpMap.size());
-      assertTrue("string".equals(amqpMap.get("property-1")));
+      assertEquals(2, amqpList.size());
    }
 
    @Test
-   public void testConvertStreamMessageToAmqpMessageWithAmqpSequencey() throws Exception {
+   public void testConvertStreamMessageToAmqpMessageWithAmqpValueBody() throws Exception {
       ServerJMSStreamMessage outbound = createStreamMessage();
-      outbound.encode();
-
-      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
-
-      assertNotNull(amqp.getBody());
-      assertTrue(amqp.getBody() instanceof AmqpSequence);
-      assertTrue(((AmqpSequence) amqp.getBody()).getValue() instanceof List);
-   }
-
-   @Test
-   public void testConvertCompressedStreamMessageToAmqpMessageWithAmqpValueBody() throws Exception {
-      ServerJMSStreamMessage outbound = createStreamMessage(true);
+      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_LIST);
       outbound.writeBoolean(false);
       outbound.writeString("test");
       outbound.encode();
@@ -247,19 +242,20 @@ public class JMSMappingOutboundTransformerTest {
       Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
 
       assertNotNull(amqp.getBody());
-      assertTrue(amqp.getBody() instanceof AmqpSequence);
+      assertTrue(amqp.getBody() instanceof AmqpValue);
 
-      AmqpSequence list = (AmqpSequence)amqp.getBody();
+      AmqpValue list = (AmqpValue) amqp.getBody();
 
       @SuppressWarnings("unchecked")
-      List<Object> amqpList = list.getValue();
+      List<Object> amqpList = (List<Object>) list.getValue();
 
       assertEquals(2, amqpList.size());
    }
 
    @Test
-   public void testConvertCompressedStreamMessageToAmqpMessageWithAmqpSequencey() throws Exception {
-      ServerJMSStreamMessage outbound = createStreamMessage(true);
+   public void testConvertStreamMessageToAmqpMessageWithAmqpSequencey() throws Exception {
+      ServerJMSStreamMessage outbound = createStreamMessage();
+      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE);
       outbound.writeBoolean(false);
       outbound.writeString("test");
       outbound.encode();
@@ -293,6 +289,7 @@ public class JMSMappingOutboundTransformerTest {
    @Test
    public void testConvertEmptyObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception {
       ServerJMSObjectMessage outbound = createObjectMessage();
+      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
       outbound.encode();
 
       Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
@@ -301,6 +298,7 @@ public class JMSMappingOutboundTransformerTest {
       assertTrue(amqp.getBody() instanceof Data);
       assertEquals(5, ((Data) amqp.getBody()).getValue().getLength());
    }
+
    @Test
    public void testConvertObjectMessageToAmqpMessageWithDataBody() throws Exception {
       ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE);
@@ -320,6 +318,7 @@ public class JMSMappingOutboundTransformerTest {
    @Test
    public void testConvertObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception {
       ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE);
+      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
       outbound.encode();
 
       Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
@@ -334,52 +333,35 @@ public class JMSMappingOutboundTransformerTest {
    }
 
    @Test
-   public void testConvertCompressedObjectMessageToAmqpMessageWithDataBody() throws Exception {
-      ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true);
-      outbound.encode();
-
-      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
-
-      assertNotNull(amqp.getBody());
-      assertTrue(amqp.getBody() instanceof Data);
-      assertFalse(0 == ((Data) amqp.getBody()).getValue().getLength());
-
-      Object value = deserialize(((Data) amqp.getBody()).getValue().getArray());
-      assertNotNull(value);
-      assertTrue(value instanceof UUID);
-   }
-
-   @Test
-   public void testConvertCompressedObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception {
-      ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true);
+   public void testConvertObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception {
+      ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE);
+      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
       outbound.encode();
 
       Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof Data);
-      assertFalse(0 == ((Data) amqp.getBody()).getValue().getLength());
+      assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
+      assertFalse(0 == ((Binary) ((Data) amqp.getBody()).getValue()).getLength());
 
-      Object value = deserialize(((Data) amqp.getBody()).getValue().getArray());
+      Object value = deserialize((((Data) amqp.getBody()).getValue()).getArray());
       assertNotNull(value);
       assertTrue(value instanceof UUID);
    }
 
    @Test
-   public void testConvertCompressedObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception {
-      ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true);
+   public void testConvertEmptyObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception {
+      ServerJMSObjectMessage outbound = createObjectMessage();
+      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
       outbound.encode();
 
       Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
 
       assertNotNull(amqp.getBody());
-      assertTrue(amqp.getBody() instanceof Data);
-      assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
-      assertFalse(0 == ((Binary) ((Data) amqp.getBody()).getValue()).getLength());
-
-      Object value = deserialize((((Data) amqp.getBody()).getValue()).getArray());
-      assertNotNull(value);
-      assertTrue(value instanceof UUID);
+      assertTrue(amqp.getBody() instanceof AmqpValue);
+      assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary);
+      assertEquals(0, ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength());
    }
 
    // ----- TextMessage type tests -------------------------------------------//
@@ -423,9 +405,9 @@ public class JMSMappingOutboundTransformerTest {
    }
 
    @Test
-   public void testConvertCompressedTextMessageCreatesDataSectionBody() throws Exception {
+   public void testConvertTextMessageCreatesDataSectionBody() throws Exception {
       String contentString = "myTextMessageContent";
-      ServerJMSTextMessage outbound = createTextMessage(contentString, true);
+      ServerJMSTextMessage outbound = createTextMessage(contentString);
       outbound.encode();
 
       Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
@@ -433,39 +415,57 @@ public class JMSMappingOutboundTransformerTest {
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof AmqpValue);
 
-      AmqpValue value = (AmqpValue)amqp.getBody();
+      AmqpValue value = (AmqpValue) amqp.getBody();
 
       assertEquals(contentString, value.getValue());
    }
 
-   // ----- Test JMSDestination Handling -------------------------------------//
-
    @Test
-   public void testConvertMessageWithJMSDestinationNull() throws Exception {
-      doTestConvertMessageWithJMSDestination(null, null);
-   }
+   public void testConvertTextMessageCreatesBodyUsingOriginalEncodingWithDataSection() throws Exception {
+      String contentString = "myTextMessageContent";
+      ServerJMSTextMessage outbound = createTextMessage(contentString);
+      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
+      outbound.encode();
 
-   @Test
-   public void testConvertMessageWithJMSDestinationQueue() throws Exception {
-      doTestConvertMessageWithJMSDestination(createDestination(QUEUE_TYPE), QUEUE_TYPE);
+      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof Data);
+      assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
+
+      Binary data = ((Data) amqp.getBody()).getValue();
+      String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(), StandardCharsets.UTF_8);
+      assertEquals(contentString, contents);
    }
 
-   @Ignore("Artemis code doesn't provide a means of supplying a typed destination to AMQP")
    @Test
-   public void testConvertMessageWithJMSDestinationTemporaryQueue() throws Exception {
-      doTestConvertMessageWithJMSDestination(createDestination(TEMP_QUEUE_TYPE), TEMP_QUEUE_TYPE);
+   public void testConvertTextMessageContentNotStoredCreatesBodyUsingOriginalEncodingWithDataSection() throws Exception {
+      String contentString = "myTextMessageContent";
+      ServerJMSTextMessage outbound = createTextMessage(contentString);
+      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
+      outbound.encode();
+
+      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof Data);
+      assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
+
+      Binary data = ((Data) amqp.getBody()).getValue();
+      String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(), StandardCharsets.UTF_8);
+      assertEquals(contentString, contents);
    }
 
-   @Ignore("Artemis code doesn't provide a means of supplying a typed destination to AMQP")
+   // ----- Test JMSDestination Handling -------------------------------------//
+
    @Test
-   public void testConvertMessageWithJMSDestinationTopic() throws Exception {
-      doTestConvertMessageWithJMSDestination(createDestination(TOPIC_TYPE), TOPIC_TYPE);
+   public void testConvertMessageWithJMSDestinationNull() throws Exception {
+      doTestConvertMessageWithJMSDestination(null, null);
    }
 
-   @Ignore("Artemis code doesn't provide a means of supplying a typed destination to AMQP")
    @Test
-   public void testConvertMessageWithJMSDestinationTemporaryTopic() throws Exception {
-      doTestConvertMessageWithJMSDestination(createDestination(TEMP_TOPIC_TYPE), TEMP_TOPIC_TYPE);
+   public void testConvertMessageWithJMSDestinationQueue() throws Exception {
+      doTestConvertMessageWithJMSDestination(createDestination(QUEUE_TYPE), QUEUE_TYPE);
    }
 
    private void doTestConvertMessageWithJMSDestination(ServerDestination jmsDestination, Object expectedAnnotationValue) throws Exception {
@@ -501,24 +501,6 @@ public class JMSMappingOutboundTransformerTest {
       doTestConvertMessageWithJMSReplyTo(createDestination(QUEUE_TYPE), QUEUE_TYPE);
    }
 
-   @Ignore("Artemis code doesn't provide a means of supplying a typed destination to AMQP")
-   @Test
-   public void testConvertMessageWithJMSReplyToTemporaryQueue() throws Exception {
-      doTestConvertMessageWithJMSReplyTo(createDestination(TEMP_QUEUE_TYPE), TEMP_QUEUE_TYPE);
-   }
-
-   @Ignore("Artemis code doesn't provide a means of supplying a typed destination to AMQP")
-   @Test
-   public void testConvertMessageWithJMSReplyToTopic() throws Exception {
-      doTestConvertMessageWithJMSReplyTo(createDestination(TOPIC_TYPE), TOPIC_TYPE);
-   }
-
-   @Ignore("Artemis code doesn't provide a means of supplying a typed destination to AMQP")
-   @Test
-   public void testConvertMessageWithJMSReplyToTemporaryTopic() throws Exception {
-      doTestConvertMessageWithJMSReplyTo(createDestination(TEMP_TOPIC_TYPE), TEMP_TOPIC_TYPE);
-   }
-
    private void doTestConvertMessageWithJMSReplyTo(ServerDestination jmsReplyTo, Object expectedAnnotationValue) throws Exception {
       ServerJMSTextMessage textMessage = createTextMessage();
       textMessage.setText("myTextMessageContent");
@@ -542,7 +524,6 @@ public class JMSMappingOutboundTransformerTest {
 
    // ----- Utility Methods used for this Test -------------------------------//
 
-
    private ServerDestination createDestination(byte destType) {
       ServerDestination destination = null;
       switch (destType) {
@@ -570,45 +551,15 @@ public class JMSMappingOutboundTransformerTest {
    }
 
    private ServerJMSBytesMessage createBytesMessage() {
-      return createBytesMessage(false);
-   }
-
-   private ServerJMSBytesMessage createBytesMessage(boolean compression) {
-      ServerJMSBytesMessage message = new ServerJMSBytesMessage(newMessage(org.apache.activemq.artemis.api.core.Message.BYTES_TYPE));
-
-      if (compression) {
-         // TODO
-      }
-
-      return message;
+      return new ServerJMSBytesMessage(newMessage(org.apache.activemq.artemis.api.core.Message.BYTES_TYPE));
    }
 
    private ServerJMSMapMessage createMapMessage() {
-      return createMapMessage(false);
-   }
-
-   private ServerJMSMapMessage createMapMessage(boolean compression) {
-      ServerJMSMapMessage message = new ServerJMSMapMessage(newMessage(org.apache.activemq.artemis.api.core.Message.MAP_TYPE));
-
-      if (compression) {
-         // TODO
-      }
-
-      return message;
+      return new ServerJMSMapMessage(newMessage(org.apache.activemq.artemis.api.core.Message.MAP_TYPE));
    }
 
    private ServerJMSStreamMessage createStreamMessage() {
-      return createStreamMessage(false);
-   }
-
-   private ServerJMSStreamMessage createStreamMessage(boolean compression) {
-      ServerJMSStreamMessage message = new ServerJMSStreamMessage(newMessage(org.apache.activemq.artemis.api.core.Message.STREAM_TYPE));
-
-      if (compression) {
-         // TODO
-      }
-
-      return message;
+      return new ServerJMSStreamMessage(newMessage(org.apache.activemq.artemis.api.core.Message.STREAM_TYPE));
    }
 
    private ServerJMSObjectMessage createObjectMessage() {
@@ -616,16 +567,8 @@ public class JMSMappingOutboundTransformerTest {
    }
 
    private ServerJMSObjectMessage createObjectMessage(Serializable payload) {
-      return createObjectMessage(payload, false);
-   }
-
-   private ServerJMSObjectMessage createObjectMessage(Serializable payload, boolean compression) {
       ServerJMSObjectMessage result = AMQPMessageSupport.createObjectMessage(0, null);
 
-      if (compression) {
-         // TODO
-      }
-
       try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos);) {
 
          oos.writeObject(payload);
@@ -643,16 +586,8 @@ public class JMSMappingOutboundTransformerTest {
    }
 
    private ServerJMSTextMessage createTextMessage(String text) {
-      return createTextMessage(text, false);
-   }
-
-   private ServerJMSTextMessage createTextMessage(String text, boolean compression) {
       ServerJMSTextMessage result = AMQPMessageSupport.createTextMessage(0, null);
 
-      if (compression) {
-         // TODO
-      }
-
       try {
          result.setText(text);
       } catch (JMSException e) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a63b0315/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
index 6bd550a..8465c61 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.amqp;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
@@ -49,6 +50,7 @@ import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.Data;
 import org.apache.qpid.proton.amqp.messaging.Section;
@@ -149,8 +151,7 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
       session.close();
    }
 
-   private void receiveJMS(int nMsgs,
-                                ConnectionFactory factory) throws JMSException {
+   private void receiveJMS(int nMsgs, ConnectionFactory factory) throws JMSException {
       Connection connection2 = factory.createConnection();
       Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
       connection2.start();
@@ -397,9 +398,8 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
             Section body = message.getWrappedMessage().getBody();
             assertNotNull("No message body for msg " + i, body);
 
-            //TODO: ARTEMIS-1941 raised. This is wrong, test sent a Data section, it got converted in transit.
-            assertTrue("Unexpected message body type for msg " + body.getClass(), body instanceof AmqpValue);
-            assertEquals("Unexpected body content for msg", new Binary(payload, 0, payload.length), ((AmqpValue) body).getValue());
+            assertTrue("Unexpected message body type for msg " + body.getClass(), body instanceof Data);
+            assertEquals("Unexpected body content for msg", new Binary(payload, 0, payload.length), ((Data) body).getValue());
 
             message.accept();
          }
@@ -411,6 +411,209 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
       }
    }
 
+   @Test(timeout = 60000)
+   public void testMessageWithAmqpValueAndEmptyBinaryPreservesBody() throws Exception {
+      server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(getTestName());
+
+         AmqpMessage message = createAmqpLargeMessageWithNoBody();
+
+         message.getWrappedMessage().setBody(new AmqpValue(new Binary(new byte[0])));
+
+         sender.send(message);
+         sender.close();
+
+         AmqpReceiver receiver = session.createReceiver(getTestName());
+         receiver.flow(1);
+
+         AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+         assertNotNull("failed to read large AMQP message", received);
+         MessageImpl wrapped = (MessageImpl) received.getWrappedMessage();
+
+         assertTrue(wrapped.getBody() instanceof AmqpValue);
+         AmqpValue body = (AmqpValue) wrapped.getBody();
+         assertTrue(body.getValue() instanceof Binary);
+         Binary payload = (Binary) body.getValue();
+         assertEquals(0, payload.getLength());
+
+         received.accept();
+         session.close();
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testMessageWithDataAndEmptyBinaryPreservesBody() throws Exception {
+      server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(getTestName());
+
+         AmqpMessage message = createAmqpLargeMessageWithNoBody();
+
+         message.getWrappedMessage().setBody(new Data(new Binary(new byte[0])));
+
+         sender.send(message);
+         sender.close();
+
+         AmqpReceiver receiver = session.createReceiver(getTestName());
+         receiver.flow(1);
+
+         AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+         assertNotNull("failed to read large AMQP message", received);
+         MessageImpl wrapped = (MessageImpl) received.getWrappedMessage();
+
+         assertTrue(wrapped.getBody() instanceof Data);
+         Data body = (Data) wrapped.getBody();
+         assertTrue(body.getValue() instanceof Binary);
+         Binary payload = (Binary) body.getValue();
+         assertEquals(0, payload.getLength());
+
+         received.accept();
+         session.close();
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testMessageWithDataAndContentTypeOfTextPreservesBodyType() throws Exception {
+      server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(getTestName());
+
+         AmqpMessage message = createAmqpLargeMessageWithNoBody();
+
+         String messageText = "This text will be in a Data Section";
+
+         message.getWrappedMessage().setContentType("text/plain");
+         message.getWrappedMessage().setBody(new Data(new Binary(messageText.getBytes(StandardCharsets.UTF_8))));
+
+         sender.send(message);
+         sender.close();
+
+         AmqpReceiver receiver = session.createReceiver(getTestName());
+         receiver.flow(1);
+
+         AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
+         assertNotNull("failed to read large AMQP message", received);
+         MessageImpl wrapped = (MessageImpl) received.getWrappedMessage();
+
+         assertTrue(wrapped.getBody() instanceof Data);
+         Data body = (Data) wrapped.getBody();
+         assertTrue(body.getValue() instanceof Binary);
+         Binary payload = (Binary) body.getValue();
+         String reconstitutedString = new String(
+            payload.getArray(), payload.getArrayOffset(), payload.getLength(), StandardCharsets.UTF_8);
+
+         assertEquals(messageText, reconstitutedString);
+
+         received.accept();
+         session.close();
+      } finally {
+         connection.close();
+      }
+   }
+
+   @SuppressWarnings({ "unchecked", "rawtypes" })
+   @Test(timeout = 60000)
+   public void testMessageWithAmqpValueListPreservesBodyType() throws Exception {
+      server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(getTestName());
+
+         AmqpMessage message = createAmqpLargeMessageWithNoBody();
+
+         List<String> values = new ArrayList<>();
+         values.add("1");
+         values.add("2");
+         values.add("3");
+
+         message.getWrappedMessage().setBody(new AmqpValue(values));
+
+         sender.send(message);
+         sender.close();
+
+         AmqpReceiver receiver = session.createReceiver(getTestName());
+         receiver.flow(1);
+
+         AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
+         assertNotNull("failed to read large AMQP message", received);
+         MessageImpl wrapped = (MessageImpl) received.getWrappedMessage();
+
+         assertTrue(wrapped.getBody() instanceof AmqpValue);
+         AmqpValue body = (AmqpValue) wrapped.getBody();
+         assertTrue(body.getValue() instanceof List);
+         List<String> payload = (List) body.getValue();
+         assertEquals(3, payload.size());
+
+         received.accept();
+         session.close();
+      } finally {
+         connection.close();
+      }
+   }
+
+   @SuppressWarnings({ "unchecked", "rawtypes" })
+   @Test(timeout = 60000)
+   public void testMessageWithAmqpSequencePreservesBodyType() throws Exception {
+      server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(getTestName());
+
+         AmqpMessage message = createAmqpLargeMessageWithNoBody();
+
+         List<String> values = new ArrayList<>();
+         values.add("1");
+         values.add("2");
+         values.add("3");
+
+         message.getWrappedMessage().setBody(new AmqpSequence(values));
+
+         sender.send(message);
+         sender.close();
+
+         AmqpReceiver receiver = session.createReceiver(getTestName());
+         receiver.flow(1);
+
+         AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
+         assertNotNull("failed to read large AMQP message", received);
+         MessageImpl wrapped = (MessageImpl) received.getWrappedMessage();
+
+         assertTrue(wrapped.getBody() instanceof AmqpSequence);
+         AmqpSequence body = (AmqpSequence) wrapped.getBody();
+         assertTrue(body.getValue() instanceof List);
+         List<String> payload = (List) body.getValue();
+         assertEquals(3, payload.size());
+
+         received.accept();
+         session.close();
+      } finally {
+         connection.close();
+      }
+   }
+
    private void sendObjectMessages(int nMsgs, ConnectionFactory factory) throws Exception {
       try (Connection connection = factory.createConnection()) {
          Session session = connection.createSession();
@@ -483,4 +686,17 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
       message.setBytes(payload);
       return message;
    }
+
+   private AmqpMessage createAmqpLargeMessageWithNoBody() {
+      AmqpMessage message = new AmqpMessage();
+
+      byte[] payload = new byte[512 * 1024];
+      for (int i = 0; i < payload.length; i++) {
+         payload[i] = (byte) 65;
+      }
+
+      message.setMessageAnnotation("x-opt-big-blob", new String(payload, StandardCharsets.UTF_8));
+
+      return message;
+   }
 }