You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/07/25 22:16:09 UTC

[1/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6374

Repository: activemq
Updated Branches:
  refs/heads/master 3953b9aae -> d54e21b2f


http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSTransformationSpeedComparisonTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSTransformationSpeedComparisonTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSTransformationSpeedComparisonTest.java
new file mode 100644
index 0000000..b008f1c
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSTransformationSpeedComparisonTest.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.message;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.transport.amqp.JMSInteroperabilityTest;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.codec.CompositeWritableBuffer;
+import org.apache.qpid.proton.codec.DroppingWritableBuffer;
+import org.apache.qpid.proton.codec.WritableBuffer;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.message.ProtonJMessage;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Some simple performance tests for the Message Transformers.
+ */
+@Ignore("Turn on to profile.")
+@RunWith(Parameterized.class)
+public class JMSTransformationSpeedComparisonTest {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(JMSInteroperabilityTest.class);
+
+    private final String transformer;
+
+    private final int WARM_CYCLES = 10;
+    private final int PROFILE_CYCLES = 1000000;
+
+    public JMSTransformationSpeedComparisonTest(String transformer) {
+        this.transformer = transformer;
+    }
+
+    @Parameters(name="Transformer->{0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {"jms"},
+                {"native"},
+                {"raw"},
+            });
+    }
+
+    private InboundTransformer getInboundTransformer() {
+        switch (transformer) {
+            case "raw":
+                return new AMQPRawInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+            case "native":
+                return new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+            default:
+                return new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+        }
+    }
+
+    private OutboundTransformer getOutboundTransformer() {
+        switch (transformer) {
+            case "raw":
+            case "native":
+                return new AMQPNativeOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
+            default:
+                return new JMSMappingOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
+        }
+    }
+
+    @Test
+    public void testBodyOnlyMessage() throws Exception {
+
+        Message message = Proton.message();
+
+        message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
+
+        EncodedMessage encoded = encode(message);
+        InboundTransformer inboundTransformer = getInboundTransformer();
+        OutboundTransformer outboundTransformer = getOutboundTransformer();
+
+        // Warm up
+        for (int i = 0; i < WARM_CYCLES; ++i) {
+            ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded);
+            intermediate.onSend();
+            outboundTransformer.transform(intermediate);
+        }
+
+        long totalDuration = 0;
+
+        long startTime = System.nanoTime();
+        for (int i = 0; i < PROFILE_CYCLES; ++i) {
+            ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded);
+            intermediate.onSend();
+            outboundTransformer.transform(intermediate);
+        }
+        totalDuration += System.nanoTime() - startTime;
+
+        LOG.info("[{}] Total time for {} cycles of transforms = {} ms",
+            transformer, PROFILE_CYCLES, TimeUnit.NANOSECONDS.toMillis(totalDuration));
+    }
+
+    @Test
+    public void testMessageWithNoPropertiesOrAnnotations() throws Exception {
+
+        Message message = Proton.message();
+
+        message.setAddress("queue://test-queue");
+        message.setDeliveryCount(1);
+        message.setCreationTime(System.currentTimeMillis());
+        message.setContentType("text/plain");
+        message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
+
+        EncodedMessage encoded = encode(message);
+        InboundTransformer inboundTransformer = getInboundTransformer();
+        OutboundTransformer outboundTransformer = getOutboundTransformer();
+
+        // Warm up
+        for (int i = 0; i < WARM_CYCLES; ++i) {
+            ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded);
+            intermediate.onSend();
+            outboundTransformer.transform(intermediate);
+        }
+
+        long totalDuration = 0;
+
+        long startTime = System.nanoTime();
+        for (int i = 0; i < PROFILE_CYCLES; ++i) {
+            ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded);
+            intermediate.onSend();
+            outboundTransformer.transform(intermediate);
+        }
+        totalDuration += System.nanoTime() - startTime;
+
+        LOG.info("[{}] Total time for {} cycles of transforms = {} ms",
+            transformer, PROFILE_CYCLES, TimeUnit.NANOSECONDS.toMillis(totalDuration));
+    }
+
+    @Test
+    public void testTypicalQpidJMSMessage() throws Exception {
+
+        Map<String, Object> applicationProperties = new HashMap<String, Object>();
+        Map<Symbol, Object> messageAnnotations = new HashMap<Symbol, Object>();
+
+        applicationProperties.put("property-1", "string");
+        applicationProperties.put("property-2", 512);
+        applicationProperties.put("property-3", true);
+
+        messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0);
+
+        Message message = Proton.message();
+
+        message.setAddress("queue://test-queue");
+        message.setDeliveryCount(1);
+        message.setApplicationProperties(new ApplicationProperties(applicationProperties));
+        message.setMessageAnnotations(new MessageAnnotations(messageAnnotations));
+        message.setCreationTime(System.currentTimeMillis());
+        message.setContentType("text/plain");
+        message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
+
+        EncodedMessage encoded = encode(message);
+        InboundTransformer inboundTransformer = getInboundTransformer();
+        OutboundTransformer outboundTransformer = getOutboundTransformer();
+
+        // Warm up
+        for (int i = 0; i < WARM_CYCLES; ++i) {
+            ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded);
+            intermediate.onSend();
+            outboundTransformer.transform(intermediate);
+        }
+
+        long totalDuration = 0;
+
+        long startTime = System.nanoTime();
+        for (int i = 0; i < PROFILE_CYCLES; ++i) {
+            ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded);
+            intermediate.onSend();
+            outboundTransformer.transform(intermediate);
+        }
+        totalDuration += System.nanoTime() - startTime;
+
+        LOG.info("[{}] Total time for {} cycles of transforms = {} ms",
+            transformer, PROFILE_CYCLES, TimeUnit.NANOSECONDS.toMillis(totalDuration));
+    }
+
+    @Test
+    public void testTypicalQpidJMSMessageInBoundOnly() throws Exception {
+
+        Map<String, Object> applicationProperties = new HashMap<String, Object>();
+        Map<Symbol, Object> messageAnnotations = new HashMap<Symbol, Object>();
+
+        applicationProperties.put("property-1", "string");
+        applicationProperties.put("property-2", 512);
+        applicationProperties.put("property-3", true);
+
+        messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0);
+
+        Message message = Proton.message();
+
+        message.setAddress("queue://test-queue");
+        message.setDeliveryCount(1);
+        message.setApplicationProperties(new ApplicationProperties(applicationProperties));
+        message.setMessageAnnotations(new MessageAnnotations(messageAnnotations));
+        message.setCreationTime(System.currentTimeMillis());
+        message.setContentType("text/plain");
+        message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
+
+        EncodedMessage encoded = encode(message);
+        InboundTransformer inboundTransformer = getInboundTransformer();
+
+        // Warm up
+        for (int i = 0; i < WARM_CYCLES; ++i) {
+            inboundTransformer.transform(encoded);
+        }
+
+        long totalDuration = 0;
+
+        long startTime = System.nanoTime();
+        for (int i = 0; i < PROFILE_CYCLES; ++i) {
+            inboundTransformer.transform(encoded);
+        }
+
+        totalDuration += System.nanoTime() - startTime;
+
+        LOG.info("[{}] Total time for {} cycles of transforms = {} ms",
+            transformer, PROFILE_CYCLES, TimeUnit.NANOSECONDS.toMillis(totalDuration));
+    }
+
+    @Test
+    public void testTypicalQpidJMSMessageOutBoundOnly() throws Exception {
+
+        Map<String, Object> applicationProperties = new HashMap<String, Object>();
+        Map<Symbol, Object> messageAnnotations = new HashMap<Symbol, Object>();
+
+        applicationProperties.put("property-1", "string");
+        applicationProperties.put("property-2", 512);
+        applicationProperties.put("property-3", true);
+
+        messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0);
+
+        Message message = Proton.message();
+
+        message.setAddress("queue://test-queue");
+        message.setDeliveryCount(1);
+        message.setApplicationProperties(new ApplicationProperties(applicationProperties));
+        message.setMessageAnnotations(new MessageAnnotations(messageAnnotations));
+        message.setCreationTime(System.currentTimeMillis());
+        message.setContentType("text/plain");
+        message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
+
+        EncodedMessage encoded = encode(message);
+        InboundTransformer inboundTransformer = getInboundTransformer();
+        OutboundTransformer outboundTransformer = getOutboundTransformer();
+
+        ActiveMQMessage outbound = (ActiveMQMessage) inboundTransformer.transform(encoded);
+        outbound.onSend();
+
+        // Warm up
+        for (int i = 0; i < WARM_CYCLES; ++i) {
+            outboundTransformer.transform(outbound);
+        }
+
+        long totalDuration = 0;
+
+        long startTime = System.nanoTime();
+        for (int i = 0; i < PROFILE_CYCLES; ++i) {
+            outboundTransformer.transform(outbound);
+        }
+
+        totalDuration += System.nanoTime() - startTime;
+
+        LOG.info("[{}] Total time for {} cycles of transforms = {} ms",
+            transformer, PROFILE_CYCLES, TimeUnit.NANOSECONDS.toMillis(totalDuration));
+    }
+
+    private EncodedMessage encode(Message message) {
+        ProtonJMessage amqp = (ProtonJMessage) message;
+
+        ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]);
+        final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
+        int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
+        if (overflow.position() > 0) {
+            buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]);
+            c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
+        }
+
+        return new EncodedMessage(1, buffer.array(), 0, c);
+    }
+}


