You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/09/26 22:45:16 UTC

[2/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6438

http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
index 59c306f..985f4f5 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
@@ -18,40 +18,63 @@ package org.apache.activemq.transport.amqp.message;
 
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_DATA;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_NULL;
-import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_ORIGINAL_ENCODING_KEY;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_SEQUENCE;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_UNKNOWN;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_BINARY;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_LIST;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_STRING;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.CONTENT_ENCODING;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.CONTENT_TYPE;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.DELIVERY_ANNOTATION_PREFIX;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.EMPTY_BINARY;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.FIRST_ACQUIRER;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.FOOTER_PREFIX;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.HEADER;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_CONTENT_TYPE;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_DELIVERY_ANNOTATION_PREFIX;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_FOOTER_PREFIX;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_PREFIX;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_PREFIX_LENGTH;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.MESSAGE_ANNOTATION_PREFIX;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.MESSAGE_FORMAT;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.NATIVE;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.ORIGINAL_ENCODING;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.PROPERTIES;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.REPLYTO_GROUP_ID;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.getBinaryFromMessageBody;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.getMapFromMessageBody;
 
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Date;
-import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Map;
 
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
-import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.MessageEOFException;
-import javax.jms.MessageFormatException;
-import javax.jms.ObjectMessage;
 import javax.jms.Queue;
-import javax.jms.StreamMessage;
 import javax.jms.TemporaryQueue;
 import javax.jms.TemporaryTopic;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.transport.amqp.AmqpProtocolException;
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.activemq.util.TypeConversionSupport;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedByte;
@@ -66,12 +89,12 @@ import org.apache.qpid.proton.amqp.messaging.Header;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Properties;
 import org.apache.qpid.proton.amqp.messaging.Section;
-import org.apache.qpid.proton.codec.CompositeWritableBuffer;
-import org.apache.qpid.proton.codec.DroppingWritableBuffer;
-import org.apache.qpid.proton.codec.WritableBuffer;
-import org.apache.qpid.proton.message.ProtonJMessage;
+import org.apache.qpid.proton.codec.AMQPDefinedTypes;
+import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.fusesource.hawtbuf.UTF8Buffer;
 
-public class JMSMappingOutboundTransformer extends OutboundTransformer {
+public class JMSMappingOutboundTransformer implements OutboundTransformer {
 
     public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-dest");
     public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-reply-to");
@@ -81,225 +104,276 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
     public static final byte TEMP_QUEUE_TYPE = 0x02;
     public static final byte TEMP_TOPIC_TYPE = 0x03;
 
-    // Deprecated legacy values used by old QPid AMQP 1.0 JMS client.
-
-    public static final Symbol LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-to-type");
-    public static final Symbol LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-reply-type");
-
-    public static final String LEGACY_QUEUE_TYPE = "queue";
-    public static final String LEGACY_TOPIC_TYPE = "topic";
-    public static final String LEGACY_TEMP_QUEUE_TYPE = "temporary,queue";
-    public static final String LEGACY_TEMP_TOPIC_TYPE = "temporary,topic";
-
-    public JMSMappingOutboundTransformer(ActiveMQJMSVendor vendor) {
-        super(vendor);
+    // For now Proton requires that we create a decoder to create an encoder
+    private final DecoderImpl decoder = new DecoderImpl();
+    private final EncoderImpl encoder = new EncoderImpl(decoder);
+    {
+        AMQPDefinedTypes.registerAllTypes(decoder, encoder);
     }
 
     @Override
-    public EncodedMessage transform(Message msg) throws Exception {
-        if (msg == null) {
-            return null;
-        }
-
-        try {
-            if (msg.getBooleanProperty(prefixVendor + "NATIVE")) {
-                return null;
-            }
-        } catch (MessageFormatException e) {
+    public EncodedMessage transform(ActiveMQMessage message) throws Exception {
+        if (message == null) {
             return null;
         }
-        ProtonJMessage amqp = convert(msg);
-
-        long messageFormat;
-        try {
-            messageFormat = msg.getLongProperty(this.messageFormatKey);
-        } catch (MessageFormatException e) {
-            return null;
-        }
-
-        ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]);
-        final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
-        int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
-        if (overflow.position() > 0) {
-            buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]);
-            c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
-        }
-
-        return new EncodedMessage(messageFormat, buffer.array(), 0, c);
-    }
-
-    /**
-     * Perform the conversion between JMS Message and Proton Message without
-     * re-encoding it to array. This is needed because some frameworks may elect
-     * to do this on their own way.
-     *
-     * @param message
-     *      The message to transform into an AMQP version for dispatch.
-     *
-     * @return an AMQP Message that represents the given JMS Message.
-     *
-     * @throws Exception if an error occurs during the conversion.
-     */
-    public ProtonJMessage convert(Message message) throws JMSException, UnsupportedEncodingException {
-        Header header = new Header();
-        Properties props = new Properties();
 
+        long messageFormat = 0;
+        Header header = null;
+        Properties properties = null;
         Map<Symbol, Object> daMap = null;
         Map<Symbol, Object> maMap = null;
         Map<String,Object> apMap = null;
         Map<Object, Object> footerMap = null;
-        Section body = null;
 
-        body = convertBody(message);
+        Section body = convertBody(message);
 
-        header.setDurable(message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false);
-        header.setPriority(new UnsignedByte((byte) message.getJMSPriority()));
-        if (message.getJMSType() != null) {
-            props.setSubject(message.getJMSType());
+        if (message.isPersistent()) {
+            if (header == null) {
+                header = new Header();
+            }
+            header.setDurable(true);
         }
-        if (message.getJMSMessageID() != null) {
-            props.setMessageId(vendor.getOriginalMessageId(message));
+        byte priority = message.getPriority();
+        if (priority != Message.DEFAULT_PRIORITY) {
+            if (header == null) {
+                header = new Header();
+            }
+            header.setPriority(new UnsignedByte(priority));
+        }
+        String type = message.getType();
+        if (type != null) {
+            if (properties == null) {
+                properties = new Properties();
+            }
+            properties.setSubject(type);
+        }
+        MessageId messageId = message.getMessageId();
+        if (messageId != null) {
+            if (properties == null) {
+                properties = new Properties();
+            }
+            properties.setMessageId(getOriginalMessageId(message));
         }
-        if (message.getJMSDestination() != null) {
-            props.setTo(vendor.toAddress(message.getJMSDestination()));
+        ActiveMQDestination destination = message.getDestination();
+        if (destination != null) {
+            if (properties == null) {
+                properties = new Properties();
+            }
+            properties.setTo(destination.getQualifiedName());
             if (maMap == null) {
                 maMap = new HashMap<Symbol, Object>();
             }
-            maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(message.getJMSDestination()));
-
-            // Deprecated: used by legacy QPid AMQP 1.0 JMS client
-            maMap.put(LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION, destinationAttributes(message.getJMSDestination()));
+            maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(destination));
         }
-        if (message.getJMSReplyTo() != null) {
-            props.setReplyTo(vendor.toAddress(message.getJMSReplyTo()));
+        ActiveMQDestination replyTo = message.getReplyTo();
+        if (replyTo != null) {
+            if (properties == null) {
+                properties = new Properties();
+            }
+            properties.setReplyTo(replyTo.getQualifiedName());
             if (maMap == null) {
                 maMap = new HashMap<Symbol, Object>();
             }
-            maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(message.getJMSReplyTo()));
-
-            // Deprecated: used by legacy QPid AMQP 1.0 JMS client
-            maMap.put(LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationAttributes(message.getJMSReplyTo()));
+            maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo));
         }
-        if (message.getJMSCorrelationID() != null) {
-            String correlationId = message.getJMSCorrelationID();
+        String correlationId = message.getCorrelationId();
+        if (correlationId != null) {
+            if (properties == null) {
+                properties = new Properties();
+            }
             try {
-                props.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId));
+                properties.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId));
             } catch (AmqpProtocolException e) {
-                props.setCorrelationId(correlationId);
+                properties.setCorrelationId(correlationId);
             }
         }
