You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/09/28 19:26:12 UTC

[1/8] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6438

Repository: activemq
Updated Branches:
  refs/heads/activemq-5.14.x 9ab56d59c -> aa32a0f79


https://issues.apache.org/jira/browse/AMQ-6438

Makes some improvements to the profiling test for the transformers.
(cherry picked from commit d4c7cce7d733df9fcdcd0daec4ce64fb3844ce64)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/14c553a8
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/14c553a8
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/14c553a8

Branch: refs/heads/activemq-5.14.x
Commit: 14c553a8ad10b6212d764a4b9f6d6e6175d638e3
Parents: 9ab56d5
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Sep 21 18:18:41 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Sep 28 15:07:56 2016 -0400

----------------------------------------------------------------------
 .../JMSTransformationSpeedComparisonTest.java   | 158 +++++++++++--------
 1 file changed, 88 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/14c553a8/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSTransformationSpeedComparisonTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSTransformationSpeedComparisonTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSTransformationSpeedComparisonTest.java
index b008f1c..e1b6a46 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSTransformationSpeedComparisonTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSTransformationSpeedComparisonTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.transport.amqp.message;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
@@ -36,7 +39,9 @@ import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.message.Message;
 import org.apache.qpid.proton.message.ProtonJMessage;
 import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
@@ -52,9 +57,12 @@ public class JMSTransformationSpeedComparisonTest {
 
     protected static final Logger LOG = LoggerFactory.getLogger(JMSInteroperabilityTest.class);
 
+    @Rule
+    public TestName test = new TestName();
+
     private final String transformer;
 
-    private final int WARM_CYCLES = 10;
+    private final int WARM_CYCLES = 50;
     private final int PROFILE_CYCLES = 1000000;
 
     public JMSTransformationSpeedComparisonTest(String transformer) {
@@ -119,8 +127,8 @@ public class JMSTransformationSpeedComparisonTest {
         }
         totalDuration += System.nanoTime() - startTime;
 
-        LOG.info("[{}] Total time for {} cycles of transforms = {} ms",
-            transformer, PROFILE_CYCLES, TimeUnit.NANOSECONDS.toMillis(totalDuration));
+        LOG.info("[{}] Total time for {} cycles of transforms = {} ms  -> [{}]",
+            transformer, PROFILE_CYCLES, TimeUnit.NANOSECONDS.toMillis(totalDuration), test.getMethodName());
     }
 
     @Test
@@ -155,33 +163,14 @@ public class JMSTransformationSpeedComparisonTest {
         }
         totalDuration += System.nanoTime() - startTime;
 
-        LOG.info("[{}] Total time for {} cycles of transforms = {} ms",
-            transformer, PROFILE_CYCLES, TimeUnit.NANOSECONDS.toMillis(totalDuration));
+        LOG.info("[{}] Total time for {} cycles of transforms = {} ms  -> [{}]",
+            transformer, PROFILE_CYCLES, TimeUnit.NANOSECONDS.toMillis(totalDuration), test.getMethodName());
     }
 
     @Test
     public void testTypicalQpidJMSMessage() throws Exception {
 
-        Map<String, Object> applicationProperties = new HashMap<String, Object>();
-        Map<Symbol, Object> messageAnnotations = new HashMap<Symbol, Object>();
-
-        applicationProperties.put("property-1", "string");
-        applicationProperties.put("property-2", 512);
-        applicationProperties.put("property-3", true);
-
-        messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0);
-
-        Message message = Proton.message();
-
-        message.setAddress("queue://test-queue");
-        message.setDeliveryCount(1);
-        message.setApplicationProperties(new ApplicationProperties(applicationProperties));
-        message.setMessageAnnotations(new MessageAnnotations(messageAnnotations));
-        message.setCreationTime(System.currentTimeMillis());
-        message.setContentType("text/plain");
-        message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
-
-        EncodedMessage encoded = encode(message);
+        EncodedMessage encoded = encode(createTypicalQpidJMSMessage());
         InboundTransformer inboundTransformer = getInboundTransformer();
         OutboundTransformer outboundTransformer = getOutboundTransformer();
 
@@ -202,56 +191,106 @@ public class JMSTransformationSpeedComparisonTest {
         }
         totalDuration += System.nanoTime() - startTime;
 
-        LOG.info("[{}] Total time for {} cycles of transforms = {} ms",
-            transformer, PROFILE_CYCLES, TimeUnit.NANOSECONDS.toMillis(totalDuration));
+        LOG.info("[{}] Total time for {} cycles of transforms = {} ms  -> [{}]",
+            transformer, PROFILE_CYCLES, TimeUnit.NANOSECONDS.toMillis(totalDuration), test.getMethodName());
     }
 
     @Test
     public void testTypicalQpidJMSMessageInBoundOnly() throws Exception {
 
-        Map<String, Object> applicationProperties = new HashMap<String, Object>();
-        Map<Symbol, Object> messageAnnotations = new HashMap<Symbol, Object>();
+        EncodedMessage encoded = encode(createTypicalQpidJMSMessage());
+        InboundTransformer inboundTransformer = getInboundTransformer();
 
-        applicationProperties.put("property-1", "string");
-        applicationProperties.put("property-2", 512);
-        applicationProperties.put("property-3", true);
+        // Warm up
+        for (int i = 0; i < WARM_CYCLES; ++i) {
+            inboundTransformer.transform(encoded);
+        }
 
-        messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0);
+        long totalDuration = 0;
 
-        Message message = Proton.message();
+        long startTime = System.nanoTime();
+        for (int i = 0; i < PROFILE_CYCLES; ++i) {
+            inboundTransformer.transform(encoded);
+        }
 
-        message.setAddress("queue://test-queue");
-        message.setDeliveryCount(1);
-        message.setApplicationProperties(new ApplicationProperties(applicationProperties));
-        message.setMessageAnnotations(new MessageAnnotations(messageAnnotations));
-        message.setCreationTime(System.currentTimeMillis());
-        message.setContentType("text/plain");
-        message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
+        totalDuration += System.nanoTime() - startTime;
 
-        EncodedMessage encoded = encode(message);
+        LOG.info("[{}] Total time for {} cycles of transforms = {} ms  -> [{}]",
+            transformer, PROFILE_CYCLES, TimeUnit.NANOSECONDS.toMillis(totalDuration), test.getMethodName());
+    }
+
+    @Test
+    public void testTypicalQpidJMSMessageOutBoundOnly() throws Exception {
+
+        EncodedMessage encoded = encode(createTypicalQpidJMSMessage());
         InboundTransformer inboundTransformer = getInboundTransformer();
+        OutboundTransformer outboundTransformer = getOutboundTransformer();
+
+        ActiveMQMessage outbound = (ActiveMQMessage) inboundTransformer.transform(encoded);
+        outbound.onSend();
 
         // Warm up
         for (int i = 0; i < WARM_CYCLES; ++i) {
-            inboundTransformer.transform(encoded);
+            outboundTransformer.transform(outbound);
         }
 
         long totalDuration = 0;
 
         long startTime = System.nanoTime();
         for (int i = 0; i < PROFILE_CYCLES; ++i) {
-            inboundTransformer.transform(encoded);
+            outboundTransformer.transform(outbound);
         }
 
         totalDuration += System.nanoTime() - startTime;
 
-        LOG.info("[{}] Total time for {} cycles of transforms = {} ms",
-            transformer, PROFILE_CYCLES, TimeUnit.NANOSECONDS.toMillis(totalDuration));
+        LOG.info("[{}] Total time for {} cycles of transforms = {} ms  -> [{}]",
+            transformer, PROFILE_CYCLES, TimeUnit.NANOSECONDS.toMillis(totalDuration), test.getMethodName());
     }
 
+    @Ignore
     @Test