[3/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6374

Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-6374

Refactor transformer to better map AMQP messages to JMS message types
and better preserve the original encoding of stored messages so that
they can be sent back to an AMQP client with expected content types.
Adds additional interoperability tests. 

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d54e21b2
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d54e21b2
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d54e21b2

Branch: refs/heads/master
Commit: d54e21b2ff563431919e8783f701c889497b2101
Parents: 3953b9a
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Jul 25 18:15:53 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Jul 25 18:15:53 2016 -0400

----------------------------------------------------------------------
 .../message/AMQPNativeInboundTransformer.java   |  13 +-
 .../message/AMQPNativeOutboundTransformer.java  |   4 +-
 .../amqp/message/AMQPRawInboundTransformer.java |  25 +-
 .../amqp/message/ActiveMQJMSVendor.java         | 328 +++++++-
 .../amqp/message/AmqpContentTypeSupport.java    | 145 ++++
 .../amqp/message/AmqpMessageSupport.java        | 150 ++++
 .../amqp/message/AutoOutboundTransformer.java   |  15 +-
 .../transport/amqp/message/EncodedMessage.java  |   4 +-
 .../amqp/message/InboundTransformer.java        |  49 +-
 .../message/InvalidContentTypeException.java    |  26 +
 .../message/JMSMappingInboundTransformer.java   | 155 ++--
 .../message/JMSMappingOutboundTransformer.java  | 263 ++++---
 .../transport/amqp/message/JMSVendor.java       |  53 --
 .../amqp/message/OutboundTransformer.java       |  46 +-
 .../transport/amqp/protocol/AmqpReceiver.java   |  18 +-
 .../transport/amqp/JMSInteroperabilityTest.java | 127 +++-
 .../message/AmqpContentTypeSupportTest.java     | 229 ++++++
 .../amqp/message/AmqpMessageSupportTest.java    | 108 +++
 .../JMSMappingInboundTransformerTest.java       | 555 +++++++++++++-
 .../JMSMappingOutboundTransformerTest.java      | 743 ++++++++++++++++++-
 .../JMSTransformationSpeedComparisonTest.java   | 312 ++++++++
 21 files changed, 3017 insertions(+), 351 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java
index a28b301..65cd657 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -20,7 +20,7 @@ import javax.jms.Message;
 
 public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
 
-    public AMQPNativeInboundTransformer(JMSVendor vendor) {
+    public AMQPNativeInboundTransformer(ActiveMQJMSVendor vendor) {
         super(vendor);
     }
 
@@ -35,12 +35,13 @@ public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
     }
 
     @Override
-    public Message transform(EncodedMessage amqpMessage) throws Exception {
+    protected Message doTransform(EncodedMessage amqpMessage) throws Exception {
         org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
 
-        Message rc = super.transform(amqpMessage);
+        Message result = super.doTransform(amqpMessage);
 
-        populateMessage(rc, amqp);
-        return rc;
+        populateMessage(result, amqp);
+
+        return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
index c1dc976..620b79b 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -32,7 +32,7 @@ import org.apache.qpid.proton.message.ProtonJMessage;
 
 public class AMQPNativeOutboundTransformer extends OutboundTransformer {
 
-    public AMQPNativeOutboundTransformer(JMSVendor vendor) {
+    public AMQPNativeOutboundTransformer(ActiveMQJMSVendor vendor) {
         super(vendor);
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
index e1414df..c534709 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -22,7 +22,7 @@ import javax.jms.Message;
 
 public class AMQPRawInboundTransformer extends InboundTransformer {
 
-    public AMQPRawInboundTransformer(JMSVendor vendor) {
+    public AMQPRawInboundTransformer(ActiveMQJMSVendor vendor) {
         super(vendor);
     }
 
@@ -33,28 +33,27 @@ public class AMQPRawInboundTransformer extends InboundTransformer {
 
     @Override
     public InboundTransformer getFallbackTransformer() {
-        return null;  // No fallback from full raw transform
+        return null;  // No fallback from full raw transform, message likely dropped.
     }
 
     @Override
-    public Message transform(EncodedMessage amqpMessage) throws Exception {
-        BytesMessage rc = vendor.createBytesMessage();
-        rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
+    protected Message doTransform(EncodedMessage amqpMessage) throws Exception {
+        BytesMessage result = vendor.createBytesMessage(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
 
         // We cannot decode the message headers to check so err on the side of caution
         // and mark all messages as persistent.
-        rc.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
-        rc.setJMSPriority(defaultPriority);
+        result.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+        result.setJMSPriority(defaultPriority);
 
         final long now = System.currentTimeMillis();
-        rc.setJMSTimestamp(now);
+        result.setJMSTimestamp(now);
         if (defaultTtl > 0) {
-            rc.setJMSExpiration(now + defaultTtl);
+            result.setJMSExpiration(now + defaultTtl);
         }
 
-        rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
-        rc.setBooleanProperty(prefixVendor + "NATIVE", true);
+        result.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
+        result.setBooleanProperty(prefixVendor + "NATIVE", true);
 
-        return rc;
+        return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java
index 216daa9..efd5017 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java
@@ -16,10 +16,18 @@
  */
 package org.apache.activemq.transport.amqp.message;
 
+import java.io.DataInputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.zip.InflaterInputStream;
+
 import javax.jms.BytesMessage;
 import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.jms.MapMessage;
 import javax.jms.Message;
+import javax.jms.MessageNotWriteableException;
 import javax.jms.ObjectMessage;
 import javax.jms.StreamMessage;
 import javax.jms.TextMessage;
@@ -31,71 +39,357 @@ import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQObjectMessage;
 import org.apache.activemq.command.ActiveMQStreamMessage;
 import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.transport.amqp.AmqpProtocolException;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.qpid.proton.amqp.Binary;
 
-public class ActiveMQJMSVendor implements JMSVendor {
+public class ActiveMQJMSVendor {
 
     final public static ActiveMQJMSVendor INSTANCE = new ActiveMQJMSVendor();
 
     private ActiveMQJMSVendor() {
     }
 
-    @Override
+    /**
+     * @return a new vendor specific Message instance.
+     */
+    public Message createMessage() {
+        return new ActiveMQMessage();
+    }
+
+    /**
+     * @return a new vendor specific BytesMessage instance.
+     */
     public BytesMessage createBytesMessage() {
         return new ActiveMQBytesMessage();
     }
 
-    @Override
-    public StreamMessage createStreamMessage() {
-        return new ActiveMQStreamMessage();
+    /**
+     * @return a new vendor specific BytesMessage instance with the given payload.
+     */
+    public BytesMessage createBytesMessage(byte[] content, int offset, int length) {
+        ActiveMQBytesMessage message = new ActiveMQBytesMessage();
+        message.setContent(new ByteSequence(content, offset, length));
+        return message;
     }
 
-    @Override
-    public Message createMessage() {
-        return new ActiveMQMessage();
+    /**
+     * @return a new vendor specific StreamMessage instance.
+     */
+    public StreamMessage createStreamMessage() {
+        return new ActiveMQStreamMessage();
     }
 
-    @Override
+    /**
+     * @return a new vendor specific TextMessage instance.
+     */
     public TextMessage createTextMessage() {
         return new ActiveMQTextMessage();
     }
 
-    @Override
+    /**
+     * @return a new vendor specific TextMessage instance with the given string in the body.
+     */
+    public TextMessage createTextMessage(String text) {
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        try {
+            message.setText(text);
+        } catch (MessageNotWriteableException ex) {}
+
+        return message;
+    }
+
+    /**
+     * @return a new vendor specific ObjectMessage instance.
+     */
     public ObjectMessage createObjectMessage() {
         return new ActiveMQObjectMessage();
     }
 
-    @Override
+    /**
+     * @return a new vendor specific ObjectMessage instance with the serialized form given.
+     */
+    public ObjectMessage createObjectMessage(byte[] content, int offset, int length) {
+        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
+        message.setContent(new ByteSequence(content, offset, length));
+        return message;
+    }
+
+    /**
+     * @return a new vendor specific MapMessage instance.
+     */
     public MapMessage createMapMessage() {
         return new ActiveMQMapMessage();
     }
 
-    @Override
+    /**
+     * @return a new vendor specific MapMessage instance with the given map as its content.
+     */
+    public MapMessage createMapMessage(Map<String, Object> content) throws JMSException {
+        ActiveMQMapMessage message = new ActiveMQMapMessage();
+        final Set<Map.Entry<String, Object>> set = content.entrySet();
+        for (Map.Entry<String, Object> entry : set) {
+            message.setObject(entry.getKey(), entry.getValue());
+        }
+        return message;
+    }
+
+    /**
+     * Creates a new JMS Destination instance from the given name.
+     *
+     * @param name
+     *      the name to use to construct the new Destination
+     *
+     * @return a new JMS Destination object derived from the given name.
+     */
     public Destination createDestination(String name) {
         return ActiveMQDestination.createDestination(name, ActiveMQDestination.QUEUE_TYPE);
     }
 
-    @Override
+    /**
+     * Set the given value as the JMSXUserID on the message instance.
+     *
+     * @param message
+     *      the message to be updated.
+     * @param value
+     *      the value to apply to the message.
+     */
     public void setJMSXUserID(Message msg, String value) {
         ((ActiveMQMessage) msg).setUserID(value);
     }
 
-    @Override
+    /**
+     * Set the given value as the JMSXGroupID on the message instance.
+     *
+     * @param message
+     *      the message to be updated.
+     * @param value
+     *      the value to apply to the message.
+     */
     public void setJMSXGroupID(Message msg, String value) {
         ((ActiveMQMessage) msg).setGroupID(value);
     }
 
-    @Override
+    /**
+     * Set the given value as the JMSXGroupSequence on the message instance.
+     *
+     * @param message
+     *      the message to be updated.
+     * @param value
+     *      the value to apply to the message.
+     */
     public void setJMSXGroupSequence(Message msg, int value) {
         ((ActiveMQMessage) msg).setGroupSequence(value);
     }
 
-    @Override
+    /**
+     * Set the given value as the JMSXDeliveryCount on the message instance.
+     *
+     * @param message
+     *      the message to be updated.
+     * @param value
+     *      the value to apply to the message.
+     */
     public void setJMSXDeliveryCount(Message msg, long value) {
         ((ActiveMQMessage) msg).setRedeliveryCounter((int) value);
     }
 
-    @Override
+    /**
+     * Convert the given JMS Destination into the appropriate AMQP address string
+     * for assignment to the 'to' or 'replyTo' field of an AMQP message.
+     *
+     * @param destination
+     *      the JMS Destination instance to be converted.
+     *
+     * @return the converted string address to assign to the message.
+     */
     public String toAddress(Destination dest) {
         return ((ActiveMQDestination) dest).getQualifiedName();
     }
+
+    /**
+     * Given an Message instance return the original Message ID that was assigned the
+     * Message when it was first processed by the broker.  For an AMQP message this
+     * should be the original value of the message's MessageId field with the correct
+     * type preserved.
+     *
+     * @param message
+     *      the message which is being accessed.
+     *
+     * @return the original MessageId assigned to this Message instance.
+     */
+    public Object getOriginalMessageId(Message message) {
+        Object result;
+        MessageId msgId = ((ActiveMQMessage)message).getMessageId();
+        if (msgId.getTextView() != null) {
+            try {
+                result = AMQPMessageIdHelper.INSTANCE.toIdObject(msgId.getTextView());
+            } catch (AmqpProtocolException e) {
+                result = msgId.getTextView().toString();
+            }
+        } else {
+            result = msgId.toString();
+        }
+
+        return result;
+    }
+
+    /**
+     * Return the encoded form of the BytesMessage as an AMQP Binary instance.
+     *
+     * @param message
+     *      the Message whose binary encoded body is needed.
+     *
+     * @return a Binary instance containing the encoded message body.
+     *
+     * @throws JMSException if an error occurs while fetching the binary payload.
+     */
+    public Binary getBinaryFromMessageBody(BytesMessage message) throws JMSException {
+        ActiveMQBytesMessage bytesMessage = (ActiveMQBytesMessage) message;
+        Binary result = null;
+
+        if (bytesMessage.getContent() != null) {
+            ByteSequence contents = bytesMessage.getContent();
+
+            if (bytesMessage.isCompressed()) {
+                int length = (int) bytesMessage.getBodyLength();
+                byte[] uncompressed = new byte[length];
+                bytesMessage.readBytes(uncompressed);
+
+                result = new Binary(uncompressed);
+            } else {
+                return new Binary(contents.getData(), contents.getOffset(), contents.getLength());
+            }
+        }
+
+        return result;
+    }
+
+    /**
+     * Return the encoded form of the BytesMessage as an AMQP Binary instance.
+     *
+     * @param message
+     *      the Message whose binary encoded body is needed.
+     *
+     * @return a Binary instance containing the encoded message body.
+     *
+     * @throws JMSException if an error occurs while fetching the binary payload.
+     */
+    public Binary getBinaryFromMessageBody(ObjectMessage message) throws JMSException {
+        ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) message;
+        Binary result = null;
+
+        if (objectMessage.getContent() != null) {
+            ByteSequence contents = objectMessage.getContent();
+
+            if (objectMessage.isCompressed()) {
+                try (ByteArrayOutputStream os = new ByteArrayOutputStream();
+                     ByteArrayInputStream is = new ByteArrayInputStream(contents);
+                     InflaterInputStream iis = new InflaterInputStream(is);) {
+
+                    byte value;
+                    while ((value = (byte) iis.read()) != -1) {
+                        os.write(value);
+                    }
+
+                    ByteSequence expanded = os.toByteSequence();
+                    result = new Binary(expanded.getData(), expanded.getOffset(), expanded.getLength());
+                } catch (Exception cause) {
+                   throw JMSExceptionSupport.create(cause);
+               }
+            } else {
+                return new Binary(contents.getData(), contents.getOffset(), contents.getLength());
+            }
+        }
+
+        return result;
+    }
+
+    /**
+     * Return the encoded form of the Message as an AMQP Binary instance.
+     *
+     * @param message
+     *      the Message whose binary encoded body is needed.
+     *
+     * @return a Binary instance containing the encoded message body.
+     *
+     * @throws JMSException if an error occurs while fetching the binary payload.
+     */
+    public Binary getBinaryFromMessageBody(TextMessage message) throws JMSException {
+        ActiveMQTextMessage textMessage = (ActiveMQTextMessage) message;
+        Binary result = null;
+
+        if (textMessage.getContent() != null) {
+            ByteSequence contents = textMessage.getContent();
+
+            if (textMessage.isCompressed()) {
+                try (ByteArrayInputStream is = new ByteArrayInputStream(contents);
+                     InflaterInputStream iis = new InflaterInputStream(is);
+                     DataInputStream dis = new DataInputStream(iis);) {
+
+                    int size = dis.readInt();
+                    byte[] uncompressed = new byte[size];
+                    dis.readFully(uncompressed);
+
+                    result = new Binary(uncompressed);
+                } catch (Exception cause) {
+                    throw JMSExceptionSupport.create(cause);
+                }
+            } else {
+                // Message includes a size prefix of four bytes for the OpenWire marshaler
+                result = new Binary(contents.getData(), contents.getOffset() + 4, contents.getLength() - 4);
+            }
+        }
+
+        return result;
+    }
+
+    /**
+     * Return the underlying Map from the JMS MapMessage instance.
+     *
+     * @param message
+     *      the MapMessage whose underlying Map is requested.
+     *
+     * @return the underlying Map used to store the value in the given MapMessage.
+     *
+     * @throws JMSException if an error occurs in constructing or fetching the Map.
+     */
+    public Map<String, Object> getMapFromMessageBody(MapMessage message) throws JMSException {
+        final HashMap<String, Object> map = new HashMap<String, Object>();
+        final ActiveMQMapMessage mapMessage = (ActiveMQMapMessage) message;
+
+        final Map<String, Object> contentMap = mapMessage.getContentMap();
+        if (contentMap != null) {
+            map.putAll(contentMap);
+        }
+
+        return contentMap;
+    }
+
+    /**
+     * Sets the given Message Property on the given message overriding any read-only
+     * state on the Message long enough for the property to be added.
+     *
+     * @param message
+     *      the message to set the property on.
+     * @param key
+     *      the String key for the new Message property
+     * @param value
+     *      the Object to assign to the new Message property.
+     *
+     * @throws JMSException if an error occurs while setting the property.
+     */
+    public void setMessageProperty(Message message, String key, Object value) throws JMSException {
+        final ActiveMQMessage amqMessage = (ActiveMQMessage) message;
+
+        boolean oldValue = amqMessage.isReadOnlyProperties();
+
+        amqMessage.setReadOnlyProperties(false);
+        amqMessage.setObjectProperty(key, value);
+        amqMessage.setReadOnlyProperties(oldValue);
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpContentTypeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpContentTypeSupport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpContentTypeSupport.java
new file mode 100644
index 0000000..114ade7
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpContentTypeSupport.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.message;
+
+import java.nio.charset.Charset;
+import java.nio.charset.IllegalCharsetNameException;
+import java.nio.charset.StandardCharsets;
+import java.nio.charset.UnsupportedCharsetException;
+import java.util.StringTokenizer;
+
+public final class AmqpContentTypeSupport {
+
+    private static final String UTF_8 = "UTF-8";
+    private static final String CHARSET = "charset";
+    private static final String TEXT = "text";
+    private static final String APPLICATION = "application";
+    private static final String JAVASCRIPT = "javascript";
+    private static final String XML = "xml";
+    private static final String XML_VARIANT = "+xml";
+    private static final String JSON = "json";
+    private static final String JSON_VARIANT = "+json";
+    private static final String XML_DTD = "xml-dtd";
+    private static final String ECMASCRIPT = "ecmascript";
+
+    /**
+     * @param contentType
+     *        the contentType of the received message
+     * @return the character set to use, or null if not to treat the message as
+     *         text
+     * @throws InvalidContentTypeException
+     *         if the content-type is invalid in some way.
+     */
+    public static Charset parseContentTypeForTextualCharset(final String contentType) throws InvalidContentTypeException {
+        if (contentType == null || contentType.trim().isEmpty()) {
+            throw new InvalidContentTypeException("Content type can't be null or empty");
+        }
+
+        int subTypeSeparator = contentType.indexOf("/");
+        if (subTypeSeparator == -1) {
+            throw new InvalidContentTypeException("Content type has no '/' separator: " + contentType);
+        }
+
+        final String type = contentType.substring(0, subTypeSeparator).toLowerCase().trim();
+
+        String subTypePart = contentType.substring(subTypeSeparator + 1).toLowerCase().trim();
+
+        String parameterPart = null;
+        int parameterSeparator = subTypePart.indexOf(";");
+        if (parameterSeparator != -1) {
+            if (parameterSeparator < subTypePart.length() - 1) {
+                parameterPart = contentType.substring(subTypeSeparator + 1).toLowerCase().trim();
+            }
+            subTypePart = subTypePart.substring(0, parameterSeparator).trim();
+        }
+
+        if (subTypePart.isEmpty()) {
+            throw new InvalidContentTypeException("Content type has no subtype after '/'" + contentType);
+        }
+
+        final String subType = subTypePart;
+
+        if (isTextual(type, subType)) {
+            String charset = findCharset(parameterPart);
+            if (charset == null) {
+                charset = UTF_8;
+            }
+
+            if (UTF_8.equals(charset)) {
+                return StandardCharsets.UTF_8;
+            } else {
+                try {
+                    return Charset.forName(charset);
+                } catch (IllegalCharsetNameException icne) {
+                    throw new InvalidContentTypeException("Illegal charset: " + charset);
+                } catch (UnsupportedCharsetException uce) {
+                    throw new InvalidContentTypeException("Unsupported charset: " + charset);
+                }
+            }
+        }
+
+        return null;
+    }
+
+    //----- Internal Content Type utilities ----------------------------------//
+
+    private static boolean isTextual(String type, String subType) {
+        if (TEXT.equals(type)) {
+            return true;
+        }
+
+        if (APPLICATION.equals(type)) {
+            if (XML.equals(subType) || JSON.equals(subType) || JAVASCRIPT.equals(subType) || subType.endsWith(XML_VARIANT) || subType.endsWith(JSON_VARIANT)
+                || XML_DTD.equals(subType) || ECMASCRIPT.equals(subType)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private static String findCharset(String paramaterPart) {
+        String charset = null;
+
+        if (paramaterPart != null) {
+            StringTokenizer tokenizer = new StringTokenizer(paramaterPart, ";");
+            while (tokenizer.hasMoreTokens()) {
+                String parameter = tokenizer.nextToken().trim();
+                int eqIndex = parameter.indexOf('=');
+                if (eqIndex != -1) {
+                    String name = parameter.substring(0, eqIndex);
+                    if (CHARSET.equalsIgnoreCase(name.trim())) {
+                        String value = unquote(parameter.substring(eqIndex + 1));
+
+                        charset = value.toUpperCase();
+                        break;
+                    }
+                }
+            }
+        }
+
+        return charset;
+    }
+
+    private static String unquote(String s) {
+        if (s.length() > 1 && (s.startsWith("\"") && s.endsWith("\""))) {
+            return s.substring(1, s.length() - 1);
+        } else {
+            return s;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java
new file mode 100644
index 0000000..3e7a60e
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.message;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.Map;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.message.Message;
+
+/**
+ * Support class containing constant values and static methods that are
+ * used to map to / from AMQP Message types being sent or received.
+ */
+public final class AmqpMessageSupport {
+
+    public static final Binary EMPTY_BINARY = new Binary(new byte[0]);
+    public static final Data EMPTY_BODY = new Data(EMPTY_BINARY);
+    public static final Data NULL_OBJECT_BODY;
+
+    public static final String AMQP_ORIGINAL_ENCODING_KEY = "JMS_AMQP_ORIGINAL_ENCODING";
+
+    public static final short AMQP_UNKNOWN = 0;
+    public static final short AMQP_NULL = 1;
+    public static final short AMQP_DATA = 2;
+    public static final short AMQP_SEQUENCE = 3;
+    public static final short AMQP_VALUE_NULL = 4;
+    public static final short AMQP_VALUE_STRING = 5;
+    public static final short AMQP_VALUE_BINARY = 6;
+    public static final short AMQP_VALUE_MAP = 7;
+    public static final short AMQP_VALUE_LIST = 8;
+
+    static {
+        byte[] bytes;
+        try {
+            bytes = getSerializedBytes(null);
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to initialise null object body", e);
+        }
+
+        NULL_OBJECT_BODY = new Data(new Binary(bytes));
+    }
+
+    /**
+     * Content type used to mark Data sections as containing a serialized java object.
+     */
+    public static final String SERIALIZED_JAVA_OBJECT_CONTENT_TYPE = "application/x-java-serialized-object";
+
+    /**
+     * Content type used to mark Data sections as containing arbitrary bytes.
+     */
+    public static final String OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
+
+    /**
+     * Lookup and return the correct Proton Symbol instance based on the given key.
+     *
+     * @param key
+     *        the String value name of the Symbol to locate.
+     *
+     * @return the Symbol value that matches the given key.
+     */
+    public static Symbol getSymbol(String key) {
+        return Symbol.valueOf(key);
+    }
+
+    /**
+     * Safe way to access message annotations which will check internal structure and
+     * either return the annotation if it exists or null if the annotation or any annotations
+     * are present.
+     *
+     * @param key
+     *        the String key to use to lookup an annotation.
+     * @param message
+     *        the AMQP message object that is being examined.
+     *
+     * @return the given annotation value or null if not present in the message.
+     */
+    public static Object getMessageAnnotation(String key, Message message) {
+        if (message != null && message.getMessageAnnotations() != null) {
+            Map<Symbol, Object> annotations = message.getMessageAnnotations().getValue();
+            return annotations.get(AmqpMessageSupport.getSymbol(key));
+        }
+
+        return null;
+    }
+
+    /**
+     * Check whether the content-type field of the properties section (if present) in
+     * the given message matches the provided string (where null matches if there is
+     * no content type present.
+     *
+     * @param contentType
+     *        content type string to compare against, or null if none
+     * @param message
+     *        the AMQP message object that is being examined.
+     *
+     * @return true if content type matches
+     */
+    public static boolean isContentType(String contentType, Message message) {
+        if (contentType == null) {
+            return message.getContentType() == null;
+        } else {
+            return contentType.equals(message.getContentType());
+        }
+    }
+
+    /**
+     * @param contentType the contentType of the received message
+     * @return the character set to use, or null if not to treat the message as text
+     */
+    public static Charset getCharsetForTextualContent(String contentType) {
+        try {
+            return AmqpContentTypeSupport.parseContentTypeForTextualCharset(contentType);
+        } catch (InvalidContentTypeException e) {
+            return null;
+        }
+    }
+
+    private static byte[] getSerializedBytes(Serializable value) throws IOException {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+
+            oos.writeObject(value);
+            oos.flush();
+            oos.close();
+
+            return baos.toByteArray();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java
index f30d4c4..f0f71a8 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -23,18 +23,21 @@ public class AutoOutboundTransformer extends JMSMappingOutboundTransformer {
 
     private final JMSMappingOutboundTransformer transformer;
 
-    public AutoOutboundTransformer(JMSVendor vendor) {
+    public AutoOutboundTransformer(ActiveMQJMSVendor vendor) {
         super(vendor);
+
         transformer = new JMSMappingOutboundTransformer(vendor);
     }
 
     @Override
     public EncodedMessage transform(Message msg) throws Exception {
-        if( msg == null )
+        if (msg == null) {
             return null;
-        if( msg.getBooleanProperty(prefixVendor + "NATIVE") ) {
-            if( msg instanceof BytesMessage ) {
-                return AMQPNativeOutboundTransformer.transform(this, (BytesMessage)msg);
+        }
+
+        if (msg.getBooleanProperty(prefixVendor + "NATIVE")) {
+            if (msg instanceof BytesMessage) {
+                return AMQPNativeOutboundTransformer.transform(this, (BytesMessage) msg);
             } else {
                 return null;
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/EncodedMessage.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/EncodedMessage.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/EncodedMessage.java
index 733c0ec0..0b25cfa 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/EncodedMessage.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/EncodedMessage.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -22,7 +22,7 @@ import org.apache.qpid.proton.message.Message;
 public class EncodedMessage {
 
     private final Binary data;
-    final long messageFormat;
+    private final long messageFormat;
 
     public EncodedMessage(long messageFormat, byte[] data, int offset, int length) {
         this.data = new Binary(data, offset, length);

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
index e883bcf..e6b7a0f 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
@@ -24,6 +24,7 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 
 import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.transport.amqp.AmqpProtocolException;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Decimal128;
 import org.apache.qpid.proton.amqp.Decimal32;
@@ -41,31 +42,51 @@ import org.apache.qpid.proton.amqp.messaging.Properties;
 
 public abstract class InboundTransformer {
 
-    JMSVendor vendor;
+    protected final ActiveMQJMSVendor vendor;
 
     public static final String TRANSFORMER_NATIVE = "native";
     public static final String TRANSFORMER_RAW = "raw";
     public static final String TRANSFORMER_JMS = "jms";
 
-    String prefixVendor = "JMS_AMQP_";
-    String prefixDeliveryAnnotations = "DA_";
-    String prefixMessageAnnotations = "MA_";
-    String prefixFooter = "FT_";
+    protected String prefixVendor = "JMS_AMQP_";
+    protected String prefixDeliveryAnnotations = "DA_";
+    protected String prefixMessageAnnotations = "MA_";
+    protected String prefixFooter = "FT_";
 
-    int defaultDeliveryMode = javax.jms.DeliveryMode.NON_PERSISTENT;
-    int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
-    long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
+    protected int defaultDeliveryMode = javax.jms.DeliveryMode.NON_PERSISTENT;
+    protected int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
+    protected long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
 
-    public InboundTransformer(JMSVendor vendor) {
+    public InboundTransformer(ActiveMQJMSVendor vendor) {
         this.vendor = vendor;
     }
 
-    public abstract Message transform(EncodedMessage amqpMessage) throws Exception;
-
     public abstract String getTransformerName();
 
     public abstract InboundTransformer getFallbackTransformer();
 
+    public final Message transform(EncodedMessage amqpMessage) throws Exception {
+        InboundTransformer transformer = this;
+        Message message = null;
+
+        while (transformer != null) {
+            try {
+                message = transformer.doTransform(amqpMessage);
+                break;
+            } catch (Exception e) {
+                transformer = transformer.getFallbackTransformer();
+            }
+        }
+
+        if (message == null) {
+            throw new AmqpProtocolException("Failed to transform incoming delivery, skipping.", false);
+        }
+
+        return message;
+    }
+
+    protected abstract Message doTransform(EncodedMessage amqpMessage) throws Exception;
+
     public int getDefaultDeliveryMode() {
         return defaultDeliveryMode;
     }
@@ -98,14 +119,10 @@ public abstract class InboundTransformer {
         this.prefixVendor = prefixVendor;
     }
 
-    public JMSVendor getVendor() {
+    public ActiveMQJMSVendor getVendor() {
         return vendor;
     }
 
-    public void setVendor(JMSVendor vendor) {
-        this.vendor = vendor;
-    }
-
     @SuppressWarnings("unchecked")
     protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
         Header header = amqp.getHeader();

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InvalidContentTypeException.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InvalidContentTypeException.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InvalidContentTypeException.java
new file mode 100644
index 0000000..7251b94
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InvalidContentTypeException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.message;
+
+public class InvalidContentTypeException extends Exception {
+
+    private static final long serialVersionUID = 1260362376856866687L;
+
+    public InvalidContentTypeException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
index 55a4db9..707e5da 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,27 +16,41 @@
  */
 package org.apache.activemq.transport.amqp.message;
 
-import java.io.Serializable;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_DATA;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_NULL;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_ORIGINAL_ENCODING_KEY;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_SEQUENCE;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_BINARY;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_LIST;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_MAP;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_NULL;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_STRING;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.getCharsetForTextualContent;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.isContentType;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
-import javax.jms.BytesMessage;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.ObjectMessage;
 import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
 
+import org.apache.activemq.transport.amqp.AmqpProtocolException;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.Data;
 import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.message.Message;
 
 public class JMSMappingInboundTransformer extends InboundTransformer {
 
-    public JMSMappingInboundTransformer(JMSVendor vendor) {
+    public JMSMappingInboundTransformer(ActiveMQJMSVendor vendor) {
         super(vendor);
     }
 
@@ -50,70 +64,113 @@ public class JMSMappingInboundTransformer extends InboundTransformer {
         return new AMQPNativeInboundTransformer(getVendor());
     }
 
-    @SuppressWarnings({ "unchecked" })
     @Override
-    public Message transform(EncodedMessage amqpMessage) throws Exception {
-        org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
+    protected javax.jms.Message doTransform(EncodedMessage amqpMessage) throws Exception {
+        Message amqp = amqpMessage.decode();
+
+        javax.jms.Message result = createMessage(amqp, amqpMessage);
+
+        result.setJMSDeliveryMode(defaultDeliveryMode);
+        result.setJMSPriority(defaultPriority);
+        result.setJMSExpiration(defaultTtl);
+
+        populateMessage(result, amqp);
+
+        result.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
+        result.setBooleanProperty(prefixVendor + "NATIVE", false);
+
+        return result;
+    }
+
+    @SuppressWarnings({ "unchecked" })
+    private javax.jms.Message createMessage(Message message, EncodedMessage original) throws Exception {
+
+        Section body = message.getBody();
+        javax.jms.Message result;
 
-        Message rc;
-        final Section body = amqp.getBody();
         if (body == null) {
-            rc = vendor.createMessage();
+            if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) {
+                result = vendor.createObjectMessage();
+            } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message) || isContentType(null, message)) {
+                result = vendor.createBytesMessage();
+            } else {
+                Charset charset = getCharsetForTextualContent(message.getContentType());
+                if (charset != null) {
+                    result = vendor.createTextMessage();
+                } else {
+                    result = vendor.createMessage();
+                }
+            }
+
+            result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_NULL);
         } else if (body instanceof Data) {
-            Binary d = ((Data) body).getValue();
-            BytesMessage m = vendor.createBytesMessage();
-            m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
-            rc = m;
+            Binary payload = ((Data) body).getValue();
+
+            if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) {
+                result = vendor.createObjectMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+            } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message)) {
+                result = vendor.createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+            } else {
+                Charset charset = getCharsetForTextualContent(message.getContentType());
+                if (StandardCharsets.UTF_8.equals(charset)) {
+                    ByteBuffer buf = ByteBuffer.wrap(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+
+                    try {
+                        CharBuffer chars = charset.newDecoder().decode(buf);
+                        result = vendor.createTextMessage(String.valueOf(chars));
+                    } catch (CharacterCodingException e) {
+                        result = vendor.createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+                    }
+                } else {
+                    result = vendor.createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+                }
+            }
+
+            result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_DATA);
         } else if (body instanceof AmqpSequence) {
             AmqpSequence sequence = (AmqpSequence) body;
             StreamMessage m = vendor.createStreamMessage();
             for (Object item : sequence.getValue()) {
                 m.writeObject(item);
             }
-            rc = m;
+
+            result = m;
+            result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_SEQUENCE);
         } else if (body instanceof AmqpValue) {
             Object value = ((AmqpValue) body).getValue();
-            if (value == null) {
-                rc = vendor.createObjectMessage();
-            }
-            if (value instanceof String) {
-                TextMessage m = vendor.createTextMessage();
-                m.setText((String) value);
-                rc = m;
+            if (value == null || value instanceof String) {
+                result = vendor.createTextMessage((String) value);
+
+                result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, value == null ? AMQP_VALUE_NULL : AMQP_VALUE_STRING);
             } else if (value instanceof Binary) {
-                Binary d = (Binary) value;
-                BytesMessage m = vendor.createBytesMessage();
-                m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
-                rc = m;
+                Binary payload = (Binary) value;
+
+                if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) {
+                    result = vendor.createObjectMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+                } else {
+                    result = vendor.createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+                }
+
+                result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY);
             } else if (value instanceof List) {
                 StreamMessage m = vendor.createStreamMessage();
                 for (Object item : (List<Object>) value) {
                     m.writeObject(item);
                 }
-                rc = m;
+                result = m;
+                result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_LIST);
             } else if (value instanceof Map) {
-                MapMessage m = vendor.createMapMessage();
-                final Set<Map.Entry<String, Object>> set = ((Map<String, Object>) value).entrySet();
-                for (Map.Entry<String, Object> entry : set) {
-                    m.setObject(entry.getKey(), entry.getValue());
-                }
-                rc = m;
+                result = vendor.createMapMessage((Map<String, Object>) value);
+                result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_MAP);
             } else {
-                ObjectMessage m = vendor.createObjectMessage();
-                m.setObject((Serializable) value);
-                rc = m;
+                // Trigger fall-back to native encoder which generates BytesMessage with the
+                // original message stored in the message body.
+                throw new AmqpProtocolException("Unable to encode to ActiveMQ JMS Message", false);
             }
         } else {
             throw new RuntimeException("Unexpected body type: " + body.getClass());
         }
-        rc.setJMSDeliveryMode(defaultDeliveryMode);
-        rc.setJMSPriority(defaultPriority);
-        rc.setJMSExpiration(defaultTtl);
-
-        populateMessage(rc, amqp);
 
-        rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
-        rc.setBooleanProperty(prefixVendor + "NATIVE", false);
-        return rc;
+        return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
index c9a94fa..59c306f 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
@@ -16,12 +16,24 @@
  */
 package org.apache.activemq.transport.amqp.message;
 
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_DATA;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_NULL;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_ORIGINAL_ENCODING_KEY;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_SEQUENCE;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_UNKNOWN;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_BINARY;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_LIST;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_STRING;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.EMPTY_BINARY;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
+
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.Enumeration;
 import java.util.HashMap;
+import java.util.Map;
 
 import javax.jms.BytesMessage;
 import javax.jms.DeliveryMode;
@@ -39,8 +51,6 @@ import javax.jms.TemporaryTopic;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.MessageId;
 import org.apache.activemq.transport.amqp.AmqpProtocolException;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
@@ -81,7 +91,7 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
     public static final String LEGACY_TEMP_QUEUE_TYPE = "temporary,queue";
     public static final String LEGACY_TEMP_TOPIC_TYPE = "temporary,topic";
 
-    public JMSMappingOutboundTransformer(JMSVendor vendor) {
+    public JMSMappingOutboundTransformer(ActiveMQJMSVendor vendor) {
         super(vendor);
     }
 
@@ -121,145 +131,107 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
     /**
      * Perform the conversion between JMS Message and Proton Message without
      * re-encoding it to array. This is needed because some frameworks may elect
-     * to do this on their own way (Netty for instance using Nettybuffers)
+     * to do this on their own way.
+     *
+     * @param message
+     *      The message to transform into an AMQP version for dispatch.
      *
-     * @param msg
-     * @return
-     * @throws Exception
+     * @return an AMQP Message that represents the given JMS Message.
+     *
+     * @throws Exception if an error occurs during the conversion.
      */
-    public ProtonJMessage convert(Message msg) throws JMSException, UnsupportedEncodingException {
+    public ProtonJMessage convert(Message message) throws JMSException, UnsupportedEncodingException {
         Header header = new Header();
         Properties props = new Properties();
-        HashMap<Symbol, Object> daMap = null;
-        HashMap<Symbol, Object> maMap = null;
-        HashMap apMap = null;
+
+        Map<Symbol, Object> daMap = null;
+        Map<Symbol, Object> maMap = null;
+        Map<String,Object> apMap = null;
+        Map<Object, Object> footerMap = null;
         Section body = null;
-        HashMap footerMap = null;
-        if (msg instanceof BytesMessage) {
-            BytesMessage m = (BytesMessage) msg;
-            byte data[] = new byte[(int) m.getBodyLength()];
-            m.readBytes(data);
-            m.reset(); // Need to reset after readBytes or future readBytes
-                       // calls (ex: redeliveries) will fail and return -1
-            body = new Data(new Binary(data));
-        }
-        if (msg instanceof TextMessage) {
-            body = new AmqpValue(((TextMessage) msg).getText());
-        }
-        if (msg instanceof MapMessage) {
-            final HashMap<String, Object> map = new HashMap<String, Object>();
-            final MapMessage m = (MapMessage) msg;
-            final Enumeration<String> names = m.getMapNames();
-            while (names.hasMoreElements()) {
-                String key = names.nextElement();
-                map.put(key, m.getObject(key));
-            }
-            body = new AmqpValue(map);
-        }
-        if (msg instanceof StreamMessage) {
-            ArrayList<Object> list = new ArrayList<Object>();
-            final StreamMessage m = (StreamMessage) msg;
-            try {
-                while (true) {
-                    list.add(m.readObject());
-                }
-            } catch (MessageEOFException e) {
-            }
-            body = new AmqpSequence(list);
-        }
-        if (msg instanceof ObjectMessage) {
-            body = new AmqpValue(((ObjectMessage) msg).getObject());
-        }
 
-        header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false);
-        header.setPriority(new UnsignedByte((byte) msg.getJMSPriority()));
-        if (msg.getJMSType() != null) {
-            props.setSubject(msg.getJMSType());
+        body = convertBody(message);
+
+        header.setDurable(message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false);
+        header.setPriority(new UnsignedByte((byte) message.getJMSPriority()));
+        if (message.getJMSType() != null) {
+            props.setSubject(message.getJMSType());
         }
-        if (msg.getJMSMessageID() != null) {
-            ActiveMQMessage amqMsg = (ActiveMQMessage) msg;
-
-            MessageId msgId = amqMsg.getMessageId();
-            if (msgId.getTextView() != null) {
-                try {
-                    props.setMessageId(AMQPMessageIdHelper.INSTANCE.toIdObject(msgId.getTextView()));
-                } catch (AmqpProtocolException e) {
-                    props.setMessageId(msgId.getTextView().toString());
-                }
-            } else {
-                props.setMessageId(msgId.toString());
-            }
+        if (message.getJMSMessageID() != null) {
+            props.setMessageId(vendor.getOriginalMessageId(message));
         }
-        if (msg.getJMSDestination() != null) {
-            props.setTo(vendor.toAddress(msg.getJMSDestination()));
+        if (message.getJMSDestination() != null) {
+            props.setTo(vendor.toAddress(message.getJMSDestination()));
             if (maMap == null) {
                 maMap = new HashMap<Symbol, Object>();
             }
-            maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSDestination()));
+            maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(message.getJMSDestination()));
 
             // Deprecated: used by legacy QPid AMQP 1.0 JMS client
-            maMap.put(LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION, destinationAttributes(msg.getJMSDestination()));
+            maMap.put(LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION, destinationAttributes(message.getJMSDestination()));
         }
-        if (msg.getJMSReplyTo() != null) {
-            props.setReplyTo(vendor.toAddress(msg.getJMSReplyTo()));
+        if (message.getJMSReplyTo() != null) {
+            props.setReplyTo(vendor.toAddress(message.getJMSReplyTo()));
             if (maMap == null) {
                 maMap = new HashMap<Symbol, Object>();
             }
-            maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSReplyTo()));
+            maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(message.getJMSReplyTo()));
 
             // Deprecated: used by legacy QPid AMQP 1.0 JMS client
-            maMap.put(LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationAttributes(msg.getJMSReplyTo()));
+            maMap.put(LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationAttributes(message.getJMSReplyTo()));
         }
-        if (msg.getJMSCorrelationID() != null) {
-            String correlationId = msg.getJMSCorrelationID();
+        if (message.getJMSCorrelationID() != null) {
+            String correlationId = message.getJMSCorrelationID();
             try {
                 props.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId));
             } catch (AmqpProtocolException e) {
                 props.setCorrelationId(correlationId);
             }
         }
-        if (msg.getJMSExpiration() != 0) {
-            long ttl = msg.getJMSExpiration() - System.currentTimeMillis();
+        if (message.getJMSExpiration() != 0) {
+            long ttl = message.getJMSExpiration() - System.currentTimeMillis();
             if (ttl < 0) {
                 ttl = 1;
             }
             header.setTtl(new UnsignedInteger((int) ttl));
 
-            props.setAbsoluteExpiryTime(new Date(msg.getJMSExpiration()));
+            props.setAbsoluteExpiryTime(new Date(message.getJMSExpiration()));
         }
-        if (msg.getJMSTimestamp() != 0) {
-            props.setCreationTime(new Date(msg.getJMSTimestamp()));
+        if (message.getJMSTimestamp() != 0) {
+            props.setCreationTime(new Date(message.getJMSTimestamp()));
         }
 
-        final Enumeration<String> keys = msg.getPropertyNames();
+        @SuppressWarnings("unchecked")
+        final Enumeration<String> keys = message.getPropertyNames();
+
         while (keys.hasMoreElements()) {
             String key = keys.nextElement();
-            if (key.equals(messageFormatKey) || key.equals(nativeKey)) {
-                // skip..
+            if (key.equals(messageFormatKey) || key.equals(nativeKey) || key.equals(AMQP_ORIGINAL_ENCODING_KEY)) {
+                // skip transformer appended properties
             } else if (key.equals(firstAcquirerKey)) {
-                header.setFirstAcquirer(msg.getBooleanProperty(key));
+                header.setFirstAcquirer(message.getBooleanProperty(key));
             } else if (key.startsWith("JMSXDeliveryCount")) {
                 // The AMQP delivery-count field only includes prior failed delivery attempts,
                 // whereas JMSXDeliveryCount includes the first/current delivery attempt.
-                int amqpDeliveryCount = msg.getIntProperty(key) - 1;
+                int amqpDeliveryCount = message.getIntProperty(key) - 1;
                 if (amqpDeliveryCount > 0) {
                     header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount));
                 }
             } else if (key.startsWith("JMSXUserID")) {
-                String value = msg.getStringProperty(key);
+                String value = message.getStringProperty(key);
                 props.setUserId(new Binary(value.getBytes("UTF-8")));
             } else if (key.startsWith("JMSXGroupID")) {
-                String value = msg.getStringProperty(key);
+                String value = message.getStringProperty(key);
                 props.setGroupId(value);
                 if (apMap == null) {
-                    apMap = new HashMap();
+                    apMap = new HashMap<String, Object>();
                 }
                 apMap.put(key, value);
             } else if (key.startsWith("JMSXGroupSeq")) {
-                UnsignedInteger value = new UnsignedInteger(msg.getIntProperty(key));
+                UnsignedInteger value = new UnsignedInteger(message.getIntProperty(key));
                 props.setGroupSequence(value);
                 if (apMap == null) {
-                    apMap = new HashMap();
+                    apMap = new HashMap<String, Object>();
                 }
                 apMap.put(key, value);
             } else if (key.startsWith(prefixDeliveryAnnotationsKey)) {
@@ -267,30 +239,30 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
                     daMap = new HashMap<Symbol, Object>();
                 }
                 String name = key.substring(prefixDeliveryAnnotationsKey.length());
-                daMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
+                daMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
             } else if (key.startsWith(prefixMessageAnnotationsKey)) {
                 if (maMap == null) {
                     maMap = new HashMap<Symbol, Object>();
                 }
                 String name = key.substring(prefixMessageAnnotationsKey.length());
-                maMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
+                maMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
             } else if (key.equals(contentTypeKey)) {
-                props.setContentType(Symbol.getSymbol(msg.getStringProperty(key)));
+                props.setContentType(Symbol.getSymbol(message.getStringProperty(key)));
             } else if (key.equals(contentEncodingKey)) {
-                props.setContentEncoding(Symbol.getSymbol(msg.getStringProperty(key)));
+                props.setContentEncoding(Symbol.getSymbol(message.getStringProperty(key)));
             } else if (key.equals(replyToGroupIDKey)) {
-                props.setReplyToGroupId(msg.getStringProperty(key));
+                props.setReplyToGroupId(message.getStringProperty(key));
             } else if (key.startsWith(prefixFooterKey)) {
                 if (footerMap == null) {
-                    footerMap = new HashMap();
+                    footerMap = new HashMap<Object, Object>();
                 }
                 String name = key.substring(prefixFooterKey.length());
-                footerMap.put(name, msg.getObjectProperty(key));
+                footerMap.put(name, message.getObjectProperty(key));
             } else {
                 if (apMap == null) {
-                    apMap = new HashMap();
+                    apMap = new HashMap<String, Object>();
                 }
-                apMap.put(key, msg.getObjectProperty(key));
+                apMap.put(key, message.getObjectProperty(key));
             }
         }
 
@@ -314,6 +286,101 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
         return (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(header, da, ma, props, ap, body, footer);
     }
 
+    private Section convertBody(Message message) throws JMSException {
+
+        Section body = null;
+        short orignalEncoding = AMQP_UNKNOWN;
+
+        if (message.propertyExists(AMQP_ORIGINAL_ENCODING_KEY)) {
+            try {
+                orignalEncoding = message.getShortProperty(AMQP_ORIGINAL_ENCODING_KEY);
+            } catch (Exception ex) {
+            }
+        }
+
+        if (message instanceof BytesMessage) {
+            Binary payload = vendor.getBinaryFromMessageBody((BytesMessage) message);
+
+            if (payload == null) {
+                payload = EMPTY_BINARY;
+            }
+
+            switch (orignalEncoding) {
+                case AMQP_NULL:
+                    break;
+                case AMQP_VALUE_BINARY:
+                    body = new AmqpValue(payload);
+                    break;
+                case AMQP_DATA:
+                case AMQP_UNKNOWN:
+                default:
+                    body = new Data(payload);
+                    break;
+            }
+        } else if (message instanceof TextMessage) {
+            switch (orignalEncoding) {
+                case AMQP_NULL:
+                    break;
+                case AMQP_DATA:
+                    body = new Data(vendor.getBinaryFromMessageBody((TextMessage) message));
+                    break;
+                case AMQP_VALUE_STRING:
+                case AMQP_UNKNOWN:
+                default:
+                    body = new AmqpValue(((TextMessage) message).getText());
+                    break;
+            }
+        } else if (message instanceof MapMessage) {
+            body = new AmqpValue(vendor.getMapFromMessageBody((MapMessage) message));
+        } else if (message instanceof StreamMessage) {
+            ArrayList<Object> list = new ArrayList<Object>();
+            final StreamMessage m = (StreamMessage) message;
+            try {
+                while (true) {
+                    list.add(m.readObject());
+                }
+            } catch (MessageEOFException e) {
+            }
+
+            switch (orignalEncoding) {
+                case AMQP_SEQUENCE:
+                    body = new AmqpSequence(list);
+                    break;
+                case AMQP_VALUE_LIST:
+                case AMQP_UNKNOWN:
+                default:
+                    body = new AmqpValue(list);
+                    break;
+            }
+        } else if (message instanceof ObjectMessage) {
+            Binary payload = vendor.getBinaryFromMessageBody((ObjectMessage) message);
+
+            if (payload == null) {
+                payload = EMPTY_BINARY;
+            }
+
+            switch (orignalEncoding) {
+                case AMQP_VALUE_BINARY:
+                    body = new AmqpValue(payload);
+                    break;
+                case AMQP_DATA:
+                case AMQP_UNKNOWN:
+                default:
+                    body = new Data(payload);
+                    break;
+            }
+
+            // For a non-AMQP message we tag the outbound content type as containing
+            // a serialized Java object so that an AMQP client has a hint as to what
+            // we are sending it.
+            if (!message.propertyExists(contentTypeKey)) {
+                vendor.setMessageProperty(message, contentTypeKey, SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
+            }
+        }
+
+        return body;
+    }
+
     private static byte destinationType(Destination destination) {
         if (destination instanceof Queue) {
             if (destination instanceof TemporaryQueue) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSVendor.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSVendor.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSVendor.java
deleted file mode 100644
index f9169ec..0000000
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSVendor.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.message;
-
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.ObjectMessage;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
-
-public interface JMSVendor {
-
-    public abstract BytesMessage createBytesMessage();
-
-    public abstract StreamMessage createStreamMessage();
-
-    public abstract Message createMessage();
-
-    public abstract TextMessage createTextMessage();
-
-    public abstract ObjectMessage createObjectMessage();
-
-    public abstract MapMessage createMapMessage();
-
-    public abstract void setJMSXUserID(Message message, String value);
-
-    public Destination createDestination(String name);
-
-    public abstract void setJMSXGroupID(Message message, String groupId);
-
-    public abstract void setJMSXGroupSequence(Message message, int value);
-
-    public abstract void setJMSXDeliveryCount(Message message, long value);
-
-    public abstract String toAddress(Destination destination);
-
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java
index 1d28a07..2eefa50 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -20,24 +20,25 @@ import javax.jms.Message;
 
 public abstract class OutboundTransformer {
 
-    JMSVendor vendor;
-    String prefixVendor;
+    protected final ActiveMQJMSVendor vendor;
 
-    String prefixDeliveryAnnotations = "DA_";
-    String prefixMessageAnnotations= "MA_";
-    String prefixFooter = "FT_";
+    protected String prefixVendor;
 
-    String messageFormatKey;
-    String nativeKey;
-    String firstAcquirerKey;
-    String prefixDeliveryAnnotationsKey;
-    String prefixMessageAnnotationsKey;
-    String contentTypeKey;
-    String contentEncodingKey;
-    String replyToGroupIDKey;
-    String prefixFooterKey;
+    protected String prefixDeliveryAnnotations = "DA_";
+    protected String prefixMessageAnnotations= "MA_";
+    protected String prefixFooter = "FT_";
 
-    public OutboundTransformer(JMSVendor vendor) {
+    protected String messageFormatKey;
+    protected String nativeKey;
+    protected String firstAcquirerKey;
+    protected String prefixDeliveryAnnotationsKey;
+    protected String prefixMessageAnnotationsKey;
+    protected String contentTypeKey;
+    protected String contentEncodingKey;
+    protected String replyToGroupIDKey;
+    protected String prefixFooterKey;
+
+    public OutboundTransformer(ActiveMQJMSVendor vendor) {
         this.vendor = vendor;
         this.setPrefixVendor("JMS_AMQP_");
     }
@@ -56,18 +57,13 @@ public abstract class OutboundTransformer {
         firstAcquirerKey = prefixVendor + "FirstAcquirer";
         prefixDeliveryAnnotationsKey = prefixVendor + prefixDeliveryAnnotations;
         prefixMessageAnnotationsKey = prefixVendor + prefixMessageAnnotations;
-        contentTypeKey = prefixVendor +"ContentType";
-        contentEncodingKey = prefixVendor +"ContentEncoding";
-        replyToGroupIDKey = prefixVendor +"ReplyToGroupID";
+        contentTypeKey = prefixVendor + "ContentType";
+        contentEncodingKey = prefixVendor + "ContentEncoding";
+        replyToGroupIDKey = prefixVendor + "ReplyToGroupID";
         prefixFooterKey = prefixVendor + prefixFooter;
-
     }
 
-    public JMSVendor getVendor() {
+    public ActiveMQJMSVendor getVendor() {
         return vendor;
     }
-
-    public void setVendor(JMSVendor vendor) {
-        this.vendor = vendor;
-    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
index 3ae018e..503a05e 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
@@ -157,23 +157,7 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
             EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), deliveryBytes.data, deliveryBytes.offset, deliveryBytes.length);
 
             InboundTransformer transformer = getTransformer();
-            ActiveMQMessage message = null;
-
-            while (transformer != null) {
-                try {
-                    message = (ActiveMQMessage) transformer.transform(em);
-                    break;
-                } catch (Exception e) {
-                    LOG.debug("Transform of message using [{}] transformer, failed", getTransformer().getTransformerName());
-                    LOG.trace("Transformation error:", e);
-
-                    transformer = transformer.getFallbackTransformer();
-                }
-            }
-
-            if (message == null) {
-                throw new IOException("Failed to transform incoming delivery, skipping.");
-            }
+            ActiveMQMessage message = (ActiveMQMessage) transformer.transform(em);
 
             current = null;
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
index 3ce5ef6..84d5864 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
@@ -29,6 +29,7 @@ import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -39,6 +40,7 @@ import javax.jms.ObjectMessage;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import org.apache.activemq.ActiveMQConnection;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -166,7 +168,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
     @Test(timeout = 60000)
     public void testMessagePropertiesArePreservedAMQPToOpenWire() throws Exception {
 
-        // Raw Transformer doesn't expand message propeties.
+        // Raw Transformer doesn't expand message properties.
         assumeFalse(transformer.equals("raw"));
 
         boolean bool = true;
@@ -284,7 +286,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
         // Now consumer the ObjectMessage
         Message received = amqpConsumer.receive(2000);
         assertNotNull(received);
-        assertTrue(received instanceof ObjectMessage);
+        assertTrue("Expected ObjectMessage but got " + received, received instanceof ObjectMessage);
         ObjectMessage incoming = (ObjectMessage) received;
 
         Object incomingObject = incoming.getObject();
@@ -297,7 +299,126 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
         openwire.close();
     }
 
-    //----- Tests for OpenWire to Qpid JMS using ObjectMessage ---------------//
+    //----- Tests for OpenWire <-> Qpid JMS using ObjectMessage --------------//
+
+    @Test
+    public void testQpidToOpenWireObjectMessage() throws Exception {
+
+        // Raw Transformer doesn't expand message properties.
+        assumeFalse(!transformer.equals("jms"));
+
+        Connection openwire = createJMSConnection();
+        Connection amqp = createConnection();
+
+        openwire.start();
+        amqp.start();
+
+        Session openwireSession = openwire.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Session amqpSession = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Destination queue = openwireSession.createQueue(getDestinationName());
+
+        MessageProducer amqpProducer = amqpSession.createProducer(queue);
+        MessageConsumer openwireConsumer = openwireSession.createConsumer(queue);
+
+        // Create and send the Message
+        ObjectMessage outgoing = amqpSession.createObjectMessage();
+        outgoing.setObject(UUID.randomUUID());
+        amqpProducer.send(outgoing);
+
+        // Now consumer the ObjectMessage
+        Message received = openwireConsumer.receive(2000);
+        assertNotNull(received);
+        LOG.info("Read new message: {}", received);
+        assertTrue(received instanceof ObjectMessage);
+        ObjectMessage incoming = (ObjectMessage) received;
+        Object payload = incoming.getObject();
+        assertNotNull(payload);
+        assertTrue(payload instanceof UUID);
+
+        amqp.close();
+        openwire.close();
+    }
+
+    @Test
+    public void testOpenWireToQpidObjectMessage() throws Exception {
+
+        // Raw Transformer doesn't expand message properties.
+        assumeFalse(!transformer.equals("jms"));
+
+        Connection openwire = createJMSConnection();
+        Connection amqp = createConnection();
+
+        openwire.start();
+        amqp.start();
+
+        Session openwireSession = openwire.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Session amqpSession = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Destination queue = openwireSession.createQueue(getDestinationName());
+
+        MessageProducer openwireProducer = openwireSession.createProducer(queue);
+        MessageConsumer amqpConsumer = amqpSession.createConsumer(queue);
+
+        // Create and send the Message
+        ObjectMessage outgoing = amqpSession.createObjectMessage();
+        outgoing.setObject(UUID.randomUUID());
+        openwireProducer.send(outgoing);
+
+        // Now consumer the ObjectMessage
+        Message received = amqpConsumer.receive(2000);
+        assertNotNull(received);
+        LOG.info("Read new message: {}", received);
+        assertTrue(received instanceof ObjectMessage);
+        ObjectMessage incoming = (ObjectMessage) received;
+        Object payload = incoming.getObject();
+        assertNotNull(payload);
+        assertTrue(payload instanceof UUID);
+
+        amqp.close();
+        openwire.close();
+    }
+
+    @Test
+    public void testOpenWireToQpidObjectMessageWithOpenWireCompression() throws Exception {
+
+        // Raw Transformer doesn't expand message properties.
+        assumeFalse(!transformer.equals("jms"));
+
+        Connection openwire = createJMSConnection();
+        ((ActiveMQConnection) openwire).setUseCompression(true);
+
+        Connection amqp = createConnection();
+
+        openwire.start();
+        amqp.start();
+
+        Session openwireSession = openwire.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Session amqpSession = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Destination queue = openwireSession.createQueue(getDestinationName());
+
+        MessageProducer openwireProducer = openwireSession.createProducer(queue);
+        MessageConsumer amqpConsumer = amqpSession.createConsumer(queue);
+
+        // Create and send the Message
+        ObjectMessage outgoing = amqpSession.createObjectMessage();
+        outgoing.setObject(UUID.randomUUID());
+        openwireProducer.send(outgoing);
+
+        // Now consumer the ObjectMessage
+        Message received = amqpConsumer.receive(2000);
+        assertNotNull(received);
+        LOG.info("Read new message: {}", received);
+        assertTrue(received instanceof ObjectMessage);
+        ObjectMessage incoming = (ObjectMessage) received;
+        Object payload = incoming.getObject();
+        assertNotNull(payload);
+        assertTrue(payload instanceof UUID);
+
+        amqp.close();
+        openwire.close();
+    }
 
     @SuppressWarnings("unchecked")
     @Test


[2/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6374

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/AmqpContentTypeSupportTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/AmqpContentTypeSupportTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/AmqpContentTypeSupportTest.java
new file mode 100644
index 0000000..e98dfc9
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/AmqpContentTypeSupportTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.message;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.Test;
+
+public class AmqpContentTypeSupportTest {
+
+    @Test (expected = InvalidContentTypeException.class)
+    public void testParseContentTypeWithOnlyType() throws Exception {
+        doParseContentTypeTestImpl("type", null);
+    }
+
+    @Test (expected = InvalidContentTypeException.class)
+    public void testParseContentTypeEndsWithSlash() throws Exception {
+        doParseContentTypeTestImpl("type/", null);
+    }
+
+    @Test (expected = InvalidContentTypeException.class)
+    public void testParseContentTypeMissingSubtype() throws Exception {
+        doParseContentTypeTestImpl("type/;", null);
+    }
+
+    @Test (expected = InvalidContentTypeException.class)
+    public void testParseContentTypeEmptyString() throws Exception {
+        doParseContentTypeTestImpl("", null);
+    }
+
+    @Test (expected = InvalidContentTypeException.class)
+    public void testParseContentTypeNullString() throws Exception {
+        doParseContentTypeTestImpl(null, null);
+    }
+
+    @Test
+    public void testParseContentTypeNoParamsAfterSeparatorNonTextual() throws Exception {
+        // Expect null as this is not a textual type
+        doParseContentTypeTestImpl("type/subtype;", null);
+    }
+
+    @Test
+    public void testParseContentTypeNoParamsAfterSeparatorTextualType() throws Exception {
+        doParseContentTypeTestImpl("text/plain;", StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testParseContentTypeEmptyParamsAfterSeparator() throws Exception {
+        doParseContentTypeTestImpl("text/plain;;", StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testParseContentTypeNoParams() throws Exception {
+        doParseContentTypeTestImpl("text/plain", StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testParseContentTypeWithCharsetUtf8() throws Exception {
+        doParseContentTypeTestImpl("text/plain;charset=utf-8", StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testParseContentTypeWithCharsetAscii() throws Exception {
+        doParseContentTypeTestImpl("text/plain;charset=us-ascii", StandardCharsets.US_ASCII);
+    }
+
+    @Test
+    public void testParseContentTypeWithMultipleParams() throws Exception {
+        doParseContentTypeTestImpl("text/plain; param=value; charset=us-ascii", StandardCharsets.US_ASCII);
+    }
+
+    @Test
+    public void testParseContentTypeWithCharsetQuoted() throws Exception {
+        doParseContentTypeTestImpl("text/plain;charset=\"us-ascii\"", StandardCharsets.US_ASCII);
+    }
+
+    @Test (expected = InvalidContentTypeException.class)
+    public void testParseContentTypeWithCharsetQuotedEmpty() throws Exception {
+        doParseContentTypeTestImpl("text/plain;charset=\"\"", null);
+    }
+
+    @Test (expected = InvalidContentTypeException.class)
+    public void testParseContentTypeWithCharsetQuoteNotClosed() throws Exception {
+        doParseContentTypeTestImpl("text/plain;charset=\"unclosed", null);
+    }
+
+    @Test (expected = InvalidContentTypeException.class)
+    public void testParseContentTypeWithCharsetQuoteNotClosedEmpty() throws Exception {
+        doParseContentTypeTestImpl("text/plain;charset=\"", null);
+    }
+
+    @Test (expected = InvalidContentTypeException.class)
+    public void testParseContentTypeWithNoCharsetValue() throws Exception {
+        doParseContentTypeTestImpl("text/plain;charset=", null);
+    }
+
+    @Test
+    public void testParseContentTypeWithTextPlain() throws Exception {
+        doParseContentTypeTestImpl("text/plain;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+        doParseContentTypeTestImpl("text/plain;charset=us-ascii", StandardCharsets.US_ASCII);
+        doParseContentTypeTestImpl("text/plain;charset=utf-8", StandardCharsets.UTF_8);
+        doParseContentTypeTestImpl("text/plain", StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testParseContentTypeWithTextJson() throws Exception {
+        doParseContentTypeTestImpl("text/json;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+        doParseContentTypeTestImpl("text/json;charset=us-ascii", StandardCharsets.US_ASCII);
+        doParseContentTypeTestImpl("text/json;charset=utf-8", StandardCharsets.UTF_8);
+        doParseContentTypeTestImpl("text/json", StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testParseContentTypeWithTextHtml() throws Exception {
+        doParseContentTypeTestImpl("text/html;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+        doParseContentTypeTestImpl("text/html;charset=us-ascii", StandardCharsets.US_ASCII);
+        doParseContentTypeTestImpl("text/html;charset=utf-8", StandardCharsets.UTF_8);
+        doParseContentTypeTestImpl("text/html", StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testParseContentTypeWithTextFoo() throws Exception {
+        doParseContentTypeTestImpl("text/foo;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+        doParseContentTypeTestImpl("text/foo;charset=us-ascii", StandardCharsets.US_ASCII);
+        doParseContentTypeTestImpl("text/foo;charset=utf-8", StandardCharsets.UTF_8);
+        doParseContentTypeTestImpl("text/foo", StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testParseContentTypeWithApplicationJson() throws Exception {
+        doParseContentTypeTestImpl("application/json;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+        doParseContentTypeTestImpl("application/json;charset=us-ascii", StandardCharsets.US_ASCII);
+        doParseContentTypeTestImpl("application/json;charset=utf-8", StandardCharsets.UTF_8);
+        doParseContentTypeTestImpl("application/json", StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testParseContentTypeWithApplicationJsonVariant() throws Exception {
+        doParseContentTypeTestImpl("application/something+json;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+        doParseContentTypeTestImpl("application/something+json;charset=us-ascii", StandardCharsets.US_ASCII);
+        doParseContentTypeTestImpl("application/something+json;charset=utf-8", StandardCharsets.UTF_8);
+        doParseContentTypeTestImpl("application/something+json", StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testParseContentTypeWithApplicationJavascript() throws Exception {
+        doParseContentTypeTestImpl("application/javascript;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+        doParseContentTypeTestImpl("application/javascript;charset=us-ascii", StandardCharsets.US_ASCII);
+        doParseContentTypeTestImpl("application/javascript;charset=utf-8", StandardCharsets.UTF_8);
+        doParseContentTypeTestImpl("application/javascript", StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testParseContentTypeWithApplicationEcmascript() throws Exception {
+        doParseContentTypeTestImpl("application/ecmascript;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+        doParseContentTypeTestImpl("application/ecmascript;charset=us-ascii", StandardCharsets.US_ASCII);
+        doParseContentTypeTestImpl("application/ecmascript;charset=utf-8", StandardCharsets.UTF_8);
+        doParseContentTypeTestImpl("application/ecmascript", StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testParseContentTypeWithApplicationXml() throws Exception {
+        doParseContentTypeTestImpl("application/xml;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+        doParseContentTypeTestImpl("application/xml;charset=us-ascii", StandardCharsets.US_ASCII);
+        doParseContentTypeTestImpl("application/xml;charset=utf-8", StandardCharsets.UTF_8);
+        doParseContentTypeTestImpl("application/xml", StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testParseContentTypeWithApplicationXmlVariant() throws Exception {
+        doParseContentTypeTestImpl("application/something+xml;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+        doParseContentTypeTestImpl("application/something+xml;charset=us-ascii", StandardCharsets.US_ASCII);
+        doParseContentTypeTestImpl("application/something+xml;charset=utf-8", StandardCharsets.UTF_8);
+        doParseContentTypeTestImpl("application/something+xml", StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testParseContentTypeWithApplicationXmlDtd() throws Exception {
+        doParseContentTypeTestImpl("application/xml-dtd;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+        doParseContentTypeTestImpl("application/xml-dtd;charset=us-ascii", StandardCharsets.US_ASCII);
+        doParseContentTypeTestImpl("application/xml-dtd;charset=utf-8", StandardCharsets.UTF_8);
+        doParseContentTypeTestImpl("application/xml-dtd", StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testParseContentTypeWithApplicationOtherNotTextual() throws Exception {
+        // Expect null as this is not a textual type
+        doParseContentTypeTestImpl("application/other", null);
+    }
+
+    @Test
+    public void testParseContentTypeWithApplicationOctetStream() throws Exception {
+        // Expect null as this is not a textual type
+        doParseContentTypeTestImpl(AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE, null);
+    }
+
+    @Test
+    public void testParseContentTypeWithApplicationJavaSerialized() throws Exception {
+        // Expect null as this is not a textual type
+        doParseContentTypeTestImpl(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, null);
+    }
+
+    private void doParseContentTypeTestImpl(String contentType, Charset expected) throws InvalidContentTypeException {
+        Charset charset = AmqpContentTypeSupport.parseContentTypeForTextualCharset(contentType);
+        if (expected == null) {
+            assertNull("Expected no charset, but got:" + charset, charset);
+        } else {
+            assertEquals("Charset not as expected", expected, charset);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupportTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupportTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupportTest.java
new file mode 100644
index 0000000..c346d71
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupportTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.message;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.message.Message;
+import org.junit.Test;
+
+public class AmqpMessageSupportTest {
+
+    //---------- getSymbol ---------------------------------------------------//
+
+    @Test
+    public void testGetSymbol() {
+        assertNotNull(AmqpMessageSupport.getSymbol("x-opt-something-or-other"));
+    }
+
+    //---------- getMessageAnnotation ----------------------------------------//
+
+    @Test
+    public void testGetMessageAnnotationWhenMessageHasAnnotationsMap() {
+        Map<Symbol, Object> messageAnnotationsMap = new HashMap<Symbol,Object>();
+        messageAnnotationsMap.put(Symbol.valueOf("x-opt-test"), Boolean.TRUE);
+        Message message = Proton.message();
+        message.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap));
+
+        assertNotNull(AmqpMessageSupport.getMessageAnnotation("x-opt-test", message));
+    }
+
+    @Test
+    public void testGetMessageAnnotationWhenMessageHasEmptyAnnotationsMap() {
+        Map<Symbol, Object> messageAnnotationsMap = new HashMap<Symbol,Object>();
+        Message message = Proton.message();
+        message.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap));
+
+        assertNull(AmqpMessageSupport.getMessageAnnotation("x-opt-test", message));
+    }
+
+    @Test
+    public void testGetMessageAnnotationWhenMessageHasNoAnnotationsMap() {
+        Message message = Proton.message();
+        assertNull(AmqpMessageSupport.getMessageAnnotation("x-opt-test", message));
+    }
+
+    @Test
+    public void testGetMessageAnnotationWhenMessageIsNull() {
+        assertNull(AmqpMessageSupport.getMessageAnnotation("x-opt-test", null));
+    }
+
+    //---------- isContentType -----------------------------------------------//
+
+    @Test
+    public void testIsContentTypeWithNullStringValueAndNullMessageContentType() {
+        Message message = Proton.message();
+        assertTrue(AmqpMessageSupport.isContentType(null, message));
+    }
+
+    @Test
+    public void testIsContentTypeWithNonNullStringValueAndNullMessageContentType() {
+        Message message = Proton.message();
+        assertFalse(AmqpMessageSupport.isContentType("test", message));
+    }
+
+    @Test
+    public void testIsContentTypeWithNonNullStringValueAndNonNullMessageContentTypeNotEqual() {
+        Message message = Proton.message();
+        message.setContentType("fails");
+        assertFalse(AmqpMessageSupport.isContentType("test", message));
+    }
+
+    @Test
+    public void testIsContentTypeWithNonNullStringValueAndNonNullMessageContentTypeEqual() {
+        Message message = Proton.message();
+        message.setContentType("test");
+        assertTrue(AmqpMessageSupport.isContentType("test", message));
+    }
+
+    @Test
+    public void testIsContentTypeWithNullStringValueAndNonNullMessageContentType() {
+        Message message = Proton.message();
+        message.setContentType("test");
+        assertFalse(AmqpMessageSupport.isContentType(null, message));
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java
index 7633e5c..ba0f014 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,11 +16,18 @@
  */
 package org.apache.activemq.transport.amqp.message;
 
-import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import javax.jms.Destination;
 import javax.jms.Queue;
@@ -29,8 +36,18 @@ import javax.jms.TemporaryTopic;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Data;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.message.Message;
 import org.junit.Test;
@@ -38,26 +55,513 @@ import org.mockito.Mockito;
 
 public class JMSMappingInboundTransformerTest {
 
+    //----- Null Body Section ------------------------------------------------//
+
+    /**
+     * Test that a message with no body section, but with the content type set to
+     * {@value AmqpMessageSupport#OCTET_STREAM_CONTENT_TYPE} results in a BytesMessage
+     *
+     * @throws Exception if an error occurs during the test.
+     */
     @Test
-    public void testTransformMessageWithAmqpValueStringCreatesTextMessage() throws Exception {
-        TextMessage mockTextMessage = createMockTextMessage();
-        JMSVendor mockVendor = createMockVendor(mockTextMessage);
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(mockVendor);
+    public void testCreateBytesMessageFromNoBodySectionAndContentType() throws Exception {
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+
+        Message message = Message.Factory.create();
+        message.setContentType(AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE);
+
+        EncodedMessage em = encodeMessage(message);
+        javax.jms.Message jmsMessage = transformer.transform(em);
+
+        assertNotNull("Message should not be null", jmsMessage);
+        assertEquals("Unexpected message class type", ActiveMQBytesMessage.class, jmsMessage.getClass());
+    }
+
+    /**
+     * Test that a message with no body section, and no content-type results in a BytesMessage
+     * when not otherwise annotated to indicate the type of JMS message it is.
+     *
+     * @throws Exception if an error occurs during the test.
+     */
+    @Test
+    public void testCreateBytesMessageFromNoBodySectionAndNoContentType() throws Exception {
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+
+        Message message = Message.Factory.create();
+
+        EncodedMessage em = encodeMessage(message);
+        javax.jms.Message jmsMessage = transformer.transform(em);
+
+        assertNotNull("Message should not be null", jmsMessage);
+        assertEquals("Unexpected message class type", ActiveMQBytesMessage.class, jmsMessage.getClass());
+    }
+
+    /**
+     * Test that a message with no body section, but with the content type set to
+     * {@value AmqpMessageSupport#SERIALIZED_JAVA_OBJECT_CONTENT_TYPE} results in an ObjectMessage
+     * when not otherwise annotated to indicate the type of JMS message it is.
+     *
+     * @throws Exception if an error occurs during the test.
+    */
+    @Test
+    public void testCreateObjectMessageFromNoBodySectionAndContentType() throws Exception {
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+
+        Message message = Message.Factory.create();
+        message.setContentType(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
+
+        EncodedMessage em = encodeMessage(message);
+        javax.jms.Message jmsMessage = transformer.transform(em);
+
+        assertNotNull("Message should not be null", jmsMessage);
+        assertEquals("Unexpected message class type", ActiveMQObjectMessage.class, jmsMessage.getClass());
+    }
+
+    @Test
+    public void testCreateTextMessageFromNoBodySectionAndContentType() throws Exception {
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+
+        Message message = Message.Factory.create();
+        message.setContentType("text/plain");
+
+        EncodedMessage em = encodeMessage(message);
+        javax.jms.Message jmsMessage = transformer.transform(em);
+
+        assertNotNull("Message should not be null", jmsMessage);
+        assertEquals("Unexpected message class type", ActiveMQTextMessage.class, jmsMessage.getClass());
+    }
+
+    /**
+     * Test that a message with no body section, and with the content type set to
+     * an unknown value results in a plain Message when not otherwise annotated to
+     * indicate the type of JMS message it is.
+     *
+     * @throws Exception if an error occurs during the test.
+     */
+    public void testCreateGenericMessageFromNoBodySectionAndUnknownContentType() throws Exception {
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+
+        Message message = Message.Factory.create();
+        message.setContentType("unknown-content-type");
+
+        EncodedMessage em = encodeMessage(message);
+        javax.jms.Message jmsMessage = transformer.transform(em);
+
+        assertNotNull("Message should not be null", jmsMessage);
+        assertEquals("Unexpected message class type", ActiveMQMessage.class, jmsMessage.getClass());
+    }
+
+    //----- Data Body Section ------------------------------------------------//
+
+    /**
+     * Test that a data body containing nothing, but with the content type set to
+     * {@value AmqpMessageSupport#OCTET_STREAM_CONTENT_TYPE} results in a BytesMessage when not
+     * otherwise annotated to indicate the type of JMS message it is.
+     *
+     * @throws Exception if an error occurs during the test.
+     */
+    @Test
+    public void testCreateBytesMessageFromDataWithEmptyBinaryAndContentType() throws Exception {
+        Message message = Proton.message();
+        Binary binary = new Binary(new byte[0]);
+        message.setBody(new Data(binary));
+        message.setContentType(AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE);
+
+        EncodedMessage em = encodeMessage(message);
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        javax.jms.Message jmsMessage = transformer.transform(em);
+
+        assertNotNull("Message should not be null", jmsMessage);
+        assertEquals("Unexpected message class type", ActiveMQBytesMessage.class, jmsMessage.getClass());
+    }
+
+    /**
+     * Test that a message with an empty data body section, and with the content type
+     * set to an unknown value results in a BytesMessage when not otherwise annotated
+     * to indicate the type of JMS message it is.
+     *
+     * @throws Exception if an error occurs during the test.
+     */
+    public void testCreateBytesMessageFromDataWithUnknownContentType() throws Exception {
+        Message message = Proton.message();
+        Binary binary = new Binary(new byte[0]);
+        message.setBody(new Data(binary));
+        message.setContentType("unknown-content-type");
+
+        EncodedMessage em = encodeMessage(message);
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        javax.jms.Message jmsMessage = transformer.transform(em);
+
+        assertNotNull("Message should not be null", jmsMessage);
+        assertEquals("Unexpected message class type", ActiveMQBytesMessage.class, jmsMessage.getClass());
+    }
+
+    /**
+     * Test that a receiving a data body containing nothing and no content type being set
+     * results in a BytesMessage when not otherwise annotated to indicate the type of
+     * JMS message it is.
+     *
+     * @throws Exception if an error occurs during the test.
+     */
+    @Test
+    public void testCreateBytesMessageFromDataWithEmptyBinaryAndNoContentType() throws Exception {
+        Message message = Proton.message();
+        Binary binary = new Binary(new byte[0]);
+        message.setBody(new Data(binary));
+
+        assertNull(message.getContentType());
+
+        EncodedMessage em = encodeMessage(message);
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        javax.jms.Message jmsMessage = transformer.transform(em);
+
+        assertNotNull("Message should not be null", jmsMessage);
+        assertEquals("Unexpected message class type", ActiveMQBytesMessage.class, jmsMessage.getClass());
+    }
+
+    /**
+     * Test that receiving a data body containing nothing, but with the content type set to
+     * {@value AmqpMessageSupport#SERIALIZED_JAVA_OBJECT_CONTENT_TYPE} results in an ObjectMessage
+     * when not otherwise annotated to indicate the type of JMS message it is.
+     *
+     * @throws Exception if an error occurs during the test.
+     */
+    @Test
+    public void testCreateObjectMessageFromDataWithContentTypeAndEmptyBinary() throws Exception {
+        Message message = Proton.message();
+        Binary binary = new Binary(new byte[0]);
+        message.setBody(new Data(binary));
+        message.setContentType(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
+
+        EncodedMessage em = encodeMessage(message);
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        javax.jms.Message jmsMessage = transformer.transform(em);
+
+        assertNotNull("Message should not be null", jmsMessage);
+        assertEquals("Unexpected message class type", ActiveMQObjectMessage.class, jmsMessage.getClass());
+    }
+
+    @Test
+    public void testCreateTextMessageFromDataWithContentTypeTextPlain() throws Exception {
+        doCreateTextMessageFromDataWithContentTypeTestImpl("text/plain;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("text/plain;charset=us-ascii", StandardCharsets.US_ASCII);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("text/plain;charset=utf-8", StandardCharsets.UTF_8);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("text/plain", StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testCreateTextMessageFromDataWithContentTypeTextJson() throws Exception {
+        doCreateTextMessageFromDataWithContentTypeTestImpl("text/json;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("text/json;charset=us-ascii", StandardCharsets.US_ASCII);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("text/json;charset=utf-8", StandardCharsets.UTF_8);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("text/json", StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testCreateTextMessageFromDataWithContentTypeTextHtml() throws Exception {
+        doCreateTextMessageFromDataWithContentTypeTestImpl("text/html;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("text/html;charset=us-ascii", StandardCharsets.US_ASCII);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("text/html;charset=utf-8", StandardCharsets.UTF_8);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("text/html", StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testCreateTextMessageFromDataWithContentTypeTextFoo() throws Exception {
+        doCreateTextMessageFromDataWithContentTypeTestImpl("text/foo;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("text/foo;charset=us-ascii", StandardCharsets.US_ASCII);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("text/foo;charset=utf-8", StandardCharsets.UTF_8);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("text/foo", StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testCreateTextMessageFromDataWithContentTypeApplicationJson() throws Exception {
+        doCreateTextMessageFromDataWithContentTypeTestImpl("application/json;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("application/json;charset=us-ascii", StandardCharsets.US_ASCII);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("application/json;charset=utf-8", StandardCharsets.UTF_8);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("application/json", StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testCreateTextMessageFromDataWithContentTypeApplicationJsonVariant() throws Exception {
+        doCreateTextMessageFromDataWithContentTypeTestImpl("application/something+json;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("application/something+json;charset=us-ascii", StandardCharsets.US_ASCII);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("application/something+json;charset=utf-8", StandardCharsets.UTF_8);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("application/something+json", StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testCreateTextMessageFromDataWithContentTypeApplicationJavascript() throws Exception {
+        doCreateTextMessageFromDataWithContentTypeTestImpl("application/javascript;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("application/javascript;charset=us-ascii", StandardCharsets.US_ASCII);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("application/javascript;charset=utf-8", StandardCharsets.UTF_8);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("application/javascript", StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testCreateTextMessageFromDataWithContentTypeApplicationEcmascript() throws Exception {
+        doCreateTextMessageFromDataWithContentTypeTestImpl("application/ecmascript;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("application/ecmascript;charset=us-ascii", StandardCharsets.US_ASCII);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("application/ecmascript;charset=utf-8", StandardCharsets.UTF_8);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("application/ecmascript", StandardCharsets.UTF_8);
+    }
 
+    @Test
+    public void testCreateTextMessageFromDataWithContentTypeApplicationXml() throws Exception {
+        doCreateTextMessageFromDataWithContentTypeTestImpl("application/xml;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("application/xml;charset=us-ascii", StandardCharsets.US_ASCII);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("application/xml;charset=utf-8", StandardCharsets.UTF_8);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("application/xml", StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testCreateTextMessageFromDataWithContentTypeApplicationXmlVariant() throws Exception {
+        doCreateTextMessageFromDataWithContentTypeTestImpl("application/something+xml;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("application/something+xml;charset=us-ascii", StandardCharsets.US_ASCII);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("application/something+xml;charset=utf-8", StandardCharsets.UTF_8);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("application/something+xml", StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testCreateTextMessageFromDataWithContentTypeApplicationXmlDtd() throws Exception {
+        doCreateTextMessageFromDataWithContentTypeTestImpl("application/xml-dtd;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("application/xml-dtd;charset=us-ascii", StandardCharsets.US_ASCII);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("application/xml-dtd;charset=utf-8", StandardCharsets.UTF_8);
+        doCreateTextMessageFromDataWithContentTypeTestImpl("application/xml-dtd", StandardCharsets.UTF_8);
+    }
+
+    private void doCreateTextMessageFromDataWithContentTypeTestImpl(String contentType, Charset expectedCharset) throws Exception {
+        Message message = Proton.message();
+        Binary binary = new Binary(new byte[0]);
+        message.setBody(new Data(binary));
+        message.setContentType(contentType);
+
+        EncodedMessage em = encodeMessage(message);
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        javax.jms.Message jmsMessage = transformer.transform(em);
+
+        assertNotNull("Message should not be null", jmsMessage);
+        if (StandardCharsets.UTF_8.equals(expectedCharset)) {
+            assertEquals("Unexpected message class type", ActiveMQTextMessage.class, jmsMessage.getClass());
+        } else {
+            assertEquals("Unexpected message class type", ActiveMQBytesMessage.class, jmsMessage.getClass());
+        }
+    }
+
+    //----- AmqpValue transformations ----------------------------------------//
+
+    /**
+     * Test that an amqp-value body containing a string results in a TextMessage
+     * when not otherwise annotated to indicate the type of JMS message it is.
+     *
+     * @throws Exception if an error occurs during the test.
+     */
+    @Test
+    public void testCreateTextMessageFromAmqpValueWithString() throws Exception {
+        Message message = Proton.message();
+        message.setBody(new AmqpValue("content"));
+
+        EncodedMessage em = encodeMessage(message);
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        javax.jms.Message jmsMessage = transformer.transform(em);
+
+        assertNotNull("Message should not be null", jmsMessage);
+        assertEquals("Unexpected message class type", ActiveMQTextMessage.class, jmsMessage.getClass());
+    }
+
+    /**
+     * Test that an amqp-value body containing a null results in an TextMessage
+     * when not otherwise annotated to indicate the type of JMS message it is.
+     *
+     * @throws Exception if an error occurs during the test.
+     */
+    @Test
+    public void testCreateTextMessageFromAmqpValueWithNull() throws Exception {
+        Message message = Proton.message();
+        message.setBody(new AmqpValue(null));
+
+        EncodedMessage em = encodeMessage(message);
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        javax.jms.Message jmsMessage = transformer.transform(em);
+
+        assertNotNull("Message should not be null", jmsMessage);
+        assertEquals("Unexpected message class type", ActiveMQTextMessage.class, jmsMessage.getClass());
+    }
+
+    /**
+     * Test that a message with an AmqpValue section containing a Binary, but with the content type
+     * set to {@value AmqpMessageSupport#SERIALIZED_JAVA_OBJECT_CONTENT_TYPE} results in an ObjectMessage
+     * when not otherwise annotated to indicate the type of JMS message it is.
+     *
+     * @throws Exception if an error occurs during the test.
+    */
+    @Test
+    public void testCreateObjectMessageFromAmqpValueWithBinaryAndContentType() throws Exception {
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+
+        Message message = Message.Factory.create();
+        message.setBody(new AmqpValue(new Binary(new byte[0])));
+        message.setContentType(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
+
+        EncodedMessage em = encodeMessage(message);
+        javax.jms.Message jmsMessage = transformer.transform(em);
+
+        assertNotNull("Message should not be null", jmsMessage);
+        assertEquals("Unexpected message class type", ActiveMQObjectMessage.class, jmsMessage.getClass());
+    }
+
+    /**
+     * Test that an amqp-value body containing a map results in an MapMessage
+     * when not otherwise annotated to indicate the type of JMS message it is.
+     *
+     * @throws Exception if an error occurs during the test.
+     */
+    @Test
+    public void testCreateAmqpMapMessageFromAmqpValueWithMap() throws Exception {
+        Message message = Proton.message();
+        Map<String, String> map = new HashMap<String,String>();
+        message.setBody(new AmqpValue(map));
+
+        EncodedMessage em = encodeMessage(message);
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        javax.jms.Message jmsMessage = transformer.transform(em);
+
+        assertNotNull("Message should not be null", jmsMessage);
+        assertEquals("Unexpected message class type", ActiveMQMapMessage.class, jmsMessage.getClass());
+    }
+
+    /**
+     * Test that an amqp-value body containing a list results in an StreamMessage
+     * when not otherwise annotated to indicate the type of JMS message it is.
+     *
+     * @throws Exception if an error occurs during the test.
+     */
+    @Test
+    public void testCreateAmqpStreamMessageFromAmqpValueWithList() throws Exception {
+        Message message = Proton.message();
+        List<String> list = new ArrayList<String>();
+        message.setBody(new AmqpValue(list));
+
+        EncodedMessage em = encodeMessage(message);
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        javax.jms.Message jmsMessage = transformer.transform(em);
+
+        assertNotNull("Message should not be null", jmsMessage);
+        assertEquals("Unexpected message class type", ActiveMQStreamMessage.class, jmsMessage.getClass());
+    }
+
+    /**
+     * Test that an amqp-sequence body containing a list results in an StreamMessage
+     * when not otherwise annotated to indicate the type of JMS message it is.
+     *
+     * @throws Exception if an error occurs during the test.
+     */
+    @Test
+    public void testCreateAmqpStreamMessageFromAmqpSequence() throws Exception {
+        Message message = Proton.message();
+        List<String> list = new ArrayList<String>();
+        message.setBody(new AmqpSequence(list));
+
+        EncodedMessage em = encodeMessage(message);
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        javax.jms.Message jmsMessage = transformer.transform(em);
+
+        assertNotNull("Message should not be null", jmsMessage);
+        assertEquals("Unexpected message class type", ActiveMQStreamMessage.class, jmsMessage.getClass());
+    }
+
+    /**
+     * Test that an amqp-value body containing a binary value results in BytesMessage
+     * when not otherwise annotated to indicate the type of JMS message it is.
+     *
+     * @throws Exception if an error occurs during the test.
+     */
+    @Test
+    public void testCreateAmqpBytesMessageFromAmqpValueWithBinary() throws Exception {
+        Message message = Proton.message();
+        Binary binary = new Binary(new byte[0]);
+        message.setBody(new AmqpValue(binary));
+
+        EncodedMessage em = encodeMessage(message);
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        javax.jms.Message jmsMessage = transformer.transform(em);
+
+        assertNotNull("Message should not be null", jmsMessage);
+        assertEquals("Unexpected message class type", ActiveMQBytesMessage.class, jmsMessage.getClass());
+    }
+
+    /**
+     * Test that an amqp-value body containing a value which can't be categorized results in
+     * an exception from the transformer and then try the transformer's own fallback transformer
+     * to result in an BytesMessage.
+     *
+     * @throws Exception if an error occurs during the test.
+     */
+    @Test
+    public void testCreateBytesMessageFromAmqpValueWithUncategorisedContent() throws Exception {
+        Message message = Proton.message();
+        message.setBody(new AmqpValue(UUID.randomUUID()));
+
+        EncodedMessage em = encodeMessage(message);
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+
+        javax.jms.Message jmsMessage = transformer.transform(em);
+
+        assertNotNull("Message should not be null", jmsMessage);
+        assertEquals("Unexpected message class type", ActiveMQBytesMessage.class, jmsMessage.getClass());
+    }
+
+    @Test
+    public void testTransformMessageWithAmqpValueStringCreatesTextMessage() throws Exception {
         String contentString = "myTextMessageContent";
-        Message amqp = Message.Factory.create();
-        amqp.setBody(new AmqpValue(contentString));
+        Message message = Message.Factory.create();
+        message.setBody(new AmqpValue(contentString));
 
-        EncodedMessage em = encodeMessage(amqp);
+        EncodedMessage em = encodeMessage(message);
 
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
         javax.jms.Message jmsMessage = transformer.transform(em);
 
         assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
-        Mockito.verify(mockTextMessage).setText(contentString);
-        assertSame("Expected provided mock message, got a different one", mockTextMessage, jmsMessage);
+        assertEquals("Unexpected message class type", ActiveMQTextMessage.class, jmsMessage.getClass());
+
+        TextMessage textMessage = (TextMessage) jmsMessage;
+
+        assertNotNull(textMessage.getText());
+        assertEquals(contentString, textMessage.getText());
     }
 
-    // ======= JMSDestination Handling =========
+    //----- Destination Conversions ------------------------------------------//
 
     @Test
     public void testTransformWithNoToTypeDestinationTypeAnnotation() throws Exception {
@@ -85,8 +589,8 @@ public class JMSMappingInboundTransformerTest {
     }
 
     private void doTransformWithToTypeDestinationTypeAnnotationTestImpl(Object toTypeAnnotationValue, Class<? extends Destination> expectedClass) throws Exception {
-        TextMessage mockTextMessage = createMockTextMessage();
-        JMSVendor mockVendor = createMockVendor(mockTextMessage);
+        ActiveMQTextMessage mockTextMessage = createMockTextMessage();
+        ActiveMQJMSVendor mockVendor = createMockVendor(mockTextMessage);
         JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(mockVendor);
 
         String toAddress = "toAddress";
@@ -111,7 +615,7 @@ public class JMSMappingInboundTransformerTest {
         // Mockito.verify(mockVendor).createDestination(toAddress, expectedClass);
     }
 
-    // ======= JMSReplyTo Handling =========
+    //----- ReplyTo Conversions ----------------------------------------------//
 
     @Test
     public void testTransformWithNoReplyToTypeDestinationTypeAnnotation() throws Exception {
@@ -139,8 +643,8 @@ public class JMSMappingInboundTransformerTest {
     }
 
     private void doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(Object replyToTypeAnnotationValue, Class<? extends Destination> expectedClass) throws Exception {
-        TextMessage mockTextMessage = createMockTextMessage();
-        JMSVendor mockVendor = createMockVendor(mockTextMessage);
+        ActiveMQTextMessage mockTextMessage = createMockTextMessage();
+        ActiveMQJMSVendor mockVendor = createMockVendor(mockTextMessage);
         JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(mockVendor);
 
         String replyToAddress = "replyToAddress";
@@ -165,21 +669,24 @@ public class JMSMappingInboundTransformerTest {
         // Mockito.verify(mockVendor).createDestination(replyToAddress, expectedClass);
     }
 
-    // ======= Utility Methods =========
-
-    private TextMessage createMockTextMessage() {
-        TextMessage mockTextMessage = Mockito.mock(TextMessage.class);
+    //----- Utility Methods --------------------------------------------------//
 
-        return mockTextMessage;
+    private ActiveMQTextMessage createMockTextMessage() {
+        return Mockito.mock(ActiveMQTextMessage.class);
     }
 
-    private JMSVendor createMockVendor(TextMessage mockTextMessage) {
-        JMSVendor mockVendor = Mockito.mock(JMSVendor.class);
+    private ActiveMQJMSVendor createMockVendor(ActiveMQTextMessage mockTextMessage) {
+        ActiveMQJMSVendor mockVendor = Mockito.mock(ActiveMQJMSVendor.class);
         Mockito.when(mockVendor.createTextMessage()).thenReturn(mockTextMessage);
+        Mockito.when(mockVendor.createTextMessage(Mockito.any(String.class))).thenReturn(mockTextMessage);
 
         return mockVendor;
     }
 
+    private ActiveMQJMSVendor createVendor() {
+        return ActiveMQJMSVendor.INSTANCE;
+    }
+
     private EncodedMessage encodeMessage(Message message) {
         byte[] encodeBuffer = new byte[1024 * 8];
         int encodedSize;

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
index 15beb31..9184101 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,6 +16,12 @@
  */
 package org.apache.activemq.transport.amqp.message;
 
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_DATA;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_NULL;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_ORIGINAL_ENCODING_KEY;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_SEQUENCE;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_UNKNOWN;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_BINARY;
 import static org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer.JMS_DEST_TYPE_MSG_ANNOTATION;
 import static org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer.JMS_REPLY_TO_TYPE_MSG_ANNOTATION;
 import static org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer.QUEUE_TYPE;
@@ -23,22 +29,40 @@ import static org.apache.activemq.transport.amqp.message.JMSMappingOutboundTrans
 import static org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer.TEMP_TOPIC_TYPE;
 import static org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer.TOPIC_TYPE;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.jms.Queue;
 import javax.jms.TemporaryQueue;
 import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
 import javax.jms.Topic;
 
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Data;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.message.Message;
 import org.junit.Test;
@@ -46,23 +70,581 @@ import org.mockito.Mockito;
 
 public class JMSMappingOutboundTransformerTest {
 
+    //----- no-body Message type tests ---------------------------------------//
+
+    @Test
+    public void testConvertMessageToAmqpMessageWithNoBody() throws Exception {
+        ActiveMQMessage outbound = createMessage();
+        outbound.onSend();
+        outbound.storeContent();
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+
+        Message amqp = transformer.convert(outbound);
+
+        assertNull(amqp.getBody());
+    }
+
+    @Test
+    public void testConvertTextMessageToAmqpMessageWithNoBodyOriginalEncodingWasNull() throws Exception {
+        ActiveMQTextMessage outbound = createTextMessage();
+        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_NULL);
+        outbound.onSend();
+        outbound.storeContent();
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+
+        Message amqp = transformer.convert(outbound);
+
+        assertNull(amqp.getBody());
+    }
+
+    //----- BytesMessage type tests ---------------------------------------//
+
+    @Test
+    public void testConvertEmptyBytesMessageToAmqpMessageWithDataBody() throws Exception {
+        ActiveMQBytesMessage outbound = createBytesMessage();
+        outbound.onSend();
+        outbound.storeContent();
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+
+        Message amqp = transformer.convert(outbound);
+
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof Data);
+        assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
+        assertEquals(0, ((Data) amqp.getBody()).getValue().getLength());
+    }
+
+    @Test
+    public void testConvertUncompressedBytesMessageToAmqpMessageWithDataBody() throws Exception {
+        byte[] expectedPayload = new byte[] { 8, 16, 24, 32 };
+        ActiveMQBytesMessage outbound = createBytesMessage();
+        outbound.writeBytes(expectedPayload);
+        outbound.storeContent();
+        outbound.onSend();
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+
+        Message amqp = transformer.convert(outbound);
+
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof Data);
+        assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
+        assertEquals(4, ((Data) amqp.getBody()).getValue().getLength());
+
+        Binary amqpData = ((Data) amqp.getBody()).getValue();
+        Binary inputData = new Binary(expectedPayload);
+
+        assertTrue(inputData.equals(amqpData));
+    }
+
+    @Test
+    public void testConvertCompressedBytesMessageToAmqpMessageWithDataBody() throws Exception {
+        byte[] expectedPayload = new byte[] { 8, 16, 24, 32 };
+        ActiveMQBytesMessage outbound = createBytesMessage(true);
+        outbound.writeBytes(expectedPayload);
+        outbound.storeContent();
+        outbound.onSend();
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+
+        Message amqp = transformer.convert(outbound);
+
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof Data);
+        assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
+        assertEquals(4, ((Data) amqp.getBody()).getValue().getLength());
+
+        Binary amqpData = ((Data) amqp.getBody()).getValue();
+        Binary inputData = new Binary(expectedPayload);
+
+        assertTrue(inputData.equals(amqpData));
+    }
+
+    @Test
+    public void testConvertEmptyBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception {
+        ActiveMQBytesMessage outbound = createBytesMessage();
+        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY);
+        outbound.onSend();
+        outbound.storeContent();
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+
+        Message amqp = transformer.convert(outbound);
+
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof AmqpValue);
+        assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary);
+        assertEquals(0, ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength());
+    }
+
+    @Test
+    public void testConvertUncompressedBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception {
+        byte[] expectedPayload = new byte[] { 8, 16, 24, 32 };
+        ActiveMQBytesMessage outbound = createBytesMessage();
+        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY);
+        outbound.writeBytes(expectedPayload);
+        outbound.storeContent();
+        outbound.onSend();
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+
+        Message amqp = transformer.convert(outbound);
+
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof AmqpValue);
+        assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary);
+        assertEquals(4, ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength());
+
+        Binary amqpData = (Binary) ((AmqpValue) amqp.getBody()).getValue();
+        Binary inputData = new Binary(expectedPayload);
+
+        assertTrue(inputData.equals(amqpData));
+    }
+
+    @Test
+    public void testConvertCompressedBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception {
+        byte[] expectedPayload = new byte[] { 8, 16, 24, 32 };
+        ActiveMQBytesMessage outbound = createBytesMessage(true);
+        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY);
+        outbound.writeBytes(expectedPayload);
+        outbound.storeContent();
+        outbound.onSend();
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+
+        Message amqp = transformer.convert(outbound);
+
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof AmqpValue);
+        assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary);
+        assertEquals(4, ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength());
+
+        Binary amqpData = (Binary) ((AmqpValue) amqp.getBody()).getValue();
+        Binary inputData = new Binary(expectedPayload);
+
+        assertTrue(inputData.equals(amqpData));
+    }
+
+    //----- MapMessage type tests --------------------------------------------//
+
+    @Test
+    public void testConvertMapMessageToAmqpMessageWithNoBody() throws Exception {
+        ActiveMQMapMessage outbound = createMapMessage();
+        outbound.onSend();
+        outbound.storeContent();
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+
+        Message amqp = transformer.convert(outbound);
+
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof AmqpValue);
+        assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map);
+    }
+
+    @Test
+    public void testConvertMapMessageToAmqpMessage() throws Exception {
+        ActiveMQMapMessage outbound = createMapMessage();
+        outbound.setString("property-1", "string");
+        outbound.setInt("property-2", 1);
+        outbound.setBoolean("property-3", true);
+        outbound.onSend();
+        outbound.storeContent();
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+
+        Message amqp = transformer.convert(outbound);
+
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof AmqpValue);
+        assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map);
+
+        @SuppressWarnings("unchecked")
+        Map<Object, Object> amqpMap = (Map<Object, Object>) ((AmqpValue) amqp.getBody()).getValue();
+
+        assertEquals(3, amqpMap.size());
+        assertTrue("string".equals(amqpMap.get("property-1")));
+    }
+
+    @Test
+    public void testConvertCompressedMapMessageToAmqpMessage() throws Exception {
+        ActiveMQMapMessage outbound = createMapMessage(true);
+        outbound.setString("property-1", "string");
+        outbound.setInt("property-2", 1);
+        outbound.setBoolean("property-3", true);
+        outbound.onSend();
+        outbound.storeContent();
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+
+        Message amqp = transformer.convert(outbound);
+
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof AmqpValue);
+        assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map);
+
+        @SuppressWarnings("unchecked")
+        Map<Object, Object> amqpMap = (Map<Object, Object>) ((AmqpValue) amqp.getBody()).getValue();
+
+        assertEquals(3, amqpMap.size());
+        assertTrue("string".equals(amqpMap.get("property-1")));
+    }
+
+    //----- StreamMessage type tests -----------------------------------------//
+
+    @Test
+    public void testConvertStreamMessageToAmqpMessageWithAmqpValueBody() throws Exception {
+        ActiveMQStreamMessage outbound = createStreamMessage();
+        outbound.onSend();
+        outbound.storeContent();
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+
+        Message amqp = transformer.convert(outbound);
+
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof AmqpValue);
+        assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof List);
+    }
+
+    @Test
+    public void testConvertStreamMessageToAmqpMessageWithAmqpSequencey() throws Exception {
+        ActiveMQStreamMessage outbound = createStreamMessage();
+        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_SEQUENCE);
+        outbound.onSend();
+        outbound.storeContent();
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+
+        Message amqp = transformer.convert(outbound);
+
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof AmqpSequence);
+        assertTrue(((AmqpSequence) amqp.getBody()).getValue() instanceof List);
+    }
+
+    @Test
+    public void testConvertCompressedStreamMessageToAmqpMessageWithAmqpValueBody() throws Exception {
+        ActiveMQStreamMessage outbound = createStreamMessage(true);
+        outbound.writeBoolean(false);
+        outbound.writeString("test");
+        outbound.onSend();
+        outbound.storeContent();
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+
+        Message amqp = transformer.convert(outbound);
+
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof AmqpValue);
+        assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof List);
+
+        @SuppressWarnings("unchecked")
+        List<Object> amqpList = (List<Object>) ((AmqpValue) amqp.getBody()).getValue();
+
+        assertEquals(2, amqpList.size());
+    }
+
+    @Test
+    public void testConvertCompressedStreamMessageToAmqpMessageWithAmqpSequencey() throws Exception {
+        ActiveMQStreamMessage outbound = createStreamMessage(true);
+        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_SEQUENCE);
+        outbound.writeBoolean(false);
+        outbound.writeString("test");
+        outbound.onSend();
+        outbound.storeContent();
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+
+        Message amqp = transformer.convert(outbound);
+
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof AmqpSequence);
+        assertTrue(((AmqpSequence) amqp.getBody()).getValue() instanceof List);
+
+        @SuppressWarnings("unchecked")
+        List<Object> amqpList = ((AmqpSequence) amqp.getBody()).getValue();
+
+        assertEquals(2, amqpList.size());
+    }
+
+    //----- ObjectMessage type tests -----------------------------------------//
+
+    @Test
+    public void testConvertEmptyObjectMessageToAmqpMessageWithDataBody() throws Exception {
+        ActiveMQObjectMessage outbound = createObjectMessage();
+        outbound.onSend();
+        outbound.storeContent();
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+
+        Message amqp = transformer.convert(outbound);
+
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof Data);
+        assertEquals(0, ((Data) amqp.getBody()).getValue().getLength());
+    }
+
+    @Test
+    public void testConvertEmptyObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception {
+        ActiveMQObjectMessage outbound = createObjectMessage();
+        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_UNKNOWN);
+        outbound.onSend();
+        outbound.storeContent();
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+
+        Message amqp = transformer.convert(outbound);
+
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof Data);
+        assertEquals(0, ((Data) amqp.getBody()).getValue().getLength());
+    }
+
+    @Test
+    public void testConvertEmptyObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception {
+        ActiveMQObjectMessage outbound = createObjectMessage();
+        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY);
+        outbound.onSend();
+        outbound.storeContent();
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+
+        Message amqp = transformer.convert(outbound);
+
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof AmqpValue);
+        assertTrue(((AmqpValue)amqp.getBody()).getValue() instanceof Binary);
+        assertEquals(0, ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength());
+    }
+
+    @Test
+    public void testConvertObjectMessageToAmqpMessageWithDataBody() throws Exception {
+        ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID());
+        outbound.onSend();
+        outbound.storeContent();
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+
+        Message amqp = transformer.convert(outbound);
+
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof Data);
+        assertFalse(0 == ((Data) amqp.getBody()).getValue().getLength());
+
+        Object value = deserialize(((Data) amqp.getBody()).getValue().getArray());
+        assertNotNull(value);
+        assertTrue(value instanceof UUID);
+    }
+
+    @Test
+    public void testConvertObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception {
+        ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID());
+        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_UNKNOWN);
+        outbound.onSend();
+        outbound.storeContent();
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+
+        Message amqp = transformer.convert(outbound);
+
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof Data);
+        assertFalse(0 == ((Data) amqp.getBody()).getValue().getLength());
+
+        Object value = deserialize(((Data) amqp.getBody()).getValue().getArray());
+        assertNotNull(value);
+        assertTrue(value instanceof UUID);
+    }
+
+    @Test
+    public void testConvertObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception {
+        ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID());
+        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY);
+        outbound.onSend();
+        outbound.storeContent();
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+
+        Message amqp = transformer.convert(outbound);
+
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof AmqpValue);
+        assertTrue(((AmqpValue)amqp.getBody()).getValue() instanceof Binary);
+        assertFalse(0 == ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength());
+
+        Object value = deserialize(((Binary) ((AmqpValue) amqp.getBody()).getValue()).getArray());
+        assertNotNull(value);
+        assertTrue(value instanceof UUID);
+    }
+
     @Test
-    public void testConvertMessageWithTextMessageCreatesAmqpValueStringBody() throws Exception {
+    public void testConvertCompressedObjectMessageToAmqpMessageWithDataBody() throws Exception {
+        ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID(), true);
+        outbound.onSend();
+        outbound.storeContent();
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+
+        Message amqp = transformer.convert(outbound);
+
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof Data);
+        assertFalse(0 == ((Data) amqp.getBody()).getValue().getLength());
+
+        Object value = deserialize(((Data) amqp.getBody()).getValue().getArray());
+        assertNotNull(value);
+        assertTrue(value instanceof UUID);
+    }
+
+    @Test
+    public void testConvertCompressedObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception {
+        ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID(), true);
+        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_UNKNOWN);
+        outbound.onSend();
+        outbound.storeContent();
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+
+        Message amqp = transformer.convert(outbound);
+
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof Data);
+        assertFalse(0 == ((Data) amqp.getBody()).getValue().getLength());
+
+        Object value = deserialize(((Data) amqp.getBody()).getValue().getArray());
+        assertNotNull(value);
+        assertTrue(value instanceof UUID);
+    }
+
+    @Test
+    public void testConvertCompressedObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception {
+        ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID(), true);
+        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY);
+        outbound.onSend();
+        outbound.storeContent();
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+
+        Message amqp = transformer.convert(outbound);
+
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof AmqpValue);
+        assertTrue(((AmqpValue)amqp.getBody()).getValue() instanceof Binary);
+        assertFalse(0 == ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength());
+
+        Object value = deserialize(((Binary) ((AmqpValue) amqp.getBody()).getValue()).getArray());
+        assertNotNull(value);
+        assertTrue(value instanceof UUID);
+    }
+
+    //----- TextMessage type tests -------------------------------------------//
+
+    @Test
+    public void testConvertTextMessageToAmqpMessageWithNoBody() throws Exception {
+        ActiveMQTextMessage outbound = createTextMessage();
+        outbound.onSend();
+        outbound.storeContent();
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+
+        Message amqp = transformer.convert(outbound);
+
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof AmqpValue);
+        assertNull(((AmqpValue) amqp.getBody()).getValue());
+    }
+
+    @Test
+    public void testConvertTextMessageCreatesBodyUsingOriginalEncodingWithDataSection() throws Exception {
         String contentString = "myTextMessageContent";
-        TextMessage mockTextMessage = createMockTextMessage();
-        Mockito.when(mockTextMessage.getText()).thenReturn(contentString);
-        JMSVendor mockVendor = createMockVendor();
+        ActiveMQTextMessage outbound = createTextMessage(contentString);
+        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_DATA);
+        outbound.onSend();
+        outbound.storeContent();
 
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(mockVendor);
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
 
-        Message amqp = transformer.convert(mockTextMessage);
+        Message amqp = transformer.convert(outbound);
+
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof Data);
+        assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
+
+        Binary data = ((Data) amqp.getBody()).getValue();
+        String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(), StandardCharsets.UTF_8);
+        assertEquals(contentString, contents);
+    }
+
+    @Test
+    public void testConvertTextMessageCreatesAmqpValueStringBody() throws Exception {
+        String contentString = "myTextMessageContent";
+        ActiveMQTextMessage outbound = createTextMessage(contentString);
+        outbound.onSend();
+        outbound.storeContent();
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+
+        Message amqp = transformer.convert(outbound);
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof AmqpValue);
         assertEquals(contentString, ((AmqpValue) amqp.getBody()).getValue());
     }
 
-    // ======= JMSDestination Handling =========
+    @Test
+    public void testConvertCompressedTextMessageCreatesDataSectionBody() throws Exception {
+        String contentString = "myTextMessageContent";
+        ActiveMQTextMessage outbound = createTextMessage(contentString, true);
+        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_DATA);
+        outbound.onSend();
+        outbound.storeContent();
+
+        ActiveMQJMSVendor vendor = createVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+
+        Message amqp = transformer.convert(outbound);
+
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof Data);
+        assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
+
+        Binary data = ((Data) amqp.getBody()).getValue();
+        String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(), StandardCharsets.UTF_8);
+        assertEquals(contentString, contents);
+    }
+
+    //----- Test JMSDestination Handling -------------------------------------//
 
     @Test
     public void testConvertMessageWithJMSDestinationNull() throws Exception {
@@ -98,10 +680,10 @@ public class JMSMappingOutboundTransformerTest {
     }
 
     private void doTestConvertMessageWithJMSDestination(Destination jmsDestination, Object expectedAnnotationValue) throws Exception {
-        TextMessage mockTextMessage = createMockTextMessage();
+        ActiveMQTextMessage mockTextMessage = createMockTextMessage();
         Mockito.when(mockTextMessage.getText()).thenReturn("myTextMessageContent");
         Mockito.when(mockTextMessage.getJMSDestination()).thenReturn(jmsDestination);
-        JMSVendor mockVendor = createMockVendor();
+        ActiveMQJMSVendor mockVendor = createMockVendor();
         String toAddress = "someToAddress";
         if (jmsDestination != null) {
             Mockito.when(mockVendor.toAddress(Mockito.any(Destination.class))).thenReturn(toAddress);
@@ -125,7 +707,7 @@ public class JMSMappingOutboundTransformerTest {
         }
     }
 
-    // ======= JMSReplyTo Handling =========
+    //----- Test JMSReplyTo Handling -----------------------------------------//
 
     @Test
     public void testConvertMessageWithJMSReplyToNull() throws Exception {
@@ -161,10 +743,10 @@ public class JMSMappingOutboundTransformerTest {
     }
 
     private void doTestConvertMessageWithJMSReplyTo(Destination jmsReplyTo, Object expectedAnnotationValue) throws Exception {
-        TextMessage mockTextMessage = createMockTextMessage();
+        ActiveMQTextMessage mockTextMessage = createMockTextMessage();
         Mockito.when(mockTextMessage.getText()).thenReturn("myTextMessageContent");
         Mockito.when(mockTextMessage.getJMSReplyTo()).thenReturn(jmsReplyTo);
-        JMSVendor mockVendor = createMockVendor();
+        ActiveMQJMSVendor mockVendor = createMockVendor();
         String replyToAddress = "someReplyToAddress";
         if (jmsReplyTo != null) {
             Mockito.when(mockVendor.toAddress(Mockito.any(Destination.class))).thenReturn(replyToAddress);
@@ -188,18 +770,139 @@ public class JMSMappingOutboundTransformerTest {
         }
     }
 
-    // ======= Utility Methods =========
+    //----- Utility Methods used for this Test -------------------------------//
 
-    private TextMessage createMockTextMessage() throws Exception {
-        TextMessage mockTextMessage = Mockito.mock(TextMessage.class);
+    private ActiveMQTextMessage createMockTextMessage() throws Exception {
+        ActiveMQTextMessage mockTextMessage = Mockito.mock(ActiveMQTextMessage.class);
         Mockito.when(mockTextMessage.getPropertyNames()).thenReturn(Collections.enumeration(Collections.emptySet()));
 
         return mockTextMessage;
     }
 
-    private JMSVendor createMockVendor() {
-        JMSVendor mockVendor = Mockito.mock(JMSVendor.class);
+    private ActiveMQJMSVendor createVendor() {
+        return ActiveMQJMSVendor.INSTANCE;
+    }
+
+    private ActiveMQJMSVendor createMockVendor() {
+        return Mockito.mock(ActiveMQJMSVendor.class);
+    }
+
+    private ActiveMQMessage createMessage() {
+        return new ActiveMQMessage();
+    }
+
+    private ActiveMQBytesMessage createBytesMessage() {
+        return createBytesMessage(false);
+    }
+
+    private ActiveMQBytesMessage createBytesMessage(boolean compression) {
+        ActiveMQBytesMessage message = new ActiveMQBytesMessage();
+
+        if (compression) {
+            ActiveMQConnection connection = Mockito.mock(ActiveMQConnection.class);
+            Mockito.when(connection.isUseCompression()).thenReturn(true);
+            message.setConnection(connection);
+        }
+
+        return message;
+    }
+
+    private ActiveMQMapMessage createMapMessage() {
+        return createMapMessage(false);
+    }
 
-        return mockVendor;
+    private ActiveMQMapMessage createMapMessage(boolean compression) {
+        ActiveMQMapMessage message = new ActiveMQMapMessage();
+
+        if (compression) {
+            ActiveMQConnection connection = Mockito.mock(ActiveMQConnection.class);
+            Mockito.when(connection.isUseCompression()).thenReturn(true);
+            message.setConnection(connection);
+        }
+
+        return message;
+    }
+
+    private ActiveMQStreamMessage createStreamMessage() {
+        return createStreamMessage(false);
+    }
+
+    private ActiveMQStreamMessage createStreamMessage(boolean compression) {
+        ActiveMQStreamMessage message = new ActiveMQStreamMessage();
+
+        if (compression) {
+            ActiveMQConnection connection = Mockito.mock(ActiveMQConnection.class);
+            Mockito.when(connection.isUseCompression()).thenReturn(true);
+            message.setConnection(connection);
+        }
+
+        return message;
+    }
+
+    private ActiveMQObjectMessage createObjectMessage() {
+        return createObjectMessage(null);
+    }
+
+    private ActiveMQObjectMessage createObjectMessage(Serializable payload) {
+        return createObjectMessage(payload, false);
+    }
+
+    private ActiveMQObjectMessage createObjectMessage(Serializable payload, boolean compression) {
+        ActiveMQObjectMessage result = new ActiveMQObjectMessage();
+
+        if (compression) {
+            ActiveMQConnection connection = Mockito.mock(ActiveMQConnection.class);
+            Mockito.when(connection.isUseCompression()).thenReturn(true);
+            result.setConnection(connection);
+        }
+
+        try {
+            result.setObject(payload);
+        } catch (JMSException ex) {
+            throw new AssertionError("Should not fail to setObject in this test");
+        }
+
+        result = Mockito.spy(result);
+
+        try {
+            Mockito.doThrow(new AssertionError("invalid setObject")).when(result).setObject(Mockito.any(Serializable.class));
+            Mockito.doThrow(new AssertionError("invalid getObject")).when(result).getObject();
+        } catch (JMSException e) {
+        }
+
+        return result;
+    }
+
+    private ActiveMQTextMessage createTextMessage() {
+        return createTextMessage(null);
+    }
+
+    private ActiveMQTextMessage createTextMessage(String text) {
+        return createTextMessage(text, false);
+    }
+
+    private ActiveMQTextMessage createTextMessage(String text, boolean compression) {
+        ActiveMQTextMessage result = new ActiveMQTextMessage();
+
+        if (compression) {
+            ActiveMQConnection connection = Mockito.mock(ActiveMQConnection.class);
+            Mockito.when(connection.isUseCompression()).thenReturn(true);
+            result.setConnection(connection);
+        }
+
+        try {
+            result.setText(text);
+        } catch (JMSException e) {
+        }
+
+        return result;
+    }
+
+    private Object deserialize(byte[] payload) throws Exception {
+        try (ByteArrayInputStream bis = new ByteArrayInputStream(payload);
+             ObjectInputStream ois = new ObjectInputStream(bis);) {
+
+            return ois.readObject();
+        }
     }
 }