-        if (message.getJMSExpiration() != 0) {
-            long ttl = message.getJMSExpiration() - System.currentTimeMillis();
+        long expiration = message.getExpiration();
+        if (expiration != 0) {
+            long ttl = expiration - System.currentTimeMillis();
             if (ttl < 0) {
                 ttl = 1;
             }
+
+            if (header == null) {
+                header = new Header();
+            }
             header.setTtl(new UnsignedInteger((int) ttl));
 
-            props.setAbsoluteExpiryTime(new Date(message.getJMSExpiration()));
+            if (properties == null) {
+                properties = new Properties();
+            }
+            properties.setAbsoluteExpiryTime(new Date(expiration));
         }
-        if (message.getJMSTimestamp() != 0) {
-            props.setCreationTime(new Date(message.getJMSTimestamp()));
+        long timeStamp = message.getTimestamp();
+        if (timeStamp != 0) {
+            if (properties == null) {
+                properties = new Properties();
+            }
+            properties.setCreationTime(new Date(timeStamp));
         }
 
-        @SuppressWarnings("unchecked")
-        final Enumeration<String> keys = message.getPropertyNames();
-
-        while (keys.hasMoreElements()) {
-            String key = keys.nextElement();
-            if (key.equals(messageFormatKey) || key.equals(nativeKey) || key.equals(AMQP_ORIGINAL_ENCODING_KEY)) {
-                // skip transformer appended properties
-            } else if (key.equals(firstAcquirerKey)) {
-                header.setFirstAcquirer(message.getBooleanProperty(key));
-            } else if (key.startsWith("JMSXDeliveryCount")) {
-                // The AMQP delivery-count field only includes prior failed delivery attempts,
-                // whereas JMSXDeliveryCount includes the first/current delivery attempt.
-                int amqpDeliveryCount = message.getIntProperty(key) - 1;
-                if (amqpDeliveryCount > 0) {
-                    header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount));
-                }
-            } else if (key.startsWith("JMSXUserID")) {
-                String value = message.getStringProperty(key);
-                props.setUserId(new Binary(value.getBytes("UTF-8")));
-            } else if (key.startsWith("JMSXGroupID")) {
-                String value = message.getStringProperty(key);
-                props.setGroupId(value);
-                if (apMap == null) {
-                    apMap = new HashMap<String, Object>();
-                }
-                apMap.put(key, value);
-            } else if (key.startsWith("JMSXGroupSeq")) {
-                UnsignedInteger value = new UnsignedInteger(message.getIntProperty(key));
-                props.setGroupSequence(value);
-                if (apMap == null) {
-                    apMap = new HashMap<String, Object>();
-                }
-                apMap.put(key, value);
-            } else if (key.startsWith(prefixDeliveryAnnotationsKey)) {
-                if (daMap == null) {
-                    daMap = new HashMap<Symbol, Object>();
-                }
-                String name = key.substring(prefixDeliveryAnnotationsKey.length());
-                daMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
-            } else if (key.startsWith(prefixMessageAnnotationsKey)) {
-                if (maMap == null) {
-                    maMap = new HashMap<Symbol, Object>();
-                }
-                String name = key.substring(prefixMessageAnnotationsKey.length());
-                maMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
-            } else if (key.equals(contentTypeKey)) {
-                props.setContentType(Symbol.getSymbol(message.getStringProperty(key)));
-            } else if (key.equals(contentEncodingKey)) {
-                props.setContentEncoding(Symbol.getSymbol(message.getStringProperty(key)));
-            } else if (key.equals(replyToGroupIDKey)) {
-                props.setReplyToGroupId(message.getStringProperty(key));
-            } else if (key.startsWith(prefixFooterKey)) {
-                if (footerMap == null) {
-                    footerMap = new HashMap<Object, Object>();
-                }
-                String name = key.substring(prefixFooterKey.length());
-                footerMap.put(name, message.getObjectProperty(key));
-            } else {
-                if (apMap == null) {
-                    apMap = new HashMap<String, Object>();
+        // JMSX Message Properties
+        int deliveryCount = message.getRedeliveryCounter();
+        if (deliveryCount > 0) {
+            if (header == null) {
+                header = new Header();
+            }
+            header.setDeliveryCount(new UnsignedInteger(deliveryCount));
+        }
+        String userId = message.getUserID();
+        if (userId != null) {
+            if (properties == null) {
+                properties = new Properties();
+            }
+            properties.setUserId(new Binary(userId.getBytes(StandardCharsets.UTF_8)));
+        }
+        String groupId = message.getGroupID();
+        if (groupId != null) {
+            if (properties == null) {
+                properties = new Properties();
+            }
+            properties.setGroupId(groupId);
+        }
+        int groupSequence = message.getGroupSequence();
+        if (groupSequence > 0) {
+            UnsignedInteger value = new UnsignedInteger(groupSequence);
+            if (properties == null) {
+                properties = new Properties();
+            }
+            properties.setGroupSequence(value);
+        }
+
+        final Map<String, Object> entries;
+        try {
+            entries = message.getProperties();
+        } catch (IOException e) {
+            throw JMSExceptionSupport.create(e);
+        }
+
+        for (Map.Entry<String, Object> entry : entries.entrySet()) {
+            String key = entry.getKey();
+            Object value = entry.getValue();
+            if (value instanceof UTF8Buffer) {
+                value = value.toString();
+            }
+
+            if (key.startsWith(JMS_AMQP_PREFIX)) {
+                if (key.startsWith(NATIVE, JMS_AMQP_PREFIX_LENGTH)) {
+                    // skip transformer appended properties
+                    continue;
+                } else if (key.startsWith(ORIGINAL_ENCODING, JMS_AMQP_PREFIX_LENGTH)) {
+                    // skip transformer appended properties
+                    continue;
+                } else if (key.startsWith(MESSAGE_FORMAT, JMS_AMQP_PREFIX_LENGTH)) {
+                    messageFormat = (long) TypeConversionSupport.convert(entry.getValue(), Long.class);
+                    continue;
+                } else if (key.startsWith(HEADER, JMS_AMQP_PREFIX_LENGTH)) {
+                    if (header == null) {
+                        header = new Header();
+                    }
+                    continue;
+                } else if (key.startsWith(PROPERTIES, JMS_AMQP_PREFIX_LENGTH)) {
+                    if (properties == null) {
+                        properties = new Properties();
+                    }
+                    continue;
+                } else if (key.startsWith(MESSAGE_ANNOTATION_PREFIX, JMS_AMQP_PREFIX_LENGTH)) {
+                    if (maMap == null) {
+                        maMap = new HashMap<Symbol, Object>();
+                    }
+                    String name = key.substring(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX.length());
+                    maMap.put(Symbol.valueOf(name), value);
+                    continue;
+                } else if (key.startsWith(FIRST_ACQUIRER, JMS_AMQP_PREFIX_LENGTH)) {
+                    if (header == null) {
+                        header = new Header();
+                    }
+                    header.setFirstAcquirer((boolean) TypeConversionSupport.convert(value, Boolean.class));
+                    continue;
+                } else if (key.startsWith(CONTENT_TYPE, JMS_AMQP_PREFIX_LENGTH)) {
+                    if (properties == null) {
+                        properties = new Properties();
+                    }
+                    properties.setContentType(Symbol.getSymbol((String) TypeConversionSupport.convert(value, String.class)));
+                    continue;
+                } else if (key.startsWith(CONTENT_ENCODING, JMS_AMQP_PREFIX_LENGTH)) {
+                    if (properties == null) {
+                        properties = new Properties();
+                    }
+                    properties.setContentEncoding(Symbol.getSymbol((String) TypeConversionSupport.convert(value, String.class)));
+                    continue;
+                } else if (key.startsWith(REPLYTO_GROUP_ID, JMS_AMQP_PREFIX_LENGTH)) {
+                    if (properties == null) {
+                        properties = new Properties();
+                    }
+                    properties.setReplyToGroupId((String) TypeConversionSupport.convert(value, String.class));
+                    continue;
+                } else if (key.startsWith(DELIVERY_ANNOTATION_PREFIX, JMS_AMQP_PREFIX_LENGTH)) {
+                    if (daMap == null) {
+                        daMap = new HashMap<Symbol, Object>();
+                    }
+                    String name = key.substring(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX.length());
+                    daMap.put(Symbol.valueOf(name), value);
+                    continue;
+                } else if (key.startsWith(FOOTER_PREFIX, JMS_AMQP_PREFIX_LENGTH)) {
+                    if (footerMap == null) {
+                        footerMap = new HashMap<Object, Object>();
+                    }
+                    String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length());
+                    footerMap.put(name, value);
+                    continue;
                 }
-                apMap.put(key, message.getObjectProperty(key));
             }
+
+            // The property didn't map into any other slot so we store it in the
+            // Application Properties section of the message.
+            if (apMap == null) {
+                apMap = new HashMap<String, Object>();
+            }
+            apMap.put(key, value);
         }
 
-        MessageAnnotations ma = null;
-        if (maMap != null) {
-            ma = new MessageAnnotations(maMap);
+        final AmqpWritableBuffer buffer = new AmqpWritableBuffer();
+        encoder.setByteBuffer(buffer);
+
+        if (header != null) {
+            encoder.writeObject(header);
         }
-        DeliveryAnnotations da = null;
         if (daMap != null) {
-            da = new DeliveryAnnotations(daMap);
+            encoder.writeObject(new DeliveryAnnotations(daMap));
+        }
+        if (maMap != null) {
+            encoder.writeObject(new MessageAnnotations(maMap));
+        }
+        if (properties != null) {
+            encoder.writeObject(properties);
         }
-        ApplicationProperties ap = null;
         if (apMap != null) {
-            ap = new ApplicationProperties(apMap);
+            encoder.writeObject(new ApplicationProperties(apMap));
+        }
+        if (body != null) {
+            encoder.writeObject(body);
         }
-        Footer footer = null;
         if (footerMap != null) {
-            footer = new Footer(footerMap);
+            encoder.writeObject(new Footer(footerMap));
         }
 
-        return (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(header, da, ma, props, ap, body, footer);
+        return new EncodedMessage(messageFormat, buffer.getArray(), 0, buffer.getArrayLength());
     }
 
-    private Section convertBody(Message message) throws JMSException {
+    private Section convertBody(ActiveMQMessage message) throws JMSException {
 
         Section body = null;
         short orignalEncoding = AMQP_UNKNOWN;
 
-        if (message.propertyExists(AMQP_ORIGINAL_ENCODING_KEY)) {
-            try {
-                orignalEncoding = message.getShortProperty(AMQP_ORIGINAL_ENCODING_KEY);
-            } catch (Exception ex) {
-            }
+        try {
+            orignalEncoding = message.getShortProperty(JMS_AMQP_ORIGINAL_ENCODING);
+        } catch (Exception ex) {
+            // Ignore and stick with UNKNOWN
         }
 
-        if (message instanceof BytesMessage) {
-            Binary payload = vendor.getBinaryFromMessageBody((BytesMessage) message);
+        if (message instanceof ActiveMQBytesMessage) {
+            Binary payload = getBinaryFromMessageBody((ActiveMQBytesMessage) message);
 
             if (payload == null) {
                 payload = EMPTY_BINARY;
@@ -317,12 +391,12 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
                     body = new Data(payload);
                     break;
             }
-        } else if (message instanceof TextMessage) {
+        } else if (message instanceof ActiveMQTextMessage) {
             switch (orignalEncoding) {
                 case AMQP_NULL:
                     break;
                 case AMQP_DATA:
-                    body = new Data(vendor.getBinaryFromMessageBody((TextMessage) message));
+                    body = new Data(getBinaryFromMessageBody((ActiveMQTextMessage) message));
                     break;
                 case AMQP_VALUE_STRING:
                 case AMQP_UNKNOWN:
@@ -330,11 +404,11 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
                     body = new AmqpValue(((TextMessage) message).getText());
                     break;
             }
-        } else if (message instanceof MapMessage) {
-            body = new AmqpValue(vendor.getMapFromMessageBody((MapMessage) message));
-        } else if (message instanceof StreamMessage) {
+        } else if (message instanceof ActiveMQMapMessage) {
+            body = new AmqpValue(getMapFromMessageBody((ActiveMQMapMessage) message));
+        } else if (message instanceof ActiveMQStreamMessage) {
             ArrayList<Object> list = new ArrayList<Object>();
-            final StreamMessage m = (StreamMessage) message;
+            final ActiveMQStreamMessage m = (ActiveMQStreamMessage) message;
             try {
                 while (true) {
                     list.add(m.readObject());
@@ -352,8 +426,8 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
                     body = new AmqpValue(list);
                     break;
             }
-        } else if (message instanceof ObjectMessage) {
-            Binary payload = vendor.getBinaryFromMessageBody((ObjectMessage) message);
+        } else if (message instanceof ActiveMQObjectMessage) {
+            Binary payload = getBinaryFromMessageBody((ActiveMQObjectMessage) message);
 
             if (payload == null) {
                 payload = EMPTY_BINARY;
@@ -373,8 +447,10 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
             // For a non-AMQP message we tag the outbound content type as containing
             // a serialized Java object so that an AMQP client has a hint as to what
             // we are sending it.
-            if (!message.propertyExists(contentTypeKey)) {
-                vendor.setMessageProperty(message, contentTypeKey, SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
+            if (!message.propertyExists(JMS_AMQP_CONTENT_TYPE)) {
+                message.setReadOnlyProperties(false);
+                message.setStringProperty(JMS_AMQP_CONTENT_TYPE, SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
+                message.setReadOnlyProperties(true);
             }
         }
 
@@ -399,23 +475,19 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
         throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer.");
     }
 
-    // Used by legacy QPid AMQP 1.0 JMS client.
-    @Deprecated
-    private static String destinationAttributes(Destination destination) {
-        if (destination instanceof Queue) {
-            if (destination instanceof TemporaryQueue) {
-                return LEGACY_TEMP_QUEUE_TYPE;
-            } else {
-                return LEGACY_QUEUE_TYPE;
-            }
-        } else if (destination instanceof Topic) {
-            if (destination instanceof TemporaryTopic) {
-                return LEGACY_TEMP_TOPIC_TYPE;
-            } else {
-                return LEGACY_TOPIC_TYPE;
+    private static Object getOriginalMessageId(ActiveMQMessage message) {
+        Object result;
+        MessageId messageId = message.getMessageId();
+        if (messageId.getTextView() != null) {
+            try {
+                result = AMQPMessageIdHelper.INSTANCE.toIdObject(messageId.getTextView());
+            } catch (AmqpProtocolException e) {
+                result = messageId.getTextView();
             }
+        } else {
+            result = messageId.toString();
         }
 
-        throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer.");
+        return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java
index 2eefa50..6ca9ced 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java
@@ -16,54 +16,10 @@
  */
 package org.apache.activemq.transport.amqp.message;
 
-import javax.jms.Message;
+import org.apache.activemq.command.ActiveMQMessage;
 
-public abstract class OutboundTransformer {
+public interface OutboundTransformer {
 
-    protected final ActiveMQJMSVendor vendor;
+    public abstract EncodedMessage transform(ActiveMQMessage message) throws Exception;
 
-    protected String prefixVendor;
-
-    protected String prefixDeliveryAnnotations = "DA_";
-    protected String prefixMessageAnnotations= "MA_";
-    protected String prefixFooter = "FT_";
-
-    protected String messageFormatKey;
-    protected String nativeKey;
-    protected String firstAcquirerKey;
-    protected String prefixDeliveryAnnotationsKey;
-    protected String prefixMessageAnnotationsKey;
-    protected String contentTypeKey;
-    protected String contentEncodingKey;
-    protected String replyToGroupIDKey;
-    protected String prefixFooterKey;
-
-    public OutboundTransformer(ActiveMQJMSVendor vendor) {
-        this.vendor = vendor;
-        this.setPrefixVendor("JMS_AMQP_");
-    }
-
-    public abstract EncodedMessage transform(Message jms) throws Exception;
-
-    public String getPrefixVendor() {
-        return prefixVendor;
-    }
-
-    public void setPrefixVendor(String prefixVendor) {
-        this.prefixVendor = prefixVendor;
-
-        messageFormatKey = prefixVendor + "MESSAGE_FORMAT";
-        nativeKey = prefixVendor + "NATIVE";
-        firstAcquirerKey = prefixVendor + "FirstAcquirer";
-        prefixDeliveryAnnotationsKey = prefixVendor + prefixDeliveryAnnotations;
-        prefixMessageAnnotationsKey = prefixVendor + prefixMessageAnnotations;
-        contentTypeKey = prefixVendor + "ContentType";
-        contentEncodingKey = prefixVendor + "ContentEncoding";
-        replyToGroupIDKey = prefixVendor + "ReplyToGroupID";
-        prefixFooterKey = prefixVendor + prefixFooter;
-    }
-
-    public ActiveMQJMSVendor getVendor() {
-        return vendor;
-    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
index 503a05e..33c319e 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
@@ -37,7 +37,6 @@ import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
 import org.apache.activemq.transport.amqp.ResponseHandler;
 import org.apache.activemq.transport.amqp.message.AMQPNativeInboundTransformer;
 import org.apache.activemq.transport.amqp.message.AMQPRawInboundTransformer;
-import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor;
 import org.apache.activemq.transport.amqp.message.EncodedMessage;
 import org.apache.activemq.transport.amqp.message.InboundTransformer;
 import org.apache.activemq.transport.amqp.message.JMSMappingInboundTransformer;
@@ -138,14 +137,14 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
         if (inboundTransformer == null) {
             String transformer = session.getConnection().getConfiguredTransformer();
             if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_JMS)) {
-                inboundTransformer = new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+                inboundTransformer = new JMSMappingInboundTransformer();
             } else if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_NATIVE)) {
-                inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+                inboundTransformer = new AMQPNativeInboundTransformer();
             } else if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_RAW)) {
-                inboundTransformer = new AMQPRawInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+                inboundTransformer = new AMQPRawInboundTransformer();
             } else {
                 LOG.warn("Unknown transformer type {} using native one instead", transformer);
-                inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+                inboundTransformer = new AMQPNativeInboundTransformer();
             }
         }
         return inboundTransformer;
@@ -157,7 +156,7 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
             EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), deliveryBytes.data, deliveryBytes.offset, deliveryBytes.length);
 
             InboundTransformer transformer = getTransformer();
-            ActiveMQMessage message = (ActiveMQMessage) transformer.transform(em);
+            ActiveMQMessage message = transformer.transform(em);
 
             current = null;
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index 455e0b0..2531c1a 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport.amqp.protocol;
 
 import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_MESSAGE_FORMAT;
 
 import java.io.IOException;
 import java.util.LinkedList;
@@ -39,7 +40,6 @@ import org.apache.activemq.command.Response;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
 import org.apache.activemq.transport.amqp.ResponseHandler;
-import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor;
 import org.apache.activemq.transport.amqp.message.AutoOutboundTransformer;
 import org.apache.activemq.transport.amqp.message.EncodedMessage;
 import org.apache.activemq.transport.amqp.message.OutboundTransformer;
@@ -75,11 +75,10 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
 
     private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
 
-    private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
+    private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer();
     private final AmqpTransferTagGenerator tagCache = new AmqpTransferTagGenerator();
     private final LinkedList<MessageDispatch> outbound = new LinkedList<MessageDispatch>();
     private final LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>();
-    private final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT";
 
     private final ConsumerInfo consumerInfo;
     private AbstractSubscription subscription;
@@ -437,8 +436,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
                         temp = (ActiveMQMessage) md.getMessage();
                     }
 
-                    if (!temp.getProperties().containsKey(MESSAGE_FORMAT_KEY)) {
-                        temp.setProperty(MESSAGE_FORMAT_KEY, 0);
+                    if (!temp.getProperties().containsKey(JMS_AMQP_MESSAGE_FORMAT)) {
+                        temp.setProperty(JMS_AMQP_MESSAGE_FORMAT, 0);
                     }
                 }
 
@@ -477,6 +476,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
                             currentDelivery = getEndpoint().delivery(tag, 0, tag.length);
                         }
                         currentDelivery.setContext(md);
+                        currentDelivery.setMessageFormat((int) amqp.getMessageFormat());
                     } else {
                         // TODO: message could not be generated what now?
                     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
index b513c1a..201cee2 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
@@ -87,8 +87,6 @@ public class AmqpTransformerTest {
 
         assertTrue(message instanceof BytesMessage);
         Boolean nativeTransformationUsed = message.getBooleanProperty("JMS_AMQP_NATIVE");
-        Long messageFormat = message.getLongProperty("JMS_AMQP_MESSAGE_FORMAT");
-        assertEquals(0L, messageFormat.longValue());
         assertTrue("Didn't use the correct transformation, expected NATIVE", nativeTransformationUsed);
         assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
         assertEquals(7, message.getJMSPriority());
@@ -136,8 +134,6 @@ public class AmqpTransformerTest {
         LOG.info("Recieved message: {}", message);
         assertTrue(message instanceof BytesMessage);
         Boolean nativeTransformationUsed = message.getBooleanProperty("JMS_AMQP_NATIVE");
-        Long messageFormat = message.getLongProperty("JMS_AMQP_MESSAGE_FORMAT");
-        assertEquals(0L, messageFormat.longValue());
         assertTrue("Didn't use the correct transformation, expected NATIVE", nativeTransformationUsed);
         assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
 
@@ -184,8 +180,6 @@ public class AmqpTransformerTest {
 
         assertTrue(message instanceof TextMessage);
         Boolean nativeTransformationUsed = message.getBooleanProperty("JMS_AMQP_NATIVE");
-        Long messageFormat = message.getLongProperty("JMS_AMQP_MESSAGE_FORMAT");
-        assertEquals(0L, messageFormat.longValue());
         assertFalse("Didn't use the correct transformation, expected NOT to be NATIVE", nativeTransformationUsed);
         assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
index 84d5864..fa61e14 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.Message;
@@ -468,4 +469,46 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
         amqp.close();
         openwire.close();
     }
+
+    //----- Test Qpid JMS to Qpid JMS interop with transformers --------------//
+
+    @Test
+    public void testQpidJMSToQpidJMSMessageSendReceive() throws Exception {
+        final int SIZE = 1024;
+        final int NUM_MESSAGES = 100;
+
+        Connection amqpSend = createConnection("client-1");
+        Connection amqpReceive = createConnection("client-2");
+
+        amqpReceive.start();
+
+        Session senderSession = amqpSend.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Session receiverSession = amqpReceive.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Destination queue = senderSession.createQueue(getDestinationName());
+
+        MessageProducer amqpProducer = senderSession.createProducer(queue);
+        MessageConsumer amqpConsumer = receiverSession.createConsumer(queue);
+
+        byte[] payload = new byte[SIZE];
+
+        for (int i = 0; i < NUM_MESSAGES; ++i) {
+            BytesMessage outgoing = senderSession.createBytesMessage();
+            outgoing.setLongProperty("SendTime", System.currentTimeMillis());
+            outgoing.writeBytes(payload);
+            amqpProducer.send(outgoing);
+        }
+
+        // Now consumer the message
+        for (int i = 0; i < NUM_MESSAGES; ++i) {
+            Message received = amqpConsumer.receive(2000);
+            assertNotNull(received);
+            assertTrue("Expected BytesMessage but got " + received, received instanceof BytesMessage);
+            BytesMessage incoming = (BytesMessage) received;
+            assertEquals(SIZE, incoming.getBodyLength());
+        }
+
+        amqpReceive.close();
+        amqpSend.close();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index 2b1b874..e5e1bbd 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -257,7 +257,7 @@ public class AmqpMessage {
      * @return the set message ID in String form or null if not set.
      */
     public String getMessageId() {
-        if (message.getProperties() == null) {
+        if (message.getProperties() == null || message.getProperties().getMessageId() == null) {
             return null;
         }
 
@@ -309,7 +309,7 @@ public class AmqpMessage {
      * @return the set correlation ID in String form or null if not set.
      */
     public String getCorrelationId() {
-        if (message.getProperties() == null) {
+        if (message.getProperties() == null || message.getProperties().getCorrelationId() == null) {
             return null;
         }
 
@@ -387,7 +387,7 @@ public class AmqpMessage {
      * @return true if the message is marked as being durable.
      */
     public boolean isDurable() {
-        if (message.getHeader() == null) {
+        if (message.getHeader() == null || message.getHeader().getDurable() == null) {
             return false;
         }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java
index ba0f014..1427b5a 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java
@@ -51,7 +51,6 @@ import org.apache.qpid.proton.amqp.messaging.Data;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.message.Message;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 public class JMSMappingInboundTransformerTest {
 
@@ -65,8 +64,7 @@ public class JMSMappingInboundTransformerTest {
      */
     @Test
     public void testCreateBytesMessageFromNoBodySectionAndContentType() throws Exception {
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
 
         Message message = Message.Factory.create();
         message.setContentType(AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE);
@@ -86,8 +84,7 @@ public class JMSMappingInboundTransformerTest {
      */
     @Test
     public void testCreateBytesMessageFromNoBodySectionAndNoContentType() throws Exception {
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
 
         Message message = Message.Factory.create();
 
@@ -107,8 +104,7 @@ public class JMSMappingInboundTransformerTest {
     */
     @Test
     public void testCreateObjectMessageFromNoBodySectionAndContentType() throws Exception {
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
 
         Message message = Message.Factory.create();
         message.setContentType(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
@@ -122,8 +118,7 @@ public class JMSMappingInboundTransformerTest {
 
     @Test
     public void testCreateTextMessageFromNoBodySectionAndContentType() throws Exception {
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
 
         Message message = Message.Factory.create();
         message.setContentType("text/plain");
@@ -143,8 +138,7 @@ public class JMSMappingInboundTransformerTest {
      * @throws Exception if an error occurs during the test.
      */
     public void testCreateGenericMessageFromNoBodySectionAndUnknownContentType() throws Exception {
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
 
         Message message = Message.Factory.create();
         message.setContentType("unknown-content-type");
@@ -174,8 +168,7 @@ public class JMSMappingInboundTransformerTest {
 
         EncodedMessage em = encodeMessage(message);
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
         javax.jms.Message jmsMessage = transformer.transform(em);
 
         assertNotNull("Message should not be null", jmsMessage);
@@ -197,8 +190,7 @@ public class JMSMappingInboundTransformerTest {
 
         EncodedMessage em = encodeMessage(message);
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
         javax.jms.Message jmsMessage = transformer.transform(em);
 
         assertNotNull("Message should not be null", jmsMessage);
@@ -222,8 +214,7 @@ public class JMSMappingInboundTransformerTest {
 
         EncodedMessage em = encodeMessage(message);
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
         javax.jms.Message jmsMessage = transformer.transform(em);
 
         assertNotNull("Message should not be null", jmsMessage);
@@ -246,8 +237,7 @@ public class JMSMappingInboundTransformerTest {
 
         EncodedMessage em = encodeMessage(message);
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
         javax.jms.Message jmsMessage = transformer.transform(em);
 
         assertNotNull("Message should not be null", jmsMessage);
@@ -350,8 +340,7 @@ public class JMSMappingInboundTransformerTest {
 
         EncodedMessage em = encodeMessage(message);
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
         javax.jms.Message jmsMessage = transformer.transform(em);
 
         assertNotNull("Message should not be null", jmsMessage);
@@ -377,8 +366,7 @@ public class JMSMappingInboundTransformerTest {
 
         EncodedMessage em = encodeMessage(message);
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
         javax.jms.Message jmsMessage = transformer.transform(em);
 
         assertNotNull("Message should not be null", jmsMessage);
@@ -398,8 +386,7 @@ public class JMSMappingInboundTransformerTest {
 
         EncodedMessage em = encodeMessage(message);
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
         javax.jms.Message jmsMessage = transformer.transform(em);
 
         assertNotNull("Message should not be null", jmsMessage);
@@ -415,8 +402,7 @@ public class JMSMappingInboundTransformerTest {
     */
     @Test
     public void testCreateObjectMessageFromAmqpValueWithBinaryAndContentType() throws Exception {
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
 
         Message message = Message.Factory.create();
         message.setBody(new AmqpValue(new Binary(new byte[0])));
@@ -443,8 +429,7 @@ public class JMSMappingInboundTransformerTest {
 
         EncodedMessage em = encodeMessage(message);
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
         javax.jms.Message jmsMessage = transformer.transform(em);
 
         assertNotNull("Message should not be null", jmsMessage);
@@ -465,8 +450,7 @@ public class JMSMappingInboundTransformerTest {
 
         EncodedMessage em = encodeMessage(message);
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
         javax.jms.Message jmsMessage = transformer.transform(em);
 
         assertNotNull("Message should not be null", jmsMessage);
@@ -487,8 +471,7 @@ public class JMSMappingInboundTransformerTest {
 
         EncodedMessage em = encodeMessage(message);
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
         javax.jms.Message jmsMessage = transformer.transform(em);
 
         assertNotNull("Message should not be null", jmsMessage);
@@ -509,8 +492,7 @@ public class JMSMappingInboundTransformerTest {
 
         EncodedMessage em = encodeMessage(message);
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
         javax.jms.Message jmsMessage = transformer.transform(em);
 
         assertNotNull("Message should not be null", jmsMessage);
@@ -531,8 +513,7 @@ public class JMSMappingInboundTransformerTest {
 
         EncodedMessage em = encodeMessage(message);
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
 
         javax.jms.Message jmsMessage = transformer.transform(em);
 
@@ -548,8 +529,7 @@ public class JMSMappingInboundTransformerTest {
 
         EncodedMessage em = encodeMessage(message);
 
-        ActiveMQJMSVendor vendor = createVendor();
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
         javax.jms.Message jmsMessage = transformer.transform(em);
 
         assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
@@ -589,9 +569,7 @@ public class JMSMappingInboundTransformerTest {
     }
 
     private void doTransformWithToTypeDestinationTypeAnnotationTestImpl(Object toTypeAnnotationValue, Class<? extends Destination> expectedClass) throws Exception {
-        ActiveMQTextMessage mockTextMessage = createMockTextMessage();
-        ActiveMQJMSVendor mockVendor = createMockVendor(mockTextMessage);
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(mockVendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
 
         String toAddress = "toAddress";
         Message amqp = Message.Factory.create();
@@ -608,11 +586,6 @@ public class JMSMappingInboundTransformerTest {
 
         javax.jms.Message jmsMessage = transformer.transform(em);
         assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
-
-        // Verify that createDestination was called with the provided 'to'
-        // address and 'Destination' class
-        // TODO - No need to really test this bit ?
-        // Mockito.verify(mockVendor).createDestination(toAddress, expectedClass);
     }
 
     //----- ReplyTo Conversions ----------------------------------------------//
@@ -643,9 +616,7 @@ public class JMSMappingInboundTransformerTest {
     }
 
     private void doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(Object replyToTypeAnnotationValue, Class<? extends Destination> expectedClass) throws Exception {
-        ActiveMQTextMessage mockTextMessage = createMockTextMessage();
-        ActiveMQJMSVendor mockVendor = createMockVendor(mockTextMessage);
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(mockVendor);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
 
         String replyToAddress = "replyToAddress";
         Message amqp = Message.Factory.create();
@@ -662,31 +633,10 @@ public class JMSMappingInboundTransformerTest {
 
         javax.jms.Message jmsMessage = transformer.transform(em);
         assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
-
-        // Verify that createDestination was called with the provided 'replyTo'
-        // address and 'Destination' class
-        // TODO - No need to really test this bit ?
-        // Mockito.verify(mockVendor).createDestination(replyToAddress, expectedClass);
     }
 
     //----- Utility Methods --------------------------------------------------//
 
-    private ActiveMQTextMessage createMockTextMessage() {
-        return Mockito.mock(ActiveMQTextMessage.class);
-    }
-
-    private ActiveMQJMSVendor createMockVendor(ActiveMQTextMessage mockTextMessage) {
-        ActiveMQJMSVendor mockVendor = Mockito.mock(ActiveMQJMSVendor.class);
-        Mockito.when(mockVendor.createTextMessage()).thenReturn(mockTextMessage);
-        Mockito.when(mockVendor.createTextMessage(Mockito.any(String.class))).thenReturn(mockTextMessage);
-
-        return mockVendor;
-    }
-
-    private ActiveMQJMSVendor createVendor() {
-        return ActiveMQJMSVendor.INSTANCE;
-    }
-
     private EncodedMessage encodeMessage(Message message) {
         byte[] encodeBuffer = new byte[1024 * 8];
         int encodedSize;