You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/05/03 16:42:27 UTC

[1/2] activemq-artemis git commit: ARTEMIS-503 - replace proton-jms with proton-jms from ActiveMQ

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 7fb603f78 -> 2a415a80e


ARTEMIS-503 - replace proton-jms with proton-jms from ActiveMQ

Ive copied over the source itself

https://issues.apache.org/jira/browse/ARTEMIS-503


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c161ab46
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c161ab46
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c161ab46

Branch: refs/heads/master
Commit: c161ab46a607b2bddc9c7d637c950ae7b2ad1cfc
Parents: 7fb603f
Author: Andy Taylor <an...@gmail.com>
Authored: Tue May 3 13:51:15 2016 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue May 3 10:40:56 2016 -0400

----------------------------------------------------------------------
 artemis-distribution/pom.xml                    |   4 -
 artemis-distribution/src/main/assembly/dep.xml  |   1 -
 artemis-protocols/artemis-amqp-protocol/pom.xml |   4 -
 .../AMQPNativeOutboundTransformer.java          |  56 ----
 .../proton/converter/ActiveMQJMSVendor.java     |   2 +-
 .../converter/JMSMappingInboundTransformer.java |  49 ---
 .../JMSMappingOutboundTransformer.java          |  53 ---
 .../converter/ProtonMessageConverter.java       |   7 +-
 .../proton/converter/jms/ServerJMSMessage.java  |   7 +
 .../converter/jms/ServerJMSObjectMessage.java   |   2 +-
 .../converter/message/AMQPMessageIdHelper.java  | 257 +++++++++++++++
 .../message/AMQPNativeInboundTransformer.java   |  46 +++
 .../message/AMQPNativeOutboundTransformer.java  |  60 ++++
 .../message/AMQPRawInboundTransformer.java      |  60 ++++
 .../converter/message/EncodedMessage.java       |  67 ++++
 .../converter/message/InboundTransformer.java   | 317 ++++++++++++++++++
 .../message/JMSMappingInboundTransformer.java   | 126 +++++++
 .../message/JMSMappingOutboundTransformer.java  | 329 +++++++++++++++++++
 .../proton/converter/message/JMSVendor.java     |  53 +++
 .../converter/message/OutboundTransformer.java  |  69 ++++
 .../plug/ProtonSessionIntegrationCallback.java  |   2 +-
 .../core/protocol/proton/TestConversions.java   |   2 +-
 pom.xml                                         |  12 -
 23 files changed, 1400 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-distribution/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-distribution/pom.xml b/artemis-distribution/pom.xml
index 6f462b4..54cf4a0 100644
--- a/artemis-distribution/pom.xml
+++ b/artemis-distribution/pom.xml
@@ -186,10 +186,6 @@
          <groupId>io.netty</groupId>
          <artifactId>netty-codec-mqtt</artifactId>
       </dependency>
-      <dependency>
-         <groupId>org.apache.activemq</groupId>
-         <artifactId>activemq-amqp</artifactId>
-      </dependency>
    </dependencies>
 
    <build>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-distribution/src/main/assembly/dep.xml
----------------------------------------------------------------------
diff --git a/artemis-distribution/src/main/assembly/dep.xml b/artemis-distribution/src/main/assembly/dep.xml
index d723bae..4a0f2da 100644
--- a/artemis-distribution/src/main/assembly/dep.xml
+++ b/artemis-distribution/src/main/assembly/dep.xml
@@ -80,7 +80,6 @@
             <include>org.jboss.logging:jboss-logging</include>
             <include>io.netty:netty-all</include>
             <include>org.apache.qpid:proton-j</include>
-            <include>org.apache.activemq:activemq-amqp</include>
             <include>org.apache.activemq:activemq-client</include>
             <include>org.slf4j:slf4j-api</include>
             <include>io.airlift:airline</include>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/pom.xml b/artemis-protocols/artemis-amqp-protocol/pom.xml
index 78e9c3b..3f130ee 100644
--- a/artemis-protocols/artemis-amqp-protocol/pom.xml
+++ b/artemis-protocols/artemis-amqp-protocol/pom.xml
@@ -42,10 +42,6 @@
          <version>${project.version}</version>
       </dependency>
       <dependency>
-         <groupId>org.apache.activemq</groupId>
-         <artifactId>activemq-amqp</artifactId>
-      </dependency>
-      <dependency>
          <groupId>org.jboss.logging</groupId>
          <artifactId>jboss-logging-processor</artifactId>
          <scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/AMQPNativeOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/AMQPNativeOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/AMQPNativeOutboundTransformer.java