-    public void testTypicalQpidJMSMessageOutBoundOnly() throws Exception {
+    public void testEncodeDecodeIsWorking() throws Exception {
+        Message incomingMessage = createTypicalQpidJMSMessage();
+        EncodedMessage encoded = encode(incomingMessage);
+        InboundTransformer inboundTransformer = getInboundTransformer();
+        OutboundTransformer outboundTransformer = getOutboundTransformer();
+
+        ActiveMQMessage outbound = (ActiveMQMessage) inboundTransformer.transform(encoded);
+        outbound.onSend();
+        Message outboudMessage = outboundTransformer.transform(outbound).decode();
+
+        // Test that message details are equal
+        assertEquals(incomingMessage.getAddress(), outboudMessage.getAddress());
+        assertEquals(incomingMessage.getDeliveryCount(), outboudMessage.getDeliveryCount());
+        assertEquals(incomingMessage.getCreationTime(), outboudMessage.getCreationTime());
+        assertEquals(incomingMessage.getContentType(), outboudMessage.getContentType());
+
+        // Test Message annotations
+        ApplicationProperties incomingApplicationProperties = incomingMessage.getApplicationProperties();
+        ApplicationProperties outgoingApplicationProperties = outboudMessage.getApplicationProperties();
+
+        assertEquals(incomingApplicationProperties.getValue(), outgoingApplicationProperties.getValue());
+
+        // Test Message properties
+        MessageAnnotations incomingMessageAnnotations = incomingMessage.getMessageAnnotations();
+        MessageAnnotations outgoingMessageAnnotations = outboudMessage.getMessageAnnotations();
+
+        assertEquals(incomingMessageAnnotations.getValue(), outgoingMessageAnnotations.getValue());
+
+        // Test that bodies are equal
+        assertTrue(incomingMessage.getBody() instanceof AmqpValue);
+        assertTrue(outboudMessage.getBody() instanceof AmqpValue);
+
+        AmqpValue incomingBody = (AmqpValue) incomingMessage.getBody();
+        AmqpValue outgoingBody = (AmqpValue) outboudMessage.getBody();
 
+        assertTrue(incomingBody.getValue() instanceof String);
+        assertTrue(outgoingBody.getValue() instanceof String);
+
+        assertEquals(incomingBody.getValue(), outgoingBody.getValue());
+    }
+
+    private Message createTypicalQpidJMSMessage() {
         Map<String, Object> applicationProperties = new HashMap<String, Object>();
         Map<Symbol, Object> messageAnnotations = new HashMap<Symbol, Object>();
 
@@ -260,6 +299,7 @@ public class JMSTransformationSpeedComparisonTest {
         applicationProperties.put("property-3", true);
 
         messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0);
+        messageAnnotations.put(Symbol.valueOf("x-opt-jms-dest"), 0);
 
         Message message = Proton.message();
 
@@ -271,29 +311,7 @@ public class JMSTransformationSpeedComparisonTest {
         message.setContentType("text/plain");
         message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
 
-        EncodedMessage encoded = encode(message);
-        InboundTransformer inboundTransformer = getInboundTransformer();
-        OutboundTransformer outboundTransformer = getOutboundTransformer();
-
-        ActiveMQMessage outbound = (ActiveMQMessage) inboundTransformer.transform(encoded);
-        outbound.onSend();
-
-        // Warm up
-        for (int i = 0; i < WARM_CYCLES; ++i) {
-            outboundTransformer.transform(outbound);
-        }
-
-        long totalDuration = 0;
-
-        long startTime = System.nanoTime();
-        for (int i = 0; i < PROFILE_CYCLES; ++i) {
-            outboundTransformer.transform(outbound);
-        }
-
-        totalDuration += System.nanoTime() - startTime;
-
-        LOG.info("[{}] Total time for {} cycles of transforms = {} ms",
-            transformer, PROFILE_CYCLES, TimeUnit.NANOSECONDS.toMillis(totalDuration));
+        return message;
     }
 
     private EncodedMessage encode(Message message) {


[3/8] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6438

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/activemq/blob/9cb92a22/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
index 021ab85..d0d31cc 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
@@ -18,10 +18,10 @@ package org.apache.activemq.transport.amqp.message;
 
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_DATA;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_NULL;
-import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_ORIGINAL_ENCODING_KEY;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_SEQUENCE;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_UNKNOWN;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_BINARY;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
 import static org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer.JMS_DEST_TYPE_MSG_ANNOTATION;
 import static org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer.JMS_REPLY_TO_TYPE_MSG_ANNOTATION;
 import static org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer.QUEUE_TYPE;
@@ -43,20 +43,20 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import javax.jms.Destination;
 import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.Topic;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMapMessage;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
 import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
@@ -70,6 +70,9 @@ import org.mockito.Mockito;
 
 public class JMSMappingOutboundTransformerTest {
 
+    private final UUID TEST_OBJECT_VALUE = UUID.fromString("fee14b62-09e0-4ac6-a4c3-4206c630d844");
+    private final String TEST_ADDRESS = "queue://testAddress";
+
     //----- no-body Message type tests ---------------------------------------//
 
     @Test
@@ -78,10 +81,12 @@ public class JMSMappingOutboundTransformerTest {
         outbound.onSend();
         outbound.storeContent();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
+
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
 
-        Message amqp = transformer.convert(outbound);
+        Message amqp = encoded.decode();
 
         assertNull(amqp.getBody());
     }
@@ -89,14 +94,16 @@ public class JMSMappingOutboundTransformerTest {
     @Test
     public void testConvertTextMessageToAmqpMessageWithNoBodyOriginalEncodingWasNull() throws Exception {
         ActiveMQTextMessage outbound = createTextMessage();
-        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_NULL);
+        outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL);
         outbound.onSend();
         outbound.storeContent();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
+
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
 
-        Message amqp = transformer.convert(outbound);
+        Message amqp = encoded.decode();
 
         assertNull(amqp.getBody());
     }
@@ -109,10 +116,12 @@ public class JMSMappingOutboundTransformerTest {
         outbound.onSend();
         outbound.storeContent();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
 
-        Message amqp = transformer.convert(outbound);
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
+
+        Message amqp = encoded.decode();
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof Data);
@@ -128,10 +137,12 @@ public class JMSMappingOutboundTransformerTest {
         outbound.storeContent();
         outbound.onSend();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
+
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
 
-        Message amqp = transformer.convert(outbound);
+        Message amqp = encoded.decode();
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof Data);
@@ -152,10 +163,12 @@ public class JMSMappingOutboundTransformerTest {
         outbound.storeContent();
         outbound.onSend();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
+
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
 
-        Message amqp = transformer.convert(outbound);
+        Message amqp = encoded.decode();
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof Data);
@@ -171,14 +184,16 @@ public class JMSMappingOutboundTransformerTest {
     @Test
     public void testConvertEmptyBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception {
         ActiveMQBytesMessage outbound = createBytesMessage();
-        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY);
+        outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
         outbound.onSend();
         outbound.storeContent();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
 
-        Message amqp = transformer.convert(outbound);
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
+
+        Message amqp = encoded.decode();
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -190,15 +205,17 @@ public class JMSMappingOutboundTransformerTest {
     public void testConvertUncompressedBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception {
         byte[] expectedPayload = new byte[] { 8, 16, 24, 32 };
         ActiveMQBytesMessage outbound = createBytesMessage();
-        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY);
+        outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
         outbound.writeBytes(expectedPayload);
         outbound.storeContent();
         outbound.onSend();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
+
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
 
-        Message amqp = transformer.convert(outbound);
+        Message amqp = encoded.decode();
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -215,15 +232,17 @@ public class JMSMappingOutboundTransformerTest {
     public void testConvertCompressedBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception {
         byte[] expectedPayload = new byte[] { 8, 16, 24, 32 };
         ActiveMQBytesMessage outbound = createBytesMessage(true);
-        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY);
+        outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
         outbound.writeBytes(expectedPayload);
         outbound.storeContent();
         outbound.onSend();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
+
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
 
-        Message amqp = transformer.convert(outbound);
+        Message amqp = encoded.decode();
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -244,10 +263,12 @@ public class JMSMappingOutboundTransformerTest {
         outbound.onSend();
         outbound.storeContent();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
 
-        Message amqp = transformer.convert(outbound);
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
+
+        Message amqp = encoded.decode();
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -263,10 +284,12 @@ public class JMSMappingOutboundTransformerTest {
         outbound.onSend();
         outbound.storeContent();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
+
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
 
-        Message amqp = transformer.convert(outbound);
+        Message amqp = encoded.decode();
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -288,10 +311,12 @@ public class JMSMappingOutboundTransformerTest {
         outbound.onSend();
         outbound.storeContent();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
+
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
 
-        Message amqp = transformer.convert(outbound);
+        Message amqp = encoded.decode();
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -312,10 +337,12 @@ public class JMSMappingOutboundTransformerTest {
         outbound.onSend();
         outbound.storeContent();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
 
-        Message amqp = transformer.convert(outbound);
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
+
+        Message amqp = encoded.decode();
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -325,14 +352,16 @@ public class JMSMappingOutboundTransformerTest {
     @Test
     public void testConvertStreamMessageToAmqpMessageWithAmqpSequencey() throws Exception {
         ActiveMQStreamMessage outbound = createStreamMessage();
-        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_SEQUENCE);
+        outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE);
         outbound.onSend();
         outbound.storeContent();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
+
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
 
-        Message amqp = transformer.convert(outbound);
+        Message amqp = encoded.decode();
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof AmqpSequence);
@@ -347,10 +376,12 @@ public class JMSMappingOutboundTransformerTest {
         outbound.onSend();
         outbound.storeContent();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
+
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
 
-        Message amqp = transformer.convert(outbound);
+        Message amqp = encoded.decode();
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -365,16 +396,18 @@ public class JMSMappingOutboundTransformerTest {
     @Test
     public void testConvertCompressedStreamMessageToAmqpMessageWithAmqpSequencey() throws Exception {
         ActiveMQStreamMessage outbound = createStreamMessage(true);
-        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_SEQUENCE);
+        outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE);
         outbound.writeBoolean(false);
         outbound.writeString("test");
         outbound.onSend();
         outbound.storeContent();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
 
-        Message amqp = transformer.convert(outbound);
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
+
+        Message amqp = encoded.decode();
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof AmqpSequence);
@@ -394,10 +427,12 @@ public class JMSMappingOutboundTransformerTest {
         outbound.onSend();
         outbound.storeContent();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
+
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
 
-        Message amqp = transformer.convert(outbound);
+        Message amqp = encoded.decode();
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof Data);
@@ -407,14 +442,16 @@ public class JMSMappingOutboundTransformerTest {
     @Test
     public void testConvertEmptyObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception {
         ActiveMQObjectMessage outbound = createObjectMessage();
-        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_UNKNOWN);
+        outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
         outbound.onSend();
         outbound.storeContent();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
+
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
 
-        Message amqp = transformer.convert(outbound);
+        Message amqp = encoded.decode();
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof Data);
@@ -424,14 +461,16 @@ public class JMSMappingOutboundTransformerTest {
     @Test
     public void testConvertEmptyObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception {
         ActiveMQObjectMessage outbound = createObjectMessage();
-        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY);
+        outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
         outbound.onSend();
         outbound.storeContent();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
 
-        Message amqp = transformer.convert(outbound);
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
+
+        Message amqp = encoded.decode();
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -445,10 +484,12 @@ public class JMSMappingOutboundTransformerTest {
         outbound.onSend();
         outbound.storeContent();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
+
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
 
-        Message amqp = transformer.convert(outbound);
+        Message amqp = encoded.decode();
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof Data);
@@ -462,14 +503,16 @@ public class JMSMappingOutboundTransformerTest {
     @Test
     public void testConvertObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception {
         ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID());
-        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_UNKNOWN);
+        outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
         outbound.onSend();
         outbound.storeContent();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
+
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
 
-        Message amqp = transformer.convert(outbound);
+        Message amqp = encoded.decode();
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof Data);
@@ -483,14 +526,16 @@ public class JMSMappingOutboundTransformerTest {
     @Test
     public void testConvertObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception {
         ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID());
-        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY);
+        outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
         outbound.onSend();
         outbound.storeContent();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
 
-        Message amqp = transformer.convert(outbound);
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
+
+        Message amqp = encoded.decode();
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -504,14 +549,16 @@ public class JMSMappingOutboundTransformerTest {
 
     @Test
     public void testConvertCompressedObjectMessageToAmqpMessageWithDataBody() throws Exception {
-        ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID(), true);
+        ActiveMQObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true);
         outbound.onSend();
         outbound.storeContent();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
+
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
 
-        Message amqp = transformer.convert(outbound);
+        Message amqp = encoded.decode();
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof Data);
@@ -525,14 +572,16 @@ public class JMSMappingOutboundTransformerTest {
     @Test
     public void testConvertCompressedObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception {
         ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID(), true);
-        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_UNKNOWN);
+        outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
         outbound.onSend();
         outbound.storeContent();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
 
-        Message amqp = transformer.convert(outbound);
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
+
+        Message amqp = encoded.decode();
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof Data);
@@ -546,14 +595,16 @@ public class JMSMappingOutboundTransformerTest {
     @Test
     public void testConvertCompressedObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception {
         ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID(), true);
-        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY);
+        outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
         outbound.onSend();
         outbound.storeContent();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
+
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
 
-        Message amqp = transformer.convert(outbound);
+        Message amqp = encoded.decode();
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -573,10 +624,12 @@ public class JMSMappingOutboundTransformerTest {
         outbound.onSend();
         outbound.storeContent();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
+
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
 
-        Message amqp = transformer.convert(outbound);
+        Message amqp = encoded.decode();
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -587,14 +640,16 @@ public class JMSMappingOutboundTransformerTest {
     public void testConvertTextMessageCreatesBodyUsingOriginalEncodingWithDataSection() throws Exception {
         String contentString = "myTextMessageContent";
         ActiveMQTextMessage outbound = createTextMessage(contentString);
-        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_DATA);
+        outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
         outbound.onSend();
         outbound.storeContent();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
 
-        Message amqp = transformer.convert(outbound);
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
+
+        Message amqp = encoded.decode();
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof Data);
@@ -609,13 +664,15 @@ public class JMSMappingOutboundTransformerTest {
     public void testConvertTextMessageContentNotStoredCreatesBodyUsingOriginalEncodingWithDataSection() throws Exception {
         String contentString = "myTextMessageContent";
         ActiveMQTextMessage outbound = createTextMessage(contentString);
-        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_DATA);
+        outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
         outbound.onSend();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
+
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
 
-        Message amqp = transformer.convert(outbound);
+        Message amqp = encoded.decode();
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof Data);
@@ -633,10 +690,12 @@ public class JMSMappingOutboundTransformerTest {
         outbound.onSend();
         outbound.storeContent();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
+
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
 
-        Message amqp = transformer.convert(outbound);
+        Message amqp = encoded.decode();
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -649,10 +708,12 @@ public class JMSMappingOutboundTransformerTest {
         ActiveMQTextMessage outbound = createTextMessage(contentString);
         outbound.onSend();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
 
-        Message amqp = transformer.convert(outbound);
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
+
+        Message amqp = encoded.decode();
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -663,14 +724,16 @@ public class JMSMappingOutboundTransformerTest {
     public void testConvertCompressedTextMessageCreatesDataSectionBody() throws Exception {
         String contentString = "myTextMessageContent";
         ActiveMQTextMessage outbound = createTextMessage(contentString, true);
-        outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_DATA);
+        outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
         outbound.onSend();
         outbound.storeContent();
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
+
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
 
-        Message amqp = transformer.convert(outbound);
+        Message amqp = encoded.decode();
 
         assertNotNull(amqp.getBody());
         assertTrue(amqp.getBody() instanceof Data);
@@ -690,45 +753,35 @@ public class JMSMappingOutboundTransformerTest {
 
     @Test
     public void testConvertMessageWithJMSDestinationQueue() throws Exception {
-        Queue mockDest = Mockito.mock(Queue.class);
-
-        doTestConvertMessageWithJMSDestination(mockDest, QUEUE_TYPE);
+        doTestConvertMessageWithJMSDestination(createMockDestination(QUEUE_TYPE), QUEUE_TYPE);
     }
 
     @Test
     public void testConvertMessageWithJMSDestinationTemporaryQueue() throws Exception {
-        TemporaryQueue mockDest = Mockito.mock(TemporaryQueue.class);
-
-        doTestConvertMessageWithJMSDestination(mockDest, TEMP_QUEUE_TYPE);
+        doTestConvertMessageWithJMSDestination(createMockDestination(TEMP_QUEUE_TYPE), TEMP_QUEUE_TYPE);
     }
 
     @Test
     public void testConvertMessageWithJMSDestinationTopic() throws Exception {
-        Topic mockDest = Mockito.mock(Topic.class);
-
-        doTestConvertMessageWithJMSDestination(mockDest, TOPIC_TYPE);
+        doTestConvertMessageWithJMSDestination(createMockDestination(TOPIC_TYPE), TOPIC_TYPE);
     }
 
     @Test
     public void testConvertMessageWithJMSDestinationTemporaryTopic() throws Exception {
-        TemporaryTopic mockDest = Mockito.mock(TemporaryTopic.class);
-
-        doTestConvertMessageWithJMSDestination(mockDest, TEMP_TOPIC_TYPE);
+        doTestConvertMessageWithJMSDestination(createMockDestination(TEMP_TOPIC_TYPE), TEMP_TOPIC_TYPE);
     }
 
-    private void doTestConvertMessageWithJMSDestination(Destination jmsDestination, Object expectedAnnotationValue) throws Exception {
+    private void doTestConvertMessageWithJMSDestination(ActiveMQDestination jmsDestination, Object expectedAnnotationValue) throws Exception {
         ActiveMQTextMessage mockTextMessage = createMockTextMessage();
         Mockito.when(mockTextMessage.getText()).thenReturn("myTextMessageContent");
-        Mockito.when(mockTextMessage.getJMSDestination()).thenReturn(jmsDestination);
-        ActiveMQJMSVendor mockVendor = createMockVendor();
-        String toAddress = "someToAddress";
-        if (jmsDestination != null) {
-            Mockito.when(mockVendor.toAddress(Mockito.any(Destination.class))).thenReturn(toAddress);
-        }
+        Mockito.when(mockTextMessage.getDestination()).thenReturn(jmsDestination);
+
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
 
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(mockVendor);
+        EncodedMessage encoded = transformer.transform(mockTextMessage);
+        assertNotNull(encoded);
 
-        Message amqp = transformer.convert(mockTextMessage);
+        Message amqp = encoded.decode();
 
         MessageAnnotations ma = amqp.getMessageAnnotations();
         Map<Symbol, Object> maMap = ma == null ? null : ma.getValue();
@@ -740,7 +793,7 @@ public class JMSMappingOutboundTransformerTest {
         }
 
         if (jmsDestination != null) {
-            assertEquals("Unexpected 'to' address", toAddress, amqp.getAddress());
+            assertEquals("Unexpected 'to' address", jmsDestination.getQualifiedName(), amqp.getAddress());
         }
     }
 
@@ -753,45 +806,35 @@ public class JMSMappingOutboundTransformerTest {
 
     @Test
     public void testConvertMessageWithJMSReplyToQueue() throws Exception {
-        Queue mockDest = Mockito.mock(Queue.class);
-
-        doTestConvertMessageWithJMSReplyTo(mockDest, QUEUE_TYPE);
+        doTestConvertMessageWithJMSReplyTo(createMockDestination(QUEUE_TYPE), QUEUE_TYPE);
     }
 
     @Test
     public void testConvertMessageWithJMSReplyToTemporaryQueue() throws Exception {
-        TemporaryQueue mockDest = Mockito.mock(TemporaryQueue.class);
-
-        doTestConvertMessageWithJMSReplyTo(mockDest, TEMP_QUEUE_TYPE);
+        doTestConvertMessageWithJMSReplyTo(createMockDestination(TEMP_QUEUE_TYPE), TEMP_QUEUE_TYPE);
     }
 
     @Test
     public void testConvertMessageWithJMSReplyToTopic() throws Exception {
-        Topic mockDest = Mockito.mock(Topic.class);
-
-        doTestConvertMessageWithJMSReplyTo(mockDest, TOPIC_TYPE);
+        doTestConvertMessageWithJMSReplyTo(createMockDestination(TOPIC_TYPE), TOPIC_TYPE);
     }
 
     @Test
     public void testConvertMessageWithJMSReplyToTemporaryTopic() throws Exception {
-        TemporaryTopic mockDest = Mockito.mock(TemporaryTopic.class);
-
-        doTestConvertMessageWithJMSReplyTo(mockDest, TEMP_TOPIC_TYPE);
+        doTestConvertMessageWithJMSReplyTo(createMockDestination(TEMP_TOPIC_TYPE), TEMP_TOPIC_TYPE);
     }
 
-    private void doTestConvertMessageWithJMSReplyTo(Destination jmsReplyTo, Object expectedAnnotationValue) throws Exception {
+    private void doTestConvertMessageWithJMSReplyTo(ActiveMQDestination jmsReplyTo, Object expectedAnnotationValue) throws Exception {
         ActiveMQTextMessage mockTextMessage = createMockTextMessage();
         Mockito.when(mockTextMessage.getText()).thenReturn("myTextMessageContent");
-        Mockito.when(mockTextMessage.getJMSReplyTo()).thenReturn(jmsReplyTo);
-        ActiveMQJMSVendor mockVendor = createMockVendor();
-        String replyToAddress = "someReplyToAddress";
-        if (jmsReplyTo != null) {
-            Mockito.when(mockVendor.toAddress(Mockito.any(Destination.class))).thenReturn(replyToAddress);
-        }
+        Mockito.when(mockTextMessage.getReplyTo()).thenReturn(jmsReplyTo);
 
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(mockVendor);
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
 
-        Message amqp = transformer.convert(mockTextMessage);
+        EncodedMessage encoded = transformer.transform(mockTextMessage);
+        assertNotNull(encoded);
+
+        Message amqp = encoded.decode();
 
         MessageAnnotations ma = amqp.getMessageAnnotations();
         Map<Symbol, Object> maMap = ma == null ? null : ma.getValue();
@@ -803,7 +846,7 @@ public class JMSMappingOutboundTransformerTest {
         }
 
         if (jmsReplyTo != null) {
-            assertEquals("Unexpected 'reply-to' address", replyToAddress, amqp.getReplyTo());
+            assertEquals("Unexpected 'reply-to' address", jmsReplyTo.getQualifiedName(), amqp.getReplyTo());
         }
     }
 
@@ -812,16 +855,35 @@ public class JMSMappingOutboundTransformerTest {
     private ActiveMQTextMessage createMockTextMessage() throws Exception {
         ActiveMQTextMessage mockTextMessage = Mockito.mock(ActiveMQTextMessage.class);
         Mockito.when(mockTextMessage.getPropertyNames()).thenReturn(Collections.enumeration(Collections.emptySet()));
+        Mockito.when(mockTextMessage.getPriority()).thenReturn((byte) Message.DEFAULT_PRIORITY);
 
         return mockTextMessage;
     }
 
-    private ActiveMQJMSVendor createVendor() {
-        return ActiveMQJMSVendor.INSTANCE;
-    }
+    private ActiveMQDestination createMockDestination(byte destType) {
+        ActiveMQDestination mockDestination = null;
+        switch (destType) {
+            case QUEUE_TYPE:
+                mockDestination = Mockito.mock(ActiveMQQueue.class);
+                Mockito.when(mockDestination.getQualifiedName()).thenReturn("queue://" + TEST_ADDRESS);
+                break;
+            case TOPIC_TYPE:
+                mockDestination = Mockito.mock(ActiveMQTopic.class);
+                Mockito.when(mockDestination.getQualifiedName()).thenReturn("topic://" + TEST_ADDRESS);
+                break;
+            case TEMP_QUEUE_TYPE:
+                mockDestination = Mockito.mock(ActiveMQTempQueue.class);
+                Mockito.when(mockDestination.getQualifiedName()).thenReturn("tempQueue://" + TEST_ADDRESS);
+                break;
+            case TEMP_TOPIC_TYPE:
+                mockDestination = Mockito.mock(ActiveMQTempTopic.class);
+                Mockito.when(mockDestination.getQualifiedName()).thenReturn("tempTopic://" + TEST_ADDRESS);
+                break;
+            default:
+                throw new IllegalArgumentException("Invliad Destination Type given/");
+        }
 
-    private ActiveMQJMSVendor createMockVendor() {
-        return Mockito.mock(ActiveMQJMSVendor.class);
+        return mockDestination;
     }
 
     private ActiveMQMessage createMessage() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/9cb92a22/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSTransformationSpeedComparisonTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSTransformationSpeedComparisonTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSTransformationSpeedComparisonTest.java
index 98d6722..204c652 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSTransformationSpeedComparisonTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSTransformationSpeedComparisonTest.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport.amqp.message;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.nio.ByteBuffer;
@@ -52,7 +53,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Some simple performance tests for the Message Transformers.
  */
-@Ignore("Turn on to profile.")
+@Ignore("Enable for profiling")
 @RunWith(Parameterized.class)
 public class JMSTransformationSpeedComparisonTest {
 
@@ -63,7 +64,7 @@ public class JMSTransformationSpeedComparisonTest {
 
     private final String transformer;
 
-    private final int WARM_CYCLES = 50;
+    private final int WARM_CYCLES = 1000;
     private final int PROFILE_CYCLES = 1000000;
 
     public JMSTransformationSpeedComparisonTest(String transformer) {
@@ -82,11 +83,11 @@ public class JMSTransformationSpeedComparisonTest {
     private InboundTransformer getInboundTransformer() {
         switch (transformer) {
             case "raw":
-                return new AMQPRawInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+                return new AMQPRawInboundTransformer();
             case "native":
-                return new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+                return new AMQPNativeInboundTransformer();
             default:
-                return new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+                return new JMSMappingInboundTransformer();
         }
     }
 
@@ -94,9 +95,9 @@ public class JMSTransformationSpeedComparisonTest {
         switch (transformer) {
             case "raw":
             case "native":
-                return new AMQPNativeOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
+                return new AMQPNativeOutboundTransformer();
             default:
-                return new JMSMappingOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
+                return new JMSMappingOutboundTransformer();
         }
     }
 
@@ -113,7 +114,7 @@ public class JMSTransformationSpeedComparisonTest {
 
         // Warm up
         for (int i = 0; i < WARM_CYCLES; ++i) {
-            ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded);
+            ActiveMQMessage intermediate = inboundTransformer.transform(encoded);
             intermediate.onSend();
             outboundTransformer.transform(intermediate);
         }
@@ -122,7 +123,7 @@ public class JMSTransformationSpeedComparisonTest {
 
         long startTime = System.nanoTime();
         for (int i = 0; i < PROFILE_CYCLES; ++i) {
-            ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded);
+            ActiveMQMessage intermediate = inboundTransformer.transform(encoded);
             intermediate.onSend();
             outboundTransformer.transform(intermediate);
         }
@@ -149,7 +150,7 @@ public class JMSTransformationSpeedComparisonTest {
 
         // Warm up
         for (int i = 0; i < WARM_CYCLES; ++i) {
-            ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded);
+            ActiveMQMessage intermediate = inboundTransformer.transform(encoded);
             intermediate.onSend();
             outboundTransformer.transform(intermediate);
         }
@@ -158,7 +159,7 @@ public class JMSTransformationSpeedComparisonTest {
 
         long startTime = System.nanoTime();
         for (int i = 0; i < PROFILE_CYCLES; ++i) {
-            ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded);
+            ActiveMQMessage intermediate = inboundTransformer.transform(encoded);
             intermediate.onSend();
             outboundTransformer.transform(intermediate);
         }
@@ -177,7 +178,7 @@ public class JMSTransformationSpeedComparisonTest {
 
         // Warm up
         for (int i = 0; i < WARM_CYCLES; ++i) {
-            ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded);
+            ActiveMQMessage intermediate = inboundTransformer.transform(encoded);
             intermediate.onSend();
             outboundTransformer.transform(intermediate);
         }
@@ -186,7 +187,7 @@ public class JMSTransformationSpeedComparisonTest {
 
         long startTime = System.nanoTime();
         for (int i = 0; i < PROFILE_CYCLES; ++i) {
-            ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded);
+            ActiveMQMessage intermediate = inboundTransformer.transform(encoded);
             intermediate.onSend();
             outboundTransformer.transform(intermediate);
         }
@@ -205,7 +206,7 @@ public class JMSTransformationSpeedComparisonTest {
 
         // Warm up
         for (int i = 0; i < WARM_CYCLES; ++i) {
-            ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded);
+            ActiveMQMessage intermediate = inboundTransformer.transform(encoded);
             intermediate.onSend();
             outboundTransformer.transform(intermediate);
         }
@@ -214,7 +215,7 @@ public class JMSTransformationSpeedComparisonTest {
 
         long startTime = System.nanoTime();
         for (int i = 0; i < PROFILE_CYCLES; ++i) {
-            ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded);
+            ActiveMQMessage intermediate = inboundTransformer.transform(encoded);
             intermediate.onSend();
             outboundTransformer.transform(intermediate);
         }
@@ -255,7 +256,7 @@ public class JMSTransformationSpeedComparisonTest {
         InboundTransformer inboundTransformer = getInboundTransformer();
         OutboundTransformer outboundTransformer = getOutboundTransformer();
 
-        ActiveMQMessage outbound = (ActiveMQMessage) inboundTransformer.transform(encoded);
+        ActiveMQMessage outbound = inboundTransformer.transform(encoded);
         outbound.onSend();
 
         // Warm up
@@ -276,7 +277,6 @@ public class JMSTransformationSpeedComparisonTest {
             transformer, PROFILE_CYCLES, TimeUnit.NANOSECONDS.toMillis(totalDuration), test.getMethodName());
     }
 
-    @Ignore
     @Test
     public void testEncodeDecodeIsWorking() throws Exception {
         Message incomingMessage = createTypicalQpidJMSMessage();
@@ -284,7 +284,7 @@ public class JMSTransformationSpeedComparisonTest {
         InboundTransformer inboundTransformer = getInboundTransformer();
         OutboundTransformer outboundTransformer = getOutboundTransformer();
 
-        ActiveMQMessage outbound = (ActiveMQMessage) inboundTransformer.transform(encoded);
+        ActiveMQMessage outbound = inboundTransformer.transform(encoded);
         outbound.onSend();
         Message outboudMessage = outboundTransformer.transform(outbound).decode();
 
@@ -319,6 +319,25 @@ public class JMSTransformationSpeedComparisonTest {
         assertEquals(incomingBody.getValue(), outgoingBody.getValue());
     }
 
+    @Test
+    public void testBodyOnlyEncodeDecode() throws Exception {
+
+        Message incomingMessage = Proton.message();
+
+        incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
+
+        EncodedMessage encoded = encode(incomingMessage);
+        InboundTransformer inboundTransformer = getInboundTransformer();
+        OutboundTransformer outboundTransformer = getOutboundTransformer();
+
+        ActiveMQMessage intermediate = inboundTransformer.transform(encoded);
+        intermediate.onSend();
+        Message outboudMessage = outboundTransformer.transform(intermediate).decode();
+
+        assertNull(outboudMessage.getHeader());
+        assertNull(outboudMessage.getProperties());
+    }
+
     private Message createTypicalQpidJMSMessage() {
         Map<String, Object> applicationProperties = new HashMap<String, Object>();
         Map<Symbol, Object> messageAnnotations = new HashMap<Symbol, Object>();


[7/8] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6438

Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-6438

Remove redundant tests and clean up a few small nits.
(cherry picked from commit 45f60e4133f64603d5f1a5161c363eb185d2804d)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/aebb365a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/aebb365a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/aebb365a

Branch: refs/heads/activemq-5.14.x
Commit: aebb365ad438933dc72208ba19f6dc434ce0c3ef
Parents: 4575bee
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Sep 27 17:08:28 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Sep 28 15:08:39 2016 -0400

----------------------------------------------------------------------
 .../message/JMSMappingOutboundTransformer.java  |  5 +-
 .../JMSTransformationSpeedComparisonTest.java   | 65 --------------------
 2 files changed, 2 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/aebb365a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
index fa915e8..69039ea 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
@@ -220,7 +220,7 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer {
             if (header == null) {
                 header = new Header();
             }
-            header.setDeliveryCount(new UnsignedInteger(deliveryCount));
+            header.setDeliveryCount(UnsignedInteger.valueOf(deliveryCount));
         }
         String userId = message.getUserID();
         if (userId != null) {
@@ -238,11 +238,10 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer {
         }
         int groupSequence = message.getGroupSequence();
         if (groupSequence > 0) {
-            UnsignedInteger value = new UnsignedInteger(groupSequence);
             if (properties == null) {
                 properties = new Properties();
             }
-            properties.setGroupSequence(value);
+            properties.setGroupSequence(UnsignedInteger.valueOf(groupSequence));
         }
 
         final Map<String, Object> entries;

http://git-wip-us.apache.org/repos/asf/activemq/blob/aebb365a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSTransformationSpeedComparisonTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSTransformationSpeedComparisonTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSTransformationSpeedComparisonTest.java
index 204c652..cd93af7 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSTransformationSpeedComparisonTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSTransformationSpeedComparisonTest.java
@@ -16,10 +16,6 @@
  */
 package org.apache.activemq.transport.amqp.message;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
@@ -277,67 +273,6 @@ public class JMSTransformationSpeedComparisonTest {
             transformer, PROFILE_CYCLES, TimeUnit.NANOSECONDS.toMillis(totalDuration), test.getMethodName());
     }
 
-    @Test
-    public void testEncodeDecodeIsWorking() throws Exception {
-        Message incomingMessage = createTypicalQpidJMSMessage();
-        EncodedMessage encoded = encode(incomingMessage);
-        InboundTransformer inboundTransformer = getInboundTransformer();
-        OutboundTransformer outboundTransformer = getOutboundTransformer();
-
-        ActiveMQMessage outbound = inboundTransformer.transform(encoded);
-        outbound.onSend();
-        Message outboudMessage = outboundTransformer.transform(outbound).decode();
-
-        // Test that message details are equal
-        assertEquals(incomingMessage.getAddress(), outboudMessage.getAddress());
-        assertEquals(incomingMessage.getDeliveryCount(), outboudMessage.getDeliveryCount());
-        assertEquals(incomingMessage.getCreationTime(), outboudMessage.getCreationTime());
-        assertEquals(incomingMessage.getContentType(), outboudMessage.getContentType());
-
-        // Test Message annotations
-        ApplicationProperties incomingApplicationProperties = incomingMessage.getApplicationProperties();
-        ApplicationProperties outgoingApplicationProperties = outboudMessage.getApplicationProperties();
-
-        assertEquals(incomingApplicationProperties.getValue(), outgoingApplicationProperties.getValue());
-
-        // Test Message properties
-        MessageAnnotations incomingMessageAnnotations = incomingMessage.getMessageAnnotations();
-        MessageAnnotations outgoingMessageAnnotations = outboudMessage.getMessageAnnotations();
-
-        assertEquals(incomingMessageAnnotations.getValue(), outgoingMessageAnnotations.getValue());
-
-        // Test that bodies are equal
-        assertTrue(incomingMessage.getBody() instanceof AmqpValue);
-        assertTrue(outboudMessage.getBody() instanceof AmqpValue);
-
-        AmqpValue incomingBody = (AmqpValue) incomingMessage.getBody();
-        AmqpValue outgoingBody = (AmqpValue) outboudMessage.getBody();
-
-        assertTrue(incomingBody.getValue() instanceof String);
-        assertTrue(outgoingBody.getValue() instanceof String);
-
-        assertEquals(incomingBody.getValue(), outgoingBody.getValue());
-    }
-
-    @Test
-    public void testBodyOnlyEncodeDecode() throws Exception {
-
-        Message incomingMessage = Proton.message();
-
-        incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
-
-        EncodedMessage encoded = encode(incomingMessage);
-        InboundTransformer inboundTransformer = getInboundTransformer();
-        OutboundTransformer outboundTransformer = getOutboundTransformer();
-
-        ActiveMQMessage intermediate = inboundTransformer.transform(encoded);
-        intermediate.onSend();
-        Message outboudMessage = outboundTransformer.transform(intermediate).decode();
-
-        assertNull(outboudMessage.getHeader());
-        assertNull(outboudMessage.getProperties());
-    }
-
     private Message createTypicalQpidJMSMessage() {
         Map<String, Object> applicationProperties = new HashMap<String, Object>();
         Map<Symbol, Object> messageAnnotations = new HashMap<Symbol, Object>();


[2/8] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6438

Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-6438

Add additional test for larger more complex AMQP message
(cherry picked from commit 5702ec8d7ca330c94d6e4b59cea4606285b2c196)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/95faf0d8
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/95faf0d8
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/95faf0d8

Branch: refs/heads/activemq-5.14.x
Commit: 95faf0d87cf37715c5d163925fbab84c69576ee1
Parents: 14c553a
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Sep 26 17:19:42 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Sep 28 15:08:09 2016 -0400

----------------------------------------------------------------------
 .../JMSTransformationSpeedComparisonTest.java   | 73 ++++++++++++++++++++
 1 file changed, 73 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/95faf0d8/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSTransformationSpeedComparisonTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSTransformationSpeedComparisonTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSTransformationSpeedComparisonTest.java
index e1b6a46..98d6722 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSTransformationSpeedComparisonTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSTransformationSpeedComparisonTest.java
@@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
@@ -196,6 +197,34 @@ public class JMSTransformationSpeedComparisonTest {
     }
 
     @Test
+    public void testComplexQpidJMSMessage() throws Exception {
+
+        EncodedMessage encoded = encode(createComplexQpidJMSMessage());
+        InboundTransformer inboundTransformer = getInboundTransformer();
+        OutboundTransformer outboundTransformer = getOutboundTransformer();
+
+        // Warm up
+        for (int i = 0; i < WARM_CYCLES; ++i) {
+            ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded);
+            intermediate.onSend();
+            outboundTransformer.transform(intermediate);
+        }
+
+        long totalDuration = 0;
+
+        long startTime = System.nanoTime();
+        for (int i = 0; i < PROFILE_CYCLES; ++i) {
+            ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded);
+            intermediate.onSend();
+            outboundTransformer.transform(intermediate);
+        }
+        totalDuration += System.nanoTime() - startTime;
+
+        LOG.info("[{}] Total time for {} cycles of transforms = {} ms  -> [{}]",
+            transformer, PROFILE_CYCLES, TimeUnit.NANOSECONDS.toMillis(totalDuration), test.getMethodName());
+    }
+
+    @Test
     public void testTypicalQpidJMSMessageInBoundOnly() throws Exception {
 
         EncodedMessage encoded = encode(createTypicalQpidJMSMessage());
@@ -314,6 +343,50 @@ public class JMSTransformationSpeedComparisonTest {
         return message;
     }
 
+    private Message createComplexQpidJMSMessage() {
+        Map<String, Object> applicationProperties = new HashMap<String, Object>();
+        Map<Symbol, Object> messageAnnotations = new HashMap<Symbol, Object>();
+
+        applicationProperties.put("property-1", "string-1");
+        applicationProperties.put("property-2", 512);
+        applicationProperties.put("property-3", true);
+        applicationProperties.put("property-4", "string-2");
+        applicationProperties.put("property-5", 512);
+        applicationProperties.put("property-6", true);
+        applicationProperties.put("property-7", "string-3");
+        applicationProperties.put("property-8", 512);
+        applicationProperties.put("property-9", true);
+
+        messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0);
+        messageAnnotations.put(Symbol.valueOf("x-opt-jms-dest"), 0);
+
+        Message message = Proton.message();
+
+        // Header Values
+        message.setPriority((short) 9);
+        message.setDurable(true);
+        message.setDeliveryCount(2);
+        message.setTtl(5000);
+
+        // Properties
+        message.setMessageId("ID:SomeQualifier:0:0:1");
+        message.setGroupId("Group-ID-1");
+        message.setGroupSequence(15);
+        message.setAddress("queue://test-queue");
+        message.setReplyTo("queue://reply-queue");
+        message.setCreationTime(System.currentTimeMillis());
+        message.setContentType("text/plain");
+        message.setCorrelationId("ID:SomeQualifier:0:7:9");
+        message.setUserId("username".getBytes(StandardCharsets.UTF_8));
+
+        // Application Properties / Message Annotations / Body
+        message.setApplicationProperties(new ApplicationProperties(applicationProperties));
+        message.setMessageAnnotations(new MessageAnnotations(messageAnnotations));
+        message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
+
+        return message;
+    }
+
     private EncodedMessage encode(Message message) {
         ProtonJMessage amqp = (ProtonJMessage) message;
 


[6/8] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6438

Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-6438

Add a new test for encode / decode validation.  Fix issue where the
internal scheduled message properties were escaping into the outbound
message.
(cherry picked from commit b1a9a9382b837fb8cff7f9586c7f9516675bf78a)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4575bee6
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4575bee6
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4575bee6

Branch: refs/heads/activemq-5.14.x
Commit: 4575bee6708cfd8f024ff1e183e910ca622ce9c1
Parents: 9cb92a2
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Sep 27 15:56:27 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Sep 28 15:08:29 2016 -0400

----------------------------------------------------------------------
 .../message/JMSMappingOutboundTransformer.java  |   5 +
 .../amqp/message/MessageTransformationTest.java | 296 +++++++++++++++++++
 2 files changed, 301 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/4575bee6/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
index 985f4f5..fa915e8 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
@@ -99,6 +99,8 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer {
     public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-dest");
     public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-reply-to");
 
+    private static final String AMQ_SCHEDULED_MESSAGE_PREFIX = "AMQ_SCHEDULED_";
+
     public static final byte QUEUE_TYPE = 0x00;
     public static final byte TOPIC_TYPE = 0x01;
     public static final byte TEMP_QUEUE_TYPE = 0x02;
@@ -323,6 +325,9 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer {
                     footerMap.put(name, value);
                     continue;
                 }
+            } else if (key.startsWith(AMQ_SCHEDULED_MESSAGE_PREFIX )) {
+                // strip off the scheduled message properties
+                continue;
             }
 
             // The property didn't map into any other slot so we store it in the

http://git-wip-us.apache.org/repos/asf/activemq/blob/4575bee6/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/MessageTransformationTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/MessageTransformationTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/MessageTransformationTest.java
new file mode 100644
index 0000000..c92e9ea
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/MessageTransformationTest.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.message;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.transport.amqp.JMSInteroperabilityTest;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.codec.CompositeWritableBuffer;
+import org.apache.qpid.proton.codec.DroppingWritableBuffer;
+import org.apache.qpid.proton.codec.WritableBuffer;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.message.ProtonJMessage;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests some basic encode / decode functionality on the transformers.
+ */
+@RunWith(Parameterized.class)
+public class MessageTransformationTest {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(JMSInteroperabilityTest.class);
+
+    @Rule
+    public TestName test = new TestName();
+
+    private final String transformer;
+
+    public MessageTransformationTest(String transformer) {
+        this.transformer = transformer;
+    }
+
+    @Parameters(name="Transformer->{0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {"jms"},
+                {"native"},
+                {"raw"},
+            });
+    }
+
+    private InboundTransformer getInboundTransformer() {
+        switch (transformer) {
+            case "raw":
+                return new AMQPRawInboundTransformer();
+            case "native":
+                return new AMQPNativeInboundTransformer();
+            default:
+                return new JMSMappingInboundTransformer();
+        }
+    }
+
+    private OutboundTransformer getOutboundTransformer() {
+        switch (transformer) {
+            case "raw":
+            case "native":
+                return new AMQPNativeOutboundTransformer();
+            default:
+                return new JMSMappingOutboundTransformer();
+        }
+    }
+
+    @Test
+    public void testEncodeDecodeFidelity() throws Exception {
+        Map<String, Object> applicationProperties = new HashMap<String, Object>();
+        Map<Symbol, Object> messageAnnotations = new HashMap<Symbol, Object>();
+
+        applicationProperties.put("property-1", "string");
+        applicationProperties.put("property-2", 512);
+        applicationProperties.put("property-3", true);
+
+        messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0);
+        messageAnnotations.put(Symbol.valueOf("x-opt-jms-dest"), 0);
+
+        Message incomingMessage = Proton.message();
+
+        incomingMessage.setAddress("queue://test-queue");
+        incomingMessage.setDeliveryCount(1);
+        incomingMessage.setApplicationProperties(new ApplicationProperties(applicationProperties));
+        incomingMessage.setMessageAnnotations(new MessageAnnotations(messageAnnotations));
+        incomingMessage.setCreationTime(System.currentTimeMillis());
+        incomingMessage.setContentType("text/plain");
+        incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
+
+        EncodedMessage encoded = encode(incomingMessage);
+        InboundTransformer inboundTransformer = getInboundTransformer();
+        OutboundTransformer outboundTransformer = getOutboundTransformer();
+
+        ActiveMQMessage outbound = inboundTransformer.transform(encoded);
+        outbound.onSend();
+        Message outboudMessage = outboundTransformer.transform(outbound).decode();
+
+        // Test that message details are equal
+        assertEquals(incomingMessage.getAddress(), outboudMessage.getAddress());
+        assertEquals(incomingMessage.getDeliveryCount(), outboudMessage.getDeliveryCount());
+        assertEquals(incomingMessage.getCreationTime(), outboudMessage.getCreationTime());
+        assertEquals(incomingMessage.getContentType(), outboudMessage.getContentType());
+
+        // Test Message annotations
+        ApplicationProperties incomingApplicationProperties = incomingMessage.getApplicationProperties();
+        ApplicationProperties outgoingApplicationProperties = outboudMessage.getApplicationProperties();
+
+        assertEquals(incomingApplicationProperties.getValue(), outgoingApplicationProperties.getValue());
+
+        // Test Message properties
+        MessageAnnotations incomingMessageAnnotations = incomingMessage.getMessageAnnotations();
+        MessageAnnotations outgoingMessageAnnotations = outboudMessage.getMessageAnnotations();
+
+        assertEquals(incomingMessageAnnotations.getValue(), outgoingMessageAnnotations.getValue());
+
+        // Test that bodies are equal
+        assertTrue(incomingMessage.getBody() instanceof AmqpValue);
+        assertTrue(outboudMessage.getBody() instanceof AmqpValue);
+
+        AmqpValue incomingBody = (AmqpValue) incomingMessage.getBody();
+        AmqpValue outgoingBody = (AmqpValue) outboudMessage.getBody();
+
+        assertTrue(incomingBody.getValue() instanceof String);
+        assertTrue(outgoingBody.getValue() instanceof String);
+
+        assertEquals(incomingBody.getValue(), outgoingBody.getValue());
+    }
+
+    @Test
+    public void testBodyOnlyEncodeDecode() throws Exception {
+
+        Message incomingMessage = Proton.message();
+
+        incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
+
+        EncodedMessage encoded = encode(incomingMessage);
+        InboundTransformer inboundTransformer = getInboundTransformer();
+        OutboundTransformer outboundTransformer = getOutboundTransformer();
+
+        ActiveMQMessage intermediate = inboundTransformer.transform(encoded);
+        intermediate.onSend();
+        Message outboudMessage = outboundTransformer.transform(intermediate).decode();
+
+        assertNull(outboudMessage.getHeader());
+        assertNull(outboudMessage.getProperties());
+    }
+
+    @Test
+    public void testPropertiesButNoHeadersEncodeDecode() throws Exception {
+
+        Message incomingMessage = Proton.message();
+
+        incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
+        incomingMessage.setMessageId("ID:SomeQualifier:0:0:1");
+
+        EncodedMessage encoded = encode(incomingMessage);
+        InboundTransformer inboundTransformer = getInboundTransformer();
+        OutboundTransformer outboundTransformer = getOutboundTransformer();
+
+        ActiveMQMessage intermediate = inboundTransformer.transform(encoded);
+        intermediate.onSend();
+        Message outboudMessage = outboundTransformer.transform(intermediate).decode();
+
+        assertNull(outboudMessage.getHeader());
+        assertNotNull(outboudMessage.getProperties());
+    }
+
+    @Test
+    public void testHeaderButNoPropertiesEncodeDecode() throws Exception {
+
+        Message incomingMessage = Proton.message();
+
+        incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
+        incomingMessage.setDurable(true);
+
+        EncodedMessage encoded = encode(incomingMessage);
+        InboundTransformer inboundTransformer = getInboundTransformer();
+        OutboundTransformer outboundTransformer = getOutboundTransformer();
+
+        ActiveMQMessage intermediate = inboundTransformer.transform(encoded);
+        intermediate.onSend();
+        Message outboudMessage = outboundTransformer.transform(intermediate).decode();
+
+        assertNotNull(outboudMessage.getHeader());
+        assertNull(outboudMessage.getProperties());
+    }
+
+    @Test
+    public void testComplexQpidJMSMessageEncodeDecode() throws Exception {
+
+        Map<String, Object> applicationProperties = new HashMap<String, Object>();
+        Map<Symbol, Object> messageAnnotations = new HashMap<Symbol, Object>();
+
+        applicationProperties.put("property-1", "string-1");
+        applicationProperties.put("property-2", 512);
+        applicationProperties.put("property-3", true);
+        applicationProperties.put("property-4", "string-2");
+        applicationProperties.put("property-5", 512);
+        applicationProperties.put("property-6", true);
+        applicationProperties.put("property-7", "string-3");
+        applicationProperties.put("property-8", 512);
+        applicationProperties.put("property-9", true);
+
+        messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0);
+        messageAnnotations.put(Symbol.valueOf("x-opt-jms-dest"), 0);
+        messageAnnotations.put(Symbol.valueOf("x-opt-jms-reply-to"), 0);
+        messageAnnotations.put(Symbol.valueOf("x-opt-delivery-delay"), 2000);
+
+        Message message = Proton.message();
+
+        // Header Values
+        message.setPriority((short) 9);
+        message.setDurable(true);
+        message.setDeliveryCount(2);
+        message.setTtl(5000);
+
+        // Properties
+        message.setMessageId("ID:SomeQualifier:0:0:1");
+        message.setGroupId("Group-ID-1");
+        message.setGroupSequence(15);
+        message.setAddress("queue://test-queue");
+        message.setReplyTo("queue://reply-queue");
+        message.setCreationTime(System.currentTimeMillis());
+        message.setContentType("text/plain");
+        message.setCorrelationId("ID:SomeQualifier:0:7:9");
+        message.setUserId("username".getBytes(StandardCharsets.UTF_8));
+
+        // Application Properties / Message Annotations / Body
+        message.setApplicationProperties(new ApplicationProperties(applicationProperties));
+        message.setMessageAnnotations(new MessageAnnotations(messageAnnotations));
+        message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
+
+        EncodedMessage encoded = encode(message);
+        InboundTransformer inboundTransformer = getInboundTransformer();
+        OutboundTransformer outboundTransformer = getOutboundTransformer();
+
+        ActiveMQMessage intermediate = inboundTransformer.transform(encoded);
+        intermediate.onSend();
+        Message outboudMessage = outboundTransformer.transform(intermediate).decode();
+
+        assertNotNull(outboudMessage.getHeader());
+        assertNotNull(outboudMessage.getProperties());
+        assertNotNull(outboudMessage.getMessageAnnotations());
+        assertNotNull(outboudMessage.getApplicationProperties());
+        assertNull(outboudMessage.getDeliveryAnnotations());
+        assertNull(outboudMessage.getFooter());
+
+        assertEquals(9, outboudMessage.getApplicationProperties().getValue().size());
+        assertEquals(4, outboudMessage.getMessageAnnotations().getValue().size());
+    }
+
+    private EncodedMessage encode(Message message) {
+        ProtonJMessage amqp = (ProtonJMessage) message;
+
+        ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]);
+        final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
+        int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
+        if (overflow.position() > 0) {
+            buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]);
+            c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
+        }
+
+        return new EncodedMessage(1, buffer.array(), 0, c);
+    }
+}


[4/8] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6438

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/activemq/blob/9cb92a22/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
index 59c306f..985f4f5 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
@@ -18,40 +18,63 @@ package org.apache.activemq.transport.amqp.message;
 
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_DATA;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_NULL;
-import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_ORIGINAL_ENCODING_KEY;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_SEQUENCE;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_UNKNOWN;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_BINARY;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_LIST;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_STRING;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.CONTENT_ENCODING;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.CONTENT_TYPE;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.DELIVERY_ANNOTATION_PREFIX;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.EMPTY_BINARY;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.FIRST_ACQUIRER;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.FOOTER_PREFIX;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.HEADER;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_CONTENT_TYPE;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_DELIVERY_ANNOTATION_PREFIX;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_FOOTER_PREFIX;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_PREFIX;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_PREFIX_LENGTH;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.MESSAGE_ANNOTATION_PREFIX;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.MESSAGE_FORMAT;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.NATIVE;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.ORIGINAL_ENCODING;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.PROPERTIES;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.REPLYTO_GROUP_ID;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.getBinaryFromMessageBody;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.getMapFromMessageBody;
 
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Date;
-import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Map;
 
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
-import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.MessageEOFException;
-import javax.jms.MessageFormatException;
-import javax.jms.ObjectMessage;
 import javax.jms.Queue;
-import javax.jms.StreamMessage;
 import javax.jms.TemporaryQueue;
 import javax.jms.TemporaryTopic;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.transport.amqp.AmqpProtocolException;
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.activemq.util.TypeConversionSupport;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedByte;
@@ -66,12 +89,12 @@ import org.apache.qpid.proton.amqp.messaging.Header;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Properties;
 import org.apache.qpid.proton.amqp.messaging.Section;
-import org.apache.qpid.proton.codec.CompositeWritableBuffer;
-import org.apache.qpid.proton.codec.DroppingWritableBuffer;
-import org.apache.qpid.proton.codec.WritableBuffer;
-import org.apache.qpid.proton.message.ProtonJMessage;
+import org.apache.qpid.proton.codec.AMQPDefinedTypes;
+import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.fusesource.hawtbuf.UTF8Buffer;
 
-public class JMSMappingOutboundTransformer extends OutboundTransformer {
+public class JMSMappingOutboundTransformer implements OutboundTransformer {
 
     public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-dest");
     public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-reply-to");
@@ -81,225 +104,276 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
     public static final byte TEMP_QUEUE_TYPE = 0x02;
     public static final byte TEMP_TOPIC_TYPE = 0x03;
 
-    // Deprecated legacy values used by old QPid AMQP 1.0 JMS client.
-
-    public static final Symbol LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-to-type");
-    public static final Symbol LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-reply-type");
-
-    public static final String LEGACY_QUEUE_TYPE = "queue";
-    public static final String LEGACY_TOPIC_TYPE = "topic";
-    public static final String LEGACY_TEMP_QUEUE_TYPE = "temporary,queue";
-    public static final String LEGACY_TEMP_TOPIC_TYPE = "temporary,topic";
-
-    public JMSMappingOutboundTransformer(ActiveMQJMSVendor vendor) {
-        super(vendor);
+    // For now Proton requires that we create a decoder to create an encoder
+    private final DecoderImpl decoder = new DecoderImpl();
+    private final EncoderImpl encoder = new EncoderImpl(decoder);
+    {
+        AMQPDefinedTypes.registerAllTypes(decoder, encoder);
     }
 
     @Override
-    public EncodedMessage transform(Message msg) throws Exception {
-        if (msg == null) {
-            return null;
-        }
-
-        try {
-            if (msg.getBooleanProperty(prefixVendor + "NATIVE")) {
-                return null;
-            }
-        } catch (MessageFormatException e) {
+    public EncodedMessage transform(ActiveMQMessage message) throws Exception {
+        if (message == null) {
             return null;
         }
-        ProtonJMessage amqp = convert(msg);
-
-        long messageFormat;
-        try {
-            messageFormat = msg.getLongProperty(this.messageFormatKey);
-        } catch (MessageFormatException e) {
-            return null;
-        }
-
-        ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]);
-        final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
-        int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
-        if (overflow.position() > 0) {
-            buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]);
-            c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
-        }
-
-        return new EncodedMessage(messageFormat, buffer.array(), 0, c);
-    }
-
-    /**
-     * Perform the conversion between JMS Message and Proton Message without
-     * re-encoding it to array. This is needed because some frameworks may elect
-     * to do this on their own way.
-     *
-     * @param message
-     *      The message to transform into an AMQP version for dispatch.
-     *
-     * @return an AMQP Message that represents the given JMS Message.
-     *
-     * @throws Exception if an error occurs during the conversion.
-     */
-    public ProtonJMessage convert(Message message) throws JMSException, UnsupportedEncodingException {
-        Header header = new Header();
-        Properties props = new Properties();
 
+        long messageFormat = 0;
+        Header header = null;
+        Properties properties = null;
         Map<Symbol, Object> daMap = null;
         Map<Symbol, Object> maMap = null;
         Map<String,Object> apMap = null;
         Map<Object, Object> footerMap = null;
-        Section body = null;
 
-        body = convertBody(message);
+        Section body = convertBody(message);
 
-        header.setDurable(message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false);
-        header.setPriority(new UnsignedByte((byte) message.getJMSPriority()));
-        if (message.getJMSType() != null) {
-            props.setSubject(message.getJMSType());
+        if (message.isPersistent()) {
+            if (header == null) {
+                header = new Header();
+            }
+            header.setDurable(true);
         }
-        if (message.getJMSMessageID() != null) {
-            props.setMessageId(vendor.getOriginalMessageId(message));
+        byte priority = message.getPriority();
+        if (priority != Message.DEFAULT_PRIORITY) {
+            if (header == null) {
+                header = new Header();
+            }
+            header.setPriority(new UnsignedByte(priority));
+        }
+        String type = message.getType();
+        if (type != null) {
+            if (properties == null) {
+                properties = new Properties();
+            }
+            properties.setSubject(type);
+        }
+        MessageId messageId = message.getMessageId();
+        if (messageId != null) {
+            if (properties == null) {
+                properties = new Properties();
+            }
+            properties.setMessageId(getOriginalMessageId(message));
         }
-        if (message.getJMSDestination() != null) {
-            props.setTo(vendor.toAddress(message.getJMSDestination()));
+        ActiveMQDestination destination = message.getDestination();
+        if (destination != null) {
+            if (properties == null) {
+                properties = new Properties();
+            }
+            properties.setTo(destination.getQualifiedName());
             if (maMap == null) {
                 maMap = new HashMap<Symbol, Object>();
             }
-            maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(message.getJMSDestination()));
-
-            // Deprecated: used by legacy QPid AMQP 1.0 JMS client
-            maMap.put(LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION, destinationAttributes(message.getJMSDestination()));
+            maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(destination));
         }
-        if (message.getJMSReplyTo() != null) {
-            props.setReplyTo(vendor.toAddress(message.getJMSReplyTo()));
+        ActiveMQDestination replyTo = message.getReplyTo();
+        if (replyTo != null) {
+            if (properties == null) {
+                properties = new Properties();
+            }
+            properties.setReplyTo(replyTo.getQualifiedName());
             if (maMap == null) {
                 maMap = new HashMap<Symbol, Object>();
             }
-            maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(message.getJMSReplyTo()));
-
-            // Deprecated: used by legacy QPid AMQP 1.0 JMS client
-            maMap.put(LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationAttributes(message.getJMSReplyTo()));
+            maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo));
         }
-        if (message.getJMSCorrelationID() != null) {
-            String correlationId = message.getJMSCorrelationID();
+        String correlationId = message.getCorrelationId();
+        if (correlationId != null) {
+            if (properties == null) {
+                properties = new Properties();
+            }
             try {
-                props.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId));
+                properties.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId));
             } catch (AmqpProtocolException e) {
-                props.setCorrelationId(correlationId);
+                properties.setCorrelationId(correlationId);
             }
         }
-        if (message.getJMSExpiration() != 0) {
-            long ttl = message.getJMSExpiration() - System.currentTimeMillis();
+        long expiration = message.getExpiration();
+        if (expiration != 0) {
+            long ttl = expiration - System.currentTimeMillis();
             if (ttl < 0) {
                 ttl = 1;
             }
+
+            if (header == null) {
+                header = new Header();
+            }
             header.setTtl(new UnsignedInteger((int) ttl));
 
-            props.setAbsoluteExpiryTime(new Date(message.getJMSExpiration()));
+            if (properties == null) {
+                properties = new Properties();
+            }
+            properties.setAbsoluteExpiryTime(new Date(expiration));
         }
-        if (message.getJMSTimestamp() != 0) {
-            props.setCreationTime(new Date(message.getJMSTimestamp()));
+        long timeStamp = message.getTimestamp();
+        if (timeStamp != 0) {
+            if (properties == null) {
+                properties = new Properties();
+            }
+            properties.setCreationTime(new Date(timeStamp));
         }
 
-        @SuppressWarnings("unchecked")
-        final Enumeration<String> keys = message.getPropertyNames();
-
-        while (keys.hasMoreElements()) {
-            String key = keys.nextElement();
-            if (key.equals(messageFormatKey) || key.equals(nativeKey) || key.equals(AMQP_ORIGINAL_ENCODING_KEY)) {
-                // skip transformer appended properties
-            } else if (key.equals(firstAcquirerKey)) {
-                header.setFirstAcquirer(message.getBooleanProperty(key));
-            } else if (key.startsWith("JMSXDeliveryCount")) {
-                // The AMQP delivery-count field only includes prior failed delivery attempts,
-                // whereas JMSXDeliveryCount includes the first/current delivery attempt.
-                int amqpDeliveryCount = message.getIntProperty(key) - 1;
-                if (amqpDeliveryCount > 0) {
-                    header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount));
-                }
-            } else if (key.startsWith("JMSXUserID")) {
-                String value = message.getStringProperty(key);
-                props.setUserId(new Binary(value.getBytes("UTF-8")));
-            } else if (key.startsWith("JMSXGroupID")) {
-                String value = message.getStringProperty(key);
-                props.setGroupId(value);
-                if (apMap == null) {
-                    apMap = new HashMap<String, Object>();
-                }
-                apMap.put(key, value);
-            } else if (key.startsWith("JMSXGroupSeq")) {
-                UnsignedInteger value = new UnsignedInteger(message.getIntProperty(key));
-                props.setGroupSequence(value);
-                if (apMap == null) {
-                    apMap = new HashMap<String, Object>();
-                }
-                apMap.put(key, value);
-            } else if (key.startsWith(prefixDeliveryAnnotationsKey)) {
-                if (daMap == null) {
-                    daMap = new HashMap<Symbol, Object>();
-                }
-                String name = key.substring(prefixDeliveryAnnotationsKey.length());
-                daMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
-            } else if (key.startsWith(prefixMessageAnnotationsKey)) {
-                if (maMap == null) {
-                    maMap = new HashMap<Symbol, Object>();
-                }
-                String name = key.substring(prefixMessageAnnotationsKey.length());
-                maMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
-            } else if (key.equals(contentTypeKey)) {
-                props.setContentType(Symbol.getSymbol(message.getStringProperty(key)));
-            } else if (key.equals(contentEncodingKey)) {
-                props.setContentEncoding(Symbol.getSymbol(message.getStringProperty(key)));
-            } else if (key.equals(replyToGroupIDKey)) {
-                props.setReplyToGroupId(message.getStringProperty(key));
-            } else if (key.startsWith(prefixFooterKey)) {
-                if (footerMap == null) {
-                    footerMap = new HashMap<Object, Object>();
-                }
-                String name = key.substring(prefixFooterKey.length());
-                footerMap.put(name, message.getObjectProperty(key));
-            } else {
-                if (apMap == null) {
-                    apMap = new HashMap<String, Object>();
+        // JMSX Message Properties
+        int deliveryCount = message.getRedeliveryCounter();
+        if (deliveryCount > 0) {
+            if (header == null) {
+                header = new Header();
+            }
+            header.setDeliveryCount(new UnsignedInteger(deliveryCount));
+        }
+        String userId = message.getUserID();
+        if (userId != null) {
+            if (properties == null) {
+                properties = new Properties();
+            }
+            properties.setUserId(new Binary(userId.getBytes(StandardCharsets.UTF_8)));
+        }
+        String groupId = message.getGroupID();
+        if (groupId != null) {
+            if (properties == null) {
+                properties = new Properties();
+            }
+            properties.setGroupId(groupId);
+        }
+        int groupSequence = message.getGroupSequence();
+        if (groupSequence > 0) {
+            UnsignedInteger value = new UnsignedInteger(groupSequence);
+            if (properties == null) {
+                properties = new Properties();
+            }
+            properties.setGroupSequence(value);
+        }
+
+        final Map<String, Object> entries;
+        try {
+            entries = message.getProperties();
+        } catch (IOException e) {
+            throw JMSExceptionSupport.create(e);
+        }
+
+        for (Map.Entry<String, Object> entry : entries.entrySet()) {
+            String key = entry.getKey();
+            Object value = entry.getValue();
+            if (value instanceof UTF8Buffer) {
+                value = value.toString();
+            }
+
+            if (key.startsWith(JMS_AMQP_PREFIX)) {
+                if (key.startsWith(NATIVE, JMS_AMQP_PREFIX_LENGTH)) {
+                    // skip transformer appended properties
+                    continue;
+                } else if (key.startsWith(ORIGINAL_ENCODING, JMS_AMQP_PREFIX_LENGTH)) {
+                    // skip transformer appended properties
+                    continue;
+                } else if (key.startsWith(MESSAGE_FORMAT, JMS_AMQP_PREFIX_LENGTH)) {
+                    messageFormat = (long) TypeConversionSupport.convert(entry.getValue(), Long.class);
+                    continue;
+                } else if (key.startsWith(HEADER, JMS_AMQP_PREFIX_LENGTH)) {
+                    if (header == null) {
+                        header = new Header();
+                    }
+                    continue;
+                } else if (key.startsWith(PROPERTIES, JMS_AMQP_PREFIX_LENGTH)) {
+                    if (properties == null) {
+                        properties = new Properties();
+                    }
+                    continue;
+                } else if (key.startsWith(MESSAGE_ANNOTATION_PREFIX, JMS_AMQP_PREFIX_LENGTH)) {
+                    if (maMap == null) {
+                        maMap = new HashMap<Symbol, Object>();
+                    }
+                    String name = key.substring(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX.length());
+                    maMap.put(Symbol.valueOf(name), value);
+                    continue;
+                } else if (key.startsWith(FIRST_ACQUIRER, JMS_AMQP_PREFIX_LENGTH)) {
+                    if (header == null) {
+                        header = new Header();
+                    }
+                    header.setFirstAcquirer((boolean) TypeConversionSupport.convert(value, Boolean.class));
+                    continue;
+                } else if (key.startsWith(CONTENT_TYPE, JMS_AMQP_PREFIX_LENGTH)) {
+                    if (properties == null) {
+                        properties = new Properties();
+                    }
+                    properties.setContentType(Symbol.getSymbol((String) TypeConversionSupport.convert(value, String.class)));
+                    continue;
+                } else if (key.startsWith(CONTENT_ENCODING, JMS_AMQP_PREFIX_LENGTH)) {
+                    if (properties == null) {
+                        properties = new Properties();
+                    }
+                    properties.setContentEncoding(Symbol.getSymbol((String) TypeConversionSupport.convert(value, String.class)));
+                    continue;
+                } else if (key.startsWith(REPLYTO_GROUP_ID, JMS_AMQP_PREFIX_LENGTH)) {
+                    if (properties == null) {
+                        properties = new Properties();
+                    }
+                    properties.setReplyToGroupId((String) TypeConversionSupport.convert(value, String.class));
+                    continue;
+                } else if (key.startsWith(DELIVERY_ANNOTATION_PREFIX, JMS_AMQP_PREFIX_LENGTH)) {
+                    if (daMap == null) {
+                        daMap = new HashMap<Symbol, Object>();
+                    }
+                    String name = key.substring(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX.length());
+                    daMap.put(Symbol.valueOf(name), value);
+                    continue;
+                } else if (key.startsWith(FOOTER_PREFIX, JMS_AMQP_PREFIX_LENGTH)) {
+                    if (footerMap == null) {
+                        footerMap = new HashMap<Object, Object>();
+                    }
+                    String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length());
+                    footerMap.put(name, value);
+                    continue;
                 }
-                apMap.put(key, message.getObjectProperty(key));
             }
+
+            // The property didn't map into any other slot so we store it in the
+            // Application Properties section of the message.
+            if (apMap == null) {
+                apMap = new HashMap<String, Object>();
+            }
+            apMap.put(key, value);
         }
 
-        MessageAnnotations ma = null;
-        if (maMap != null) {
-            ma = new MessageAnnotations(maMap);
+        final AmqpWritableBuffer buffer = new AmqpWritableBuffer();
+        encoder.setByteBuffer(buffer);
+
+        if (header != null) {
+            encoder.writeObject(header);
         }
-        DeliveryAnnotations da = null;
         if (daMap != null) {
-            da = new DeliveryAnnotations(daMap);
+            encoder.writeObject(new DeliveryAnnotations(daMap));
+        }
+        if (maMap != null) {
+            encoder.writeObject(new MessageAnnotations(maMap));
+        }
+        if (properties != null) {
+            encoder.writeObject(properties);
         }
-        ApplicationProperties ap = null;
         if (apMap != null) {
-            ap = new ApplicationProperties(apMap);
+            encoder.writeObject(new ApplicationProperties(apMap));
+        }
+        if (body != null) {
+            encoder.writeObject(body);
         }
-        Footer footer = null;
         if (footerMap != null) {
-            footer = new Footer(footerMap);
+            encoder.writeObject(new Footer(footerMap));
         }
 
-        return (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(header, da, ma, props, ap, body, footer);
+        return new EncodedMessage(messageFormat, buffer.getArray(), 0, buffer.getArrayLength());
     }
 
-    private Section convertBody(Message message) throws JMSException {
+    private Section convertBody(ActiveMQMessage message) throws JMSException {
 
         Section body = null;
         short orignalEncoding = AMQP_UNKNOWN;
 
-        if (message.propertyExists(AMQP_ORIGINAL_ENCODING_KEY)) {
-            try {
-                orignalEncoding = message.getShortProperty(AMQP_ORIGINAL_ENCODING_KEY);
-            } catch (Exception ex) {
-            }
+        try {
+            orignalEncoding = message.getShortProperty(JMS_AMQP_ORIGINAL_ENCODING);
+        } catch (Exception ex) {
+            // Ignore and stick with UNKNOWN
         }
 
-        if (message instanceof BytesMessage) {
-            Binary payload = vendor.getBinaryFromMessageBody((BytesMessage) message);
+        if (message instanceof ActiveMQBytesMessage) {
+            Binary payload = getBinaryFromMessageBody((ActiveMQBytesMessage) message);
 
             if (payload == null) {
                 payload = EMPTY_BINARY;
@@ -317,12 +391,12 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
                     body = new Data(payload);
                     break;
             }
-        } else if (message instanceof TextMessage) {
+        } else if (message instanceof ActiveMQTextMessage) {
             switch (orignalEncoding) {
                 case AMQP_NULL:
                     break;
                 case AMQP_DATA:
-                    body = new Data(vendor.getBinaryFromMessageBody((TextMessage) message));
+                    body = new Data(getBinaryFromMessageBody((ActiveMQTextMessage) message));
                     break;
                 case AMQP_VALUE_STRING:
                 case AMQP_UNKNOWN:
@@ -330,11 +404,11 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
                     body = new AmqpValue(((TextMessage) message).getText());
                     break;
             }
-        } else if (message instanceof MapMessage) {
-            body = new AmqpValue(vendor.getMapFromMessageBody((MapMessage) message));
-        } else if (message instanceof StreamMessage) {
+        } else if (message instanceof ActiveMQMapMessage) {
+            body = new AmqpValue(getMapFromMessageBody((ActiveMQMapMessage) message));
+        } else if (message instanceof ActiveMQStreamMessage) {
             ArrayList<Object> list = new ArrayList<Object>();
-            final StreamMessage m = (StreamMessage) message;
+            final ActiveMQStreamMessage m = (ActiveMQStreamMessage) message;
             try {
                 while (true) {
                     list.add(m.readObject());
@@ -352,8 +426,8 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
                     body = new AmqpValue(list);
                     break;
             }
-        } else if (message instanceof ObjectMessage) {
-            Binary payload = vendor.getBinaryFromMessageBody((ObjectMessage) message);
+        } else if (message instanceof ActiveMQObjectMessage) {
+            Binary payload = getBinaryFromMessageBody((ActiveMQObjectMessage) message);
 
             if (payload == null) {
                 payload = EMPTY_BINARY;
@@ -373,8 +447,10 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
             // For a non-AMQP message we tag the outbound content type as containing
             // a serialized Java object so that an AMQP client has a hint as to what
             // we are sending it.
-            if (!message.propertyExists(contentTypeKey)) {
-                vendor.setMessageProperty(message, contentTypeKey, SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
+            if (!message.propertyExists(JMS_AMQP_CONTENT_TYPE)) {
+                message.setReadOnlyProperties(false);
+                message.setStringProperty(JMS_AMQP_CONTENT_TYPE, SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
+                message.setReadOnlyProperties(true);
             }
         }
 
@@ -399,23 +475,19 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
         throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer.");
     }
 
-    // Used by legacy QPid AMQP 1.0 JMS client.
-    @Deprecated
-    private static String destinationAttributes(Destination destination) {
-        if (destination instanceof Queue) {
-            if (destination instanceof TemporaryQueue) {
-                return LEGACY_TEMP_QUEUE_TYPE;
-            } else {
-                return LEGACY_QUEUE_TYPE;
-            }
-        } else if (destination instanceof Topic) {
-            if (destination instanceof TemporaryTopic) {
-                return LEGACY_TEMP_TOPIC_TYPE;
-            } else {
-                return LEGACY_TOPIC_TYPE;
+    private static Object getOriginalMessageId(ActiveMQMessage message) {
+        Object result;
+        MessageId messageId = message.getMessageId();
+        if (messageId.getTextView() != null) {
+            try {
+                result = AMQPMessageIdHelper.INSTANCE.toIdObject(messageId.getTextView());
+            } catch (AmqpProtocolException e) {
+                result = messageId.getTextView();
             }
+        } else {
+            result = messageId.toString();
         }
 
-        throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer.");
+        return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/9cb92a22/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java
index 2eefa50..6ca9ced 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java
@@ -16,54 +16,10 @@
  */
 package org.apache.activemq.transport.amqp.message;
 
-import javax.jms.Message;
+import org.apache.activemq.command.ActiveMQMessage;
 
-public abstract class OutboundTransformer {
+public interface OutboundTransformer {
 
-    protected final ActiveMQJMSVendor vendor;
+    public abstract EncodedMessage transform(ActiveMQMessage message) throws Exception;
 
-    protected String prefixVendor;
-
-    protected String prefixDeliveryAnnotations = "DA_";
-    protected String prefixMessageAnnotations= "MA_";
-    protected String prefixFooter = "FT_";
-
-    protected String messageFormatKey;
-    protected String nativeKey;
-    protected String firstAcquirerKey;
-    protected String prefixDeliveryAnnotationsKey;
-    protected String prefixMessageAnnotationsKey;
-    protected String contentTypeKey;
-    protected String contentEncodingKey;
-    protected String replyToGroupIDKey;
-    protected String prefixFooterKey;
-
-    public OutboundTransformer(ActiveMQJMSVendor vendor) {
-        this.vendor = vendor;
-        this.setPrefixVendor("JMS_AMQP_");
-    }
-
-    public abstract EncodedMessage transform(Message jms) throws Exception;
-
-    public String getPrefixVendor() {
-        return prefixVendor;
-    }
-
-    public void setPrefixVendor(String prefixVendor) {
-        this.prefixVendor = prefixVendor;
-
-        messageFormatKey = prefixVendor + "MESSAGE_FORMAT";
-        nativeKey = prefixVendor + "NATIVE";
-        firstAcquirerKey = prefixVendor + "FirstAcquirer";
-        prefixDeliveryAnnotationsKey = prefixVendor + prefixDeliveryAnnotations;
-        prefixMessageAnnotationsKey = prefixVendor + prefixMessageAnnotations;
-        contentTypeKey = prefixVendor + "ContentType";
-        contentEncodingKey = prefixVendor + "ContentEncoding";
-        replyToGroupIDKey = prefixVendor + "ReplyToGroupID";
-        prefixFooterKey = prefixVendor + prefixFooter;
-    }
-
-    public ActiveMQJMSVendor getVendor() {
-        return vendor;
-    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/9cb92a22/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
index 503a05e..33c319e 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
@@ -37,7 +37,6 @@ import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
 import org.apache.activemq.transport.amqp.ResponseHandler;
 import org.apache.activemq.transport.amqp.message.AMQPNativeInboundTransformer;
 import org.apache.activemq.transport.amqp.message.AMQPRawInboundTransformer;
-import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor;
 import org.apache.activemq.transport.amqp.message.EncodedMessage;
 import org.apache.activemq.transport.amqp.message.InboundTransformer;
 import org.apache.activemq.transport.amqp.message.JMSMappingInboundTransformer;
@@ -138,14 +137,14 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
         if (inboundTransformer == null) {
             String transformer = session.getConnection().getConfiguredTransformer();
             if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_JMS)) {
-                inboundTransformer = new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+                inboundTransformer = new JMSMappingInboundTransformer();
             } else if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_NATIVE)) {
-                inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+                inboundTransformer = new AMQPNativeInboundTransformer();
             } else if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_RAW)) {
-                inboundTransformer = new AMQPRawInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+                inboundTransformer = new AMQPRawInboundTransformer();
             } else {
                 LOG.warn("Unknown transformer type {} using native one instead", transformer);
-                inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+                inboundTransformer = new AMQPNativeInboundTransformer();
             }
         }
         return inboundTransformer;
@@ -157,7 +156,7 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
             EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), deliveryBytes.data, deliveryBytes.offset, deliveryBytes.length);
 
             InboundTransformer transformer = getTransformer();
-            ActiveMQMessage message = (ActiveMQMessage) transformer.transform(em);
+            ActiveMQMessage message = transformer.transform(em);
 
             current = null;
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/9cb92a22/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index 455e0b0..2531c1a 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport.amqp.protocol;
 
 import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_MESSAGE_FORMAT;
 
 import java.io.IOException;
 import java.util.LinkedList;
@@ -39,7 +40,6 @@ import org.apache.activemq.command.Response;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
 import org.apache.activemq.transport.amqp.ResponseHandler;
-import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor;
 import org.apache.activemq.transport.amqp.message.AutoOutboundTransformer;
 import org.apache.activemq.transport.amqp.message.EncodedMessage;
 import org.apache.activemq.transport.amqp.message.OutboundTransformer;
@@ -75,11 +75,10 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
 
     private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
 
-    private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
+    private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer();
     private final AmqpTransferTagGenerator tagCache = new AmqpTransferTagGenerator();
     private final LinkedList<MessageDispatch> outbound = new LinkedList<MessageDispatch>();
     private final LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>();
-    private final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT";
 
     private final ConsumerInfo consumerInfo;
     private AbstractSubscription subscription;
@@ -437,8 +436,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
                         temp = (ActiveMQMessage) md.getMessage();
                     }
 
-                    if (!temp.getProperties().containsKey(MESSAGE_FORMAT_KEY)) {
-                        temp.setProperty(MESSAGE_FORMAT_KEY, 0);
+                    if (!temp.getProperties().containsKey(JMS_AMQP_MESSAGE_FORMAT)) {
+                        temp.setProperty(JMS_AMQP_MESSAGE_FORMAT, 0);
                     }
                 }
 
@@ -477,6 +476,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
                             currentDelivery = getEndpoint().delivery(tag, 0, tag.length);
                         }
                         currentDelivery.setContext(md);
+                        currentDelivery.setMessageFormat((int) amqp.getMessageFormat());
                     } else {
                         // TODO: message could not be generated what now?
                     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/9cb92a22/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
index b513c1a..201cee2 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
@@ -87,8 +87,6 @@ public class AmqpTransformerTest {
 
         assertTrue(message instanceof BytesMessage);
         Boolean nativeTransformationUsed = message.getBooleanProperty("JMS_AMQP_NATIVE");
-        Long messageFormat = message.getLongProperty("JMS_AMQP_MESSAGE_FORMAT");
-        assertEquals(0L, messageFormat.longValue());
         assertTrue("Didn't use the correct transformation, expected NATIVE", nativeTransformationUsed);
         assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
         assertEquals(7, message.getJMSPriority());
@@ -136,8 +134,6 @@ public class AmqpTransformerTest {
         LOG.info("Recieved message: {}", message);
         assertTrue(message instanceof BytesMessage);
         Boolean nativeTransformationUsed = message.getBooleanProperty("JMS_AMQP_NATIVE");
-        Long messageFormat = message.getLongProperty("JMS_AMQP_MESSAGE_FORMAT");
-        assertEquals(0L, messageFormat.longValue());
         assertTrue("Didn't use the correct transformation, expected NATIVE", nativeTransformationUsed);
         assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
 
@@ -184,8 +180,6 @@ public class AmqpTransformerTest {
 
         assertTrue(message instanceof TextMessage);
         Boolean nativeTransformationUsed = message.getBooleanProperty("JMS_AMQP_NATIVE");
-        Long messageFormat = message.getLongProperty("JMS_AMQP_MESSAGE_FORMAT");
-        assertEquals(0L, messageFormat.longValue());
         assertFalse("Didn't use the correct transformation, expected NOT to be NATIVE", nativeTransformationUsed);
         assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/9cb92a22/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
index 84d5864..fa61e14 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.Message;
@@ -468,4 +469,46 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
         amqp.close();
         openwire.close();
     }
+
+    //----- Test Qpid JMS to Qpid JMS interop with transformers --------------//
+
+    @Test
+    public void testQpidJMSToQpidJMSMessageSendReceive() throws Exception {
+        final int SIZE = 1024;
+        final int NUM_MESSAGES = 100;
+
+        Connection amqpSend = createConnection("client-1");
+        Connection amqpReceive = createConnection("client-2");
+
+        amqpReceive.start();
+
+        Session senderSession = amqpSend.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Session receiverSession = amqpReceive.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Destination queue = senderSession.createQueue(getDestinationName());
+
+        MessageProducer amqpProducer = senderSession.createProducer(queue);
+        MessageConsumer amqpConsumer = receiverSession.createConsumer(queue);
+
+        byte[] payload = new byte[SIZE];
+
+        for (int i = 0; i < NUM_MESSAGES; ++i) {
+            BytesMessage outgoing = senderSession.createBytesMessage();
+            outgoing.setLongProperty("SendTime", System.currentTimeMillis());
+            outgoing.writeBytes(payload);
+            amqpProducer.send(outgoing);
+        }
+
+        // Now consumer the message
+        for (int i = 0; i < NUM_MESSAGES; ++i) {
+            Message received = amqpConsumer.receive(2000);
+            assertNotNull(received);
+            assertTrue("Expected BytesMessage but got " + received, received instanceof BytesMessage);
+            BytesMessage incoming = (BytesMessage) received;
+            assertEquals(SIZE, incoming.getBodyLength());
+        }
+
+        amqpReceive.close();
+        amqpSend.close();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/9cb92a22/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index 2b1b874..e5e1bbd 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -257,7 +257,7 @@ public class AmqpMessage {
      * @return the set message ID in String form or null if not set.
      */
     public String getMessageId() {
-        if (message.getProperties() == null) {
+        if (message.getProperties() == null || message.getProperties().getMessageId() == null) {
             return null;
         }
 
@@ -309,7 +309,7 @@ public class AmqpMessage {
      * @return the set correlation ID in String form or null if not set.
      */
     public String getCorrelationId() {
-        if (message.getProperties() == null) {
+        if (message.getProperties() == null || message.getProperties().getCorrelationId() == null) {
             return null;
         }
 
@@ -387,7 +387,7 @@ public class AmqpMessage {
      * @return true if the message is marked as being durable.
      */
     public boolean isDurable() {
-        if (message.getHeader() == null) {
+        if (message.getHeader() == null || message.getHeader().getDurable() == null) {
             return false;
         }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/9cb92a22/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java
index ba0f014..1427b5a 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java
@@ -51,7 +51,6 @@ import org.apache.qpid.proton.amqp.messaging.Data;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.message.Message;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 public class JMSMappingInboundTransformerTest {
 
@@ -65,8 +64,7 @@ public class JMSMappingInboundTransformerTest {
      */
     @Test
     public void testCreateBytesMessageFromNoBodySectionAndContentType() throws Exception {
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
 
         Message message = Message.Factory.create();
         message.setContentType(AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE);
@@ -86,8 +84,7 @@ public class JMSMappingInboundTransformerTest {
      */
     @Test
     public void testCreateBytesMessageFromNoBodySectionAndNoContentType() throws Exception {
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
 
         Message message = Message.Factory.create();
 
@@ -107,8 +104,7 @@ public class JMSMappingInboundTransformerTest {
     */
     @Test
     public void testCreateObjectMessageFromNoBodySectionAndContentType() throws Exception {
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
 
         Message message = Message.Factory.create();
         message.setContentType(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
@@ -122,8 +118,7 @@ public class JMSMappingInboundTransformerTest {
 
     @Test
     public void testCreateTextMessageFromNoBodySectionAndContentType() throws Exception {
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
 
         Message message = Message.Factory.create();
         message.setContentType("text/plain");
@@ -143,8 +138,7 @@ public class JMSMappingInboundTransformerTest {
      * @throws Exception if an error occurs during the test.
      */
     public void testCreateGenericMessageFromNoBodySectionAndUnknownContentType() throws Exception {
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
 
         Message message = Message.Factory.create();
         message.setContentType("unknown-content-type");
@@ -174,8 +168,7 @@ public class JMSMappingInboundTransformerTest {
 
         EncodedMessage em = encodeMessage(message);
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
         javax.jms.Message jmsMessage = transformer.transform(em);
 
         assertNotNull("Message should not be null", jmsMessage);
@@ -197,8 +190,7 @@ public class JMSMappingInboundTransformerTest {
 
         EncodedMessage em = encodeMessage(message);
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
         javax.jms.Message jmsMessage = transformer.transform(em);
 
         assertNotNull("Message should not be null", jmsMessage);
@@ -222,8 +214,7 @@ public class JMSMappingInboundTransformerTest {
 
         EncodedMessage em = encodeMessage(message);
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
         javax.jms.Message jmsMessage = transformer.transform(em);
 
         assertNotNull("Message should not be null", jmsMessage);
@@ -246,8 +237,7 @@ public class JMSMappingInboundTransformerTest {
 
         EncodedMessage em = encodeMessage(message);
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
         javax.jms.Message jmsMessage = transformer.transform(em);
 
         assertNotNull("Message should not be null", jmsMessage);
@@ -350,8 +340,7 @@ public class JMSMappingInboundTransformerTest {
 
         EncodedMessage em = encodeMessage(message);
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
         javax.jms.Message jmsMessage = transformer.transform(em);
 
         assertNotNull("Message should not be null", jmsMessage);
@@ -377,8 +366,7 @@ public class JMSMappingInboundTransformerTest {
 
         EncodedMessage em = encodeMessage(message);
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
         javax.jms.Message jmsMessage = transformer.transform(em);
 
         assertNotNull("Message should not be null", jmsMessage);
@@ -398,8 +386,7 @@ public class JMSMappingInboundTransformerTest {
 
         EncodedMessage em = encodeMessage(message);
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
         javax.jms.Message jmsMessage = transformer.transform(em);
 
         assertNotNull("Message should not be null", jmsMessage);
@@ -415,8 +402,7 @@ public class JMSMappingInboundTransformerTest {
     */
     @Test
     public void testCreateObjectMessageFromAmqpValueWithBinaryAndContentType() throws Exception {
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
 
         Message message = Message.Factory.create();
         message.setBody(new AmqpValue(new Binary(new byte[0])));
@@ -443,8 +429,7 @@ public class JMSMappingInboundTransformerTest {
 
         EncodedMessage em = encodeMessage(message);
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
         javax.jms.Message jmsMessage = transformer.transform(em);
 
         assertNotNull("Message should not be null", jmsMessage);
@@ -465,8 +450,7 @@ public class JMSMappingInboundTransformerTest {
 
         EncodedMessage em = encodeMessage(message);
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
         javax.jms.Message jmsMessage = transformer.transform(em);
 
         assertNotNull("Message should not be null", jmsMessage);
@@ -487,8 +471,7 @@ public class JMSMappingInboundTransformerTest {
 
         EncodedMessage em = encodeMessage(message);
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
         javax.jms.Message jmsMessage = transformer.transform(em);
 
         assertNotNull("Message should not be null", jmsMessage);
@@ -509,8 +492,7 @@ public class JMSMappingInboundTransformerTest {
 
         EncodedMessage em = encodeMessage(message);
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
         javax.jms.Message jmsMessage = transformer.transform(em);
 
         assertNotNull("Message should not be null", jmsMessage);
@@ -531,8 +513,7 @@ public class JMSMappingInboundTransformerTest {
 
         EncodedMessage em = encodeMessage(message);
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
 
         javax.jms.Message jmsMessage = transformer.transform(em);
 
@@ -548,8 +529,7 @@ public class JMSMappingInboundTransformerTest {
 
         EncodedMessage em = encodeMessage(message);
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
         javax.jms.Message jmsMessage = transformer.transform(em);
 
         assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
@@ -589,9 +569,7 @@ public class JMSMappingInboundTransformerTest {
     }
 
     private void doTransformWithToTypeDestinationTypeAnnotationTestImpl(Object toTypeAnnotationValue, Class<? extends Destination> expectedClass) throws Exception {
-        ActiveMQTextMessage mockTextMessage = createMockTextMessage();
-        ActiveMQJMSVendor mockVendor = createMockVendor(mockTextMessage);
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(mockVendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
 
         String toAddress = "toAddress";
         Message amqp = Message.Factory.create();
@@ -608,11 +586,6 @@ public class JMSMappingInboundTransformerTest {
 
         javax.jms.Message jmsMessage = transformer.transform(em);
         assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
-
-        // Verify that createDestination was called with the provided 'to'
-        // address and 'Destination' class
-        // TODO - No need to really test this bit ?
-        // Mockito.verify(mockVendor).createDestination(toAddress, expectedClass);
     }
 
     //----- ReplyTo Conversions ----------------------------------------------//
@@ -643,9 +616,7 @@ public class JMSMappingInboundTransformerTest {
     }
 
     private void doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(Object replyToTypeAnnotationValue, Class<? extends Destination> expectedClass) throws Exception {
-        ActiveMQTextMessage mockTextMessage = createMockTextMessage();
-        ActiveMQJMSVendor mockVendor = createMockVendor(mockTextMessage);
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(mockVendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
 
         String replyToAddress = "replyToAddress";
         Message amqp = Message.Factory.create();
@@ -662,31 +633,10 @@ public class JMSMappingInboundTransformerTest {
 
         javax.jms.Message jmsMessage = transformer.transform(em);
         assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
-
-        // Verify that createDestination was called with the provided 'replyTo'
-        // address and 'Destination' class
-        // TODO - No need to really test this bit ?
-        // Mockito.verify(mockVendor).createDestination(replyToAddress, expectedClass);
     }
 
     //----- Utility Methods --------------------------------------------------//
 
-    private ActiveMQTextMessage createMockTextMessage() {
-        return Mockito.mock(ActiveMQTextMessage.class);
-    }
-
-    private ActiveMQJMSVendor createMockVendor(ActiveMQTextMessage mockTextMessage) {
-        ActiveMQJMSVendor mockVendor = Mockito.mock(ActiveMQJMSVendor.class);
-        Mockito.when(mockVendor.createTextMessage()).thenReturn(mockTextMessage);
-        Mockito.when(mockVendor.createTextMessage(Mockito.any(String.class))).thenReturn(mockTextMessage);
-
-        return mockVendor;
-    }
-
-    private ActiveMQJMSVendor createVendor() {
-        return ActiveMQJMSVendor.INSTANCE;
-    }
-
     private EncodedMessage encodeMessage(Message message) {
         byte[] encodeBuffer = new byte[1024 * 8];
         int encodedSize;


[5/8] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6438

Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-6438

JMS Transformer performance improvements and bug fixes

Trim unnecessary code and improve overall performance of the JMS
Transformer codecs.  Remove legacy Qpid JMS client related code from the
transformer as these are no longer supported.  Fix outgoing message that
do not match the structure of the incoming message that created them such
as message with had only a body being sent out with Headers and message
Properties.

(cherry picked from commit 63d62a71f59ec485ac79e1ce40e98316d37ca14a)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9cb92a22
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9cb92a22
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9cb92a22

Branch: refs/heads/activemq-5.14.x
Commit: 9cb92a225fc1d93b8a8096cc6f11781e32623819
Parents: 95faf0d
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Sep 21 12:57:16 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Sep 28 15:08:18 2016 -0400

----------------------------------------------------------------------
 .../message/AMQPNativeInboundTransformer.java   |  12 +-
 .../message/AMQPNativeOutboundTransformer.java  | 102 ++--
 .../amqp/message/AMQPRawInboundTransformer.java |  32 +-
 .../amqp/message/ActiveMQJMSVendor.java         | 398 ---------------
 .../amqp/message/AmqpMessageSupport.java        | 181 ++++++-
 .../amqp/message/AmqpWritableBuffer.java        | 164 +++++++
 .../amqp/message/AutoOutboundTransformer.java   |  26 +-
 .../amqp/message/InboundTransformer.java        | 163 +++----
 .../message/JMSMappingInboundTransformer.java   | 111 +++--
 .../message/JMSMappingOutboundTransformer.java  | 482 +++++++++++--------
 .../amqp/message/OutboundTransformer.java       |  50 +-
 .../transport/amqp/protocol/AmqpReceiver.java   |  11 +-
 .../transport/amqp/protocol/AmqpSender.java     |  12 +-
 .../transport/amqp/AmqpTransformerTest.java     |   6 -
 .../transport/amqp/JMSInteroperabilityTest.java |  43 ++
 .../transport/amqp/client/AmqpMessage.java      |   6 +-
 .../JMSMappingInboundTransformerTest.java       |  92 +---
 .../JMSMappingOutboundTransformerTest.java      | 384 ++++++++-------
 .../JMSTransformationSpeedComparisonTest.java   |  55 ++-
 19 files changed, 1161 insertions(+), 1169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/9cb92a22/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java
index 65cd657..b5429e6 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java
@@ -16,14 +16,10 @@
  */
 package org.apache.activemq.transport.amqp.message;
 
-import javax.jms.Message;
+import org.apache.activemq.command.ActiveMQMessage;
 
 public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
 
-    public AMQPNativeInboundTransformer(ActiveMQJMSVendor vendor) {
-        super(vendor);
-    }
-
     @Override
     public String getTransformerName() {
         return TRANSFORMER_NATIVE;
@@ -31,14 +27,14 @@ public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
 
     @Override
     public InboundTransformer getFallbackTransformer() {
-        return new AMQPRawInboundTransformer(getVendor());
+        return new AMQPRawInboundTransformer();
     }
 
     @Override
-    protected Message doTransform(EncodedMessage amqpMessage) throws Exception {
+    protected ActiveMQMessage doTransform(EncodedMessage amqpMessage) throws Exception {
         org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
 
-        Message result = super.doTransform(amqpMessage);
+        ActiveMQMessage result = super.doTransform(amqpMessage);
 
         populateMessage(result, amqp);
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/9cb92a22/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
index 620b79b..cbc3461 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
@@ -16,93 +16,73 @@
  */
 package org.apache.activemq.transport.amqp.message;
 
-import java.nio.ByteBuffer;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_MESSAGE_FORMAT;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.getBinaryFromMessageBody;
 
-import javax.jms.BytesMessage;
 import javax.jms.JMSException;
-import javax.jms.Message;
 import javax.jms.MessageFormatException;
 
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.apache.qpid.proton.amqp.messaging.Header;
-import org.apache.qpid.proton.codec.CompositeWritableBuffer;
-import org.apache.qpid.proton.codec.DroppingWritableBuffer;
-import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.message.ProtonJMessage;
 
-public class AMQPNativeOutboundTransformer extends OutboundTransformer {
-
-    public AMQPNativeOutboundTransformer(ActiveMQJMSVendor vendor) {
-        super(vendor);
-    }
+public class AMQPNativeOutboundTransformer implements OutboundTransformer {
 
     @Override
-    public EncodedMessage transform(Message msg) throws Exception {
-        if (msg == null || !(msg instanceof BytesMessage)) {
-            return null;
-        }
-
-        try {
-            if (!msg.getBooleanProperty(prefixVendor + "NATIVE")) {
-                return null;
-            }
-        } catch (MessageFormatException e) {
+    public EncodedMessage transform(ActiveMQMessage message) throws Exception {
+        if (message == null || !(message instanceof ActiveMQBytesMessage)) {
             return null;
         }
 
-        return transform(this, (BytesMessage) msg);
+        return transform(this, (ActiveMQBytesMessage) message);
     }
 
-    static EncodedMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException {
+    static EncodedMessage transform(OutboundTransformer options, ActiveMQBytesMessage message) throws JMSException {
         long messageFormat;
         try {
-            messageFormat = msg.getLongProperty(options.prefixVendor + "MESSAGE_FORMAT");
+            messageFormat = message.getLongProperty(JMS_AMQP_MESSAGE_FORMAT);
         } catch (MessageFormatException e) {
             return null;
         }
-        byte data[] = new byte[(int) msg.getBodyLength()];
-        int dataSize = data.length;
-        msg.readBytes(data);
-        msg.reset();
-
-        try {
-            int count = msg.getIntProperty("JMSXDeliveryCount");
-            if (count > 1) {
 
-                // decode...
-                ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create();
-                int offset = 0;
-                int len = data.length;
-                while (len > 0) {
-                    final int decoded = amqp.decode(data, offset, len);
-                    assert decoded > 0 : "Make progress decoding the message";
-                    offset += decoded;
-                    len -= decoded;
-                }
+        Binary encodedMessage = getBinaryFromMessageBody(message);
+        byte encodedData[] = encodedMessage.getArray();
+        int encodedSize = encodedMessage.getLength();
 
-                // Update the DeliveryCount header...
-                // The AMQP delivery-count field only includes prior failed delivery attempts,
-                // whereas JMSXDeliveryCount includes the first/current delivery attempt. Subtract 1.
-                if (amqp.getHeader() == null) {
-                    amqp.setHeader(new Header());
-                }
+        int count = message.getRedeliveryCounter();
+        if (count >= 1) {
 
-                amqp.getHeader().setDeliveryCount(new UnsignedInteger(count - 1));
+            // decode...
+            ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create();
+            int offset = 0;
+            int len = encodedSize;
+            while (len > 0) {
+                final int decoded = amqp.decode(encodedData, offset, len);
+                assert decoded > 0 : "Make progress decoding the message";
+                offset += decoded;
+                len -= decoded;
+            }
 
-                // Re-encode...
-                ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]);
-                final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
-                int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
-                if (overflow.position() > 0) {
-                    buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]);
-                    c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
-                }
-                data = buffer.array();
-                dataSize = c;
+            // Update the DeliveryCount header...
+            // The AMQP delivery-count field only includes prior failed delivery attempts,
+            // whereas JMSXDeliveryCount includes the first/current delivery attempt. Subtract 1.
+            if (amqp.getHeader() == null) {
+                amqp.setHeader(new Header());
             }
-        } catch (JMSException e) {
+
+            amqp.getHeader().setDeliveryCount(new UnsignedInteger(count));
+
+            // Re-encode...
+            final AmqpWritableBuffer buffer = new AmqpWritableBuffer();
+            int written = amqp.encode(buffer);
+
+            encodedData = buffer.getArray();
+            encodedSize = written;
         }
 
-        return new EncodedMessage(messageFormat, data, 0, dataSize);
+        return new EncodedMessage(messageFormat, encodedData, 0, encodedSize);
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/9cb92a22/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
index c534709..b4d3ad6 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
@@ -16,15 +16,16 @@
  */
 package org.apache.activemq.transport.amqp.message;
 
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_MESSAGE_FORMAT;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_NATIVE;
+
 import javax.jms.Message;
 
-public class AMQPRawInboundTransformer extends InboundTransformer {
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.util.ByteSequence;
 
-    public AMQPRawInboundTransformer(ActiveMQJMSVendor vendor) {
-        super(vendor);
-    }
+public class AMQPRawInboundTransformer extends InboundTransformer {
 
     @Override
     public String getTransformerName() {
@@ -37,22 +38,23 @@ public class AMQPRawInboundTransformer extends InboundTransformer {
     }
 
     @Override
-    protected Message doTransform(EncodedMessage amqpMessage) throws Exception {
-        BytesMessage result = vendor.createBytesMessage(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
+    protected ActiveMQMessage doTransform(EncodedMessage amqpMessage) throws Exception {
+        ActiveMQBytesMessage result = new ActiveMQBytesMessage();
+        result.setContent(new ByteSequence(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength()));
 
         // We cannot decode the message headers to check so err on the side of caution
         // and mark all messages as persistent.
-        result.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
-        result.setJMSPriority(defaultPriority);
+        result.setPersistent(true);
+        result.setPriority((byte) Message.DEFAULT_PRIORITY);
 
         final long now = System.currentTimeMillis();
-        result.setJMSTimestamp(now);
-        if (defaultTtl > 0) {
-            result.setJMSExpiration(now + defaultTtl);
+        result.setTimestamp(now);
+
+        if (amqpMessage.getMessageFormat() != 0) {
+            result.setLongProperty(JMS_AMQP_MESSAGE_FORMAT, amqpMessage.getMessageFormat());
         }
 
-        result.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
-        result.setBooleanProperty(prefixVendor + "NATIVE", true);
+        result.setBooleanProperty(JMS_AMQP_NATIVE, true);
 
         return result;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/9cb92a22/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java
deleted file mode 100644
index 9b5a4ab..0000000
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java
+++ /dev/null
@@ -1,398 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.message;
-
-import java.io.DataInputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.zip.InflaterInputStream;
-
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageNotWriteableException;
-import javax.jms.ObjectMessage;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.command.ActiveMQBytesMessage;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMapMessage;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQObjectMessage;
-import org.apache.activemq.command.ActiveMQStreamMessage;
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.transport.amqp.AmqpProtocolException;
-import org.apache.activemq.util.ByteArrayInputStream;
-import org.apache.activemq.util.ByteArrayOutputStream;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.JMSExceptionSupport;
-import org.apache.qpid.proton.amqp.Binary;
-
-public class ActiveMQJMSVendor {
-
-    final public static ActiveMQJMSVendor INSTANCE = new ActiveMQJMSVendor();
-
-    private ActiveMQJMSVendor() {
-    }
-
-    /**
-     * @return a new vendor specific Message instance.
-     */
-    public Message createMessage() {
-        return new ActiveMQMessage();
-    }
-
-    /**
-     * @return a new vendor specific BytesMessage instance.
-     */
-    public BytesMessage createBytesMessage() {
-        return new ActiveMQBytesMessage();
-    }
-
-    /**
-     * @return a new vendor specific BytesMessage instance with the given payload.
-     */
-    public BytesMessage createBytesMessage(byte[] content, int offset, int length) {
-        ActiveMQBytesMessage message = new ActiveMQBytesMessage();
-        message.setContent(new ByteSequence(content, offset, length));
-        return message;
-    }
-
-    /**
-     * @return a new vendor specific StreamMessage instance.
-     */
-    public StreamMessage createStreamMessage() {
-        return new ActiveMQStreamMessage();
-    }
-
-    /**
-     * @return a new vendor specific TextMessage instance.
-     */
-    public TextMessage createTextMessage() {
-        return new ActiveMQTextMessage();
-    }
-
-    /**
-     * @return a new vendor specific TextMessage instance with the given string in the body.
-     */
-    public TextMessage createTextMessage(String text) {
-        ActiveMQTextMessage message = new ActiveMQTextMessage();
-        try {
-            message.setText(text);
-        } catch (MessageNotWriteableException ex) {}
-
-        return message;
-    }
-
-    /**
-     * @return a new vendor specific ObjectMessage instance.
-     */
-    public ObjectMessage createObjectMessage() {
-        return new ActiveMQObjectMessage();
-    }
-
-    /**
-     * @return a new vendor specific ObjectMessage instance with the serialized form given.
-     */
-    public ObjectMessage createObjectMessage(byte[] content, int offset, int length) {
-        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
-        message.setContent(new ByteSequence(content, offset, length));
-        return message;
-    }
-
-    /**
-     * @return a new vendor specific MapMessage instance.
-     */
-    public MapMessage createMapMessage() {
-        return new ActiveMQMapMessage();
-    }
-
-    /**
-     * @return a new vendor specific MapMessage instance with the given map as its content.
-     */
-    public MapMessage createMapMessage(Map<String, Object> content) throws JMSException {
-        ActiveMQMapMessage message = new ActiveMQMapMessage();
-        final Set<Map.Entry<String, Object>> set = content.entrySet();
-        for (Map.Entry<String, Object> entry : set) {
-            message.setObject(entry.getKey(), entry.getValue());
-        }
-        return message;
-    }
-
-    /**
-     * Creates a new JMS Destination instance from the given name.
-     *
-     * @param name
-     *      the name to use to construct the new Destination
-     *
-     * @return a new JMS Destination object derived from the given name.
-     */
-    public Destination createDestination(String name) {
-        return ActiveMQDestination.createDestination(name, ActiveMQDestination.QUEUE_TYPE);
-    }
-
-    /**
-     * Set the given value as the JMSXUserID on the message instance.
-     *
-     * @param message
-     *      the message to be updated.
-     * @param value
-     *      the value to apply to the message.
-     */
-    public void setJMSXUserID(Message msg, String value) {
-        ((ActiveMQMessage) msg).setUserID(value);
-    }
-
-    /**
-     * Set the given value as the JMSXGroupID on the message instance.
-     *
-     * @param message
-     *      the message to be updated.
-     * @param value
-     *      the value to apply to the message.
-     */
-    public void setJMSXGroupID(Message msg, String value) {
-        ((ActiveMQMessage) msg).setGroupID(value);
-    }
-
-    /**
-     * Set the given value as the JMSXGroupSequence on the message instance.
-     *
-     * @param message
-     *      the message to be updated.
-     * @param value
-     *      the value to apply to the message.
-     */
-    public void setJMSXGroupSequence(Message msg, int value) {
-        ((ActiveMQMessage) msg).setGroupSequence(value);
-    }
-
-    /**
-     * Set the given value as the JMSXDeliveryCount on the message instance.
-     *
-     * @param message
-     *      the message to be updated.
-     * @param value
-     *      the value to apply to the message.
-     */
-    public void setJMSXDeliveryCount(Message msg, long value) {
-        ((ActiveMQMessage) msg).setRedeliveryCounter((int) value);
-    }
-
-    /**
-     * Convert the given JMS Destination into the appropriate AMQP address string
-     * for assignment to the 'to' or 'replyTo' field of an AMQP message.
-     *
-     * @param destination
-     *      the JMS Destination instance to be converted.
-     *
-     * @return the converted string address to assign to the message.
-     */
-    public String toAddress(Destination dest) {
-        return ((ActiveMQDestination) dest).getQualifiedName();
-    }
-
-    /**
-     * Given an Message instance return the original Message ID that was assigned the
-     * Message when it was first processed by the broker.  For an AMQP message this
-     * should be the original value of the message's MessageId field with the correct
-     * type preserved.
-     *
-     * @param message
-     *      the message which is being accessed.
-     *
-     * @return the original MessageId assigned to this Message instance.
-     */
-    public Object getOriginalMessageId(Message message) {
-        Object result;
-        MessageId msgId = ((ActiveMQMessage)message).getMessageId();
-        if (msgId.getTextView() != null) {
-            try {
-                result = AMQPMessageIdHelper.INSTANCE.toIdObject(msgId.getTextView());
-            } catch (AmqpProtocolException e) {
-                result = msgId.getTextView().toString();
-            }
-        } else {
-            result = msgId.toString();
-        }
-
-        return result;
-    }
-
-    /**
-     * Return the encoded form of the BytesMessage as an AMQP Binary instance.
-     *
-     * @param message
-     *      the Message whose binary encoded body is needed.
-     *
-     * @return a Binary instance containing the encoded message body.
-     *
-     * @throws JMSException if an error occurs while fetching the binary payload.
-     */
-    public Binary getBinaryFromMessageBody(BytesMessage message) throws JMSException {
-        ActiveMQBytesMessage bytesMessage = (ActiveMQBytesMessage) message;
-        Binary result = null;
-
-        if (bytesMessage.getContent() != null) {
-            ByteSequence contents = bytesMessage.getContent();
-
-            if (bytesMessage.isCompressed()) {
-                int length = (int) bytesMessage.getBodyLength();
-                byte[] uncompressed = new byte[length];
-                bytesMessage.readBytes(uncompressed);
-
-                result = new Binary(uncompressed);
-            } else {
-                return new Binary(contents.getData(), contents.getOffset(), contents.getLength());
-            }
-        }
-
-        return result;
-    }
-
-    /**
-     * Return the encoded form of the BytesMessage as an AMQP Binary instance.
-     *
-     * @param message
-     *      the Message whose binary encoded body is needed.
-     *
-     * @return a Binary instance containing the encoded message body.
-     *
-     * @throws JMSException if an error occurs while fetching the binary payload.
-     */
-    public Binary getBinaryFromMessageBody(ObjectMessage message) throws JMSException {
-        ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) message;
-        Binary result = null;
-
-        if (objectMessage.getContent() != null) {
-            ByteSequence contents = objectMessage.getContent();
-
-            if (objectMessage.isCompressed()) {
-                try (ByteArrayOutputStream os = new ByteArrayOutputStream();
-                     ByteArrayInputStream is = new ByteArrayInputStream(contents);
-                     InflaterInputStream iis = new InflaterInputStream(is);) {
-
-                    byte value;
-                    while ((value = (byte) iis.read()) != -1) {
-                        os.write(value);
-                    }
-
-                    ByteSequence expanded = os.toByteSequence();
-                    result = new Binary(expanded.getData(), expanded.getOffset(), expanded.getLength());
-                } catch (Exception cause) {
-                   throw JMSExceptionSupport.create(cause);
-               }
-            } else {
-                return new Binary(contents.getData(), contents.getOffset(), contents.getLength());
-            }
-        }
-
-        return result;
-    }
-
-    /**
-     * Return the encoded form of the Message as an AMQP Binary instance.
-     *
-     * @param message
-     *      the Message whose binary encoded body is needed.
-     *
-     * @return a Binary instance containing the encoded message body.
-     *
-     * @throws JMSException if an error occurs while fetching the binary payload.
-     */
-    public Binary getBinaryFromMessageBody(TextMessage message) throws JMSException {
-        ActiveMQTextMessage textMessage = (ActiveMQTextMessage) message;
-        Binary result = null;
-
-        if (textMessage.getContent() != null) {
-            ByteSequence contents = textMessage.getContent();
-
-            if (textMessage.isCompressed()) {
-                try (ByteArrayInputStream is = new ByteArrayInputStream(contents);
-                     InflaterInputStream iis = new InflaterInputStream(is);
-                     DataInputStream dis = new DataInputStream(iis);) {
-
-                    int size = dis.readInt();
-                    byte[] uncompressed = new byte[size];
-                    dis.readFully(uncompressed);
-
-                    result = new Binary(uncompressed);
-                } catch (Exception cause) {
-                    throw JMSExceptionSupport.create(cause);
-                }
-            } else {
-                // Message includes a size prefix of four bytes for the OpenWire marshaler
-                result = new Binary(contents.getData(), contents.getOffset() + 4, contents.getLength() - 4);
-            }
-        } else if (textMessage.getText() != null) {
-            result = new Binary(textMessage.getText().getBytes(StandardCharsets.UTF_8));
-        }
-
-        return result;
-    }
-
-    /**
-     * Return the underlying Map from the JMS MapMessage instance.
-     *
-     * @param message
-     *      the MapMessage whose underlying Map is requested.
-     *
-     * @return the underlying Map used to store the value in the given MapMessage.
-     *
-     * @throws JMSException if an error occurs in constructing or fetching the Map.
-     */
-    public Map<String, Object> getMapFromMessageBody(MapMessage message) throws JMSException {
-        final HashMap<String, Object> map = new HashMap<String, Object>();
-        final ActiveMQMapMessage mapMessage = (ActiveMQMapMessage) message;
-
-        final Map<String, Object> contentMap = mapMessage.getContentMap();
-        if (contentMap != null) {
-            map.putAll(contentMap);
-        }
-
-        return contentMap;
-    }
-
-    /**
-     * Sets the given Message Property on the given message overriding any read-only
-     * state on the Message long enough for the property to be added.
-     *
-     * @param message
-     *      the message to set the property on.
-     * @param key
-     *      the String key for the new Message property
-     * @param value
-     *      the Object to assign to the new Message property.
-     *
-     * @throws JMSException if an error occurs while setting the property.
-     */
-    public void setMessageProperty(Message message, String key, Object value) throws JMSException {
-        final ActiveMQMessage amqMessage = (ActiveMQMessage) message;
-
-        boolean oldValue = amqMessage.isReadOnlyProperties();
-
-        amqMessage.setReadOnlyProperties(false);
-        amqMessage.setObjectProperty(key, value);
-        amqMessage.setReadOnlyProperties(oldValue);
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/9cb92a22/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java
index 3e7a60e..4f468ba 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java
@@ -16,13 +16,26 @@
  */
 package org.apache.activemq.transport.amqp.message;
 
-import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.zip.InflaterInputStream;
 
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.JMSExceptionSupport;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Data;
@@ -34,12 +47,44 @@ import org.apache.qpid.proton.message.Message;
  */
 public final class AmqpMessageSupport {
 
+    // Message Properties used to map AMQP to JMS and back
+
+    public static final String JMS_AMQP_PREFIX = "JMS_AMQP_";
+    public static final int JMS_AMQP_PREFIX_LENGTH = JMS_AMQP_PREFIX.length();
+
+    public static final String MESSAGE_FORMAT = "MESSAGE_FORMAT";
+    public static final String ORIGINAL_ENCODING = "ORIGINAL_ENCODING";
+    public static final String NATIVE = "NATIVE";
+    public static final String HEADER = "HEADER";
+    public static final String PROPERTIES = "PROPERTIES";
+
+    public static final String FIRST_ACQUIRER = "FirstAcquirer";
+    public static final String CONTENT_TYPE = "ContentType";
+    public static final String CONTENT_ENCODING = "ContentEncoding";
+    public static final String REPLYTO_GROUP_ID = "ReplyToGroupID";
+
+    public static final String DELIVERY_ANNOTATION_PREFIX = "DA_";
+    public static final String MESSAGE_ANNOTATION_PREFIX = "MA_";
+    public static final String FOOTER_PREFIX = "FT_";
+
+    public static final String JMS_AMQP_HEADER = JMS_AMQP_PREFIX + HEADER;
+    public static final String JMS_AMQP_PROPERTIES = JMS_AMQP_PREFIX + PROPERTIES;
+    public static final String JMS_AMQP_ORIGINAL_ENCODING = JMS_AMQP_PREFIX + ORIGINAL_ENCODING;
+    public static final String JMS_AMQP_MESSAGE_FORMAT = JMS_AMQP_PREFIX + MESSAGE_FORMAT;
+    public static final String JMS_AMQP_NATIVE = JMS_AMQP_PREFIX + NATIVE;
+    public static final String JMS_AMQP_FIRST_ACQUIRER = JMS_AMQP_PREFIX + FIRST_ACQUIRER;
+    public static final String JMS_AMQP_CONTENT_TYPE = JMS_AMQP_PREFIX + CONTENT_TYPE;
+    public static final String JMS_AMQP_CONTENT_ENCODING = JMS_AMQP_PREFIX + CONTENT_ENCODING;
+    public static final String JMS_AMQP_REPLYTO_GROUP_ID = JMS_AMQP_PREFIX + REPLYTO_GROUP_ID;
+    public static final String JMS_AMQP_DELIVERY_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + DELIVERY_ANNOTATION_PREFIX;
+    public static final String JMS_AMQP_MESSAGE_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + MESSAGE_ANNOTATION_PREFIX;
+    public static final String JMS_AMQP_FOOTER_PREFIX = JMS_AMQP_PREFIX + FOOTER_PREFIX;
+
+    // Message body type definitions
     public static final Binary EMPTY_BINARY = new Binary(new byte[0]);
     public static final Data EMPTY_BODY = new Data(EMPTY_BINARY);
     public static final Data NULL_OBJECT_BODY;
 
-    public static final String AMQP_ORIGINAL_ENCODING_KEY = "JMS_AMQP_ORIGINAL_ENCODING";
-
     public static final short AMQP_UNKNOWN = 0;
     public static final short AMQP_NULL = 1;
     public static final short AMQP_DATA = 2;
@@ -147,4 +192,134 @@ public final class AmqpMessageSupport {
             return baos.toByteArray();
         }
     }
+
+    /**
+     * Return the encoded form of the BytesMessage as an AMQP Binary instance.
+     *
+     * @param message
+     *      the Message whose binary encoded body is needed.
+     *
+     * @return a Binary instance containing the encoded message body.
+     *
+     * @throws JMSException if an error occurs while fetching the binary payload.
+     */
+    public static Binary getBinaryFromMessageBody(ActiveMQBytesMessage message) throws JMSException {
+        Binary result = null;
+
+        if (message.getContent() != null) {
+            ByteSequence contents = message.getContent();
+
+            if (message.isCompressed()) {
+                int length = (int) message.getBodyLength();
+                byte[] uncompressed = new byte[length];
+                message.readBytes(uncompressed);
+
+                result = new Binary(uncompressed);
+            } else {
+                return new Binary(contents.getData(), contents.getOffset(), contents.getLength());
+            }
+        }
+
+        return result;
+    }
+
+    /**
+     * Return the encoded form of the BytesMessage as an AMQP Binary instance.
+     *
+     * @param message
+     *      the Message whose binary encoded body is needed.
+     *
+     * @return a Binary instance containing the encoded message body.
+     *
+     * @throws JMSException if an error occurs while fetching the binary payload.
+     */
+    public static Binary getBinaryFromMessageBody(ActiveMQObjectMessage message) throws JMSException {
+        Binary result = null;
+
+        if (message.getContent() != null) {
+            ByteSequence contents = message.getContent();
+
+            if (message.isCompressed()) {
+                try (ByteArrayOutputStream os = new ByteArrayOutputStream();
+                     ByteArrayInputStream is = new ByteArrayInputStream(contents);
+                     InflaterInputStream iis = new InflaterInputStream(is);) {
+
+                    byte value;
+                    while ((value = (byte) iis.read()) != -1) {
+                        os.write(value);
+                    }
+
+                    ByteSequence expanded = os.toByteSequence();
+                    result = new Binary(expanded.getData(), expanded.getOffset(), expanded.getLength());
+                } catch (Exception cause) {
+                   throw JMSExceptionSupport.create(cause);
+               }
+            } else {
+                return new Binary(contents.getData(), contents.getOffset(), contents.getLength());
+            }
+        }
+
+        return result;
+    }
+
+    /**
+     * Return the encoded form of the Message as an AMQP Binary instance.
+     *
+     * @param message
+     *      the Message whose binary encoded body is needed.
+     *
+     * @return a Binary instance containing the encoded message body.
+     *
+     * @throws JMSException if an error occurs while fetching the binary payload.
+     */
+    public static Binary getBinaryFromMessageBody(ActiveMQTextMessage message) throws JMSException {
+        Binary result = null;
+
+        if (message.getContent() != null) {
+            ByteSequence contents = message.getContent();
+
+            if (message.isCompressed()) {
+                try (ByteArrayInputStream is = new ByteArrayInputStream(contents);
+                     InflaterInputStream iis = new InflaterInputStream(is);
+                     DataInputStream dis = new DataInputStream(iis);) {
+
+                    int size = dis.readInt();
+                    byte[] uncompressed = new byte[size];
+                    dis.readFully(uncompressed);
+
+                    result = new Binary(uncompressed);
+                } catch (Exception cause) {
+                    throw JMSExceptionSupport.create(cause);
+                }
+            } else {
+                // Message includes a size prefix of four bytes for the OpenWire marshaler
+                result = new Binary(contents.getData(), contents.getOffset() + 4, contents.getLength() - 4);
+            }
+        } else if (message.getText() != null) {
+            result = new Binary(message.getText().getBytes(StandardCharsets.UTF_8));
+        }
+
+        return result;
+    }
+
+    /**
+     * Return the underlying Map from the JMS MapMessage instance.
+     *
+     * @param message
+     *      the MapMessage whose underlying Map is requested.
+     *
+     * @return the underlying Map used to store the value in the given MapMessage.
+     *
+     * @throws JMSException if an error occurs in constructing or fetching the Map.
+     */
+    public static Map<String, Object> getMapFromMessageBody(ActiveMQMapMessage message) throws JMSException {
+        final HashMap<String, Object> map = new HashMap<String, Object>();
+
+        final Map<String, Object> contentMap = message.getContentMap();
+        if (contentMap != null) {
+            map.putAll(contentMap);
+        }
+
+        return contentMap;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/9cb92a22/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpWritableBuffer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpWritableBuffer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpWritableBuffer.java
new file mode 100644
index 0000000..399eb3f
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpWritableBuffer.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.message;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.proton.codec.WritableBuffer;
+
+/**
+ *
+ */
+public class AmqpWritableBuffer implements WritableBuffer {
+
+    public final static int DEFAULT_CAPACITY = 4 * 1024;
+
+    byte buffer[];
+    int position;
+
+   /**
+    * Creates a new WritableBuffer with default capacity.
+    */
+   public AmqpWritableBuffer() {
+       this(DEFAULT_CAPACITY);
+   }
+
+    /**
+     * Create a new WritableBuffer with the given capacity.
+     */
+    public AmqpWritableBuffer(int capacity) {
+        this.buffer = new byte[capacity];
+    }
+
+    public byte[] getArray() {
+        return buffer;
+    }
+
+    public int getArrayLength() {
+        return position;
+    }
+
+    @Override
+    public void put(byte b) {
+        int newPosition = position + 1;
+        ensureCapacity(newPosition);
+        buffer[position] = b;
+        position = newPosition;
+    }
+
+    @Override
+    public void putShort(short value) {
+        ensureCapacity(position + 2);
+        buffer[position++] = (byte)(value >>> 8);
+        buffer[position++] = (byte)(value >>> 0);
+    }
+
+    @Override
+    public void putInt(int value) {
+        ensureCapacity(position + 4);
+        buffer[position++] = (byte)(value >>> 24);
+        buffer[position++] = (byte)(value >>> 16);
+        buffer[position++] = (byte)(value >>> 8);
+        buffer[position++] = (byte)(value >>> 0);
+    }
+
+    @Override
+    public void putLong(long value) {
+        ensureCapacity(position + 8);
+        buffer[position++] = (byte)(value >>> 56);
+        buffer[position++] = (byte)(value >>> 48);
+        buffer[position++] = (byte)(value >>> 40);
+        buffer[position++] = (byte)(value >>> 32);
+        buffer[position++] = (byte)(value >>> 24);
+        buffer[position++] = (byte)(value >>> 16);
+        buffer[position++] = (byte)(value >>> 8);
+        buffer[position++] = (byte)(value >>> 0);
+    }
+
+    @Override
+    public void putFloat(float value) {
+        putInt(Float.floatToRawIntBits(value));
+    }
+
+    @Override
+    public void putDouble(double value) {
+        putLong(Double.doubleToRawLongBits(value));
+    }
+
+    @Override
+    public void put(byte[] src, int offset, int length) {
+        if (length == 0) {
+            return;
+        }
+
+        int newPosition = position + length;
+        ensureCapacity(newPosition);
+        System.arraycopy(src, offset, buffer, position, length);
+        position = newPosition;
+    }
+
+    @Override
+    public boolean hasRemaining() {
+        return position < Integer.MAX_VALUE;
+    }
+
+    @Override
+    public int remaining() {
+        return Integer.MAX_VALUE - position;
+    }
+
+    @Override
+    public int position() {
+        return position;
+    }
+
+    @Override
+    public void position(int position) {
+        ensureCapacity(position);
+        this.position = position;
+    }
+
+    @Override
+    public void put(ByteBuffer payload) {
+        int newPosition = position + payload.remaining();
+        ensureCapacity(newPosition);
+        while (payload.hasRemaining()) {
+            buffer[position++] = payload.get();
+        }
+
+        position = newPosition;
+    }
+
+    @Override
+    public int limit() {
+        return Integer.MAX_VALUE;
+    }
+
+    /**
+     * Ensures the the buffer has at least the minimumCapacity specified.
+     *
+     * @param minimumCapacity
+     *      the minimum capacity needed to meet the next write operation.
+     */
+    private void ensureCapacity(int minimumCapacity) {
+        if (minimumCapacity > buffer.length) {
+            byte newBuffer[] = new byte[Math.max(buffer.length << 1, minimumCapacity)];
+            System.arraycopy(buffer, 0, newBuffer, 0, position);
+            buffer = newBuffer;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/9cb92a22/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java
index f0f71a8..edfdecf 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java
@@ -16,33 +16,31 @@
  */
 package org.apache.activemq.transport.amqp.message;
 
-import javax.jms.BytesMessage;
-import javax.jms.Message;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_NATIVE;
 
-public class AutoOutboundTransformer extends JMSMappingOutboundTransformer {
+import javax.jms.BytesMessage;
 
-    private final JMSMappingOutboundTransformer transformer;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMessage;
 
-    public AutoOutboundTransformer(ActiveMQJMSVendor vendor) {
-        super(vendor);
+public class AutoOutboundTransformer extends JMSMappingOutboundTransformer {
 
-        transformer = new JMSMappingOutboundTransformer(vendor);
-    }
+    private final JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
 
     @Override
-    public EncodedMessage transform(Message msg) throws Exception {
-        if (msg == null) {
+    public EncodedMessage transform(ActiveMQMessage message) throws Exception {
+        if (message == null) {
             return null;
         }
 
-        if (msg.getBooleanProperty(prefixVendor + "NATIVE")) {
-            if (msg instanceof BytesMessage) {
-                return AMQPNativeOutboundTransformer.transform(this, (BytesMessage) msg);
+        if (message.getBooleanProperty(JMS_AMQP_NATIVE)) {
+            if (message instanceof BytesMessage) {
+                return AMQPNativeOutboundTransformer.transform(this, (ActiveMQBytesMessage) message);
             } else {
                 return null;
             }
         } else {
-            return transformer.transform(msg);
+            return transformer.transform(message);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/9cb92a22/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
index e6b7a0f..323a9c1 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
@@ -16,14 +16,25 @@
  */
 package org.apache.activemq.transport.amqp.message;
 
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_CONTENT_ENCODING;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_CONTENT_TYPE;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_FOOTER_PREFIX;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_HEADER;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_PROPERTIES;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID;
+
+import java.nio.charset.StandardCharsets;
 import java.util.Map;
 import java.util.Set;
 
-import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
 import javax.jms.Message;
 
 import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.transport.amqp.AmqpProtocolException;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Decimal128;
@@ -42,32 +53,17 @@ import org.apache.qpid.proton.amqp.messaging.Properties;
 
 public abstract class InboundTransformer {
 
-    protected final ActiveMQJMSVendor vendor;
-
     public static final String TRANSFORMER_NATIVE = "native";
     public static final String TRANSFORMER_RAW = "raw";
     public static final String TRANSFORMER_JMS = "jms";
 
-    protected String prefixVendor = "JMS_AMQP_";
-    protected String prefixDeliveryAnnotations = "DA_";
-    protected String prefixMessageAnnotations = "MA_";
-    protected String prefixFooter = "FT_";
-
-    protected int defaultDeliveryMode = javax.jms.DeliveryMode.NON_PERSISTENT;
-    protected int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
-    protected long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
-
-    public InboundTransformer(ActiveMQJMSVendor vendor) {
-        this.vendor = vendor;
-    }
-
     public abstract String getTransformerName();
 
     public abstract InboundTransformer getFallbackTransformer();
 
-    public final Message transform(EncodedMessage amqpMessage) throws Exception {
+    public final ActiveMQMessage transform(EncodedMessage amqpMessage) throws Exception {
         InboundTransformer transformer = this;
-        Message message = null;
+        ActiveMQMessage message = null;
 
         while (transformer != null) {
             try {
@@ -85,79 +81,40 @@ public abstract class InboundTransformer {
         return message;
     }
 
-    protected abstract Message doTransform(EncodedMessage amqpMessage) throws Exception;
-
-    public int getDefaultDeliveryMode() {
-        return defaultDeliveryMode;
-    }
-
-    public void setDefaultDeliveryMode(int defaultDeliveryMode) {
-        this.defaultDeliveryMode = defaultDeliveryMode;
-    }
-
-    public int getDefaultPriority() {
-        return defaultPriority;
-    }
-
-    public void setDefaultPriority(int defaultPriority) {
-        this.defaultPriority = defaultPriority;
-    }
-
-    public long getDefaultTtl() {
-        return defaultTtl;
-    }
-
-    public void setDefaultTtl(long defaultTtl) {
-        this.defaultTtl = defaultTtl;
-    }
-
-    public String getPrefixVendor() {
-        return prefixVendor;
-    }
-
-    public void setPrefixVendor(String prefixVendor) {
-        this.prefixVendor = prefixVendor;
-    }
-
-    public ActiveMQJMSVendor getVendor() {
-        return vendor;
-    }
+    protected abstract ActiveMQMessage doTransform(EncodedMessage amqpMessage) throws Exception;
 
     @SuppressWarnings("unchecked")
-    protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
+    protected void populateMessage(ActiveMQMessage jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
         Header header = amqp.getHeader();
-        if (header == null) {
-            header = new Header();
-        }
+        if (header != null) {
+            jms.setBooleanProperty(JMS_AMQP_HEADER, true);
 
-        if (header.getDurable() != null) {
-            jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-        } else {
-            jms.setJMSDeliveryMode(defaultDeliveryMode);
-        }
+            if (header.getDurable() != null) {
+                jms.setPersistent(header.getDurable().booleanValue());
+            }
 
-        if (header.getPriority() != null) {
-            jms.setJMSPriority(header.getPriority().intValue());
-        } else {
-            jms.setJMSPriority(defaultPriority);
-        }
+            if (header.getPriority() != null) {
+                jms.setJMSPriority(header.getPriority().intValue());
+            } else {
+                jms.setPriority((byte) Message.DEFAULT_PRIORITY);
+            }
 
-        if (header.getFirstAcquirer() != null) {
-            jms.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer());
-        }
+            if (header.getFirstAcquirer() != null) {
+                jms.setBooleanProperty(JMS_AMQP_FIRST_ACQUIRER, header.getFirstAcquirer());
+            }
 
-        if (header.getDeliveryCount() != null) {
-            vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue());
+            if (header.getDeliveryCount() != null) {
+                jms.setRedeliveryCounter(header.getDeliveryCount().intValue());
+            }
+        } else {
+            jms.setPriority((byte) Message.DEFAULT_PRIORITY);
         }
 
         final MessageAnnotations ma = amqp.getMessageAnnotations();
         if (ma != null) {
             for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) {
                 String key = entry.getKey().toString();
-                if ("x-opt-jms-type".equals(key) && entry.getValue() != null) {
-                    // Legacy annotation, JMSType value will be replaced by Subject further down if also present.
-                    jms.setJMSType(entry.getValue().toString());
-                } else if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
+                if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
                     long deliveryTime = ((Number) entry.getValue()).longValue();
                     long delay = deliveryTime - System.currentTimeMillis();
                     if (delay > 0) {
@@ -185,82 +142,72 @@ public abstract class InboundTransformer {
                     }
                 }
 
-                setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
+                setProperty(jms, JMS_AMQP_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue());
             }
         }
 
         final ApplicationProperties ap = amqp.getApplicationProperties();
         if (ap != null) {
             for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) ap.getValue().entrySet()) {
-                String key = entry.getKey().toString();
-                if ("JMSXGroupID".equals(key)) {
-                    vendor.setJMSXGroupID(jms, entry.getValue().toString());
-                } else if ("JMSXGroupSequence".equals(key)) {
-                    vendor.setJMSXGroupSequence(jms, ((Number) entry.getValue()).intValue());
-                } else if ("JMSXUserID".equals(key)) {
-                    vendor.setJMSXUserID(jms, entry.getValue().toString());
-                } else {
-                    setProperty(jms, key, entry.getValue());
-                }
+                setProperty(jms,  entry.getKey().toString(), entry.getValue());
             }
         }
 
         final Properties properties = amqp.getProperties();
         if (properties != null) {
+            jms.setBooleanProperty(JMS_AMQP_PROPERTIES, true);
             if (properties.getMessageId() != null) {
                 jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getMessageId()));
             }
             Binary userId = properties.getUserId();
             if (userId != null) {
-                vendor.setJMSXUserID(jms, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8"));
+                jms.setUserID(new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8));
             }
             if (properties.getTo() != null) {
-                jms.setJMSDestination(vendor.createDestination(properties.getTo()));
+                jms.setDestination((ActiveMQDestination.createDestination(properties.getTo(), ActiveMQDestination.QUEUE_TYPE)));
             }
             if (properties.getSubject() != null) {
-                jms.setJMSType(properties.getSubject());
+                jms.setType(properties.getSubject());
             }
             if (properties.getReplyTo() != null) {
-                jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo()));
+                jms.setReplyTo((ActiveMQDestination.createDestination(properties.getReplyTo(), ActiveMQDestination.QUEUE_TYPE)));
             }
             if (properties.getCorrelationId() != null) {
-                jms.setJMSCorrelationID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getCorrelationId()));
+                jms.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getCorrelationId()));
             }
             if (properties.getContentType() != null) {
-                jms.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString());
+                jms.setStringProperty(JMS_AMQP_CONTENT_TYPE, properties.getContentType().toString());
             }
             if (properties.getContentEncoding() != null) {
-                jms.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString());
+                jms.setStringProperty(JMS_AMQP_CONTENT_ENCODING, properties.getContentEncoding().toString());
             }
             if (properties.getCreationTime() != null) {
-                jms.setJMSTimestamp(properties.getCreationTime().getTime());
+                jms.setTimestamp(properties.getCreationTime().getTime());
             }
             if (properties.getGroupId() != null) {
-                vendor.setJMSXGroupID(jms, properties.getGroupId());
+                jms.setGroupID(properties.getGroupId());
             }
             if (properties.getGroupSequence() != null) {
-                vendor.setJMSXGroupSequence(jms, properties.getGroupSequence().intValue());
+                jms.setGroupSequence(properties.getGroupSequence().intValue());
             }
             if (properties.getReplyToGroupId() != null) {
-                jms.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId());
+                jms.setStringProperty(JMS_AMQP_REPLYTO_GROUP_ID, properties.getReplyToGroupId());
             }
             if (properties.getAbsoluteExpiryTime() != null) {
-                jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime());
+                jms.setExpiration(properties.getAbsoluteExpiryTime().getTime());
             }
         }
 
         // If the jms expiration has not yet been set...
-        if (jms.getJMSExpiration() == 0) {
+        if (header != null && jms.getJMSExpiration() == 0) {
             // Then lets try to set it based on the message ttl.
-            long ttl = defaultTtl;
+            long ttl = Message.DEFAULT_TIME_TO_LIVE;
             if (header.getTtl() != null) {
                 ttl = header.getTtl().longValue();
             }
 
-            if (ttl == 0) {
-                jms.setJMSExpiration(0);
-            } else {
-                jms.setJMSExpiration(System.currentTimeMillis() + ttl);
+            if (ttl != javax.jms.Message.DEFAULT_TIME_TO_LIVE) {
+                jms.setExpiration(System.currentTimeMillis() + ttl);
             }
         }
 
@@ -268,7 +215,7 @@ public abstract class InboundTransformer {
         if (fp != null) {
             for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) fp.getValue().entrySet()) {
                 String key = entry.getKey().toString();
-                setProperty(jms, prefixVendor + prefixFooter + key, entry.getValue());
+                setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/9cb92a22/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
index 707e5da..79e4c2c 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
@@ -18,13 +18,14 @@ package org.apache.activemq.transport.amqp.message;
 
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_DATA;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_NULL;
-import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_ORIGINAL_ENCODING_KEY;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_SEQUENCE;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_BINARY;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_LIST;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_MAP;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_NULL;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_STRING;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_MESSAGE_FORMAT;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.getCharsetForTextualContent;
@@ -37,10 +38,19 @@ import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
-import javax.jms.StreamMessage;
+import javax.jms.JMSException;
+import javax.jms.MessageNotWriteableException;
 
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.transport.amqp.AmqpProtocolException;
+import org.apache.activemq.util.ByteSequence;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
@@ -50,10 +60,6 @@ import org.apache.qpid.proton.message.Message;
 
 public class JMSMappingInboundTransformer extends InboundTransformer {
 
-    public JMSMappingInboundTransformer(ActiveMQJMSVendor vendor) {
-        super(vendor);
-    }
-
     @Override
     public String getTransformerName() {
         return TRANSFORMER_JMS;
@@ -61,55 +67,52 @@ public class JMSMappingInboundTransformer extends InboundTransformer {
 
     @Override
     public InboundTransformer getFallbackTransformer() {
-        return new AMQPNativeInboundTransformer(getVendor());
+        return new AMQPNativeInboundTransformer();
     }
 
     @Override
-    protected javax.jms.Message doTransform(EncodedMessage amqpMessage) throws Exception {
+    protected ActiveMQMessage doTransform(EncodedMessage amqpMessage) throws Exception {
         Message amqp = amqpMessage.decode();
 
-        javax.jms.Message result = createMessage(amqp, amqpMessage);
-
-        result.setJMSDeliveryMode(defaultDeliveryMode);
-        result.setJMSPriority(defaultPriority);
-        result.setJMSExpiration(defaultTtl);
+        ActiveMQMessage result = createMessage(amqp, amqpMessage);
 
         populateMessage(result, amqp);
 
-        result.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
-        result.setBooleanProperty(prefixVendor + "NATIVE", false);
+        if (amqpMessage.getMessageFormat() != 0) {
+            result.setLongProperty(JMS_AMQP_MESSAGE_FORMAT, amqpMessage.getMessageFormat());
+        }
 
         return result;
     }
 
     @SuppressWarnings({ "unchecked" })
-    private javax.jms.Message createMessage(Message message, EncodedMessage original) throws Exception {
+    private ActiveMQMessage createMessage(Message message, EncodedMessage original) throws Exception {
 
         Section body = message.getBody();
-        javax.jms.Message result;
+        ActiveMQMessage result;
 
         if (body == null) {
             if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) {
-                result = vendor.createObjectMessage();
+                result = new ActiveMQObjectMessage();
             } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message) || isContentType(null, message)) {
-                result = vendor.createBytesMessage();
+                result = new ActiveMQBytesMessage();
             } else {
                 Charset charset = getCharsetForTextualContent(message.getContentType());
                 if (charset != null) {
-                    result = vendor.createTextMessage();
+                    result = new ActiveMQTextMessage();
                 } else {
-                    result = vendor.createMessage();
+                    result = new ActiveMQMessage();
                 }
             }
 
-            result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_NULL);
+            result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL);
         } else if (body instanceof Data) {
             Binary payload = ((Data) body).getValue();
 
             if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) {
-                result = vendor.createObjectMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+                result = createObjectMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
             } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message)) {
-                result = vendor.createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+                result = createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
             } else {
                 Charset charset = getCharsetForTextualContent(message.getContentType());
                 if (StandardCharsets.UTF_8.equals(charset)) {
@@ -117,51 +120,51 @@ public class JMSMappingInboundTransformer extends InboundTransformer {
 
                     try {
                         CharBuffer chars = charset.newDecoder().decode(buf);
-                        result = vendor.createTextMessage(String.valueOf(chars));
+                        result = createTextMessage(String.valueOf(chars));
                     } catch (CharacterCodingException e) {
-                        result = vendor.createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+                        result = createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
                     }
                 } else {
-                    result = vendor.createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+                    result = createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
                 }
             }
 
-            result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_DATA);
+            result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
         } else if (body instanceof AmqpSequence) {
             AmqpSequence sequence = (AmqpSequence) body;
-            StreamMessage m = vendor.createStreamMessage();
+            ActiveMQStreamMessage m = new ActiveMQStreamMessage();
             for (Object item : sequence.getValue()) {
                 m.writeObject(item);
             }
 
             result = m;
-            result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_SEQUENCE);
+            result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE);
         } else if (body instanceof AmqpValue) {
             Object value = ((AmqpValue) body).getValue();
             if (value == null || value instanceof String) {
-                result = vendor.createTextMessage((String) value);
+                result = createTextMessage((String) value);
 
-                result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, value == null ? AMQP_VALUE_NULL : AMQP_VALUE_STRING);
+                result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, value == null ? AMQP_VALUE_NULL : AMQP_VALUE_STRING);
             } else if (value instanceof Binary) {
                 Binary payload = (Binary) value;
 
                 if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) {
-                    result = vendor.createObjectMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+                    result = createObjectMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
                 } else {
-                    result = vendor.createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+                    result = createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
                 }
 
-                result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY);
+                result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
             } else if (value instanceof List) {
-                StreamMessage m = vendor.createStreamMessage();
+                ActiveMQStreamMessage m = new ActiveMQStreamMessage();
                 for (Object item : (List<Object>) value) {
                     m.writeObject(item);
                 }
                 result = m;
-                result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_LIST);
+                result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_LIST);
             } else if (value instanceof Map) {
-                result = vendor.createMapMessage((Map<String, Object>) value);
-                result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_MAP);
+                result = createMapMessage((Map<String, Object>) value);
+                result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_MAP);
             } else {
                 // Trigger fall-back to native encoder which generates BytesMessage with the
                 // original message stored in the message body.
@@ -173,4 +176,34 @@ public class JMSMappingInboundTransformer extends InboundTransformer {
 
         return result;
     }
+
+    private static ActiveMQBytesMessage createBytesMessage(byte[] content, int offset, int length) {
+        ActiveMQBytesMessage message = new ActiveMQBytesMessage();
+        message.setContent(new ByteSequence(content, offset, length));
+        return message;
+    }
+
+    public static ActiveMQTextMessage createTextMessage(String text) {
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        try {
+            message.setText(text);
+        } catch (MessageNotWriteableException ex) {}
+
+        return message;
+    }
+
+    public static ActiveMQObjectMessage createObjectMessage(byte[] content, int offset, int length) {
+        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
+        message.setContent(new ByteSequence(content, offset, length));
+        return message;
+    }
+
+    public static ActiveMQMapMessage createMapMessage(Map<String, Object> content) throws JMSException {
+        ActiveMQMapMessage message = new ActiveMQMapMessage();
+        final Set<Map.Entry<String, Object>> set = content.entrySet();
+        for (Map.Entry<String, Object> entry : set) {
+            message.setObject(entry.getKey(), entry.getValue());
+        }
+        return message;
+    }
 }


[8/8] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6444

Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-6444

Ensure that unsettled TX messages remain acquired and not redelivered to
the receiver.   Add several tests that demonstrate that a received
message can be released, rejected, accepted or modified after a TX
rollback if it was not settled.
(cherry picked from commit 0dd806f43f3bee9372ee9b9481089d417c265dfe)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/aa32a0f7
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/aa32a0f7
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/aa32a0f7

Branch: refs/heads/activemq-5.14.x
Commit: aa32a0f7925c4981aca9a4369b5e95f3336cde94
Parents: aebb365
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Sep 28 14:56:36 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Sep 28 15:08:49 2016 -0400

----------------------------------------------------------------------
 .../amqp/protocol/AmqpAbstractReceiver.java     |   5 +-
 .../transport/amqp/protocol/AmqpLink.java       |  11 +-
 .../transport/amqp/protocol/AmqpSender.java     |  64 +++----
 .../transport/amqp/protocol/AmqpSession.java    |  15 +-
 .../protocol/AmqpTransactionCoordinator.java    |  10 +-
 .../amqp/interop/AmqpTransactionTest.java       | 178 +++++++++++++++++--
 .../JMSMappingOutboundTransformerTest.java      |  10 +-
 7 files changed, 232 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/aa32a0f7/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
index 7ed2f92..9ed465a 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.transport.amqp.protocol;
 
+import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.transport.amqp.AmqpProtocolException;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
@@ -78,11 +79,11 @@ public abstract class AmqpAbstractReceiver extends AmqpAbstractLink<Receiver> {
     }
 
     @Override
-    public void commit() throws Exception {
+    public void commit(LocalTransactionId txnId) throws Exception {
     }
 
     @Override
-    public void rollback() throws Exception {
+    public void rollback(LocalTransactionId txnId) throws Exception {
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/aa32a0f7/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpLink.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpLink.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpLink.java
index d245769..0c75f48 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpLink.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpLink.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport.amqp.protocol;
 
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.LocalTransactionId;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.engine.Delivery;
 
@@ -60,17 +61,23 @@ public interface AmqpLink extends AmqpResource {
      * Handle work necessary on commit of transacted resources associated with
      * this Link instance.
      *
+     * @param txnId
+     *      The Transaction ID being committed.
+     *
      * @throws Exception if an error occurs while performing the commit.
      */
-    void commit() throws Exception;
+    void commit(LocalTransactionId txnId) throws Exception;
 
     /**
      * Handle work necessary on rollback of transacted resources associated with
      * this Link instance.
      *
+     * @param txnId
+     *      The Transaction ID being rolled back.
+     *
      * @throws Exception if an error occurs while performing the rollback.
      */
-    void rollback() throws Exception;
+    void rollback(LocalTransactionId txnId) throws Exception;
 
     /**
      * @return the ActiveMQDestination that this link is servicing.

http://git-wip-us.apache.org/repos/asf/activemq/blob/aa32a0f7/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index 2531c1a..149b2e8 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -78,7 +78,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
     private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer();
     private final AmqpTransferTagGenerator tagCache = new AmqpTransferTagGenerator();
     private final LinkedList<MessageDispatch> outbound = new LinkedList<MessageDispatch>();
-    private final LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>();
+    private final LinkedList<Delivery> dispatchedInTx = new LinkedList<Delivery>();
 
     private final ConsumerInfo consumerInfo;
     private AbstractSubscription subscription;
@@ -208,26 +208,26 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
         } else if (endpointCredit >= 0) {
 
             if (endpointCredit == 0 && currentCreditRequest != 0) {
-
                 prefetchExtension.set(0);
                 currentCreditRequest = 0;
                 logicalDeliveryCount = 0;
                 LOG.trace("Flow: credit 0 for sub:" + subscription);
-
             } else {
-
                 int deltaToAdd = endpointCredit;
                 int logicalCredit = currentCreditRequest - logicalDeliveryCount;
                 if (logicalCredit > 0) {
                     deltaToAdd -= logicalCredit;
                 } else {
-                    // reset delivery counter - dispatch from broker concurrent with credit=0 flow can go negative
+                    // reset delivery counter - dispatch from broker concurrent with credit=0
+                    // flow can go negative
                     logicalDeliveryCount = 0;
                 }
+
                 if (deltaToAdd > 0) {
                     currentCreditRequest = prefetchExtension.addAndGet(deltaToAdd);
                     subscription.wakeupDestinationsForDispatch();
-                    // force dispatch of matched/pending for topics (pending messages accumulate in the sub and are dispatched on update of prefetch)
+                    // force dispatch of matched/pending for topics (pending messages accumulate
+                    // in the sub and are dispatched on update of prefetch)
                     subscription.setPrefetchSize(0);
                     LOG.trace("Flow: credit addition of {} for sub {}", deltaToAdd, subscription);
                 }
@@ -246,14 +246,20 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
             if (txState.getOutcome() != null) {
                 Outcome outcome = txState.getOutcome();
                 if (outcome instanceof Accepted) {
+                    TransactionId txId = new LocalTransactionId(session.getConnection().getConnectionId(), toLong(txState.getTxnId()));
+
+                    // Store the message sent in this TX we might need to re-send on rollback
+                    // and we need to ACK it on commit.
+                    session.enlist(txId);
+                    dispatchedInTx.addFirst(delivery);
+
                     if (!delivery.remotelySettled()) {
                         TransactionalState txAccepted = new TransactionalState();
                         txAccepted.setOutcome(Accepted.getInstance());
-                        txAccepted.setTxnId(((TransactionalState) state).getTxnId());
+                        txAccepted.setTxnId(txState.getTxnId());
 
                         delivery.disposition(txAccepted);
                     }
-                    settle(delivery, MessageAck.DELIVERED_ACK_TYPE);
                 }
             }
         } else {
@@ -294,12 +300,14 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
     }
 
     @Override
-    public void commit() throws Exception {
+    public void commit(LocalTransactionId txnId) throws Exception {
         if (!dispatchedInTx.isEmpty()) {
-            for (MessageDispatch md : dispatchedInTx) {
-                MessageAck pendingTxAck = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
-                pendingTxAck.setFirstMessageId(md.getMessage().getMessageId());
-                pendingTxAck.setTransactionId(md.getMessage().getTransactionId());
+            for (final Delivery delivery : dispatchedInTx) {
+                MessageDispatch dispatch = (MessageDispatch) delivery.getContext();
+
+                MessageAck pendingTxAck = new MessageAck(dispatch, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
+                pendingTxAck.setFirstMessageId(dispatch.getMessage().getMessageId());
+                pendingTxAck.setTransactionId(txnId);
 
                 LOG.trace("Sending commit Ack to ActiveMQ: {}", pendingTxAck);
 
@@ -310,6 +318,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
                             Throwable exception = ((ExceptionResponse) response).getException();
                             exception.printStackTrace();
                             getEndpoint().close();
+                        } else {
+                            delivery.settle();
                         }
                         session.pumpProtonToSocket();
                     }
@@ -321,15 +331,22 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
     }
 
     @Override
-    public void rollback() throws Exception {
+    public void rollback(LocalTransactionId txnId) throws Exception {
         synchronized (outbound) {
 
             LOG.trace("Rolling back {} messages for redelivery. ", dispatchedInTx.size());
 
-            for (MessageDispatch dispatch : dispatchedInTx) {
-                dispatch.setRedeliveryCounter(dispatch.getRedeliveryCounter() + 1);
+            for (Delivery delivery : dispatchedInTx) {
+                // Only settled deliveries should be re-dispatched, unsettled deliveries
+                // remain acquired on the remote end and can be accepted again in a new
+                // TX or released or rejected etc.
+                MessageDispatch dispatch = (MessageDispatch) delivery.getContext();
                 dispatch.getMessage().setTransactionId(null);
-                outbound.addFirst(dispatch);
+
+                if (delivery.remotelySettled()) {
+                    dispatch.setRedeliveryCounter(dispatch.getRedeliveryCounter() + 1);
+                    outbound.addFirst(dispatch);
+                }
             }
 
             dispatchedInTx.clear();
@@ -507,19 +524,6 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
             ack.setMessageCount(1);
             ack.setAckType((byte) ackType);
             ack.setDestination(md.getDestination());
-
-            DeliveryState remoteState = delivery.getRemoteState();
-            if (remoteState != null && remoteState instanceof TransactionalState) {
-                TransactionalState txState = (TransactionalState) remoteState;
-                TransactionId txId = new LocalTransactionId(session.getConnection().getConnectionId(), toLong(txState.getTxnId()));
-                ack.setTransactionId(txId);
-
-                // Store the message sent in this TX we might need to re-send on rollback
-                session.enlist(txId);
-                md.getMessage().setTransactionId(txId);
-                dispatchedInTx.addFirst(md);
-            }
-
             LOG.trace("Sending Ack to ActiveMQ: {}", ack);
 
             sendToActiveMQ(ack, new ResponseHandler() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/aa32a0f7/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
index 4cb5f37..1c91962 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
@@ -35,6 +35,7 @@ import org.apache.activemq.command.ActiveMQTempDestination;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.RemoveInfo;
@@ -123,11 +124,14 @@ public class AmqpSession implements AmqpResource {
     /**
      * Commits all pending work for all resources managed under this session.
      *
+     * @param txId
+     *      The specific TransactionId that is being committed.
+     *
      * @throws Exception if an error occurs while attempting to commit work.
      */
-    public void commit() throws Exception {
+    public void commit(LocalTransactionId txId) throws Exception {
         for (AmqpSender consumer : consumers.values()) {
-            consumer.commit();
+            consumer.commit(txId);
         }
 
         enlisted = false;
@@ -136,11 +140,14 @@ public class AmqpSession implements AmqpResource {
     /**
      * Rolls back any pending work being down under this session.
      *
+     * @param txId
+     *      The specific TransactionId that is being rolled back.
+     *
      * @throws Exception if an error occurs while attempting to roll back work.
      */
-    public void rollback() throws Exception {
+    public void rollback(LocalTransactionId txId) throws Exception {
         for (AmqpSender consumer : consumers.values()) {
-            consumer.rollback();
+            consumer.rollback(txId);
         }
 
         enlisted = false;

http://git-wip-us.apache.org/repos/asf/activemq/blob/aa32a0f7/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
index 40bcda5..95cd5e3 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
@@ -98,7 +98,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver {
             TransactionInfo txInfo = new TransactionInfo(connectionId, txId, TransactionInfo.BEGIN);
             session.getConnection().registerTransaction(txId, this);
             sendToActiveMQ(txInfo, null);
-            LOG.trace("started transaction {}", txId.getValue());
+            LOG.trace("started transaction {}", txId);
 
             Declared declared = new Declared();
             declared.setTxnId(new Binary(toBytes(txId.getValue())));
@@ -110,18 +110,18 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver {
             final byte operation;
 
             if (discharge.getFail()) {
-                LOG.trace("rollback transaction {}", txId.getValue());
+                LOG.trace("rollback transaction {}", txId);
                 operation = TransactionInfo.ROLLBACK;
             } else {
-                LOG.trace("commit transaction {}", txId.getValue());
+                LOG.trace("commit transaction {}", txId);
                 operation = TransactionInfo.COMMIT_ONE_PHASE;
             }
 
             for (AmqpSession txSession : txSessions) {
                 if (operation == TransactionInfo.ROLLBACK) {
-                    txSession.rollback();
+                    txSession.rollback(txId);
                 } else {
-                    txSession.commit();
+                    txSession.commit(txId);
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/aa32a0f7/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
index 0815f8a..f61cbc3 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
@@ -32,7 +32,11 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
-import org.junit.Ignore;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Outcome;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Released;
 import org.junit.Test;
 
 /**
@@ -89,7 +93,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
 
         assertEquals(1, queue.getQueueSize());
 
-        sender.close();
         connection.close();
     }
 
@@ -114,7 +117,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
 
         assertEquals(0, queue.getQueueSize());
 
-        sender.close();
         connection.close();
     }
 
@@ -146,7 +148,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
 
         assertEquals(0, queue.getQueueSize());
 
-        sender.close();
         connection.close();
     }
 
@@ -194,7 +195,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
         connection.close();
     }
 
-
     @Test(timeout = 60000)
     public void testReceiveMessageWithRollback() throws Exception {
         AmqpClient client = createAmqpClient();
@@ -223,7 +223,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
 
         assertEquals(1, queue.getQueueSize());
 
-        sender.close();
         connection.close();
     }
 
@@ -421,6 +420,163 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
         assertEquals(0, queue.getQueueSize());
     }
 
+    @Test(timeout = 60000)
+    public void testAcceptedButNotSettledInTXRemainsAquiredCanBeAccepted() throws Exception {
+        doTestAcceptedButNotSettledInTXRemainsAquired(Accepted.getInstance());
+    }
+
+    @Test(timeout = 60000)
+    public void testAcceptedButNotSettledInTXRemainsAquiredCanBeReleased() throws Exception {
+        doTestAcceptedButNotSettledInTXRemainsAquired(Released.getInstance());
+    }
+
+    @Test(timeout = 60000)
+    public void testAcceptedButNotSettledInTXRemainsAquiredCanBeRejected() throws Exception {
+        doTestAcceptedButNotSettledInTXRemainsAquired(new Rejected());
+    }
+
+    @Test(timeout = 60000)
+    public void testAcceptedButNotSettledInTXRemainsAquiredCanBeModifiedAsFailed() throws Exception {
+        Modified outcome = new Modified();
+        outcome.setDeliveryFailed(true);
+        doTestAcceptedButNotSettledInTXRemainsAquired(outcome);
+    }
+
+    @Test(timeout = 60000)
+    public void testAcceptedButNotSettledInTXRemainsAquiredCanBeModifiedAsUndeliverable() throws Exception {
+        Modified outcome = new Modified();
+        outcome.setDeliveryFailed(true);
+        outcome.setUndeliverableHere(true);
+        doTestAcceptedButNotSettledInTXRemainsAquired(outcome);
+    }
+
+    private void doTestAcceptedButNotSettledInTXRemainsAquired(Outcome outcome) throws Exception {
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+
+        final QueueViewMBean queue = getProxyToQueue(getTestName());
+
+        AmqpMessage message = new AmqpMessage();
+        message.setText("Test-Message");
+        sender.send(message);
+
+        session.begin();
+
+        receiver.flow(10);
+        AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+        assertNotNull(received);
+        received.accept(false);
+
+        session.rollback();
+
+        // Message should remain acquired an not be redelivered.
+        assertEquals(1, queue.getQueueSize());
+        assertNull(receiver.receive(2, TimeUnit.SECONDS));
+
+        if (outcome instanceof Released || outcome instanceof Rejected) {
+            // Receiver should be able to release the still acquired message and the
+            // broker should redispatch it to the client again.
+            received.release();
+            received = receiver.receive(3, TimeUnit.SECONDS);
+            assertNotNull(received);
+            received.accept();
+            received = receiver.receive(2, TimeUnit.SECONDS);
+            assertNull(received);
+            assertEquals(0, queue.getQueueSize());
+        } else if (outcome instanceof Accepted) {
+            // Receiver should be able to accept the still acquired message and the
+            // broker should then mark it as consumed.
+            received.accept();
+            received = receiver.receive(2, TimeUnit.SECONDS);
+            assertNull(received);
+            assertEquals(0, queue.getQueueSize());
+        } else if (outcome instanceof Modified) {
+            // Depending on the undeliverable here state the message will either be
+            // redelivered or DLQ'd
+            Modified modified = (Modified) outcome;
+            received.modified(Boolean.TRUE.equals(modified.getDeliveryFailed()), Boolean.TRUE.equals(modified.getUndeliverableHere()));
+            if (Boolean.TRUE.equals(modified.getUndeliverableHere())) {
+                received = receiver.receive(2, TimeUnit.SECONDS);
+                assertNull(received);
+                assertEquals(0, queue.getQueueSize());
+            } else {
+                received = receiver.receive(3, TimeUnit.SECONDS);
+                assertNotNull(received);
+                received.accept();
+                received = receiver.receive(2, TimeUnit.SECONDS);
+                assertNull(received);
+                assertEquals(0, queue.getQueueSize());
+            }
+        }
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testTransactionallyAcquiredMessageCanBeTransactionallyConsumed() throws Exception {
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+
+        final QueueViewMBean queue = getProxyToQueue(getTestName());
+
+        AmqpMessage message = new AmqpMessage();
+        message.setText("Test-Message");
+        sender.send(message);
+
+        session.begin();
+
+        receiver.flow(10);
+        AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+        assertNotNull(received);
+        received.accept(false);
+
+        session.rollback();
+
+        // Message should remain acquired an not be redelivered.
+        assertEquals(1, queue.getQueueSize());
+        assertNull(receiver.receive(1, TimeUnit.SECONDS));
+
+        // Consume under TX but settle this time
+        session.begin();
+        received.accept(false);
+        session.rollback();
+
+        // Should still be acquired
+        assertEquals(1, queue.getQueueSize());
+        assertNull(receiver.receive(1, TimeUnit.SECONDS));
+
+        // Consume under TX and settle but rollback, message should be redelivered.
+        session.begin();
+        received.accept();
+        session.rollback();
+
+        assertEquals(1, queue.getQueueSize());
+        received = receiver.receive(1, TimeUnit.SECONDS);
+        assertNotNull(received);
+
+        // Consume under TX and commit it this time.
+        session.begin();
+        received.accept(false);
+        session.commit();
+
+        // Check that it is now consumed and no more message available
+        assertTrue(received.getWrappedDelivery().remotelySettled());
+        assertEquals(0, queue.getQueueSize());
+        assertNull(receiver.receive(1, TimeUnit.SECONDS));
+
+        connection.close();
+    }
+
     //----- Tests Ported from AmqpNetLite client -----------------------------//
 
     @Test(timeout = 60000)
@@ -621,9 +777,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
         connection.close();
     }
 
-    // TODO - Direct ports of the AmqpNetLite client tests that don't currently with this broker.
-
-    @Ignore("Fails due to no support for TX enrollment without settlement.")
     @Test(timeout = 60000)
     public void testReceiversCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement() throws Exception {
         final int NUM_MESSAGES = 10;
@@ -701,7 +854,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
         connection.close();
     }
 
-    @Ignore("Fails due to no support for TX enrollment without settlement.")
     @Test(timeout = 60000)
     public void testCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement() throws Exception {
         final int NUM_MESSAGES = 10;
@@ -756,12 +908,12 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
 
         message2.release();
 
-        // Should be two message available for dispatch given that we sent and committed one, and
+        // Should be ten message available for dispatch given that we sent and committed one, and
         // releases another we had previously received.
-        receiver.flow(2);
+        receiver.flow(10);
         for (int i = 1; i <= NUM_MESSAGES; ++i) {
             AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
-            assertNotNull(message);
+            assertNotNull("Expected a message for: " + i, message);
             assertEquals(i, message.getApplicationProperty("msgId"));
             message.accept();
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/aa32a0f7/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
index d0d31cc..ee69650 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
@@ -480,7 +480,7 @@ public class JMSMappingOutboundTransformerTest {
 
     @Test
     public void testConvertObjectMessageToAmqpMessageWithDataBody() throws Exception {
-        ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID());
+        ActiveMQObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE);
         outbound.onSend();
         outbound.storeContent();
 
@@ -502,7 +502,7 @@ public class JMSMappingOutboundTransformerTest {
 
     @Test
     public void testConvertObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception {
-        ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID());
+        ActiveMQObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE);
         outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
         outbound.onSend();
         outbound.storeContent();
@@ -525,7 +525,7 @@ public class JMSMappingOutboundTransformerTest {
 
     @Test
     public void testConvertObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception {
-        ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID());
+        ActiveMQObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE);
         outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
         outbound.onSend();
         outbound.storeContent();
@@ -571,7 +571,7 @@ public class JMSMappingOutboundTransformerTest {
 
     @Test
     public void testConvertCompressedObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception {
-        ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID(), true);
+        ActiveMQObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true);
         outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
         outbound.onSend();
         outbound.storeContent();
@@ -594,7 +594,7 @@ public class JMSMappingOutboundTransformerTest {
 
     @Test
     public void testConvertCompressedObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception {
-        ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID(), true);
+        ActiveMQObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true);
         outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
         outbound.onSend();
         outbound.storeContent();