You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/02/17 21:43:13 UTC

[2/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5592

https://issues.apache.org/jira/browse/AMQ-5592

Initial drop of the JMS transformer code to be reworked.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6e693196
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6e693196
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6e693196

Branch: refs/heads/master
Commit: 6e693196068d10d177a57e01c823da0ecaaed9a6
Parents: 388c16d
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Feb 17 15:42:45 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Feb 17 15:42:45 2015 -0500

----------------------------------------------------------------------
 activemq-amqp/pom.xml                           |   7 +-
 .../transport/amqp/ActiveMQJMSVendor.java       |   4 +-
 .../transport/amqp/AmqpProtocolConverter.java   |  14 +-
 .../transport/amqp/AmqpTransportFilter.java     |   2 +-
 .../message/AMQPNativeInboundTransformer.java   |  36 ++
 .../message/AMQPNativeOutboundTransformer.java  | 103 ++++++
 .../amqp/message/AMQPRawInboundTransformer.java |  47 +++
 .../amqp/message/AutoOutboundTransformer.java   |  52 +++
 .../transport/amqp/message/EncodedMessage.java  |  67 ++++
 .../amqp/message/InboundTransformer.java        | 360 +++++++++++++++++++
 .../message/JMSMappingInboundTransformer.java   | 109 ++++++
 .../message/JMSMappingOutboundTransformer.java  | 314 ++++++++++++++++
 .../transport/amqp/message/JMSVendor.java       |  65 ++++
 .../amqp/message/OutboundTransformer.java       |  87 +++++
 .../JMSMappingInboundTransformerTest.java       | 259 +++++++++++++
 .../JMSMappingOutboundTransformerTest.java      | 309 ++++++++++++++++
 activemq-tooling/activemq-maven-plugin/pom.xml  |   2 -
 pom.xml                                         |  29 +-
 18 files changed, 1840 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/6e693196/activemq-amqp/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-amqp/pom.xml b/activemq-amqp/pom.xml
index 35a3177..358422d 100644
--- a/activemq-amqp/pom.xml
+++ b/activemq-amqp/pom.xml
@@ -42,7 +42,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.qpid</groupId>
-      <artifactId>proton-jms</artifactId>
+      <artifactId>proton-j</artifactId>
       <version>${qpid-proton-version}</version>
     </dependency>
     <dependency>
@@ -109,6 +109,11 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-log4j12</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/activemq/blob/6e693196/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java
index b576c6b..c00a390 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java
@@ -39,10 +39,8 @@ import org.apache.activemq.command.ActiveMQTempQueue;
 import org.apache.activemq.command.ActiveMQTempTopic;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.qpid.proton.jms.JMSVendor;
+import org.apache.activemq.transport.amqp.message.JMSVendor;
 
-/**
- */
 public class ActiveMQJMSVendor extends JMSVendor {
 
     final public static ActiveMQJMSVendor INSTANCE = new ActiveMQJMSVendor();

http://git-wip-us.apache.org/repos/asf/activemq/blob/6e693196/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index 425b264..16bfa3d 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -63,6 +63,13 @@ import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.store.PersistenceAdapterSupport;
+import org.apache.activemq.transport.amqp.message.AMQPNativeInboundTransformer;
+import org.apache.activemq.transport.amqp.message.AMQPRawInboundTransformer;
+import org.apache.activemq.transport.amqp.message.AutoOutboundTransformer;
+import org.apache.activemq.transport.amqp.message.EncodedMessage;
+import org.apache.activemq.transport.amqp.message.InboundTransformer;
+import org.apache.activemq.transport.amqp.message.JMSMappingInboundTransformer;
+import org.apache.activemq.transport.amqp.message.OutboundTransformer;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.LongSequenceGenerator;
@@ -102,13 +109,6 @@ import org.apache.qpid.proton.engine.impl.CollectorImpl;
 import org.apache.qpid.proton.engine.impl.ProtocolTracer;
 import org.apache.qpid.proton.engine.impl.TransportImpl;
 import org.apache.qpid.proton.framing.TransportFrame;
-import org.apache.qpid.proton.jms.AMQPNativeInboundTransformer;
-import org.apache.qpid.proton.jms.AMQPRawInboundTransformer;
-import org.apache.qpid.proton.jms.AutoOutboundTransformer;
-import org.apache.qpid.proton.jms.EncodedMessage;
-import org.apache.qpid.proton.jms.InboundTransformer;
-import org.apache.qpid.proton.jms.JMSMappingInboundTransformer;
-import org.apache.qpid.proton.jms.OutboundTransformer;
 import org.apache.qpid.proton.message.Message;
 import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.hawtbuf.ByteArrayOutputStream;

http://git-wip-us.apache.org/repos/asf/activemq/blob/6e693196/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
index 3e361c2..fb7542b 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
@@ -25,10 +25,10 @@ import org.apache.activemq.command.Command;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFilter;
 import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.amqp.message.InboundTransformer;
 import org.apache.activemq.transport.tcp.SslTransport;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.wireformat.WireFormat;
-import org.apache.qpid.proton.jms.InboundTransformer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/6e693196/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java
new file mode 100644
index 0000000..9789e7b
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.message;
+
+import javax.jms.Message;
+
+public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
+
+    public AMQPNativeInboundTransformer(JMSVendor vendor) {
+        super(vendor);
+    }
+
+    @Override
+    public Message transform(EncodedMessage amqpMessage) throws Exception {
+        org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
+
+        Message rc = super.transform(amqpMessage);
+
+        populateMessage(rc, amqp);
+        return rc;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/6e693196/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
new file mode 100644
index 0000000..ebe6fcc
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.message;
+
+import java.nio.ByteBuffer;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageFormatException;
+
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.codec.CompositeWritableBuffer;
+import org.apache.qpid.proton.codec.DroppingWritableBuffer;
+import org.apache.qpid.proton.codec.WritableBuffer;
+import org.apache.qpid.proton.message.ProtonJMessage;
+
+public class AMQPNativeOutboundTransformer extends OutboundTransformer {
+
+    public AMQPNativeOutboundTransformer(JMSVendor vendor) {
+        super(vendor);
+    }
+
+    @Override
+    public EncodedMessage transform(Message msg) throws Exception {
+        if( msg == null )
+            return null;
+        if( !(msg instanceof BytesMessage) )
+            return null;
+        try {
+            if( !msg.getBooleanProperty(prefixVendor + "NATIVE") ) {
+                return null;
+            }
+        } catch (MessageFormatException e) {
+            return null;
+        }
+        return transform(this, (BytesMessage) msg);
+    }
+
+    static EncodedMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException {
+        long messageFormat;
+        try {
+            messageFormat = msg.getLongProperty(options.prefixVendor + "MESSAGE_FORMAT");
+        } catch (MessageFormatException e) {
+            return null;
+        }
+        byte data[] = new byte[(int) msg.getBodyLength()];
+        int dataSize = data.length;
+        msg.readBytes(data);
+        msg.reset();
+
+        try {
+            int count = msg.getIntProperty("JMSXDeliveryCount");
+            if( count > 1 ) {
+
+                // decode...
+                ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create();
+                int offset = 0;
+                int len = data.length;
+                while( len > 0 ) {
+                    final int decoded = amqp.decode(data, offset, len);
+                    assert decoded > 0: "Make progress decoding the message";
+                    offset += decoded;
+                    len -= decoded;
+                }
+
+                // Update the DeliveryCount header...
+                // The AMQP delivery-count field only includes prior failed delivery attempts,
+                // whereas JMSXDeliveryCount includes the first/current delivery attempt. Subtract 1.
+                amqp.getHeader().setDeliveryCount(new UnsignedInteger(count - 1));
+
+                // Re-encode...
+                ByteBuffer buffer = ByteBuffer.wrap(new byte[1024*4]);
+                final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
+                int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
+                if( overflow.position() > 0 ) {
+                    buffer = ByteBuffer.wrap(new byte[1024*4+overflow.position()]);
+                    c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
+                }
+                data = buffer.array();
+                dataSize = c;
+            }
+        } catch (JMSException e) {
+        }
+
+        return new EncodedMessage(messageFormat, data, 0, dataSize);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/6e693196/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
new file mode 100644
index 0000000..20703d7
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.message;
+
+import javax.jms.BytesMessage;
+import javax.jms.Message;
+
+public class AMQPRawInboundTransformer extends InboundTransformer {
+
+    public AMQPRawInboundTransformer(JMSVendor vendor) {
+        super(vendor);
+    }
+
+    @Override
+    public Message transform(EncodedMessage amqpMessage) throws Exception {
+        BytesMessage rc = vendor.createBytesMessage();
+        rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
+
+        rc.setJMSDeliveryMode(defaultDeliveryMode);
+        rc.setJMSPriority(defaultPriority);
+
+        final long now = System.currentTimeMillis();
+        rc.setJMSTimestamp(now);
+        if( defaultTtl > 0 ) {
+            rc.setJMSExpiration(now + defaultTtl);
+        }
+
+        rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
+        rc.setBooleanProperty(prefixVendor + "NATIVE", true);
+
+        return rc;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/6e693196/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java
new file mode 100644
index 0000000..0f0d7b2
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.message;
+
+import javax.jms.BytesMessage;
+import javax.jms.Message;
+
+public class AutoOutboundTransformer extends JMSMappingOutboundTransformer {
+
+    private final JMSMappingOutboundTransformer transformer;
+
+    public AutoOutboundTransformer(JMSVendor vendor) {
+        super(vendor);
+        transformer = new JMSMappingOutboundTransformer(vendor);
+    }
+
+    @Override
+    public EncodedMessage transform(Message msg) throws Exception {
+        if( msg == null )
+            return null;
+        if( msg.getBooleanProperty(prefixVendor + "NATIVE") ) {
+            if( msg instanceof BytesMessage ) {
+                return AMQPNativeOutboundTransformer.transform(this, (BytesMessage)msg);
+            } else {
+                return null;
+            }
+        } else {
+            return transformer.transform(msg);
+        }
+    }
+
+    @Override
+    public void setUseByteDestinationTypeAnnotations(boolean useByteDestinationTypeAnnotations)
+    {
+        super.setUseByteDestinationTypeAnnotations(useByteDestinationTypeAnnotations);
+        transformer.setUseByteDestinationTypeAnnotations(useByteDestinationTypeAnnotations);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/6e693196/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/EncodedMessage.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/EncodedMessage.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/EncodedMessage.java
new file mode 100644
index 0000000..733c0ec0
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/EncodedMessage.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.message;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.message.Message;
+
+public class EncodedMessage {
+
+    private final Binary data;
+    final long messageFormat;
+
+    public EncodedMessage(long messageFormat, byte[] data, int offset, int length) {
+        this.data = new Binary(data, offset, length);
+        this.messageFormat = messageFormat;
+    }
+
+    public long getMessageFormat() {
+        return messageFormat;
+    }
+
+    public Message decode() throws Exception {
+        Message amqp = Message.Factory.create();
+
+        int offset = getArrayOffset();
+        int len = getLength();
+        while (len > 0) {
+            final int decoded = amqp.decode(getArray(), offset, len);
+            assert decoded > 0 : "Make progress decoding the message";
+            offset += decoded;
+            len -= decoded;
+        }
+
+        return amqp;
+    }
+
+    public int getLength() {
+        return data.getLength();
+    }
+
+    public int getArrayOffset() {
+        return data.getArrayOffset();
+    }
+
+    public byte[] getArray() {
+        return data.getArray();
+    }
+
+    @Override
+    public String toString() {
+        return data.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/6e693196/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
new file mode 100644
index 0000000..9e5758c
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
@@ -0,0 +1,360 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.message;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.Topic;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Decimal128;
+import org.apache.qpid.proton.amqp.Decimal32;
+import org.apache.qpid.proton.amqp.Decimal64;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedByte;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+import org.apache.qpid.proton.amqp.UnsignedShort;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Footer;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+
+public abstract class InboundTransformer {
+
+    JMSVendor vendor;
+
+    public static final String TRANSFORMER_NATIVE = "native";
+    public static final String TRANSFORMER_RAW = "raw";
+    public static final String TRANSFORMER_JMS = "jms";
+
+    String prefixVendor = "JMS_AMQP_";
+    String prefixDeliveryAnnotations = "DA_";
+    String prefixMessageAnnotations = "MA_";
+    String prefixFooter = "FT_";
+
+    int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
+    int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
+    long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
+
+    private boolean useByteDestinationTypeAnnotations = false;
+
+    public InboundTransformer(JMSVendor vendor) {
+        this.vendor = vendor;
+    }
+
+    abstract public Message transform(EncodedMessage amqpMessage) throws Exception;
+
+    public boolean isUseByteDestinationTypeAnnotations() {
+        return useByteDestinationTypeAnnotations;
+    }
+
+    public void setUseByteDestinationTypeAnnotations(boolean useByteDestinationTypeAnnotations) {
+        this.useByteDestinationTypeAnnotations = useByteDestinationTypeAnnotations;
+    }
+
+    public int getDefaultDeliveryMode() {
+        return defaultDeliveryMode;
+    }
+
+    public void setDefaultDeliveryMode(int defaultDeliveryMode) {
+        this.defaultDeliveryMode = defaultDeliveryMode;
+    }
+
+    public int getDefaultPriority() {
+        return defaultPriority;
+    }
+
+    public void setDefaultPriority(int defaultPriority) {
+        this.defaultPriority = defaultPriority;
+    }
+
+    public long getDefaultTtl() {
+        return defaultTtl;
+    }
+
+    public void setDefaultTtl(long defaultTtl) {
+        this.defaultTtl = defaultTtl;
+    }
+
+    public String getPrefixVendor() {
+        return prefixVendor;
+    }
+
+    public void setPrefixVendor(String prefixVendor) {
+        this.prefixVendor = prefixVendor;
+    }
+
+    public JMSVendor getVendor() {
+        return vendor;
+    }
+
+    public void setVendor(JMSVendor vendor) {
+        this.vendor = vendor;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
+        Header header = amqp.getHeader();
+        if (header == null) {
+            header = new Header();
+        }
+
+        if (header.getDurable() != null) {
+            jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+        } else {
+            jms.setJMSDeliveryMode(defaultDeliveryMode);
+        }
+        if (header.getPriority() != null) {
+            jms.setJMSPriority(header.getPriority().intValue());
+        } else {
+            jms.setJMSPriority(defaultPriority);
+        }
+        if (header.getFirstAcquirer() != null) {
+            jms.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer());
+        }
+        if (header.getDeliveryCount() != null) {
+            vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue());
+        }
+
+        final DeliveryAnnotations da = amqp.getDeliveryAnnotations();
+        if (da != null) {
+            for (Map.Entry<?, ?> entry : da.getValue().entrySet()) {
+                String key = entry.getKey().toString();
+                setProperty(jms, prefixVendor + prefixDeliveryAnnotations + key, entry.getValue());
+            }
+        }
+
+        Class<? extends Destination> toAttributes = null;
+        Class<? extends Destination> replyToAttributes = null;
+
+        if (isUseByteDestinationTypeAnnotations()) {
+            toAttributes = Queue.class;
+            replyToAttributes = Queue.class;
+        } else {
+            toAttributes = Destination.class;
+            replyToAttributes = Destination.class;
+        }
+
+        final MessageAnnotations ma = amqp.getMessageAnnotations();
+        if (ma != null) {
+            for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) {
+                String key = entry.getKey().toString();
+                if ("x-opt-jms-type".equals(key.toString()) && entry.getValue() != null) {
+                    jms.setJMSType(entry.getValue().toString());
+                } else if ("x-opt-to-type".equals(key.toString())) {
+                    toAttributes = toClassFromAttributes(entry.getValue());
+                } else if ("x-opt-reply-type".equals(key.toString())) {
+                    replyToAttributes = toClassFromAttributes(entry.getValue());
+                } else {
+                    setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
+                }
+            }
+        }
+
+        final ApplicationProperties ap = amqp.getApplicationProperties();
+        if (ap != null) {
+            for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) ap.getValue().entrySet()) {
+                String key = entry.getKey().toString();
+                if ("JMSXGroupID".equals(key)) {
+                    vendor.setJMSXGroupID(jms, entry.getValue().toString());
+                } else if ("JMSXGroupSequence".equals(key)) {
+                    vendor.setJMSXGroupSequence(jms, ((Number) entry.getValue()).intValue());
+                } else if ("JMSXUserID".equals(key)) {
+                    vendor.setJMSXUserID(jms, entry.getValue().toString());
+                } else {
+                    setProperty(jms, key, entry.getValue());
+                }
+            }
+        }
+
+        final Properties properties = amqp.getProperties();
+        if (properties != null) {
+            if (properties.getMessageId() != null) {
+                jms.setJMSMessageID(properties.getMessageId().toString());
+            }
+            Binary userId = properties.getUserId();
+            if (userId != null) {
+                vendor.setJMSXUserID(jms, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8"));
+            }
+            if (properties.getTo() != null) {
+                jms.setJMSDestination(vendor.createDestination(properties.getTo(), toAttributes));
+            }
+            if (properties.getSubject() != null) {
+                jms.setStringProperty(prefixVendor + "Subject", properties.getSubject());
+            }
+            if (properties.getReplyTo() != null) {
+                jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo(), replyToAttributes));
+            }
+            if (properties.getCorrelationId() != null) {
+                jms.setJMSCorrelationID(properties.getCorrelationId().toString());
+            }
+            if (properties.getContentType() != null) {
+                jms.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString());
+            }
+            if (properties.getContentEncoding() != null) {
+                jms.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString());
+            }
+            if (properties.getCreationTime() != null) {
+                jms.setJMSTimestamp(properties.getCreationTime().getTime());
+            }
+            if (properties.getGroupId() != null) {
+                vendor.setJMSXGroupID(jms, properties.getGroupId());
+            }
+            if (properties.getGroupSequence() != null) {
+                vendor.setJMSXGroupSequence(jms, properties.getGroupSequence().intValue());
+            }
+            if (properties.getReplyToGroupId() != null) {
+                jms.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId());
+            }
+            if (properties.getAbsoluteExpiryTime() != null) {
+                jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime());
+            }
+        }
+
+        // If the jms expiration has not yet been set...
+        if (jms.getJMSExpiration() == 0) {
+            // Then lets try to set it based on the message ttl.
+            long ttl = defaultTtl;
+            if (header.getTtl() != null) {
+                ttl = header.getTtl().longValue();
+            }
+            if (ttl == 0) {
+                jms.setJMSExpiration(0);
+            } else {
+                jms.setJMSExpiration(System.currentTimeMillis() + ttl);
+            }
+        }
+
+        final Footer fp = amqp.getFooter();
+        if (fp != null) {
+            for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) fp.getValue().entrySet()) {
+                String key = entry.getKey().toString();
+                setProperty(jms, prefixVendor + prefixFooter + key, entry.getValue());
+            }
+        }
+    }
+
+    private static final Set<String> QUEUE_ATTRIBUTES = createSet("queue");
+    private static final Set<String> TOPIC_ATTRIBUTES = createSet("topic");
+    private static final Set<String> TEMP_QUEUE_ATTRIBUTES = createSet("queue", "temporary");
+    private static final Set<String> TEMP_TOPIC_ATTRIBUTES = createSet("topic", "temporary");
+
+    private static Set<String> createSet(String... args) {
+        HashSet<String> s = new HashSet<String>();
+        for (String arg : args) {
+            s.add(arg);
+        }
+        return Collections.unmodifiableSet(s);
+    }
+
+    Class<? extends Destination> toClassFromAttributes(Object value) {
+        if (isUseByteDestinationTypeAnnotations()) {
+            if (value instanceof Byte) {
+                switch ((Byte) value) {
+                    case JMSVendor.QUEUE_TYPE:
+                        return Queue.class;
+                    case JMSVendor.TOPIC_TYPE:
+                        return Topic.class;
+                    case JMSVendor.TEMP_QUEUE_TYPE:
+                        return TemporaryQueue.class;
+                    case JMSVendor.TEMP_TOPIC_TYPE:
+                        return TemporaryTopic.class;
+                    default:
+                        return Queue.class;
+                }
+            }
+
+            return Queue.class;
+        } else {
+            if (value == null) {
+                return null;
+            }
+            String valueString = value.toString();
+            HashSet<String> attributes = new HashSet<String>();
+            for (String x : valueString.split("\\s*,\\s*")) {
+                attributes.add(x);
+            }
+
+            if (QUEUE_ATTRIBUTES.equals(attributes)) {
+                return Queue.class;
+            }
+            if (TOPIC_ATTRIBUTES.equals(attributes)) {
+                return Topic.class;
+            }
+            if (TEMP_QUEUE_ATTRIBUTES.equals(attributes)) {
+                return TemporaryQueue.class;
+            }
+            if (TEMP_TOPIC_ATTRIBUTES.equals(attributes)) {
+                return TemporaryTopic.class;
+            }
+            return Destination.class;
+        }
+    }
+
+    private void setProperty(Message msg, String key, Object value) throws JMSException {
+        if (value instanceof UnsignedLong) {
+            long v = ((UnsignedLong) value).longValue();
+            msg.setLongProperty(key, v);
+        } else if (value instanceof UnsignedInteger) {
+            long v = ((UnsignedInteger) value).longValue();
+            if (Integer.MIN_VALUE <= v && v <= Integer.MAX_VALUE) {
+                msg.setIntProperty(key, (int) v);
+            } else {
+                msg.setLongProperty(key, v);
+            }
+        } else if (value instanceof UnsignedShort) {
+            int v = ((UnsignedShort) value).intValue();
+            if (Short.MIN_VALUE <= v && v <= Short.MAX_VALUE) {
+                msg.setShortProperty(key, (short) v);
+            } else {
+                msg.setIntProperty(key, v);
+            }
+        } else if (value instanceof UnsignedByte) {
+            short v = ((UnsignedByte) value).shortValue();
+            if (Byte.MIN_VALUE <= v && v <= Byte.MAX_VALUE) {
+                msg.setByteProperty(key, (byte) v);
+            } else {
+                msg.setShortProperty(key, v);
+            }
+        } else if (value instanceof Symbol) {
+            msg.setStringProperty(key, value.toString());
+        } else if (value instanceof Decimal128) {
+            msg.setDoubleProperty(key, ((Decimal128) value).doubleValue());
+        } else if (value instanceof Decimal64) {
+            msg.setDoubleProperty(key, ((Decimal64) value).doubleValue());
+        } else if (value instanceof Decimal32) {
+            msg.setFloatProperty(key, ((Decimal32) value).floatValue());
+        } else if (value instanceof Binary) {
+            msg.setStringProperty(key, value.toString());
+        } else {
+            msg.setObjectProperty(key, value);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/6e693196/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
new file mode 100644
index 0000000..63e216c
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.message;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.jms.BytesMessage;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.Section;
+
+public class JMSMappingInboundTransformer extends InboundTransformer {
+
+    public JMSMappingInboundTransformer(JMSVendor vendor) {
+        super(vendor);
+    }
+
+    @SuppressWarnings({ "unchecked" })
+    @Override
+    public Message transform(EncodedMessage amqpMessage) throws Exception {
+        org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
+
+        Message rc;
+        final Section body = amqp.getBody();
+        if (body == null) {
+            rc = vendor.createMessage();
+        } else if (body instanceof Data) {
+            Binary d = ((Data) body).getValue();
+            BytesMessage m = vendor.createBytesMessage();
+            m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
+            rc = m;
+        } else if (body instanceof AmqpSequence) {
+            AmqpSequence sequence = (AmqpSequence) body;
+            StreamMessage m = vendor.createStreamMessage();
+            for (Object item : sequence.getValue()) {
+                m.writeObject(item);
+            }
+            rc = m;
+        } else if (body instanceof AmqpValue) {
+            Object value = ((AmqpValue) body).getValue();
+            if (value == null) {
+                rc = vendor.createObjectMessage();
+            }
+            if (value instanceof String) {
+                TextMessage m = vendor.createTextMessage();
+                m.setText((String) value);
+                rc = m;
+            } else if (value instanceof Binary) {
+                Binary d = (Binary) value;
+                BytesMessage m = vendor.createBytesMessage();
+                m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
+                rc = m;
+            } else if (value instanceof List) {
+                StreamMessage m = vendor.createStreamMessage();
+                for (Object item : (List<Object>) value) {
+                    m.writeObject(item);
+                }
+                rc = m;
+            } else if (value instanceof Map) {
+                MapMessage m = vendor.createMapMessage();
+                final Set<Map.Entry<String, Object>> set = ((Map<String, Object>) value).entrySet();
+                for (Map.Entry<String, Object> entry : set) {
+                    m.setObject(entry.getKey(), entry.getValue());
+                }
+                rc = m;
+            } else {
+                ObjectMessage m = vendor.createObjectMessage();
+                m.setObject((Serializable) value);
+                rc = m;
+            }
+        } else {
+            throw new RuntimeException("Unexpected body type: " + body.getClass());
+        }
+        rc.setJMSDeliveryMode(defaultDeliveryMode);
+        rc.setJMSPriority(defaultPriority);
+        rc.setJMSExpiration(defaultTtl);
+
+        populateMessage(rc, amqp);
+
+        rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
+        rc.setBooleanProperty(prefixVendor + "NATIVE", false);
+        return rc;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/6e693196/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
new file mode 100644
index 0000000..768bb24
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.message;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Enumeration;
+import java.util.HashMap;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedByte;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Footer;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.codec.CompositeWritableBuffer;
+import org.apache.qpid.proton.codec.DroppingWritableBuffer;
+import org.apache.qpid.proton.codec.WritableBuffer;
+import org.apache.qpid.proton.message.ProtonJMessage;
+
+public class JMSMappingOutboundTransformer extends OutboundTransformer {
+
+    public JMSMappingOutboundTransformer(JMSVendor vendor) {
+        super(vendor);
+    }
+
+    @Override
+    public EncodedMessage transform(Message msg) throws Exception {
+        if (msg == null) {
+            return null;
+        }
+
+        try {
+            if (msg.getBooleanProperty(prefixVendor + "NATIVE")) {
+                return null;
+            }
+        } catch (MessageFormatException e) {
+            return null;
+        }
+        ProtonJMessage amqp = convert(msg);
+
+        long messageFormat;
+        try {
+            messageFormat = msg.getLongProperty(this.messageFormatKey);
+        } catch (MessageFormatException e) {
+            return null;
+        }
+
+        ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]);
+        final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
+        int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
+        if (overflow.position() > 0) {
+            buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]);
+            c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
+        }
+
+        return new EncodedMessage(messageFormat, buffer.array(), 0, c);
+    }
+
+    /**
+     * Perform the conversion between JMS Message and Proton Message without
+     * re-encoding it to array. This is needed because some frameworks may elect
+     * to do this on their own way (Netty for instance using Nettybuffers)
+     *
+     * @param msg
+     * @return
+     * @throws Exception
+     */
+    public ProtonJMessage convert(Message msg) throws JMSException, UnsupportedEncodingException {
+        Header header = new Header();
+        Properties props = new Properties();
+        HashMap<Symbol, Object> daMap = null;
+        HashMap<Symbol, Object> maMap = null;
+        HashMap apMap = null;
+        Section body = null;
+        HashMap footerMap = null;
+        if (msg instanceof BytesMessage) {
+            BytesMessage m = (BytesMessage) msg;
+            byte data[] = new byte[(int) m.getBodyLength()];
+            m.readBytes(data);
+            m.reset(); // Need to reset after readBytes or future readBytes
+                       // calls (ex: redeliveries) will fail and return -1
+            body = new Data(new Binary(data));
+        }
+        if (msg instanceof TextMessage) {
+            body = new AmqpValue(((TextMessage) msg).getText());
+        }
+        if (msg instanceof MapMessage) {
+            final HashMap<String, Object> map = new HashMap<String, Object>();
+            final MapMessage m = (MapMessage) msg;
+            final Enumeration<String> names = m.getMapNames();
+            while (names.hasMoreElements()) {
+                String key = names.nextElement();
+                map.put(key, m.getObject(key));
+            }
+            body = new AmqpValue(map);
+        }
+        if (msg instanceof StreamMessage) {
+            ArrayList<Object> list = new ArrayList<Object>();
+            final StreamMessage m = (StreamMessage) msg;
+            try {
+                while (true) {
+                    list.add(m.readObject());
+                }
+            } catch (MessageEOFException e) {
+            }
+            body = new AmqpSequence(list);
+        }
+        if (msg instanceof ObjectMessage) {
+            body = new AmqpValue(((ObjectMessage) msg).getObject());
+        }
+
+        header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false);
+        header.setPriority(new UnsignedByte((byte) msg.getJMSPriority()));
+        if (msg.getJMSType() != null) {
+            if (maMap == null) {
+                maMap = new HashMap<Symbol, Object>();
+            }
+            maMap.put(Symbol.valueOf("x-opt-jms-type"), msg.getJMSType());
+        }
+        if (msg.getJMSMessageID() != null) {
+            props.setMessageId(msg.getJMSMessageID());
+        }
+        if (msg.getJMSDestination() != null) {
+            props.setTo(vendor.toAddress(msg.getJMSDestination()));
+            if (maMap == null) {
+                maMap = new HashMap<Symbol, Object>();
+            }
+            maMap.put(Symbol.valueOf("x-opt-to-type"), destinationAttributes(msg.getJMSDestination()));
+        }
+        if (msg.getJMSReplyTo() != null) {
+            props.setReplyTo(vendor.toAddress(msg.getJMSReplyTo()));
+            if (maMap == null) {
+                maMap = new HashMap<Symbol, Object>();
+            }
+            maMap.put(Symbol.valueOf("x-opt-reply-type"), destinationAttributes(msg.getJMSReplyTo()));
+        }
+        if (msg.getJMSCorrelationID() != null) {
+            props.setCorrelationId(msg.getJMSCorrelationID());
+        }
+        if (msg.getJMSExpiration() != 0) {
+            long ttl = msg.getJMSExpiration() - System.currentTimeMillis();
+            if (ttl < 0) {
+                ttl = 1;
+            }
+            header.setTtl(new UnsignedInteger((int) ttl));
+
+            props.setAbsoluteExpiryTime(new Date(msg.getJMSExpiration()));
+        }
+        if (msg.getJMSTimestamp() != 0) {
+            props.setCreationTime(new Date(msg.getJMSTimestamp()));
+        }
+
+        final Enumeration<String> keys = msg.getPropertyNames();
+        while (keys.hasMoreElements()) {
+            String key = keys.nextElement();
+            if (key.equals(messageFormatKey) || key.equals(nativeKey)) {
+                // skip..
+            } else if (key.equals(firstAcquirerKey)) {
+                header.setFirstAcquirer(msg.getBooleanProperty(key));
+            } else if (key.startsWith("JMSXDeliveryCount")) {
+                // The AMQP delivery-count field only includes prior failed delivery attempts,
+                // whereas JMSXDeliveryCount includes the first/current delivery attempt.
+                int amqpDeliveryCount = msg.getIntProperty(key) - 1;
+                if (amqpDeliveryCount > 0) {
+                    header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount));
+                }
+            } else if (key.startsWith("JMSXUserID")) {
+                String value = msg.getStringProperty(key);
+                props.setUserId(new Binary(value.getBytes("UTF-8")));
+            } else if (key.startsWith("JMSXGroupID")) {
+                String value = msg.getStringProperty(key);
+                props.setGroupId(value);
+                if (apMap == null) {
+                    apMap = new HashMap();
+                }
+                apMap.put(key, value);
+            } else if (key.startsWith("JMSXGroupSeq")) {
+                UnsignedInteger value = new UnsignedInteger(msg.getIntProperty(key));
+                props.setGroupSequence(value);
+                if (apMap == null) {
+                    apMap = new HashMap();
+                }
+                apMap.put(key, value);
+            } else if (key.startsWith(prefixDeliveryAnnotationsKey)) {
+                if (daMap == null) {
+                    daMap = new HashMap<Symbol, Object>();
+                }
+                String name = key.substring(prefixDeliveryAnnotationsKey.length());
+                daMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
+            } else if (key.startsWith(prefixMessageAnnotationsKey)) {
+                if (maMap == null) {
+                    maMap = new HashMap<Symbol, Object>();
+                }
+                String name = key.substring(prefixMessageAnnotationsKey.length());
+                maMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
+            } else if (key.equals(subjectKey)) {
+                props.setSubject(msg.getStringProperty(key));
+            } else if (key.equals(contentTypeKey)) {
+                props.setContentType(Symbol.getSymbol(msg.getStringProperty(key)));
+            } else if (key.equals(contentEncodingKey)) {
+                props.setContentEncoding(Symbol.getSymbol(msg.getStringProperty(key)));
+            } else if (key.equals(replyToGroupIDKey)) {
+                props.setReplyToGroupId(msg.getStringProperty(key));
+            } else if (key.startsWith(prefixFooterKey)) {
+                if (footerMap == null) {
+                    footerMap = new HashMap();
+                }
+                String name = key.substring(prefixFooterKey.length());
+                footerMap.put(name, msg.getObjectProperty(key));
+            } else {
+                if (apMap == null) {
+                    apMap = new HashMap();
+                }
+                apMap.put(key, msg.getObjectProperty(key));
+            }
+        }
+
+        MessageAnnotations ma = null;
+        if (maMap != null) {
+            ma = new MessageAnnotations(maMap);
+        }
+        DeliveryAnnotations da = null;
+        if (daMap != null) {
+            da = new DeliveryAnnotations(daMap);
+        }
+        ApplicationProperties ap = null;
+        if (apMap != null) {
+            ap = new ApplicationProperties(apMap);
+        }
+        Footer footer = null;
+        if (footerMap != null) {
+            footer = new Footer(footerMap);
+        }
+
+        return (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(header, da, ma, props, ap, body, footer);
+    }
+
+    private Object destinationAttributes(Destination destination) {
+        if (isUseByteDestinationTypeAnnotations()) {
+            if (destination instanceof Queue) {
+                if (destination instanceof TemporaryQueue) {
+                    return JMSVendor.TEMP_QUEUE_TYPE;
+                } else {
+                    return JMSVendor.QUEUE_TYPE;
+                }
+            }
+            if (destination instanceof Topic) {
+                if (destination instanceof TemporaryTopic) {
+                    return JMSVendor.TEMP_TOPIC_TYPE;
+                } else {
+                    return JMSVendor.TOPIC_TYPE;
+                }
+            }
+            return JMSVendor.QUEUE_TYPE;
+        } else {
+            if (destination instanceof Queue) {
+                if (destination instanceof TemporaryQueue) {
+                    return "temporary,queue";
+                } else {
+                    return "queue";
+                }
+            }
+            if (destination instanceof Topic) {
+                if (destination instanceof TemporaryTopic) {
+                    return "temporary,topic";
+                } else {
+                    return "topic";
+                }
+            }
+            return "";
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/6e693196/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSVendor.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSVendor.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSVendor.java
new file mode 100644
index 0000000..33d77c4
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSVendor.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.message;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+abstract public class JMSVendor {
+
+    public static final byte QUEUE_TYPE = 0x00;
+    public static final byte TOPIC_TYPE = 0x01;
+    public static final byte TEMP_QUEUE_TYPE = 0x02;
+    public static final byte TEMP_TOPIC_TYPE = 0x03;
+
+    public abstract BytesMessage createBytesMessage();
+
+    public abstract StreamMessage createStreamMessage();
+
+    public abstract Message createMessage();
+
+    public abstract TextMessage createTextMessage();
+
+    public abstract ObjectMessage createObjectMessage();
+
+    public abstract MapMessage createMapMessage();
+
+    public abstract void setJMSXUserID(Message message, String value);
+
+    @Deprecated
+    public Destination createDestination(String name) {
+        return null;
+    }
+
+    public <T extends Destination> T createDestination(String name, Class<T> kind) {
+        return kind.cast(createDestination(name));
+    }
+
+    public abstract void setJMSXGroupID(Message message, String groupId);
+
+    public abstract void setJMSXGroupSequence(Message message, int value);
+
+    public abstract void setJMSXDeliveryCount(Message message, long value);
+
+    public abstract String toAddress(Destination destination);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/6e693196/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java
new file mode 100644
index 0000000..6c7d8ad
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.message;
+
+import javax.jms.Message;
+
+public abstract class OutboundTransformer {
+
+    JMSVendor vendor;
+    String prefixVendor;
+
+    String prefixDeliveryAnnotations = "DA_";
+    String prefixMessageAnnotations= "MA_";
+    String prefixFooter = "FT_";
+
+    String messageFormatKey;
+    String nativeKey;
+    String firstAcquirerKey;
+    String prefixDeliveryAnnotationsKey;
+    String prefixMessageAnnotationsKey;
+    String subjectKey;
+    String contentTypeKey;
+    String contentEncodingKey;
+    String replyToGroupIDKey;
+    String prefixFooterKey;
+
+    private boolean useByteDestinationTypeAnnotations;
+
+   public OutboundTransformer(JMSVendor vendor) {
+        this.vendor = vendor;
+        this.setPrefixVendor("JMS_AMQP_");
+    }
+
+    public abstract EncodedMessage transform(Message jms) throws Exception;
+
+    public boolean isUseByteDestinationTypeAnnotations()
+    {
+        return useByteDestinationTypeAnnotations;
+    }
+
+    public void setUseByteDestinationTypeAnnotations(boolean useByteDestinationTypeAnnotations)
+    {
+        this.useByteDestinationTypeAnnotations = useByteDestinationTypeAnnotations;
+    }
+
+    public String getPrefixVendor() {
+        return prefixVendor;
+    }
+
+    public void setPrefixVendor(String prefixVendor) {
+        this.prefixVendor = prefixVendor;
+
+        messageFormatKey = prefixVendor + "MESSAGE_FORMAT";
+        nativeKey = prefixVendor + "NATIVE";
+        firstAcquirerKey = prefixVendor + "FirstAcquirer";
+        prefixDeliveryAnnotationsKey = prefixVendor + prefixDeliveryAnnotations;
+        prefixMessageAnnotationsKey = prefixVendor + prefixMessageAnnotations;
+        subjectKey =  prefixVendor +"Subject";
+        contentTypeKey = prefixVendor +"ContentType";
+        contentEncodingKey = prefixVendor +"ContentEncoding";
+        replyToGroupIDKey = prefixVendor +"ReplyToGroupID";
+        prefixFooterKey = prefixVendor + prefixFooter;
+
+    }
+
+    public JMSVendor getVendor() {
+        return vendor;
+    }
+
+    public void setVendor(JMSVendor vendor) {
+        this.vendor = vendor;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/6e693196/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java
new file mode 100644
index 0000000..576c06e
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.message;
+
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.Queue;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.message.Message;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class JMSMappingInboundTransformerTest {
+
+    @Test
+    public void testTransformMessageWithAmqpValueStringCreatesTextMessage() throws Exception {
+        TextMessage mockTextMessage = createMockTextMessage();
+        JMSVendor mockVendor = createMockVendor(mockTextMessage);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(mockVendor);
+
+        String contentString = "myTextMessageContent";
+        Message amqp = Message.Factory.create();
+        amqp.setBody(new AmqpValue(contentString));
+
+        EncodedMessage em = encodeMessage(amqp);
+
+        javax.jms.Message jmsMessage = transformer.transform(em);
+
+        assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
+        Mockito.verify(mockTextMessage).setText(contentString);
+        assertSame("Expected provided mock message, got a different one", mockTextMessage, jmsMessage);
+    }
+
+    // ======= JMSDestination Handling =========
+
+    // --- String type annotation ---
+    @Test
+    public void testTransformWithNoToTypeDestinationTypeAnnotation() throws Exception {
+        doTransformWithToTypeDestinationTypeAnnotationTestImpl(null, Destination.class, false);
+    }
+
+    @Test
+    public void testTransformWithQueueStringToTypeDestinationTypeAnnotation() throws Exception {
+        doTransformWithToTypeDestinationTypeAnnotationTestImpl("queue", Queue.class, false);
+    }
+
+    @Test
+    public void testTransformWithTemporaryQueueStringToTypeDestinationTypeAnnotation() throws Exception {
+        doTransformWithToTypeDestinationTypeAnnotationTestImpl("queue,temporary", TemporaryQueue.class, false);
+    }
+
+    @Test
+    public void testTransformWithTopicStringToTypeDestinationTypeAnnotation() throws Exception {
+        doTransformWithToTypeDestinationTypeAnnotationTestImpl("topic", Topic.class, false);
+    }
+
+    @Test
+    public void testTransformWithTemporaryTopicStringToTypeDestinationTypeAnnotation() throws Exception {
+        doTransformWithToTypeDestinationTypeAnnotationTestImpl("topic,temporary", TemporaryTopic.class, false);
+    }
+
+    // --- byte type annotation ---
+
+    @Test
+    public void testTransformWithNoToTypeDestinationTypeAnnotationUsingByteAnnotation() throws Exception {
+        doTransformWithToTypeDestinationTypeAnnotationTestImpl(null, Queue.class, true);
+    }
+
+    @Test
+    public void testTransformWithQueueByteToTypeDestinationTypeAnnotation() throws Exception {
+        doTransformWithToTypeDestinationTypeAnnotationTestImpl(JMSVendor.QUEUE_TYPE, Queue.class, true);
+    }
+
+    @Test
+    public void testTransformWithTemporaryQueueByteToTypeDestinationTypeAnnotation() throws Exception {
+        doTransformWithToTypeDestinationTypeAnnotationTestImpl(JMSVendor.TEMP_QUEUE_TYPE, TemporaryQueue.class, true);
+    }
+
+    @Test
+    public void testTransformWithTopicByteToTypeDestinationTypeAnnotation() throws Exception {
+        doTransformWithToTypeDestinationTypeAnnotationTestImpl(JMSVendor.TOPIC_TYPE, Topic.class, true);
+    }
+
+    @Test
+    public void testTransformWithTemporaryTopicByteToTypeDestinationTypeAnnotation() throws Exception {
+        doTransformWithToTypeDestinationTypeAnnotationTestImpl(JMSVendor.TEMP_TOPIC_TYPE, TemporaryTopic.class, true);
+    }
+
+    private void doTransformWithToTypeDestinationTypeAnnotationTestImpl(Object toTypeAnnotationValue, Class<? extends Destination> expectedClass,
+        boolean byteType) throws Exception {
+        TextMessage mockTextMessage = createMockTextMessage();
+        JMSVendor mockVendor = createMockVendor(mockTextMessage);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(mockVendor);
+        if (byteType) {
+            transformer.setUseByteDestinationTypeAnnotations(true);
+        }
+
+        String toAddress = "toAddress";
+        Message amqp = Message.Factory.create();
+        amqp.setBody(new AmqpValue("myTextMessageContent"));
+        amqp.setAddress(toAddress);
+        if (toTypeAnnotationValue != null) {
+            Map<Symbol, Object> map = new HashMap<Symbol, Object>();
+            map.put(Symbol.valueOf("x-opt-to-type"), toTypeAnnotationValue);
+            MessageAnnotations ma = new MessageAnnotations(map);
+            amqp.setMessageAnnotations(ma);
+        }
+
+        EncodedMessage em = encodeMessage(amqp);
+
+        javax.jms.Message jmsMessage = transformer.transform(em);
+        assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
+
+        // Verify that createDestination was called with the provided 'to'
+        // address and 'Destination' class
+        Mockito.verify(mockVendor).createDestination(toAddress, expectedClass);
+    }
+
+    // ======= JMSReplyTo Handling =========
+
+    // --- String type annotation ---
+    @Test
+    public void testTransformWithNoReplyToTypeDestinationTypeAnnotation() throws Exception {
+        doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(null, Destination.class, false);
+    }
+
+    @Test
+    public void testTransformWithQueueStringReplyToTypeDestinationTypeAnnotation() throws Exception {
+        doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl("queue", Queue.class, false);
+    }
+
+    @Test
+    public void testTransformWithTemporaryQueueStringReplyToTypeDestinationTypeAnnotation() throws Exception {
+        doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl("queue,temporary", TemporaryQueue.class, false);
+    }
+
+    @Test
+    public void testTransformWithTopicStringReplyToTypeDestinationTypeAnnotation() throws Exception {
+        doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl("topic", Topic.class, false);
+    }
+
+    @Test
+    public void testTransformWithTemporaryTopicStringReplyToTypeDestinationTypeAnnotation() throws Exception {
+        doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl("topic,temporary", TemporaryTopic.class, false);
+    }
+
+    // --- byte type annotation ---
+    @Test
+    public void testTransformWithNoReplyToTypeDestinationTypeAnnotationUsingByteAnnotation() throws Exception {
+        doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(null, Queue.class, true);
+    }
+
+    @Test
+    public void testTransformWithQueueByteReplyToTypeDestinationTypeAnnotation() throws Exception {
+        doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(JMSVendor.QUEUE_TYPE, Queue.class, true);
+    }
+
+    @Test
+    public void testTransformWithTemporaryQueueByteReplyToTypeDestinationTypeAnnotation() throws Exception {
+        doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(JMSVendor.TEMP_QUEUE_TYPE, TemporaryQueue.class, true);
+    }
+
+    @Test
+    public void testTransformWithTopicByteReplyToTypeDestinationTypeAnnotation() throws Exception {
+        doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(JMSVendor.TOPIC_TYPE, Topic.class, true);
+    }
+
+    @Test
+    public void testTransformWithTemporaryTopicByteReplyToTypeDestinationTypeAnnotation() throws Exception {
+        doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(JMSVendor.TEMP_TOPIC_TYPE, TemporaryTopic.class, true);
+    }
+
+    private void doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(Object replyToTypeAnnotationValue, Class<? extends Destination> expectedClass,
+        boolean byteType) throws Exception {
+        TextMessage mockTextMessage = createMockTextMessage();
+        JMSVendor mockVendor = createMockVendor(mockTextMessage);
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(mockVendor);
+        if (byteType) {
+            transformer.setUseByteDestinationTypeAnnotations(true);
+        }
+
+        String replyToAddress = "replyToAddress";
+        Message amqp = Message.Factory.create();
+        amqp.setBody(new AmqpValue("myTextMessageContent"));
+        amqp.setReplyTo(replyToAddress);
+        if (replyToTypeAnnotationValue != null) {
+            Map<Symbol, Object> map = new HashMap<Symbol, Object>();
+            map.put(Symbol.valueOf("x-opt-reply-type"), replyToTypeAnnotationValue);
+            MessageAnnotations ma = new MessageAnnotations(map);
+            amqp.setMessageAnnotations(ma);
+        }
+
+        EncodedMessage em = encodeMessage(amqp);
+
+        javax.jms.Message jmsMessage = transformer.transform(em);
+        assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
+
+        // Verify that createDestination was called with the provided 'replyTo'
+        // address and 'Destination' class
+        Mockito.verify(mockVendor).createDestination(replyToAddress, expectedClass);
+    }
+
+    // ======= Utility Methods =========
+
+    private TextMessage createMockTextMessage() {
+        TextMessage mockTextMessage = Mockito.mock(TextMessage.class);
+
+        return mockTextMessage;
+    }
+
+    private JMSVendor createMockVendor(TextMessage mockTextMessage) {
+        JMSVendor mockVendor = Mockito.mock(JMSVendor.class);
+        Mockito.when(mockVendor.createTextMessage()).thenReturn(mockTextMessage);
+
+        return mockVendor;
+    }
+
+    private EncodedMessage encodeMessage(Message message) {
+        byte[] encodeBuffer = new byte[1024 * 8];
+        int encodedSize;
+        while (true) {
+            try {
+                encodedSize = message.encode(encodeBuffer, 0, encodeBuffer.length);
+                break;
+            } catch (java.nio.BufferOverflowException e) {
+                encodeBuffer = new byte[encodeBuffer.length * 2];
+            }
+        }
+
+        long messageFormat = 0;
+        return new EncodedMessage(messageFormat, encodeBuffer, 0, encodedSize);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/6e693196/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
new file mode 100644
index 0000000..0c4a9c2
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.message;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Collections;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.Queue;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.message.Message;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class JMSMappingOutboundTransformerTest {
+
+    @Test
+    public void testConvertMessageWithTextMessageCreatesAmqpValueStringBody() throws Exception {
+        String contentString = "myTextMessageContent";
+        TextMessage mockTextMessage = createMockTextMessage();
+        Mockito.when(mockTextMessage.getText()).thenReturn(contentString);
+        JMSVendor mockVendor = createMockVendor();
+
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(mockVendor);
+
+        Message amqp = transformer.convert(mockTextMessage);
+
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof AmqpValue);
+        assertEquals(contentString, ((AmqpValue) amqp.getBody()).getValue());
+    }
+
+    @Test
+    public void testDefaultsTolStringDestinationTypeAnnotationValues() {
+        JMSVendor mockVendor = createMockVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(mockVendor);
+
+        assertFalse("Expected the older string style annotation values to be used by default", transformer.isUseByteDestinationTypeAnnotations());
+    }
+
+    @Test
+    public void testSetGetIsUseByteDestinationTypeAnnotations() {
+        JMSVendor mockVendor = createMockVendor();
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(mockVendor);
+
+        assertFalse(transformer.isUseByteDestinationTypeAnnotations());
+        transformer.setUseByteDestinationTypeAnnotations(true);
+        assertTrue(transformer.isUseByteDestinationTypeAnnotations());
+    }
+
+    // ======= JMSDestination Handling =========
+
+    // --- String type annotation ---
+    @Test
+    public void testConvertMessageWithJMSDestinationNull() throws Exception {
+        doTestConvertMessageWithJMSDestination(null, null, false);
+    }
+
+    @Test
+    public void testConvertMessageWithJMSDestinationQueue() throws Exception {
+        Queue mockDest = Mockito.mock(Queue.class);
+
+        doTestConvertMessageWithJMSDestination(mockDest, "queue", false);
+    }
+
+    @Test
+    public void testConvertMessageWithJMSDestinationTemporaryQueue() throws Exception {
+        TemporaryQueue mockDest = Mockito.mock(TemporaryQueue.class);
+
+        doTestConvertMessageWithJMSDestination(mockDest, "temporary,queue", false);
+    }
+
+    @Test
+    public void testConvertMessageWithJMSDestinationTopic() throws Exception {
+        Topic mockDest = Mockito.mock(Topic.class);
+
+        doTestConvertMessageWithJMSDestination(mockDest, "topic", false);
+    }
+
+    @Test
+    public void testConvertMessageWithJMSDestinationTemporaryTopic() throws Exception {
+        TemporaryTopic mockDest = Mockito.mock(TemporaryTopic.class);
+
+        doTestConvertMessageWithJMSDestination(mockDest, "temporary,topic", false);
+    }
+
+    // --- byte type annotation ---
+
+    @Test
+    public void testConvertMessageWithJMSDestinationNullUsingByteAnnotation() throws Exception {
+        doTestConvertMessageWithJMSDestination(null, null, true);
+    }
+
+    @Test
+    public void testConvertMessageWithJMSDestinationQueueUsingByteAnnotation() throws Exception {
+        Queue mockDest = Mockito.mock(Queue.class);
+
+        doTestConvertMessageWithJMSDestination(mockDest, JMSVendor.QUEUE_TYPE, true);
+    }
+
+    @Test
+    public void testConvertMessageWithJMSDestinationTemporaryQueueUsingByteAnnotation() throws Exception {
+        TemporaryQueue mockDest = Mockito.mock(TemporaryQueue.class);
+
+        doTestConvertMessageWithJMSDestination(mockDest, JMSVendor.TEMP_QUEUE_TYPE, true);
+    }
+
+    @Test
+    public void testConvertMessageWithJMSDestinationTopicUsingByteAnnotation() throws Exception {
+        Topic mockDest = Mockito.mock(Topic.class);
+
+        doTestConvertMessageWithJMSDestination(mockDest, JMSVendor.TOPIC_TYPE, true);
+    }
+
+    @Test
+    public void testConvertMessageWithJMSDestinationTemporaryTopicUsingByteAnnotation() throws Exception {
+        TemporaryTopic mockDest = Mockito.mock(TemporaryTopic.class);
+
+        doTestConvertMessageWithJMSDestination(mockDest, JMSVendor.TEMP_TOPIC_TYPE, true);
+    }
+
+    @Test
+    public void testConvertMessageWithJMSDestinationUnkownUsingByteAnnotation() throws Exception {
+        Destination mockDest = Mockito.mock(Destination.class);
+
+        doTestConvertMessageWithJMSDestination(mockDest, JMSVendor.QUEUE_TYPE, true);
+    }
+
+    private void doTestConvertMessageWithJMSDestination(Destination jmsDestination, Object expectedAnnotationValue, boolean byteType) throws Exception {
+        TextMessage mockTextMessage = createMockTextMessage();
+        Mockito.when(mockTextMessage.getText()).thenReturn("myTextMessageContent");
+        Mockito.when(mockTextMessage.getJMSDestination()).thenReturn(jmsDestination);
+        JMSVendor mockVendor = createMockVendor();
+        String toAddress = "someToAddress";
+        if (jmsDestination != null) {
+            Mockito.when(mockVendor.toAddress(Mockito.any(Destination.class))).thenReturn(toAddress);
+        }
+
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(mockVendor);
+        if (byteType) {
+            transformer.setUseByteDestinationTypeAnnotations(true);
+        }
+
+        Message amqp = transformer.convert(mockTextMessage);
+
+        MessageAnnotations ma = amqp.getMessageAnnotations();
+        Map<Symbol, Object> maMap = ma == null ? null : ma.getValue();
+        if (maMap != null) {
+            Object actualValue = maMap.get(Symbol.valueOf("x-opt-to-type"));
+            assertEquals("Unexpected annotation value", expectedAnnotationValue, actualValue);
+        } else if (expectedAnnotationValue != null) {
+            fail("Expected annotation value, but there were no annotations");
+        }
+
+        if (jmsDestination != null) {
+            assertEquals("Unexpected 'to' address", toAddress, amqp.getAddress());
+        }
+    }
+
+    // ======= JMSReplyTo Handling =========
+
+    // --- String type annotation ---
+    @Test
+    public void testConvertMessageWithJMSReplyToNull() throws Exception {
+        doTestConvertMessageWithJMSReplyTo(null, null, false);
+    }
+
+    @Test
+    public void testConvertMessageWithJMSReplyToQueue() throws Exception {
+        Queue mockDest = Mockito.mock(Queue.class);
+
+        doTestConvertMessageWithJMSReplyTo(mockDest, "queue", false);
+    }
+
+    @Test
+    public void testConvertMessageWithJMSReplyToTemporaryQueue() throws Exception {
+        TemporaryQueue mockDest = Mockito.mock(TemporaryQueue.class);
+
+        doTestConvertMessageWithJMSReplyTo(mockDest, "temporary,queue", false);
+    }
+
+    @Test
+    public void testConvertMessageWithJMSReplyToTopic() throws Exception {
+        Topic mockDest = Mockito.mock(Topic.class);
+
+        doTestConvertMessageWithJMSReplyTo(mockDest, "topic", false);
+    }
+
+    @Test
+    public void testConvertMessageWithJMSReplyToTemporaryTopic() throws Exception {
+        TemporaryTopic mockDest = Mockito.mock(TemporaryTopic.class);
+
+        doTestConvertMessageWithJMSReplyTo(mockDest, "temporary,topic", false);
+    }
+
+    // --- byte type annotation ---
+    @Test
+    public void testConvertMessageWithJMSReplyToNullUsingByteAnnotation() throws Exception {
+        doTestConvertMessageWithJMSReplyTo(null, null, true);
+    }
+
+    @Test
+    public void testConvertMessageWithJMSReplyToQueueUsingByteAnnotation() throws Exception {
+        Queue mockDest = Mockito.mock(Queue.class);
+
+        doTestConvertMessageWithJMSReplyTo(mockDest, JMSVendor.QUEUE_TYPE, true);
+    }
+
+    @Test
+    public void testConvertMessageWithJMSReplyToTemporaryQueueUsingByteAnnotation() throws Exception {
+        TemporaryQueue mockDest = Mockito.mock(TemporaryQueue.class);
+
+        doTestConvertMessageWithJMSReplyTo(mockDest, JMSVendor.TEMP_QUEUE_TYPE, true);
+    }
+
+    @Test
+    public void testConvertMessageWithJMSReplyToTopicUsingByteAnnotation() throws Exception {
+        Topic mockDest = Mockito.mock(Topic.class);
+
+        doTestConvertMessageWithJMSReplyTo(mockDest, JMSVendor.TOPIC_TYPE, true);
+    }
+
+    @Test
+    public void testConvertMessageWithJMSReplyToTemporaryTopicUsingByteAnnotation() throws Exception {
+        TemporaryTopic mockDest = Mockito.mock(TemporaryTopic.class);
+
+        doTestConvertMessageWithJMSReplyTo(mockDest, JMSVendor.TEMP_TOPIC_TYPE, true);
+    }
+
+    @Test
+    public void testConvertMessageWithJMSReplyToUnkownUsingByteAnnotation() throws Exception {
+        Destination mockDest = Mockito.mock(Destination.class);
+
+        doTestConvertMessageWithJMSReplyTo(mockDest, JMSVendor.QUEUE_TYPE, true);
+    }
+
+    private void doTestConvertMessageWithJMSReplyTo(Destination jmsReplyTo, Object expectedAnnotationValue, boolean byteType) throws Exception {
+        TextMessage mockTextMessage = createMockTextMessage();
+        Mockito.when(mockTextMessage.getText()).thenReturn("myTextMessageContent");
+        Mockito.when(mockTextMessage.getJMSReplyTo()).thenReturn(jmsReplyTo);
+        JMSVendor mockVendor = createMockVendor();
+        String replyToAddress = "someReplyToAddress";
+        if (jmsReplyTo != null) {
+            Mockito.when(mockVendor.toAddress(Mockito.any(Destination.class))).thenReturn(replyToAddress);
+        }
+
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(mockVendor);
+        if (byteType) {
+            transformer.setUseByteDestinationTypeAnnotations(true);
+        }
+
+        Message amqp = transformer.convert(mockTextMessage);
+
+        MessageAnnotations ma = amqp.getMessageAnnotations();
+        Map<Symbol, Object> maMap = ma == null ? null : ma.getValue();
+        if (maMap != null) {
+            Object actualValue = maMap.get(Symbol.valueOf("x-opt-reply-type"));
+            assertEquals("Unexpected annotation value", expectedAnnotationValue, actualValue);
+        } else if (expectedAnnotationValue != null) {
+            fail("Expected annotation value, but there were no annotations");
+        }
+
+        if (jmsReplyTo != null) {
+            assertEquals("Unexpected 'reply-to' address", replyToAddress, amqp.getReplyTo());
+        }
+    }
+
+    // ======= Utility Methods =========
+
+    private TextMessage createMockTextMessage() throws Exception {
+        TextMessage mockTextMessage = Mockito.mock(TextMessage.class);
+        Mockito.when(mockTextMessage.getPropertyNames()).thenReturn(Collections.enumeration(Collections.emptySet()));
+
+        return mockTextMessage;
+    }
+
+    private JMSVendor createMockVendor() {
+        JMSVendor mockVendor = Mockito.mock(JMSVendor.class);
+
+        return mockVendor;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/6e693196/activemq-tooling/activemq-maven-plugin/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-maven-plugin/pom.xml b/activemq-tooling/activemq-maven-plugin/pom.xml
index 95a70e6..1c2c4c8 100644
--- a/activemq-tooling/activemq-maven-plugin/pom.xml
+++ b/activemq-tooling/activemq-maven-plugin/pom.xml
@@ -67,11 +67,9 @@
       <groupId>org.apache.geronimo.specs</groupId>
       <artifactId>geronimo-j2ee-management_1.1_spec</artifactId>
     </dependency>
-
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>
-      <version>${mockito-version}</version>
       <scope>test</scope>
     </dependency>
   </dependencies>