deleted file mode 100644
index c187ad0..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/AMQPNativeOutboundTransformer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.proton.converter;
-
-import org.apache.activemq.transport.amqp.message.OutboundTransformer;
-import org.apache.qpid.proton.amqp.UnsignedInteger;
-import org.apache.qpid.proton.amqp.messaging.Header;
-import org.apache.qpid.proton.message.ProtonJMessage;
-
-import javax.jms.BytesMessage;
-import javax.jms.JMSException;
-
-public class AMQPNativeOutboundTransformer {
-   static ProtonJMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException {
-      byte[] data = new byte[(int) msg.getBodyLength()];
-      msg.readBytes(data);
-      msg.reset();
-      int count = msg.getIntProperty("JMSXDeliveryCount");
-
-      // decode...
-      ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create();
-      int offset = 0;
-      int len = data.length;
-      while (len > 0) {
-         final int decoded = amqp.decode(data, offset, len);
-         assert decoded > 0 : "Make progress decoding the message";
-         offset += decoded;
-         len -= decoded;
-      }
-
-      // Update the DeliveryCount header...
-      // The AMQP delivery-count field only includes prior failed delivery attempts,
-      // whereas JMSXDeliveryCount includes the first/current delivery attempt. Subtract 1.
-      if (amqp.getHeader() == null) {
-         amqp.setHeader(new Header());
-      }
-
-      amqp.getHeader().setDeliveryCount(new UnsignedInteger(count - 1));
-
-      return amqp;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java
index 3af26dc..59e0edb 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java
@@ -26,6 +26,7 @@ import javax.jms.TextMessage;
 
 import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerDestination;
 import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSObjectMessage;
+import org.apache.activemq.artemis.core.protocol.proton.converter.message.JMSVendor;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
 import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSBytesMessage;
@@ -36,7 +37,6 @@ import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMST
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.utils.IDGenerator;
-import org.apache.activemq.transport.amqp.message.JMSVendor;
 
 public class ActiveMQJMSVendor implements JMSVendor {
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/JMSMappingInboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/JMSMappingInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/JMSMappingInboundTransformer.java
deleted file mode 100644
index 03f9104..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/JMSMappingInboundTransformer.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.proton.converter;
-
-import org.apache.activemq.transport.amqp.message.JMSVendor;
-import org.apache.qpid.proton.amqp.UnsignedLong;
-import org.apache.qpid.proton.amqp.messaging.Properties;
-
-import javax.jms.Message;
-
-class JMSMappingInboundTransformer extends org.apache.activemq.transport.amqp.message.JMSMappingInboundTransformer {
-
-   JMSMappingInboundTransformer(JMSVendor vendor) {
-      super(vendor);
-   }
-
-   @Override
-   protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
-      super.populateMessage(jms, amqp);
-      final Properties properties = amqp.getProperties();
-      if (properties != null) {
-         if (properties.getMessageId() != null) {
-            if (properties.getMessageId() instanceof Long) {
-               jms.setLongProperty(this.getPrefixVendor() + "NATIVE_LONG_MESSAGE_ID", (Long) properties.getMessageId());
-            }
-            else if (properties.getMessageId() instanceof UnsignedLong) {
-               jms.setLongProperty(this.getPrefixVendor() + "NATIVE_UNSIGNED_LONG_MESSAGE_ID", ((UnsignedLong) properties.getMessageId()).longValue());
-            }
-            else {
-               jms.setStringProperty(this.getPrefixVendor() + "NATIVE_STRING_MESSAGE_ID", properties.getMessageId().toString());
-            }
-         }
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/JMSMappingOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/JMSMappingOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/JMSMappingOutboundTransformer.java
deleted file mode 100644
index b643162..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/JMSMappingOutboundTransformer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.proton.converter;
-
-import org.apache.activemq.transport.amqp.message.JMSVendor;
-import org.apache.qpid.proton.amqp.UnsignedLong;
-import org.apache.qpid.proton.message.ProtonJMessage;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import java.io.UnsupportedEncodingException;
-import java.util.Map;
-
-class JMSMappingOutboundTransformer extends org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer {
-   JMSMappingOutboundTransformer(JMSVendor vendor) {
-      super(vendor);
-   }
-
-   @Override
-   public ProtonJMessage convert(Message msg) throws JMSException, UnsupportedEncodingException {
-      ProtonJMessage protonJMessage = super.convert(msg);
-
-      Map properties = protonJMessage.getApplicationProperties().getValue();
-
-      if (properties.containsKey(this.getPrefixVendor() + "NATIVE_LONG_MESSAGE_ID")) {
-         Long id = (Long) properties.remove(this.getPrefixVendor() + "NATIVE_LONG_MESSAGE_ID");
-         protonJMessage.setMessageId(id);
-      }
-      else if (properties.containsKey(this.getPrefixVendor() + "NATIVE_UNSIGNED_LONG_MESSAGE_ID")) {
-         Long id = (Long) properties.remove(this.getPrefixVendor() + "NATIVE_UNSIGNED_LONG_MESSAGE_ID");
-         protonJMessage.setMessageId(new UnsignedLong(id));
-      }
-      else if (properties.containsKey(this.getPrefixVendor() + "NATIVE_STRING_MESSAGE_ID")) {
-         String id = (String) properties.remove(this.getPrefixVendor() + "NATIVE_STRING_MESSAGE_ID");
-         protonJMessage.setMessageId(id);
-      }
-      return protonJMessage;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java
index 47011c1..6b4e99b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java
@@ -17,9 +17,12 @@
 package org.apache.activemq.artemis.core.protocol.proton.converter;
 
 import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
-import org.apache.activemq.transport.amqp.message.EncodedMessage;
-import org.apache.activemq.transport.amqp.message.InboundTransformer;
+import org.apache.activemq.artemis.core.protocol.proton.converter.message.AMQPNativeOutboundTransformer;
+import org.apache.activemq.artemis.core.protocol.proton.converter.message.EncodedMessage;
+import org.apache.activemq.artemis.core.protocol.proton.converter.message.InboundTransformer;
+import org.apache.activemq.artemis.core.protocol.proton.converter.message.JMSMappingInboundTransformer;
 import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSMessage;
+import org.apache.activemq.artemis.core.protocol.proton.converter.message.JMSMappingOutboundTransformer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
 import org.apache.activemq.artemis.utils.IDGenerator;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java
index 8f6ef9b..0d82236 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java
@@ -33,6 +33,7 @@ import org.apache.activemq.artemis.reader.MessageUtil;
 public class ServerJMSMessage implements Message {
 
    protected final MessageInternal message;
+   private final String NATIVE_MESSAGE_ID = "NATIVE_MESSAGE_ID";
 
    protected int deliveryCount;
 
@@ -65,11 +66,17 @@ public class ServerJMSMessage implements Message {
 
    @Override
    public final String getJMSMessageID() throws JMSException {
+      if (message.containsProperty(NATIVE_MESSAGE_ID)) {
+         return getStringProperty(NATIVE_MESSAGE_ID);
+      }
       return null;
    }
 
    @Override
    public final void setJMSMessageID(String id) throws JMSException {
+      if (id != null) {
+         message.putStringProperty(NATIVE_MESSAGE_ID, id);
+      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSObjectMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSObjectMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSObjectMessage.java
index 938f459..d5dbbe8 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSObjectMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSObjectMessage.java
@@ -18,11 +18,11 @@ package org.apache.activemq.artemis.core.protocol.proton.converter.jms;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.message.impl.MessageInternal;
-import org.apache.activemq.util.ByteArrayOutputStream;
 
 import javax.jms.JMSException;
 import javax.jms.ObjectMessage;
 import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPMessageIdHelper.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPMessageIdHelper.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPMessageIdHelper.java
new file mode 100644
index 0000000..479c1f7
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPMessageIdHelper.java
@@ -0,0 +1,257 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.activemq.artemis.core.protocol.proton.converter.message;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+import org.proton.plug.exceptions.ActiveMQAMQPIllegalStateException;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+/**
+ * Helper class for identifying and converting message-id and correlation-id values between
+ * the AMQP types and the Strings values used by JMS.
+ * <p>
+ * <p>AMQP messages allow for 4 types of message-id/correlation-id: message-id-string, message-id-binary,
+ * message-id-uuid, or message-id-ulong. In order to accept or return a string representation of these
+ * for interoperability with other AMQP clients, the following encoding can be used after removing or
+ * before adding the "ID:" prefix used for a JMSMessageID value:<br>
+ * <p>
+ * {@literal "AMQP_BINARY:<hex representation of binary content>"}<br>
+ * {@literal "AMQP_UUID:<string representation of uuid>"}<br>
+ * {@literal "AMQP_ULONG:<string representation of ulong>"}<br>
+ * {@literal "AMQP_STRING:<string>"}<br>
+ * <p>
+ * <p>The AMQP_STRING encoding exists only for escaping message-id-string values that happen to begin
+ * with one of the encoding prefixes (including AMQP_STRING itself). It MUST NOT be used otherwise.
+ * <p>
+ * <p>When provided a string for conversion which attempts to identify itself as an encoded binary, uuid, or
+ * ulong but can't be converted into the indicated format, an exception will be thrown.
+ */
+public class AMQPMessageIdHelper {
+
+   public static final AMQPMessageIdHelper INSTANCE = new AMQPMessageIdHelper();
+
+   public static final String AMQP_STRING_PREFIX = "AMQP_STRING:";
+   public static final String AMQP_UUID_PREFIX = "AMQP_UUID:";
+   public static final String AMQP_ULONG_PREFIX = "AMQP_ULONG:";
+   public static final String AMQP_BINARY_PREFIX = "AMQP_BINARY:";
+
+   private static final int AMQP_UUID_PREFIX_LENGTH = AMQP_UUID_PREFIX.length();
+   private static final int AMQP_ULONG_PREFIX_LENGTH = AMQP_ULONG_PREFIX.length();
+   private static final int AMQP_STRING_PREFIX_LENGTH = AMQP_STRING_PREFIX.length();
+   private static final int AMQP_BINARY_PREFIX_LENGTH = AMQP_BINARY_PREFIX.length();
+   private static final char[] HEX_CHARS = "0123456789ABCDEF".toCharArray();
+
+   /**
+    * Takes the provided AMQP messageId style object, and convert it to a base string.
+    * Encodes type information as a prefix where necessary to convey or escape the type
+    * of the provided object.
+    *
+    * @param messageId the raw messageId object to process
+    * @return the base string to be used in creating the actual id.
+    */
+   public String toBaseMessageIdString(Object messageId) {
+      if (messageId == null) {
+         return null;
+      }
+      else if (messageId instanceof String) {
+         String stringId = (String) messageId;
+
+         // If the given string has a type encoding prefix,
+         // we need to escape it as an encoded string (even if
+         // the existing encoding prefix was also for string)
+         if (hasTypeEncodingPrefix(stringId)) {
+            return AMQP_STRING_PREFIX + stringId;
+         }
+         else {
+            return stringId;
+         }
+      }
+      else if (messageId instanceof UUID) {
+         return AMQP_UUID_PREFIX + messageId.toString();
+      }
+      else if (messageId instanceof UnsignedLong) {
+         return AMQP_ULONG_PREFIX + messageId.toString();
+      }
+      else if (messageId instanceof Binary) {
+         ByteBuffer dup = ((Binary) messageId).asByteBuffer();
+
+         byte[] bytes = new byte[dup.remaining()];
+         dup.get(bytes);
+
+         String hex = convertBinaryToHexString(bytes);
+
+         return AMQP_BINARY_PREFIX + hex;
+      }
+      else {
+         throw new IllegalArgumentException("Unsupported type provided: " + messageId.getClass());
+      }
+   }
+
+   /**
+    * Takes the provided base id string and return the appropriate amqp messageId style object.
+    * Converts the type based on any relevant encoding information found as a prefix.
+    *
+    * @param baseId the object to be converted to an AMQP MessageId value.
+    * @return the AMQP messageId style object
+    * @throws ActiveMQAMQPIllegalStateException if the provided baseId String indicates an encoded type but can't be converted to that type.
+    */
+   public Object toIdObject(String baseId) throws ActiveMQAMQPIllegalStateException {
+      if (baseId == null) {
+         return null;
+      }
+
+      try {
+         if (hasAmqpUuidPrefix(baseId)) {
+            String uuidString = strip(baseId, AMQP_UUID_PREFIX_LENGTH);
+            return UUID.fromString(uuidString);
+         }
+         else if (hasAmqpUlongPrefix(baseId)) {
+            String longString = strip(baseId, AMQP_ULONG_PREFIX_LENGTH);
+            return UnsignedLong.valueOf(longString);
+         }
+         else if (hasAmqpStringPrefix(baseId)) {
+            return strip(baseId, AMQP_STRING_PREFIX_LENGTH);
+         }
+         else if (hasAmqpBinaryPrefix(baseId)) {
+            String hexString = strip(baseId, AMQP_BINARY_PREFIX_LENGTH);
+            byte[] bytes = convertHexStringToBinary(hexString);
+            return new Binary(bytes);
+         }
+         else {
+            // We have a string without any type prefix, transmit it as-is.
+            return baseId;
+         }
+      }
+      catch (IllegalArgumentException e) {
+         throw new ActiveMQAMQPIllegalStateException("Unable to convert ID value");
+      }
+   }
+
+   /**
+    * Convert the provided hex-string into a binary representation where each byte represents
+    * two characters of the hex string.
+    * <p>
+    * The hex characters may be upper or lower case.
+    *
+    * @param hexString string to convert to a binary value.
+    * @return a byte array containing the binary representation
+    * @throws IllegalArgumentException if the provided String is a non-even length or contains
+    *                                  non-hex characters
+    */
+   public byte[] convertHexStringToBinary(String hexString) throws IllegalArgumentException {
+      int length = hexString.length();
+
+      // As each byte needs two characters in the hex encoding, the string must be an even length.
+      if (length % 2 != 0) {
+         throw new IllegalArgumentException("The provided hex String must be an even length, but was of length " + length + ": " + hexString);
+      }
+
+      byte[] binary = new byte[length / 2];
+
+      for (int i = 0; i < length; i += 2) {
+         char highBitsChar = hexString.charAt(i);
+         char lowBitsChar = hexString.charAt(i + 1);
+
+         int highBits = hexCharToInt(highBitsChar, hexString) << 4;
+         int lowBits = hexCharToInt(lowBitsChar, hexString);
+
+         binary[i / 2] = (byte) (highBits + lowBits);
+      }
+
+      return binary;
+   }
+
+   /**
+    * Convert the provided binary into a hex-string representation where each character
+    * represents 4 bits of the provided binary, i.e each byte requires two characters.
+    * <p>
+    * The returned hex characters are upper-case.
+    *
+    * @param bytes the binary value to convert to a hex String instance.
+    * @return a String containing a hex representation of the bytes
+    */
+   public String convertBinaryToHexString(byte[] bytes) {
+      // Each byte is represented as 2 chars
+      StringBuilder builder = new StringBuilder(bytes.length * 2);
+
+      for (byte b : bytes) {
+         // The byte will be expanded to int before shifting, replicating the
+         // sign bit, so mask everything beyond the first 4 bits afterwards
+         int highBitsInt = (b >> 4) & 0xF;
+         // We only want the first 4 bits
+         int lowBitsInt = b & 0xF;
+
+         builder.append(HEX_CHARS[highBitsInt]);
+         builder.append(HEX_CHARS[lowBitsInt]);
+      }
+
+      return builder.toString();
+   }
+
+   //----- Internal implementation ------------------------------------------//
+
+   private boolean hasTypeEncodingPrefix(String stringId) {
+      return hasAmqpBinaryPrefix(stringId) || hasAmqpUuidPrefix(stringId) ||
+            hasAmqpUlongPrefix(stringId) || hasAmqpStringPrefix(stringId);
+   }
+
+   private boolean hasAmqpStringPrefix(String stringId) {
+      return stringId.startsWith(AMQP_STRING_PREFIX);
+   }
+
+   private boolean hasAmqpUlongPrefix(String stringId) {
+      return stringId.startsWith(AMQP_ULONG_PREFIX);
+   }
+
+   private boolean hasAmqpUuidPrefix(String stringId) {
+      return stringId.startsWith(AMQP_UUID_PREFIX);
+   }
+
+   private boolean hasAmqpBinaryPrefix(String stringId) {
+      return stringId.startsWith(AMQP_BINARY_PREFIX);
+   }
+
+   private String strip(String id, int numChars) {
+      return id.substring(numChars);
+   }
+
+   private int hexCharToInt(char ch, String orig) throws IllegalArgumentException {
+      if (ch >= '0' && ch <= '9') {
+         // subtract '0' to get difference in position as an int
+         return ch - '0';
+      }
+      else if (ch >= 'A' && ch <= 'F') {
+         // subtract 'A' to get difference in position as an int
+         // and then add 10 for the offset of 'A'
+         return ch - 'A' + 10;
+      }
+      else if (ch >= 'a' && ch <= 'f') {
+         // subtract 'a' to get difference in position as an int
+         // and then add 10 for the offset of 'a'
+         return ch - 'a' + 10;
+      }
+
+      throw new IllegalArgumentException("The provided hex string contains non-hex character '" + ch + "': " + orig);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPNativeInboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPNativeInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPNativeInboundTransformer.java
new file mode 100644
index 0000000..e194e6b
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPNativeInboundTransformer.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.proton.converter.message;
+
+import javax.jms.Message;
+
+public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
+
+   public AMQPNativeInboundTransformer(JMSVendor vendor) {
+      super(vendor);
+   }
+
+   @Override
+   public String getTransformerName() {
+      return TRANSFORMER_NATIVE;
+   }
+
+   @Override
+   public InboundTransformer getFallbackTransformer() {
+      return new AMQPRawInboundTransformer(getVendor());
+   }
+
+   @Override
+   public Message transform(EncodedMessage amqpMessage) throws Exception {
+      org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
+
+      Message rc = super.transform(amqpMessage);
+
+      populateMessage(rc, amqp);
+      return rc;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPNativeOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPNativeOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPNativeOutboundTransformer.java
new file mode 100644
index 0000000..fa7f206
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPNativeOutboundTransformer.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.proton.converter.message;
+
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.message.ProtonJMessage;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+
+public class AMQPNativeOutboundTransformer extends OutboundTransformer {
+
+   public AMQPNativeOutboundTransformer(JMSVendor vendor) {
+      super(vendor);
+   }
+
+   public static ProtonJMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException {
+      byte[] data = new byte[(int) msg.getBodyLength()];
+      msg.readBytes(data);
+      msg.reset();
+      int count = msg.getIntProperty("JMSXDeliveryCount");
+
+      // decode...
+      ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create();
+      int offset = 0;
+      int len = data.length;
+      while (len > 0) {
+         final int decoded = amqp.decode(data, offset, len);
+         assert decoded > 0 : "Make progress decoding the message";
+         offset += decoded;
+         len -= decoded;
+      }
+
+      // Update the DeliveryCount header...
+      // The AMQP delivery-count field only includes prior failed delivery attempts,
+      // whereas JMSXDeliveryCount includes the first/current delivery attempt. Subtract 1.
+      if (amqp.getHeader() == null) {
+         amqp.setHeader(new Header());
+      }
+
+      amqp.getHeader().setDeliveryCount(new UnsignedInteger(count - 1));
+
+      return amqp;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPRawInboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPRawInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPRawInboundTransformer.java
new file mode 100644
index 0000000..fd9540d
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPRawInboundTransformer.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.proton.converter.message;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+
+public class AMQPRawInboundTransformer extends InboundTransformer {
+
+   public AMQPRawInboundTransformer(JMSVendor vendor) {
+      super(vendor);
+   }
+
+   @Override
+   public String getTransformerName() {
+      return TRANSFORMER_RAW;
+   }
+
+   @Override
+   public InboundTransformer getFallbackTransformer() {
+      return null;  // No fallback from full raw transform
+   }
+
+   @Override
+   public Message transform(EncodedMessage amqpMessage) throws Exception {
+      BytesMessage rc = vendor.createBytesMessage();
+      rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
+
+      // We cannot decode the message headers to check so err on the side of caution
+      // and mark all messages as persistent.
+      rc.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+      rc.setJMSPriority(defaultPriority);
+
+      final long now = System.currentTimeMillis();
+      rc.setJMSTimestamp(now);
+      if (defaultTtl > 0) {
+         rc.setJMSExpiration(now + defaultTtl);
+      }
+
+      rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
+      rc.setBooleanProperty(prefixVendor + "NATIVE", true);
+
+      return rc;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/EncodedMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/EncodedMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/EncodedMessage.java
new file mode 100644
index 0000000..2857c17
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/EncodedMessage.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.proton.converter.message;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.message.Message;
+
+public class EncodedMessage {
+
+   private final Binary data;
+   final long messageFormat;
+
+   public EncodedMessage(long messageFormat, byte[] data, int offset, int length) {
+      this.data = new Binary(data, offset, length);
+      this.messageFormat = messageFormat;
+   }
+
+   public long getMessageFormat() {
+      return messageFormat;
+   }
+
+   public Message decode() throws Exception {
+      Message amqp = Message.Factory.create();
+
+      int offset = getArrayOffset();
+      int len = getLength();
+      while (len > 0) {
+         final int decoded = amqp.decode(getArray(), offset, len);
+         assert decoded > 0 : "Make progress decoding the message";
+         offset += decoded;
+         len -= decoded;
+      }
+
+      return amqp;
+   }
+
+   public int getLength() {
+      return data.getLength();
+   }
+
+   public int getArrayOffset() {
+      return data.getArrayOffset();
+   }
+
+   public byte[] getArray() {
+      return data.getArray();
+   }
+
+   @Override
+   public String toString() {
+      return data.toString();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/InboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/InboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/InboundTransformer.java
new file mode 100644
index 0000000..3a58038
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/InboundTransformer.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.proton.converter.message;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Decimal128;
+import org.apache.qpid.proton.amqp.Decimal32;
+import org.apache.qpid.proton.amqp.Decimal64;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedByte;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+import org.apache.qpid.proton.amqp.UnsignedShort;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.Footer;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME;
+
+public abstract class InboundTransformer {
+
+   JMSVendor vendor;
+
+   public static final String TRANSFORMER_NATIVE = "native";
+   public static final String TRANSFORMER_RAW = "raw";
+   public static final String TRANSFORMER_JMS = "jms";
+
+   String prefixVendor = "JMS_AMQP_";
+   String prefixDeliveryAnnotations = "DA_";
+   String prefixMessageAnnotations = "MA_";
+   String prefixFooter = "FT_";
+
+   int defaultDeliveryMode = DeliveryMode.NON_PERSISTENT;
+   int defaultPriority = Message.DEFAULT_PRIORITY;
+   long defaultTtl = Message.DEFAULT_TIME_TO_LIVE;
+
+   public InboundTransformer(JMSVendor vendor) {
+      this.vendor = vendor;
+   }
+
+   public abstract Message transform(EncodedMessage amqpMessage) throws Exception;
+
+   public abstract String getTransformerName();
+
+   public abstract InboundTransformer getFallbackTransformer();
+
+   public int getDefaultDeliveryMode() {
+      return defaultDeliveryMode;
+   }
+
+   public void setDefaultDeliveryMode(int defaultDeliveryMode) {
+      this.defaultDeliveryMode = defaultDeliveryMode;
+   }
+
+   public int getDefaultPriority() {
+      return defaultPriority;
+   }
+
+   public void setDefaultPriority(int defaultPriority) {
+      this.defaultPriority = defaultPriority;
+   }
+
+   public long getDefaultTtl() {
+      return defaultTtl;
+   }
+
+   public void setDefaultTtl(long defaultTtl) {
+      this.defaultTtl = defaultTtl;
+   }
+
+   public String getPrefixVendor() {
+      return prefixVendor;
+   }
+
+   public void setPrefixVendor(String prefixVendor) {
+      this.prefixVendor = prefixVendor;
+   }
+
+   public JMSVendor getVendor() {
+      return vendor;
+   }
+
+   public void setVendor(JMSVendor vendor) {
+      this.vendor = vendor;
+   }
+
+   @SuppressWarnings("unchecked")
+   protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
+      Header header = amqp.getHeader();
+      if (header == null) {
+         header = new Header();
+      }
+
+      if (header.getDurable() != null) {
+         jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+      }
+      else {
+         jms.setJMSDeliveryMode(defaultDeliveryMode);
+      }
+
+      if (header.getPriority() != null) {
+         jms.setJMSPriority(header.getPriority().intValue());
+      }
+      else {
+         jms.setJMSPriority(defaultPriority);
+      }
+
+      if (header.getFirstAcquirer() != null) {
+         jms.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer());
+      }
+
+      if (header.getDeliveryCount() != null) {
+         vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue());
+      }
+
+      final MessageAnnotations ma = amqp.getMessageAnnotations();
+      if (ma != null) {
+         for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) {
+            String key = entry.getKey().toString();
+            if ("x-opt-jms-type".equals(key) && entry.getValue() != null) {
+               // Legacy annotation, JMSType value will be replaced by Subject further down if also present.
+               jms.setJMSType(entry.getValue().toString());
+            }
+            else if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
+               long deliveryTime = ((Number) entry.getValue()).longValue();
+               jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), deliveryTime);
+            }
+            else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) {
+               long delay = ((Number) entry.getValue()).longValue();
+               if (delay > 0) {
+                  jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), System.currentTimeMillis() + delay);
+               }
+            }
+            //todo
+               /*else if ("x-opt-delivery-repeat".equals(key) && entry.getValue() != null) {
+                    int repeat = ((Number) entry.getValue()).intValue();
+                    if (repeat > 0) {
+                        jms.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
+                    }
+                } else if ("x-opt-delivery-period".equals(key) && entry.getValue() != null) {
+                    long period = ((Number) entry.getValue()).longValue();
+                    if (period > 0) {
+                        jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
+                    }
+                } else if ("x-opt-delivery-cron".equals(key) && entry.getValue() != null) {
+                    String cronEntry = (String) entry.getValue();
+                    if (cronEntry != null) {
+                        jms.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, cronEntry);
+                    }
+                }*/
+
+            setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
+         }
+      }
+
+      final ApplicationProperties ap = amqp.getApplicationProperties();
+      if (ap != null) {
+         for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) ap.getValue().entrySet()) {
+            String key = entry.getKey().toString();
+            if ("JMSXGroupID".equals(key)) {
+               vendor.setJMSXGroupID(jms, entry.getValue().toString());
+            }
+            else if ("JMSXGroupSequence".equals(key)) {
+               vendor.setJMSXGroupSequence(jms, ((Number) entry.getValue()).intValue());
+            }
+            else if ("JMSXUserID".equals(key)) {
+               vendor.setJMSXUserID(jms, entry.getValue().toString());
+            }
+            else {
+               setProperty(jms, key, entry.getValue());
+            }
+         }
+      }
+
+      final Properties properties = amqp.getProperties();
+      if (properties != null) {
+         if (properties.getMessageId() != null) {
+            jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getMessageId()));
+         }
+         Binary userId = properties.getUserId();
+         if (userId != null) {
+            vendor.setJMSXUserID(jms, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8"));
+         }
+         if (properties.getTo() != null) {
+            jms.setJMSDestination(vendor.createDestination(properties.getTo()));
+         }
+         if (properties.getSubject() != null) {
+            jms.setJMSType(properties.getSubject());
+         }
+         if (properties.getReplyTo() != null) {
+            jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo()));
+         }
+         if (properties.getCorrelationId() != null) {
+            jms.setJMSCorrelationID(properties.getCorrelationId().toString());
+         }
+         if (properties.getContentType() != null) {
+            jms.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString());
+         }
+         if (properties.getContentEncoding() != null) {
+            jms.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString());
+         }
+         if (properties.getCreationTime() != null) {
+            jms.setJMSTimestamp(properties.getCreationTime().getTime());
+         }
+         if (properties.getGroupId() != null) {
+            vendor.setJMSXGroupID(jms, properties.getGroupId());
+         }
+         if (properties.getGroupSequence() != null) {
+            vendor.setJMSXGroupSequence(jms, properties.getGroupSequence().intValue());
+         }
+         if (properties.getReplyToGroupId() != null) {
+            jms.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId());
+         }
+         if (properties.getAbsoluteExpiryTime() != null) {
+            jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime());
+         }
+      }
+
+      // If the jms expiration has not yet been set...
+      if (jms.getJMSExpiration() == 0) {
+         // Then lets try to set it based on the message ttl.
+         long ttl = defaultTtl;
+         if (header.getTtl() != null) {
+            ttl = header.getTtl().longValue();
+         }
+
+         if (ttl == 0) {
+            jms.setJMSExpiration(0);
+         }
+         else {
+            jms.setJMSExpiration(System.currentTimeMillis() + ttl);
+         }
+      }
+
+      final Footer fp = amqp.getFooter();
+      if (fp != null) {
+         for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) fp.getValue().entrySet()) {
+            String key = entry.getKey().toString();
+            setProperty(jms, prefixVendor + prefixFooter + key, entry.getValue());
+         }
+      }
+   }
+
+   private void setProperty(Message msg, String key, Object value) throws JMSException {
+      if (value instanceof UnsignedLong) {
+         long v = ((UnsignedLong) value).longValue();
+         msg.setLongProperty(key, v);
+      }
+      else if (value instanceof UnsignedInteger) {
+         long v = ((UnsignedInteger) value).longValue();
+         if (Integer.MIN_VALUE <= v && v <= Integer.MAX_VALUE) {
+            msg.setIntProperty(key, (int) v);
+         }
+         else {
+            msg.setLongProperty(key, v);
+         }
+      }
+      else if (value instanceof UnsignedShort) {
+         int v = ((UnsignedShort) value).intValue();
+         if (Short.MIN_VALUE <= v && v <= Short.MAX_VALUE) {
+            msg.setShortProperty(key, (short) v);
+         }
+         else {
+            msg.setIntProperty(key, v);
+         }
+      }
+      else if (value instanceof UnsignedByte) {
+         short v = ((UnsignedByte) value).shortValue();
+         if (Byte.MIN_VALUE <= v && v <= Byte.MAX_VALUE) {
+            msg.setByteProperty(key, (byte) v);
+         }
+         else {
+            msg.setShortProperty(key, v);
+         }
+      }
+      else if (value instanceof Symbol) {
+         msg.setStringProperty(key, value.toString());
+      }
+      else if (value instanceof Decimal128) {
+         msg.setDoubleProperty(key, ((Decimal128) value).doubleValue());
+      }
+      else if (value instanceof Decimal64) {
+         msg.setDoubleProperty(key, ((Decimal64) value).doubleValue());
+      }
+      else if (value instanceof Decimal32) {
+         msg.setFloatProperty(key, ((Decimal32) value).floatValue());
+      }
+      else if (value instanceof Binary) {
+         msg.setStringProperty(key, value.toString());
+      }
+      else {
+         msg.setObjectProperty(key, value);
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/JMSMappingInboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/JMSMappingInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/JMSMappingInboundTransformer.java
new file mode 100644
index 0000000..e804818
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/JMSMappingInboundTransformer.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.proton.converter.message;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.Section;
+
+import javax.jms.BytesMessage;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class JMSMappingInboundTransformer extends InboundTransformer {
+
+   public JMSMappingInboundTransformer(JMSVendor vendor) {
+      super(vendor);
+   }
+
+   @Override
+   public String getTransformerName() {
+      return TRANSFORMER_JMS;
+   }
+
+   @Override
+   public InboundTransformer getFallbackTransformer() {
+      return new AMQPNativeInboundTransformer(getVendor());
+   }
+
+   @SuppressWarnings({"unchecked"})
+   @Override
+   public Message transform(EncodedMessage amqpMessage) throws Exception {
+      org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
+
+      Message rc;
+      final Section body = amqp.getBody();
+      if (body == null) {
+         rc = vendor.createMessage();
+      }
+      else if (body instanceof Data) {
+         Binary d = ((Data) body).getValue();
+         BytesMessage m = vendor.createBytesMessage();
+         m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
+         rc = m;
+      }
+      else if (body instanceof AmqpSequence) {
+         AmqpSequence sequence = (AmqpSequence) body;
+         StreamMessage m = vendor.createStreamMessage();
+         for (Object item : sequence.getValue()) {
+            m.writeObject(item);
+         }
+         rc = m;
+      }
+      else if (body instanceof AmqpValue) {
+         Object value = ((AmqpValue) body).getValue();
+         if (value == null) {
+            rc = vendor.createObjectMessage();
+         }
+         if (value instanceof String) {
+            TextMessage m = vendor.createTextMessage();
+            m.setText((String) value);
+            rc = m;
+         }
+         else if (value instanceof Binary) {
+            Binary d = (Binary) value;
+            BytesMessage m = vendor.createBytesMessage();
+            m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
+            rc = m;
+         }
+         else if (value instanceof List) {
+            StreamMessage m = vendor.createStreamMessage();
+            for (Object item : (List<Object>) value) {
+               m.writeObject(item);
+            }
+            rc = m;
+         }
+         else if (value instanceof Map) {
+            MapMessage m = vendor.createMapMessage();
+            final Set<Map.Entry<String, Object>> set = ((Map<String, Object>) value).entrySet();
+            for (Map.Entry<String, Object> entry : set) {
+               m.setObject(entry.getKey(), entry.getValue());
+            }
+            rc = m;
+         }
+         else {
+            ObjectMessage m = vendor.createObjectMessage();
+            m.setObject((Serializable) value);
+            rc = m;
+         }
+      }
+      else {
+         throw new RuntimeException("Unexpected body type: " + body.getClass());
+      }
+      rc.setJMSDeliveryMode(defaultDeliveryMode);
+      rc.setJMSPriority(defaultPriority);
+      rc.setJMSExpiration(defaultTtl);
+
+      populateMessage(rc, amqp);
+
+      rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
+      rc.setBooleanProperty(prefixVendor + "NATIVE", false);
+      return rc;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/JMSMappingOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/JMSMappingOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/JMSMappingOutboundTransformer.java
new file mode 100644
index 0000000..7babcf3
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/JMSMappingOutboundTransformer.java
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.proton.converter.message;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedByte;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Footer;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.message.ProtonJMessage;
+import org.proton.plug.exceptions.ActiveMQAMQPIllegalStateException;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageEOFException;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Enumeration;
+import java.util.HashMap;
+
+public class JMSMappingOutboundTransformer extends OutboundTransformer {
+
+   public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-dest");
+   public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-reply-to");
+
+   public static final byte QUEUE_TYPE = 0x00;
+   public static final byte TOPIC_TYPE = 0x01;
+   public static final byte TEMP_QUEUE_TYPE = 0x02;
+   public static final byte TEMP_TOPIC_TYPE = 0x03;
+
+   // Deprecated legacy values used by old QPid AMQP 1.0 JMS client.
+
+   public static final Symbol LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-to-type");
+   public static final Symbol LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-reply-type");
+
+   public static final String LEGACY_QUEUE_TYPE = "queue";
+   public static final String LEGACY_TOPIC_TYPE = "topic";
+   public static final String LEGACY_TEMP_QUEUE_TYPE = "temporary,queue";
+   public static final String LEGACY_TEMP_TOPIC_TYPE = "temporary,topic";
+
+   public JMSMappingOutboundTransformer(JMSVendor vendor) {
+      super(vendor);
+   }
+
+   /**
+    * Perform the conversion between JMS Message and Proton Message without
+    * re-encoding it to array. This is needed because some frameworks may elect
+    * to do this on their own way (Netty for instance using Nettybuffers)
+    *
+    * @param msg
+    * @return
+    * @throws Exception
+    */
+   public ProtonJMessage convert(Message msg) throws JMSException, UnsupportedEncodingException {
+      Header header = new Header();
+      Properties props = new Properties();
+      HashMap<Symbol, Object> daMap = null;
+      HashMap<Symbol, Object> maMap = null;
+      HashMap apMap = null;
+      Section body = null;
+      HashMap footerMap = null;
+      if (msg instanceof BytesMessage) {
+         BytesMessage m = (BytesMessage) msg;
+         byte[] data = new byte[(int) m.getBodyLength()];
+         m.readBytes(data);
+         m.reset(); // Need to reset after readBytes or future readBytes
+         // calls (ex: redeliveries) will fail and return -1
+         body = new Data(new Binary(data));
+      }
+      if (msg instanceof TextMessage) {
+         body = new AmqpValue(((TextMessage) msg).getText());
+      }
+      if (msg instanceof MapMessage) {
+         final HashMap<String, Object> map = new HashMap<String, Object>();
+         final MapMessage m = (MapMessage) msg;
+         final Enumeration<String> names = m.getMapNames();
+         while (names.hasMoreElements()) {
+            String key = names.nextElement();
+            map.put(key, m.getObject(key));
+         }
+         body = new AmqpValue(map);
+      }
+      if (msg instanceof StreamMessage) {
+         ArrayList<Object> list = new ArrayList<Object>();
+         final StreamMessage m = (StreamMessage) msg;
+         try {
+            while (true) {
+               list.add(m.readObject());
+            }
+         }
+         catch (MessageEOFException e) {
+         }
+         body = new AmqpSequence(list);
+      }
+      if (msg instanceof ObjectMessage) {
+         body = new AmqpValue(((ObjectMessage) msg).getObject());
+      }
+
+      header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false);
+      header.setPriority(new UnsignedByte((byte) msg.getJMSPriority()));
+      if (msg.getJMSType() != null) {
+         props.setSubject(msg.getJMSType());
+      }
+      if (msg.getJMSMessageID() != null) {
+
+         String msgId = msg.getJMSMessageID();
+
+         if (msgId != null) {
+            try {
+               props.setMessageId(AMQPMessageIdHelper.INSTANCE.toIdObject(msgId));
+            }
+            catch (ActiveMQAMQPIllegalStateException e) {
+               props.setMessageId(msgId);
+            }
+         }
+         else {
+            props.setMessageId(msgId.toString());
+         }
+      }
+      if (msg.getJMSDestination() != null) {
+         props.setTo(vendor.toAddress(msg.getJMSDestination()));
+         if (maMap == null) {
+            maMap = new HashMap<Symbol, Object>();
+         }
+         maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSDestination()));
+
+         // Deprecated: used by legacy QPid AMQP 1.0 JMS client
+         maMap.put(LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION, destinationAttributes(msg.getJMSDestination()));
+      }
+      if (msg.getJMSReplyTo() != null) {
+         props.setReplyTo(vendor.toAddress(msg.getJMSReplyTo()));
+         if (maMap == null) {
+            maMap = new HashMap<Symbol, Object>();
+         }
+         maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSReplyTo()));
+
+         // Deprecated: used by legacy QPid AMQP 1.0 JMS client
+         maMap.put(LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationAttributes(msg.getJMSReplyTo()));
+      }
+      if (msg.getJMSCorrelationID() != null) {
+         props.setCorrelationId(msg.getJMSCorrelationID());
+      }
+      if (msg.getJMSExpiration() != 0) {
+         long ttl = msg.getJMSExpiration() - System.currentTimeMillis();
+         if (ttl < 0) {
+            ttl = 1;
+         }
+         header.setTtl(new UnsignedInteger((int) ttl));
+
+         props.setAbsoluteExpiryTime(new Date(msg.getJMSExpiration()));
+      }
+      if (msg.getJMSTimestamp() != 0) {
+         props.setCreationTime(new Date(msg.getJMSTimestamp()));
+      }
+
+      final Enumeration<String> keys = msg.getPropertyNames();
+      while (keys.hasMoreElements()) {
+         String key = keys.nextElement();
+         if (key.equals(messageFormatKey) || key.equals(nativeKey)) {
+            // skip..
+         }
+         else if (key.equals(firstAcquirerKey)) {
+            header.setFirstAcquirer(msg.getBooleanProperty(key));
+         }
+         else if (key.startsWith("JMSXDeliveryCount")) {
+            // The AMQP delivery-count field only includes prior failed delivery attempts,
+            // whereas JMSXDeliveryCount includes the first/current delivery attempt.
+            int amqpDeliveryCount = msg.getIntProperty(key) - 1;
+            if (amqpDeliveryCount > 0) {
+               header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount));
+            }
+         }
+         else if (key.startsWith("JMSXUserID")) {
+            String value = msg.getStringProperty(key);
+            props.setUserId(new Binary(value.getBytes("UTF-8")));
+         }
+         else if (key.startsWith("JMSXGroupID")) {
+            String value = msg.getStringProperty(key);
+            props.setGroupId(value);
+            if (apMap == null) {
+               apMap = new HashMap();
+            }
+            apMap.put(key, value);
+         }
+         else if (key.startsWith("JMSXGroupSeq")) {
+            UnsignedInteger value = new UnsignedInteger(msg.getIntProperty(key));
+            props.setGroupSequence(value);
+            if (apMap == null) {
+               apMap = new HashMap();
+            }
+            apMap.put(key, value);
+         }
+         else if (key.startsWith(prefixDeliveryAnnotationsKey)) {
+            if (daMap == null) {
+               daMap = new HashMap<Symbol, Object>();
+            }
+            String name = key.substring(prefixDeliveryAnnotationsKey.length());
+            daMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
+         }
+         else if (key.startsWith(prefixMessageAnnotationsKey)) {
+            if (maMap == null) {
+               maMap = new HashMap<Symbol, Object>();
+            }
+            String name = key.substring(prefixMessageAnnotationsKey.length());
+            maMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
+         }
+         else if (key.equals(contentTypeKey)) {
+            props.setContentType(Symbol.getSymbol(msg.getStringProperty(key)));
+         }
+         else if (key.equals(contentEncodingKey)) {
+            props.setContentEncoding(Symbol.getSymbol(msg.getStringProperty(key)));
+         }
+         else if (key.equals(replyToGroupIDKey)) {
+            props.setReplyToGroupId(msg.getStringProperty(key));
+         }
+         else if (key.startsWith(prefixFooterKey)) {
+            if (footerMap == null) {
+               footerMap = new HashMap();
+            }
+            String name = key.substring(prefixFooterKey.length());
+            footerMap.put(name, msg.getObjectProperty(key));
+         }
+         else {
+            if (apMap == null) {
+               apMap = new HashMap();
+            }
+            apMap.put(key, msg.getObjectProperty(key));
+         }
+      }
+
+      MessageAnnotations ma = null;
+      if (maMap != null) {
+         ma = new MessageAnnotations(maMap);
+      }
+      DeliveryAnnotations da = null;
+      if (daMap != null) {
+         da = new DeliveryAnnotations(daMap);
+      }
+      ApplicationProperties ap = null;
+      if (apMap != null) {
+         ap = new ApplicationProperties(apMap);
+      }
+      Footer footer = null;
+      if (footerMap != null) {
+         footer = new Footer(footerMap);
+      }
+
+      return (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(header, da, ma, props, ap, body, footer);
+   }
+
+   private static byte destinationType(Destination destination) {
+      if (destination instanceof Queue) {
+         if (destination instanceof TemporaryQueue) {
+            return TEMP_QUEUE_TYPE;
+         }
+         else {
+            return QUEUE_TYPE;
+         }
+      }
+      else if (destination instanceof Topic) {
+         if (destination instanceof TemporaryTopic) {
+            return TEMP_TOPIC_TYPE;
+         }
+         else {
+            return TOPIC_TYPE;
+         }
+      }
+
+      throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer.");
+   }
+
+   // Used by legacy QPid AMQP 1.0 JMS client.
+   @Deprecated
+   private static String destinationAttributes(Destination destination) {
+      if (destination instanceof Queue) {
+         if (destination instanceof TemporaryQueue) {
+            return LEGACY_TEMP_QUEUE_TYPE;
+         }
+         else {
+            return LEGACY_QUEUE_TYPE;
+         }
+      }
+      else if (destination instanceof Topic) {
+         if (destination instanceof TemporaryTopic) {
+            return LEGACY_TEMP_TOPIC_TYPE;
+         }
+         else {
+            return LEGACY_TOPIC_TYPE;
+         }
+      }
+
+      throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer.");
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/JMSVendor.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/JMSVendor.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/JMSVendor.java
new file mode 100644
index 0000000..7255295
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/JMSVendor.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.proton.converter.message;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+public interface JMSVendor {
+
+   BytesMessage createBytesMessage();
+
+   StreamMessage createStreamMessage();
+
+   Message createMessage();
+
+   TextMessage createTextMessage();
+
+   ObjectMessage createObjectMessage();
+
+   MapMessage createMapMessage();
+
+   void setJMSXUserID(Message message, String value);
+
+   Destination createDestination(String name);
+
+   void setJMSXGroupID(Message message, String groupId);
+
+   void setJMSXGroupSequence(Message message, int value);
+
+   void setJMSXDeliveryCount(Message message, long value);
+
+   String toAddress(Destination destination);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/OutboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/OutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/OutboundTransformer.java
new file mode 100644
index 0000000..bd20eee
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/OutboundTransformer.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.proton.converter.message;
+
+public abstract class OutboundTransformer {
+
+   JMSVendor vendor;
+   String prefixVendor;
+
+   String prefixDeliveryAnnotations = "DA_";
+   String prefixMessageAnnotations = "MA_";
+   String prefixFooter = "FT_";
+
+   String messageFormatKey;
+   String nativeKey;
+   String firstAcquirerKey;
+   String prefixDeliveryAnnotationsKey;
+   String prefixMessageAnnotationsKey;
+   String contentTypeKey;
+   String contentEncodingKey;
+   String replyToGroupIDKey;
+   String prefixFooterKey;
+
+   public OutboundTransformer(JMSVendor vendor) {
+      this.vendor = vendor;
+      this.setPrefixVendor("JMS_AMQP_");
+   }
+
+   public String getPrefixVendor() {
+      return prefixVendor;
+   }
+
+   public void setPrefixVendor(String prefixVendor) {
+      this.prefixVendor = prefixVendor;
+
+      messageFormatKey = prefixVendor + "MESSAGE_FORMAT";
+      nativeKey = prefixVendor + "NATIVE";
+      firstAcquirerKey = prefixVendor + "FirstAcquirer";
+      prefixDeliveryAnnotationsKey = prefixVendor + prefixDeliveryAnnotations;
+      prefixMessageAnnotationsKey = prefixVendor + prefixMessageAnnotations;
+      contentTypeKey = prefixVendor + "ContentType";
+      contentEncodingKey = prefixVendor + "ContentEncoding";
+      replyToGroupIDKey = prefixVendor + "ReplyToGroupID";
+      prefixFooterKey = prefixVendor + prefixFooter;
+
+   }
+
+   public JMSVendor getVendor() {
+      return vendor;
+   }
+
+   public void setVendor(JMSVendor vendor) {
+      this.vendor = vendor;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index 6a93df0..421a382 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -20,11 +20,11 @@ import java.util.concurrent.Executor;
 
 import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.protocol.proton.converter.message.EncodedMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
-import org.apache.activemq.transport.amqp.message.EncodedMessage;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java
index fc9fe2c..28f8b86 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java
@@ -26,7 +26,7 @@ import java.util.Map;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.transport.amqp.message.EncodedMessage;
+import org.apache.activemq.artemis.core.protocol.proton.converter.message.EncodedMessage;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 47b58e4..ed33038 100644
--- a/pom.xml
+++ b/pom.xml
@@ -414,18 +414,6 @@
             <!-- License: Apache 2.0 -->
          </dependency>
          <dependency>
-            <groupId>org.apache.activemq</groupId>
-            <artifactId>activemq-amqp</artifactId>
-            <version>${activemq5-version}</version>
-            <exclusions>
-               <exclusion>
-                  <groupId>org.apache.geronimo.specs</groupId>
-                  <artifactId>geronimo-jms_1.1_spec</artifactId>
-               </exclusion>
-            </exclusions>
-            <!-- License: Apache 2.0 -->
-         </dependency>
-         <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>


[2/2] activemq-artemis git commit: This closes #501

Posted by cl...@apache.org.
This closes #501


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2a415a80
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2a415a80
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2a415a80

Branch: refs/heads/master
Commit: 2a415a80e94cd2963722d8509847fe7fe5c301b4
Parents: 7fb603f c161ab4
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue May 3 10:40:57 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue May 3 10:40:57 2016 -0400

----------------------------------------------------------------------
 artemis-distribution/pom.xml                    |   4 -
 artemis-distribution/src/main/assembly/dep.xml  |   1 -
 artemis-protocols/artemis-amqp-protocol/pom.xml |   4 -
 .../AMQPNativeOutboundTransformer.java          |  56 ----
 .../proton/converter/ActiveMQJMSVendor.java     |   2 +-
 .../converter/JMSMappingInboundTransformer.java |  49 ---
 .../JMSMappingOutboundTransformer.java          |  53 ---
 .../converter/ProtonMessageConverter.java       |   7 +-
 .../proton/converter/jms/ServerJMSMessage.java  |   7 +
 .../converter/jms/ServerJMSObjectMessage.java   |   2 +-
 .../converter/message/AMQPMessageIdHelper.java  | 257 +++++++++++++++
 .../message/AMQPNativeInboundTransformer.java   |  46 +++
 .../message/AMQPNativeOutboundTransformer.java  |  60 ++++
 .../message/AMQPRawInboundTransformer.java      |  60 ++++
 .../converter/message/EncodedMessage.java       |  67 ++++
 .../converter/message/InboundTransformer.java   | 317 ++++++++++++++++++
 .../message/JMSMappingInboundTransformer.java   | 126 +++++++
 .../message/JMSMappingOutboundTransformer.java  | 329 +++++++++++++++++++
 .../proton/converter/message/JMSVendor.java     |  53 +++
 .../converter/message/OutboundTransformer.java  |  69 ++++
 .../plug/ProtonSessionIntegrationCallback.java  |   2 +-
 .../core/protocol/proton/TestConversions.java   |   2 +-
 pom.xml                                         |  12 -
 23 files changed, 1400 insertions(+), 185 deletions(-)
----------------------------------------------------------------------