You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/10/07 14:51:04 UTC

[3/5] activemq-artemis git commit: ARTEMIS-770 AMQP Message Transformer refactor

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java
index 9dd29ab..629c499 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java
@@ -1,13 +1,13 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,27 +16,50 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter.message;
 
-import javax.jms.BytesMessage;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.ObjectMessage;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
-import java.io.Serializable;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_SEQUENCE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_BINARY;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_LIST;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_MAP;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_NULL;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_STRING;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_FORMAT;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createBytesMessage;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createMapMessage;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createMessage;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createObjectMessage;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createStreamMessage;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createTextMessage;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.getCharsetForTextualContent;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.isContentType;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
+import org.apache.activemq.artemis.utils.IDGenerator;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.Data;
 import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.message.Message;
 
 public class JMSMappingInboundTransformer extends InboundTransformer {
 
-   public JMSMappingInboundTransformer(JMSVendor vendor) {
-      super(vendor);
+   public JMSMappingInboundTransformer(IDGenerator idGenerator) {
+      super(idGenerator);
    }
 
    @Override
@@ -46,75 +69,128 @@ public class JMSMappingInboundTransformer extends InboundTransformer {
 
    @Override
    public InboundTransformer getFallbackTransformer() {
-      return new AMQPNativeInboundTransformer(getVendor());
+      return new AMQPNativeInboundTransformer(idGenerator);
    }
 
-   @SuppressWarnings({"unchecked"})
    @Override
-   public Message transform(EncodedMessage amqpMessage) throws Exception {
-      org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
+   public ServerJMSMessage transform(EncodedMessage encodedMessage) throws Exception {
+      ServerJMSMessage transformedMessage = null;
+
+      try {
+         Message amqpMessage = encodedMessage.decode();
+         transformedMessage = createServerMessage(amqpMessage);
+         populateMessage(transformedMessage, amqpMessage);
+      } catch (Exception ex) {
+         InboundTransformer transformer = this.getFallbackTransformer();
+
+         while (transformer != null) {
+            try {
+               transformedMessage = transformer.transform(encodedMessage);
+               break;
+            } catch (Exception e) {
+               transformer = transformer.getFallbackTransformer();
+            }
+         }
+      }
+
+      // Regardless of the transformer that finally decoded the message we need to ensure that
+      // the AMQP Message Format value is preserved for application on retransmit.
+      if (transformedMessage != null && encodedMessage.getMessageFormat() != 0) {
+         transformedMessage.setLongProperty(JMS_AMQP_MESSAGE_FORMAT, encodedMessage.getMessageFormat());
+      }
+
+      return transformedMessage;
+   }
+
+   @SuppressWarnings("unchecked")
+   private ServerJMSMessage createServerMessage(Message message) throws Exception {
+
+      Section body = message.getBody();
+      ServerJMSMessage result;
 
-      Message rc;
-      final Section body = amqp.getBody();
       if (body == null) {
-         rc = vendor.createMessage();
+         if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) {
+            result = createObjectMessage(idGenerator);
+         } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message) || isContentType(null, message)) {
+            result = createBytesMessage(idGenerator);
+         } else {
+            Charset charset = getCharsetForTextualContent(message.getContentType());
+            if (charset != null) {
+               result = createTextMessage(idGenerator);
+            } else {
+               result = createMessage(idGenerator);
+            }
+         }
+
+         result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL);
       } else if (body instanceof Data) {
-         Binary d = ((Data) body).getValue();
-         BytesMessage m = vendor.createBytesMessage();
-         m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
-         rc = m;
+         Binary payload = ((Data) body).getValue();
+
+         if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) {
+            result = createObjectMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength());
+         } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message)) {
+            result = createBytesMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength());
+         } else {
+            Charset charset = getCharsetForTextualContent(message.getContentType());
+            if (StandardCharsets.UTF_8.equals(charset)) {
+               ByteBuffer buf = ByteBuffer.wrap(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+
+               try {
+                  CharBuffer chars = charset.newDecoder().decode(buf);
+                  result = createTextMessage(idGenerator, String.valueOf(chars));
+               } catch (CharacterCodingException e) {
+                  result = createBytesMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength());
+               }
+            } else {
+               result = createBytesMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength());
+            }
+         }
+
+         result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
       } else if (body instanceof AmqpSequence) {
          AmqpSequence sequence = (AmqpSequence) body;
-         StreamMessage m = vendor.createStreamMessage();
+         ServerJMSStreamMessage m = createStreamMessage(idGenerator);
          for (Object item : sequence.getValue()) {
             m.writeObject(item);
          }
-         rc = m;
-         m.setStringProperty(AMQPMessageTypes.AMQP_TYPE_KEY, AMQPMessageTypes.AMQP_SEQUENCE);
+
+         result = m;
+         result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE);
       } else if (body instanceof AmqpValue) {
          Object value = ((AmqpValue) body).getValue();
-         if (value == null) {
-            rc = vendor.createObjectMessage();
-         }
-         if (value instanceof String) {
-            TextMessage m = vendor.createTextMessage();
-            m.setText((String) value);
-            rc = m;
+         if (value == null || value instanceof String) {
+            result = createTextMessage(idGenerator, (String) value);
+
+            result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, value == null ? AMQP_VALUE_NULL : AMQP_VALUE_STRING);
          } else if (value instanceof Binary) {
-            Binary d = (Binary) value;
-            BytesMessage m = vendor.createBytesMessage();
-            m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
-            rc = m;
+            Binary payload = (Binary) value;
+
+            if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) {
+               result = createObjectMessage(idGenerator, payload);
+            } else {
+               result = createBytesMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength());
+            }
+
+            result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
          } else if (value instanceof List) {
-            StreamMessage m = vendor.createStreamMessage();
+            ServerJMSStreamMessage m = createStreamMessage(idGenerator);
             for (Object item : (List<Object>) value) {
                m.writeObject(item);
             }
-            rc = m;
-            m.setStringProperty(AMQPMessageTypes.AMQP_TYPE_KEY, AMQPMessageTypes.AMQP_LIST);
+            result = m;
+            result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_LIST);
          } else if (value instanceof Map) {
-            MapMessage m = vendor.createMapMessage();
-            final Set<Map.Entry<String, Object>> set = ((Map<String, Object>) value).entrySet();
-            for (Map.Entry<String, Object> entry : set) {
-               m.setObject(entry.getKey(), entry.getValue());
-            }
-            rc = m;
+            result = createMapMessage(idGenerator, (Map<String, Object>) value);
+            result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_MAP);
          } else {
-            ObjectMessage m = vendor.createObjectMessage();
-            m.setObject((Serializable) value);
-            rc = m;
+            // Trigger fall-back to native encoder which generates BytesMessage with the
+            // original message stored in the message body.
+            throw new ActiveMQAMQPInternalErrorException("Unable to encode to ActiveMQ JMS Message");
          }
       } else {
          throw new RuntimeException("Unexpected body type: " + body.getClass());
       }
-      rc.setJMSDeliveryMode(defaultDeliveryMode);
-      rc.setJMSPriority(defaultPriority);
-      rc.setJMSExpiration(defaultTtl);
-
-      populateMessage(rc, amqp);
 
-      rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
-      rc.setBooleanProperty(prefixVendor + "NATIVE", false);
-      return rc;
+      return result;
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
index 9f28a6b..9ee0344 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
@@ -1,13 +1,13 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,30 +16,61 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter.message;
 
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
+import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_SEQUENCE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_UNKNOWN;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_BINARY;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_LIST;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_STRING;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.EMPTY_BINARY;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_DELIVERY_ANNOTATION_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_FORMAT;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_NATIVE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_PROPERTIES;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.toAddress;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
 import javax.jms.Destination;
 import javax.jms.JMSException;
-import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.MessageEOFException;
-import javax.jms.ObjectMessage;
 import javax.jms.Queue;
-import javax.jms.StreamMessage;
 import javax.jms.TemporaryQueue;
 import javax.jms.TemporaryTopic;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
-import java.io.UnsupportedEncodingException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.Enumeration;
-import java.util.HashMap;
 
 import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
+import org.apache.activemq.artemis.reader.MessageUtil;
+import org.apache.activemq.artemis.utils.IDGenerator;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedByte;
@@ -54,12 +85,16 @@ import org.apache.qpid.proton.amqp.messaging.Header;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Properties;
 import org.apache.qpid.proton.amqp.messaging.Section;
-import org.apache.qpid.proton.message.ProtonJMessage;
+import org.apache.qpid.proton.codec.AMQPDefinedTypes;
+import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.WritableBuffer;
 import org.jboss.logging.Logger;
 
 public class JMSMappingOutboundTransformer extends OutboundTransformer {
 
    private static final Logger logger = Logger.getLogger(JMSMappingOutboundTransformer.class);
+
    public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-dest");
    public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-reply-to");
 
@@ -68,227 +103,458 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
    public static final byte TEMP_QUEUE_TYPE = 0x02;
    public static final byte TEMP_TOPIC_TYPE = 0x03;
 
-   public JMSMappingOutboundTransformer(JMSVendor vendor) {
-      super(vendor);
+   // For now Proton requires that we create a decoder to create an encoder
+   private static class EncoderDecoderPair {
+      DecoderImpl decoder = new DecoderImpl();
+      EncoderImpl encoder = new EncoderImpl(decoder);
+      {
+         AMQPDefinedTypes.registerAllTypes(decoder, encoder);
+      }
    }
 
-   /**
-    * Perform the conversion between JMS Message and Proton Message without
-    * re-encoding it to array. This is needed because some frameworks may elect
-    * to do this on their own way (Netty for instance using Nettybuffers)
-    *
-    * @param msg
-    * @return
-    * @throws Exception
-    */
-   public ProtonJMessage convert(Message msg) throws JMSException, UnsupportedEncodingException {
-      Header header = new Header();
-      Properties props = new Properties();
-      HashMap<Symbol, Object> daMap = null;
-      HashMap<Symbol, Object> maMap = null;
-      HashMap apMap = null;
-      Section body = null;
-      HashMap footerMap = null;
-      if (msg instanceof BytesMessage) {
-         BytesMessage m = (BytesMessage) msg;
-         byte[] data = new byte[(int) m.getBodyLength()];
-         m.readBytes(data);
-         m.reset(); // Need to reset after readBytes or future readBytes
-         // calls (ex: redeliveries) will fail and return -1
-         body = new Data(new Binary(data));
-      }
-      if (msg instanceof TextMessage) {
-         body = new AmqpValue(((TextMessage) msg).getText());
-      }
-      if (msg instanceof MapMessage) {
-         final HashMap<String, Object> map = new HashMap<>();
-         final MapMessage m = (MapMessage) msg;
-         final Enumeration<String> names = m.getMapNames();
-         while (names.hasMoreElements()) {
-            String key = names.nextElement();
-            map.put(key, m.getObject(key));
-         }
-         body = new AmqpValue(map);
+   private static final ThreadLocal<EncoderDecoderPair> tlsCodec = new ThreadLocal<EncoderDecoderPair>() {
+      @Override
+      protected EncoderDecoderPair initialValue() {
+         return new EncoderDecoderPair();
       }
-      if (msg instanceof StreamMessage) {
-         ArrayList<Object> list = new ArrayList<>();
-         final StreamMessage m = (StreamMessage) msg;
-         try {
-            while (true) {
-               list.add(m.readObject());
-            }
-         } catch (MessageEOFException e) {
-         }
+   };
 
-         String amqpType = msg.getStringProperty(AMQPMessageTypes.AMQP_TYPE_KEY);
-         if (amqpType.equals(AMQPMessageTypes.AMQP_LIST)) {
-            body = new AmqpValue(list);
-         } else {
-            body = new AmqpSequence(list);
-         }
-      }
-      if (msg instanceof ObjectMessage) {
-         body = new AmqpValue(((ObjectMessage) msg).getObject());
+   public JMSMappingOutboundTransformer(IDGenerator idGenerator) {
+      super(idGenerator);
+   }
+
+   @Override
+   public long transform(ServerJMSMessage message, WritableBuffer buffer) throws JMSException, UnsupportedEncodingException {
+      if (message == null) {
+         return 0;
       }
 
-      if (body == null && msg instanceof ServerJMSMessage) {
+      long messageFormat = 0;
+      Header header = null;
+      Properties properties = null;
+      Map<Symbol, Object> daMap = null;
+      Map<Symbol, Object> maMap = null;
+      Map<String, Object> apMap = null;
+      Map<Object, Object> footerMap = null;
 
-         MessageInternal internalMessage = ((ServerJMSMessage) msg).getInnerMessage();
-         if (!internalMessage.containsProperty("AMQP_MESSAGE_FORMAT")) {
-            int readerIndex = internalMessage.getBodyBuffer().readerIndex();
-            try {
-               Object s = internalMessage.getBodyBuffer().readNullableSimpleString();
-               if (s != null) {
-                  body = new AmqpValue(s.toString());
-               }
-            } catch (Throwable ignored) {
-               logger.debug("Exception ignored during conversion, should be ok!", ignored.getMessage(), ignored);
-            } finally {
-               internalMessage.getBodyBuffer().readerIndex(readerIndex);
-            }
+      Section body = convertBody(message);
+
+      if (message.getInnerMessage().isDurable()) {
+         if (header == null) {
+            header = new Header();
          }
+         header.setDurable(true);
       }
-
-      header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false);
-      header.setPriority(new UnsignedByte((byte) msg.getJMSPriority()));
-      if (msg.getJMSType() != null) {
-         props.setSubject(msg.getJMSType());
+      byte priority = (byte) message.getJMSPriority();
+      if (priority != Message.DEFAULT_PRIORITY) {
+         if (header == null) {
+            header = new Header();
+         }
+         header.setPriority(UnsignedByte.valueOf(priority));
       }
-      if (msg.getJMSMessageID() != null) {
-
-         String msgId = msg.getJMSMessageID();
-
+      String type = message.getJMSType();
+      if (type != null) {
+         if (properties == null) {
+            properties = new Properties();
+         }
+         properties.setSubject(type);
+      }
+      String messageId = message.getJMSMessageID();
+      if (messageId != null) {
+         if (properties == null) {
+            properties = new Properties();
+         }
          try {
-            props.setMessageId(AMQPMessageIdHelper.INSTANCE.toIdObject(msgId));
+            properties.setMessageId(AMQPMessageIdHelper.INSTANCE.toIdObject(messageId));
          } catch (ActiveMQAMQPIllegalStateException e) {
-            props.setMessageId(msgId);
+            properties.setMessageId(messageId);
          }
       }
-      if (msg.getJMSDestination() != null) {
-         props.setTo(vendor.toAddress(msg.getJMSDestination()));
+      Destination destination = message.getJMSDestination();
+      if (destination != null) {
+         if (properties == null) {
+            properties = new Properties();
+         }
+         properties.setTo(toAddress(destination));
          if (maMap == null) {
-            maMap = new HashMap<>();
+            maMap = new HashMap<Symbol, Object>();
          }
-         maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSDestination()));
+         maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(destination));
       }
-      if (msg.getJMSReplyTo() != null) {
-         props.setReplyTo(vendor.toAddress(msg.getJMSReplyTo()));
+      Destination replyTo = message.getJMSReplyTo();
+      if (replyTo != null) {
+         if (properties == null) {
+            properties = new Properties();
+         }
+         properties.setReplyTo(toAddress(replyTo));
          if (maMap == null) {
-            maMap = new HashMap<>();
+            maMap = new HashMap<Symbol, Object>();
          }
-         maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSReplyTo()));
+         maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo));
       }
-      if (msg.getJMSCorrelationID() != null) {
-         String correlationId = msg.getJMSCorrelationID();
-
+      String correlationId = message.getJMSCorrelationID();
+      if (correlationId != null) {
+         if (properties == null) {
+            properties = new Properties();
+         }
          try {
-            props.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId));
+            properties.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId));
          } catch (ActiveMQAMQPIllegalStateException e) {
-            props.setCorrelationId(correlationId);
+            properties.setCorrelationId(correlationId);
          }
       }
-      if (msg.getJMSExpiration() != 0) {
-         long ttl = msg.getJMSExpiration() - System.currentTimeMillis();
+      long expiration = message.getJMSExpiration();
+      if (expiration != 0) {
+         long ttl = expiration - System.currentTimeMillis();
          if (ttl < 0) {
             ttl = 1;
          }
+
+         if (header == null) {
+            header = new Header();
+         }
          header.setTtl(new UnsignedInteger((int) ttl));
 
-         props.setAbsoluteExpiryTime(new Date(msg.getJMSExpiration()));
+         if (properties == null) {
+            properties = new Properties();
+         }
+         properties.setAbsoluteExpiryTime(new Date(expiration));
       }
-      if (msg.getJMSTimestamp() != 0) {
-         props.setCreationTime(new Date(msg.getJMSTimestamp()));
+      long timeStamp = message.getJMSTimestamp();
+      if (timeStamp != 0) {
+         if (properties == null) {
+            properties = new Properties();
+         }
+         properties.setCreationTime(new Date(timeStamp));
       }
 
-      final Enumeration<String> keys = msg.getPropertyNames();
-      while (keys.hasMoreElements()) {
-         String key = keys.nextElement();
-         if (key.equals(messageFormatKey) || key.equals(nativeKey) || key.equals(ServerJMSMessage.NATIVE_MESSAGE_ID)) {
-            // skip..
-         } else if (key.equals(firstAcquirerKey)) {
-            header.setFirstAcquirer(msg.getBooleanProperty(key));
-         } else if (key.startsWith("JMSXDeliveryCount")) {
-            // The AMQP delivery-count field only includes prior failed delivery attempts,
-            // whereas JMSXDeliveryCount includes the first/current delivery attempt.
-            int amqpDeliveryCount = msg.getIntProperty(key) - 1;
-            if (amqpDeliveryCount > 0) {
-               header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount));
-            }
-         } else if (key.startsWith("JMSXUserID")) {
-            String value = msg.getStringProperty(key);
-            props.setUserId(new Binary(value.getBytes(StandardCharsets.UTF_8)));
-         } else if (key.startsWith("JMSXGroupID") || key.startsWith("_AMQ_GROUP_ID")) {
-            String value = msg.getStringProperty(key);
-            props.setGroupId(value);
-            if (apMap == null) {
-               apMap = new HashMap();
-            }
-            apMap.put(key, value);
-         } else if (key.startsWith("JMSXGroupSeq")) {
-            UnsignedInteger value = new UnsignedInteger(msg.getIntProperty(key));
-            props.setGroupSequence(value);
-            if (apMap == null) {
-               apMap = new HashMap();
-            }
-            apMap.put(key, value);
-         } else if (key.startsWith(prefixDeliveryAnnotationsKey)) {
-            if (daMap == null) {
-               daMap = new HashMap<>();
+      final Set<String> keySet = MessageUtil.getPropertyNames(message.getInnerMessage());
+      for (String key : keySet) {
+         if (key.startsWith("JMSX")) {
+            if (key.equals("JMSXDeliveryCount")) {
+               // The AMQP delivery-count field only includes prior failed delivery attempts,
+               // whereas JMSXDeliveryCount includes the first/current delivery attempt.
+               int amqpDeliveryCount = message.getDeliveryCount() - 1;
+               if (amqpDeliveryCount > 0) {
+                  if (header == null) {
+                     header = new Header();
+                  }
+                  header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount));
+               }
+               continue;
+            } else if (key.equals("JMSXUserID")) {
+               String value = message.getStringProperty(key);
+               if (properties == null) {
+                  properties = new Properties();
+               }
+               properties.setUserId(new Binary(value.getBytes(StandardCharsets.UTF_8)));
+               continue;
+            } else if (key.equals("JMSXGroupID")) {
+               String value = message.getStringProperty(key);
+               if (properties == null) {
+                  properties = new Properties();
+               }
+               properties.setGroupId(value);
+               continue;
+            } else if (key.equals("JMSXGroupSeq")) {
+               UnsignedInteger value = new UnsignedInteger(message.getIntProperty(key));
+               if (properties == null) {
+                  properties = new Properties();
+               }
+               properties.setGroupSequence(value);
+               continue;
             }
-            String name = key.substring(prefixDeliveryAnnotationsKey.length());
-            daMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
-         } else if (key.startsWith(prefixMessageAnnotationsKey)) {
-            if (maMap == null) {
-               maMap = new HashMap<>();
+         } else if (key.startsWith(JMS_AMQP_PREFIX)) {
+            // AMQP Message Information stored from a conversion to the Core Message
+            if (key.equals(JMS_AMQP_MESSAGE_FORMAT)) {
+               messageFormat = message.getLongProperty(JMS_AMQP_MESSAGE_FORMAT);
+               continue;
+            } else if (key.equals(JMS_AMQP_NATIVE)) {
+               // skip..internal use only
+               continue;
+            } else if (key.equals(JMS_AMQP_ORIGINAL_ENCODING)) {
+               // skip..internal use only
+               continue;
+            } else if (key.equals(JMS_AMQP_FIRST_ACQUIRER)) {
+               if (header == null) {
+                  header = new Header();
+               }
+               header.setFirstAcquirer(message.getBooleanProperty(key));
+               continue;
+            } else if (key.equals(JMS_AMQP_HEADER)) {
+               if (header == null) {
+                  header = new Header();
+               }
+               continue;
+            } else if (key.startsWith(JMS_AMQP_PROPERTIES)) {
+               if (properties == null) {
+                  properties = new Properties();
+               }
+               continue;
+            } else if (key.startsWith(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX)) {
+               if (daMap == null) {
+                  daMap = new HashMap<Symbol, Object>();
+               }
+               String name = key.substring(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX.length());
+               daMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
+               continue;
+            } else if (key.startsWith(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX)) {
+               if (maMap == null) {
+                  maMap = new HashMap<Symbol, Object>();
+               }
+               String name = key.substring(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX.length());
+               maMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
+               continue;
+            } else if (key.equals(JMS_AMQP_CONTENT_TYPE)) {
+               if (properties == null) {
+                  properties = new Properties();
+               }
+               properties.setContentType(Symbol.getSymbol(message.getStringProperty(key)));
+               continue;
+            } else if (key.equals(JMS_AMQP_CONTENT_ENCODING)) {
+               if (properties == null) {
+                  properties = new Properties();
+               }
+               properties.setContentEncoding(Symbol.getSymbol(message.getStringProperty(key)));
+               continue;
+            } else if (key.equals(JMS_AMQP_REPLYTO_GROUP_ID)) {
+               if (properties == null) {
+                  properties = new Properties();
+               }
+               properties.setReplyToGroupId(message.getStringProperty(key));
+               continue;
+            } else if (key.startsWith(JMS_AMQP_FOOTER_PREFIX)) {
+               if (footerMap == null) {
+                  footerMap = new HashMap<Object, Object>();
+               }
+               String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length());
+               footerMap.put(name, message.getObjectProperty(key));
+               continue;
             }
-            String name = key.substring(prefixMessageAnnotationsKey.length());
-            maMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
-         } else if (key.equals(contentTypeKey)) {
-            props.setContentType(Symbol.getSymbol(msg.getStringProperty(key)));
-         } else if (key.equals(contentEncodingKey)) {
-            props.setContentEncoding(Symbol.getSymbol(msg.getStringProperty(key)));
-         } else if (key.equals(replyToGroupIDKey)) {
-            props.setReplyToGroupId(msg.getStringProperty(key));
-         } else if (key.startsWith(prefixFooterKey)) {
-            if (footerMap == null) {
-               footerMap = new HashMap();
+         } else if (key.equals("_AMQ_GROUP_ID")) {
+            String value = message.getStringProperty(key);
+            if (properties == null) {
+               properties = new Properties();
             }
-            String name = key.substring(prefixFooterKey.length());
-            footerMap.put(name, msg.getObjectProperty(key));
+            properties.setGroupId(value);
+            continue;
+         } else if (key.equals(ServerJMSMessage.NATIVE_MESSAGE_ID)) {
+            // skip..internal use only
+            continue;
+         } else if (key.endsWith(HDR_SCHEDULED_DELIVERY_TIME.toString())) {
+            // skip..remove annotation from previous inbound transformation
+            continue;
          } else if (key.equals(AMQPMessageTypes.AMQP_TYPE_KEY)) {
-            // skip
-         } else {
-            if (apMap == null) {
-               apMap = new HashMap();
-            }
-            Object objectProperty = msg.getObjectProperty(key);
-            if (objectProperty instanceof byte[]) {
-               Binary binary = new Binary((byte[]) objectProperty);
-               apMap.put(key, binary);
-            } else {
-               apMap.put(key, objectProperty);
-            }
+            // skip..internal use only - TODO - Remove this deprecated value in future release.
+            continue;
+         }
+
+         if (apMap == null) {
+            apMap = new HashMap<String, Object>();
          }
+
+         Object objectProperty = message.getObjectProperty(key);
+         if (objectProperty instanceof byte[]) {
+            objectProperty = new Binary((byte[]) objectProperty);
+         }
+
+         apMap.put(key, objectProperty);
       }
 
-      MessageAnnotations ma = null;
-      if (maMap != null) {
-         ma = new MessageAnnotations(maMap);
+      EncoderImpl encoder = tlsCodec.get().encoder;
+      encoder.setByteBuffer(buffer);
+
+      if (header != null) {
+         encoder.writeObject(header);
       }
-      DeliveryAnnotations da = null;
       if (daMap != null) {
-         da = new DeliveryAnnotations(daMap);
+         encoder.writeObject(new DeliveryAnnotations(daMap));
+      }
+      if (maMap != null) {
+         encoder.writeObject(new MessageAnnotations(maMap));
+      }
+      if (properties != null) {
+         encoder.writeObject(properties);
       }
-      ApplicationProperties ap = null;
       if (apMap != null) {
-         ap = new ApplicationProperties(apMap);
+         encoder.writeObject(new ApplicationProperties(apMap));
+      }
+      if (body != null) {
+         encoder.writeObject(body);
       }
-      Footer footer = null;
       if (footerMap != null) {
-         footer = new Footer(footerMap);
+         encoder.writeObject(new Footer(footerMap));
+      }
+
+      return messageFormat;
+   }
+
+   private Section convertBody(ServerJMSMessage message) throws JMSException {
+
+      Section body = null;
+      short orignalEncoding = AMQP_UNKNOWN;
+
+      try {
+         orignalEncoding = message.getShortProperty(JMS_AMQP_ORIGINAL_ENCODING);
+      } catch (Exception ex) {
+         // Ignore and stick with UNKNOWN
+      }
+
+      if (message instanceof ServerJMSBytesMessage) {
+         Binary payload = getBinaryFromMessageBody((ServerJMSBytesMessage) message);
+
+         if (payload == null) {
+            payload = EMPTY_BINARY;
+         }
+
+         switch (orignalEncoding) {
+            case AMQP_NULL:
+               break;
+            case AMQP_VALUE_BINARY:
+               body = new AmqpValue(payload);
+               break;
+            case AMQP_DATA:
+            case AMQP_UNKNOWN:
+            default:
+               body = new Data(payload);
+               break;
+         }
+      } else if (message instanceof ServerJMSTextMessage) {
+         switch (orignalEncoding) {
+            case AMQP_NULL:
+               break;
+            case AMQP_DATA:
+               body = new Data(getBinaryFromMessageBody((ServerJMSTextMessage) message));
+               break;
+            case AMQP_VALUE_STRING:
+            case AMQP_UNKNOWN:
+            default:
+               body = new AmqpValue(((TextMessage) message).getText());
+               break;
+         }
+      } else if (message instanceof ServerJMSMapMessage) {
+         body = new AmqpValue(getMapFromMessageBody((ServerJMSMapMessage) message));
+      } else if (message instanceof ServerJMSStreamMessage) {
+         ArrayList<Object> list = new ArrayList<>();
+         final ServerJMSStreamMessage m = (ServerJMSStreamMessage) message;
+         try {
+            while (true) {
+               list.add(m.readObject());
+            }
+         } catch (MessageEOFException e) {
+         }
+
+         // Deprecated encoding markers - TODO - Remove on future release
+         if (orignalEncoding == AMQP_UNKNOWN) {
+            String amqpType = message.getStringProperty(AMQPMessageTypes.AMQP_TYPE_KEY);
+            if (amqpType != null) {
+               if (amqpType.equals(AMQPMessageTypes.AMQP_LIST)) {
+                  orignalEncoding = AMQP_VALUE_LIST;
+               } else {
+                  orignalEncoding = AMQP_SEQUENCE;
+               }
+            }
+         }
+
+         switch (orignalEncoding) {
+            case AMQP_SEQUENCE:
+               body = new AmqpSequence(list);
+               break;
+            case AMQP_VALUE_LIST:
+            case AMQP_UNKNOWN:
+            default:
+               body = new AmqpValue(list);
+               break;
+         }
+      } else if (message instanceof ServerJMSObjectMessage) {
+         Binary payload = getBinaryFromMessageBody((ServerJMSObjectMessage) message);
+
+         if (payload == null) {
+            payload = EMPTY_BINARY;
+         }
+
+         switch (orignalEncoding) {
+            case AMQP_VALUE_BINARY:
+               body = new AmqpValue(payload);
+               break;
+            case AMQP_DATA:
+            case AMQP_UNKNOWN:
+            default:
+               body = new Data(payload);
+               break;
+         }
+
+         // For a non-AMQP message we tag the outbound content type as containing
+         // a serialized Java object so that an AMQP client has a hint as to what
+         // we are sending it.
+         if (!message.propertyExists(JMS_AMQP_CONTENT_TYPE)) {
+            message.setStringProperty(JMS_AMQP_CONTENT_TYPE, SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
+         }
+      } else if (message instanceof ServerJMSMessage) {
+         // If this is not an AMQP message that was converted then the original encoding
+         // will be unknown so we check for special cases of messages with special data
+         // encoded into the server message body.
+         if (orignalEncoding == AMQP_UNKNOWN) {
+            MessageInternal internalMessage = message.getInnerMessage();
+            int readerIndex = internalMessage.getBodyBuffer().readerIndex();
+            try {
+               Object s = internalMessage.getBodyBuffer().readNullableSimpleString();
+               if (s != null) {
+                  body = new AmqpValue(s.toString());
+               }
+            } catch (Throwable ignored) {
+               logger.debug("Exception ignored during conversion, should be ok!", ignored.getMessage(), ignored);
+            } finally {
+               internalMessage.getBodyBuffer().readerIndex(readerIndex);
+            }
+         }
+      }
+
+      return body;
+   }
+
+   private Binary getBinaryFromMessageBody(ServerJMSBytesMessage message) throws JMSException {
+      byte[] data = new byte[(int) message.getBodyLength()];
+      message.readBytes(data);
+      message.reset(); // Need to reset after readBytes or future readBytes
+
+      return new Binary(data);
+   }
+
+   private Binary getBinaryFromMessageBody(ServerJMSTextMessage message) throws JMSException {
+      Binary result = null;
+      String text = message.getText();
+      if (text != null) {
+         result = new Binary(text.getBytes(StandardCharsets.UTF_8));
+      }
+
+      return result;
+   }
+
+   private Binary getBinaryFromMessageBody(ServerJMSObjectMessage message) throws JMSException {
+      message.getInnerMessage().getBodyBuffer().resetReaderIndex();
+      int size = message.getInnerMessage().getBodyBuffer().readInt();
+      byte[] bytes = new byte[size];
+      message.getInnerMessage().getBodyBuffer().readBytes(bytes);
+
+      return new Binary(bytes);
+   }
+
+   private Map<String, Object> getMapFromMessageBody(ServerJMSMapMessage message) throws JMSException {
+      final HashMap<String, Object> map = new LinkedHashMap<>();
+
+      @SuppressWarnings("unchecked")
+      final Enumeration<String> names = message.getMapNames();
+      while (names.hasMoreElements()) {
+         String key = names.nextElement();
+         Object value = message.getObject(key);
+         if (value instanceof byte[]) {
+            value = new Binary((byte[]) value);
+         }
+         map.put(key, value);
       }
 
-      return (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(header, da, ma, props, ap, body, footer);
+      return map;
    }
 
    private static byte destinationType(Destination destination) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSVendor.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSVendor.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSVendor.java
deleted file mode 100644
index 9a0ed63..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSVendor.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.protocol.amqp.converter.message;
-
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.ObjectMessage;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
-
-public interface JMSVendor {
-
-   BytesMessage createBytesMessage();
-
-   StreamMessage createStreamMessage();
-
-   Message createMessage();
-
-   TextMessage createTextMessage();
-
-   ObjectMessage createObjectMessage();
-
-   MapMessage createMapMessage();
-
-   void setJMSXUserID(Message message, String value);
-
-   Destination createDestination(String name);
-
-   void setJMSXGroupID(Message message, String groupId);
-
-   void setJMSXGroupSequence(Message message, int value);
-
-   void setJMSXDeliveryCount(Message message, long value);
-
-   String toAddress(Destination destination);
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java
index 310d4ba..f15490f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java
@@ -1,13 +1,13 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,54 +16,36 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter.message;
 
-public abstract class OutboundTransformer {
-
-   JMSVendor vendor;
-   String prefixVendor;
+import java.io.UnsupportedEncodingException;
 
-   String prefixDeliveryAnnotations = "DA_";
-   String prefixMessageAnnotations = "MA_";
-   String prefixFooter = "FT_";
+import javax.jms.JMSException;
 
-   String messageFormatKey;
-   String nativeKey;
-   String firstAcquirerKey;
-   String prefixDeliveryAnnotationsKey;
-   String prefixMessageAnnotationsKey;
-   String contentTypeKey;
-   String contentEncodingKey;
-   String replyToGroupIDKey;
-   String prefixFooterKey;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
+import org.apache.activemq.artemis.utils.IDGenerator;
+import org.apache.qpid.proton.codec.WritableBuffer;
 
-   public OutboundTransformer(JMSVendor vendor) {
-      this.vendor = vendor;
-      this.setPrefixVendor("JMS_AMQP_");
-   }
-
-   public String getPrefixVendor() {
-      return prefixVendor;
-   }
-
-   public void setPrefixVendor(String prefixVendor) {
-      this.prefixVendor = prefixVendor;
+public abstract class OutboundTransformer {
 
-      messageFormatKey = prefixVendor + "MESSAGE_FORMAT";
-      nativeKey = prefixVendor + "NATIVE";
-      firstAcquirerKey = prefixVendor + "FirstAcquirer";
-      prefixDeliveryAnnotationsKey = prefixVendor + prefixDeliveryAnnotations;
-      prefixMessageAnnotationsKey = prefixVendor + prefixMessageAnnotations;
-      contentTypeKey = prefixVendor + "ContentType";
-      contentEncodingKey = prefixVendor + "ContentEncoding";
-      replyToGroupIDKey = prefixVendor + "ReplyToGroupID";
-      prefixFooterKey = prefixVendor + prefixFooter;
+   protected IDGenerator idGenerator;
 
+   public OutboundTransformer(IDGenerator idGenerator) {
+      this.idGenerator = idGenerator;
    }
 
-   public JMSVendor getVendor() {
-      return vendor;
-   }
+   /**
+    * Given an JMS Message perform a conversion to an AMQP Message and encode into a form that
+    * is ready for transmission.
+    *
+    * @param message
+    *        the message to transform
+    * @param buffer
+    *        the buffer where encoding should write to
+    *
+    * @return the message format key of the encoded message.
+    *
+    * @throws Exception
+    *         if an error occurs during message transformation
+    */
+   public abstract long transform(ServerJMSMessage message, WritableBuffer buffer) throws JMSException, UnsupportedEncodingException;
 
-   public void setVendor(JMSVendor vendor) {
-      this.vendor = vendor;
-   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPInvalidContentTypeException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPInvalidContentTypeException.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPInvalidContentTypeException.java
new file mode 100644
index 0000000..4def92c
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPInvalidContentTypeException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.exceptions;
+
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+
+public class ActiveMQAMQPInvalidContentTypeException extends ActiveMQAMQPException {
+
+   public ActiveMQAMQPInvalidContentTypeException(String message) {
+      super(AmqpError.INTERNAL_ERROR, message, ActiveMQExceptionType.INTERNAL_ERROR);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 7d401fa..94e6a47 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -19,8 +19,6 @@ package org.apache.activemq.artemis.protocol.amqp.proton;
 import java.util.Map;
 import java.util.Objects;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -52,9 +50,11 @@ import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Sender;
-import org.apache.qpid.proton.message.ProtonJMessage;
 import org.jboss.logging.Logger;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+
 public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler {
 
    private static final Logger log = Logger.getLogger(ProtonServerSenderContext.class);
@@ -407,33 +407,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          return 0;
       }
 
-      //encode the message
-      ProtonJMessage serverMessage;
-      try {
-         // This can be done a lot better here
-         serverMessage = sessionSPI.encodeMessage(message, deliveryCount);
-      } catch (Throwable e) {
-         log.warn(e.getMessage(), e);
-         throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
-      }
-
-      return performSend(serverMessage, message);
-   }
-
-   private static boolean hasCapabilities(Symbol symbol, Source source) {
-      if (source != null) {
-         if (source.getCapabilities() != null) {
-            for (Symbol cap : source.getCapabilities()) {
-               if (symbol.equals(cap)) {
-                  return true;
-               }
-            }
-         }
-      }
-      return false;
-   }
-
-   protected int performSend(ProtonJMessage serverMessage, Object context) {
       if (!creditsSemaphore.tryAcquire()) {
          try {
             creditsSemaphore.acquire();
@@ -444,22 +417,32 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          }
       }
 
-      //presettle means we can ack the message on the dealer side before we send it, i.e. for browsers
+      // presettle means we can settle the message on the dealer side before we send it, i.e. for browsers
       boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
 
-      //we only need a tag if we are going to ack later
+      // we only need a tag if we are going to settle later
       byte[] tag = preSettle ? new byte[0] : protonSession.getTag();
 
       ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
       try {
-         serverMessage.encode(new NettyWritable(nettyBuffer));
+         long messageFormat = 0;
+
+         // Encode the Server Message into the given Netty Buffer as an AMQP
+         // Message transformed from the internal message model.
+         try {
+            messageFormat = sessionSPI.encodeMessage(message, deliveryCount, new NettyWritable(nettyBuffer));
+         } catch (Throwable e) {
+            log.warn(e.getMessage(), e);
+            throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
+         }
 
          int size = nettyBuffer.writerIndex();
 
          synchronized (connection.getLock()) {
             final Delivery delivery;
             delivery = sender.delivery(tag, 0, tag.length);
-            delivery.setContext(context);
+            delivery.setMessageFormat((int) messageFormat);
+            delivery.setContext(message);
 
             // this will avoid a copy.. patch provided by Norman using buffer.array()
             sender.send(nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes());
@@ -479,6 +462,19 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       }
    }
 
+   private static boolean hasCapabilities(Symbol symbol, Source source) {
+      if (source != null) {
+         if (source.getCapabilities() != null) {
+            for (Symbol cap : source.getCapabilities()) {
+               if (symbol.equals(cap)) {
+                  return true;
+               }
+            }
+         }
+      }
+      return false;
+   }
+
    private static String createQueueName(String clientId, String pubId) {
       return clientId + "." + pubId;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
index 148482e..96ce90e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
@@ -16,32 +16,31 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter;
 
-import java.io.ByteArrayOutputStream;
+import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.wrapMessage;
+
 import java.io.IOException;
-import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.message.EncodedMessage;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.activemq.artemis.utils.SimpleIDGenerator;
-import org.apache.blacklist.ABadClass;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
@@ -53,10 +52,13 @@ import org.apache.qpid.proton.message.impl.MessageImpl;
 import org.junit.Assert;
 import org.junit.Test;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+
 public class TestConversions extends Assert {
 
    @Test
-   public void testObjectMessageWhiteList() throws Exception {
+   public void testAmqpValueOfBooleanIsPassedThrough() throws Exception {
       Map<String, Object> mapprop = createPropertiesMap();
       ApplicationProperties properties = new ApplicationProperties(mapprop);
       MessageImpl message = (MessageImpl) Message.Factory.create();
@@ -73,39 +75,15 @@ public class TestConversions extends Assert {
       EncodedMessage encodedMessage = encodeMessage(message);
 
       ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
-      ServerJMSObjectMessage serverMessage = (ServerJMSObjectMessage) converter.inboundJMSType(encodedMessage);
+      ServerMessage serverMessage = converter.inbound(encodedMessage);
 
-      verifyProperties(serverMessage);
+      verifyProperties(new ServerJMSMessage(serverMessage, 0));
 
-      assertEquals(true, serverMessage.getObject());
+      EncodedMessage encoded = (EncodedMessage) converter.outbound(serverMessage, 0);
+      Message amqpMessage = encoded.decode();
 
-      Object obj = converter.outbound((ServerMessage) serverMessage.getInnerMessage(), 0);
-
-      AmqpValue value = (AmqpValue) ((Message) obj).getBody();
+      AmqpValue value = (AmqpValue) amqpMessage.getBody();
       assertEquals(value.getValue(), true);
-
-   }
-
-   @Test
-   public void testObjectMessageNotOnWhiteList() throws Exception {
-
-      ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
-      ServerMessageImpl message = new ServerMessageImpl(1, 1024);
-      message.setType((byte) 2);
-      ServerJMSObjectMessage serverMessage = new ServerJMSObjectMessage(message, 1024);
-      ByteArrayOutputStream out = new ByteArrayOutputStream();
-      ObjectOutputStream ois = new ObjectOutputStream(out);
-      ois.writeObject(new ABadClass());
-      byte[] src = out.toByteArray();
-      serverMessage.getInnerMessage().getBodyBuffer().writeInt(src.length);
-      serverMessage.getInnerMessage().getBodyBuffer().writeBytes(src);
-
-      try {
-         converter.outbound((ServerMessage) serverMessage.getInnerMessage(), 0);
-         fail("should throw ClassNotFoundException");
-      } catch (ClassNotFoundException e) {
-         //ignore
-      }
    }
 
    @Test
@@ -126,22 +104,23 @@ public class TestConversions extends Assert {
       EncodedMessage encodedMessage = encodeMessage(message);
 
       ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
-      ServerJMSBytesMessage serverMessage = (ServerJMSBytesMessage) converter.inboundJMSType(encodedMessage);
+      ServerMessage serverMessage = converter.inbound(encodedMessage);
 
-      verifyProperties(serverMessage);
+      ServerJMSBytesMessage bytesMessage = (ServerJMSBytesMessage) wrapMessage(BYTES_TYPE, serverMessage, 0);
 
-      assertEquals(bodyBytes.length, serverMessage.getBodyLength());
+      verifyProperties(bytesMessage);
+
+      assertEquals(bodyBytes.length, bytesMessage.getBodyLength());
 
       byte[] newBodyBytes = new byte[4];
 
-      serverMessage.readBytes(newBodyBytes);
+      bytesMessage.readBytes(newBodyBytes);
 
       Assert.assertArrayEquals(bodyBytes, newBodyBytes);
 
-      Object obj = converter.outbound((ServerMessage) serverMessage.getInnerMessage(), 0);
+      Object obj = converter.outbound(serverMessage, 0);
 
       System.out.println("output = " + obj);
-
    }
 
    private void verifyProperties(javax.jms.Message message) throws Exception {
@@ -175,25 +154,25 @@ public class TestConversions extends Assert {
       EncodedMessage encodedMessage = encodeMessage(message);
 
       ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
-      ServerJMSMapMessage serverMessage = (ServerJMSMapMessage) converter.inboundJMSType(encodedMessage);
+      ServerMessage serverMessage = converter.inbound(encodedMessage);
 
-      verifyProperties(serverMessage);
+      ServerJMSMapMessage mapMessage = (ServerJMSMapMessage) wrapMessage(MAP_TYPE, serverMessage, 0);
+      mapMessage.decode();
 
-      Assert.assertEquals(1, serverMessage.getInt("someint"));
-      Assert.assertEquals("value", serverMessage.getString("somestr"));
+      verifyProperties(mapMessage);
 
-      Object obj = converter.outbound((ServerMessage) serverMessage.getInnerMessage(), 0);
+      Assert.assertEquals(1, mapMessage.getInt("someint"));
+      Assert.assertEquals("value", mapMessage.getString("somestr"));
 
-      reEncodeMsg(obj);
+      EncodedMessage encoded = (EncodedMessage) converter.outbound(serverMessage, 0);
+      Message amqpMessage = encoded.decode();
 
-      MessageImpl outMessage = (MessageImpl) obj;
-      AmqpValue value = (AmqpValue) outMessage.getBody();
-      Map mapoutput = (Map) value.getValue();
+      AmqpValue value = (AmqpValue) amqpMessage.getBody();
+      Map<?, ?> mapoutput = (Map<?, ?>) value.getValue();
 
       assertEquals(Integer.valueOf(1), mapoutput.get("someint"));
 
-      System.out.println("output = " + obj);
-
+      System.out.println("output = " + amqpMessage);
    }
 
    @Test
@@ -212,26 +191,25 @@ public class TestConversions extends Assert {
       EncodedMessage encodedMessage = encodeMessage(message);
 
       ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
-      ServerJMSStreamMessage serverMessage = (ServerJMSStreamMessage) converter.inboundJMSType(encodedMessage);
+      ServerMessage serverMessage = converter.inbound(encodedMessage);
 
       simulatePersistence(serverMessage);
 
-      verifyProperties(serverMessage);
+      ServerJMSStreamMessage streamMessage = (ServerJMSStreamMessage) wrapMessage(STREAM_TYPE, serverMessage, 0);
 
-      serverMessage.reset();
+      verifyProperties(streamMessage);
 
-      assertEquals(10, serverMessage.readInt());
-      assertEquals("10", serverMessage.readString());
+      streamMessage.reset();
 
-      Object obj = converter.outbound((ServerMessage) serverMessage.getInnerMessage(), 0);
+      assertEquals(10, streamMessage.readInt());
+      assertEquals("10", streamMessage.readString());
 
-      reEncodeMsg(obj);
+      EncodedMessage encoded = (EncodedMessage) converter.outbound(serverMessage, 0);
+      Message amqpMessage = encoded.decode();
 
-      MessageImpl outMessage = (MessageImpl) obj;
-      List list = ((AmqpSequence) outMessage.getBody()).getValue();
+      List<?> list = ((AmqpSequence) amqpMessage.getBody()).getValue();
       Assert.assertEquals(Integer.valueOf(10), list.get(0));
       Assert.assertEquals("10", list.get(1));
-
    }
 
    @Test
@@ -247,33 +225,33 @@ public class TestConversions extends Assert {
       EncodedMessage encodedMessage = encodeMessage(message);
 
       ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
-      ServerJMSTextMessage serverMessage = (ServerJMSTextMessage) converter.inboundJMSType(encodedMessage);
+      ServerMessage serverMessage = converter.inbound(encodedMessage);
 
       simulatePersistence(serverMessage);
 
-      verifyProperties(serverMessage);
+      ServerJMSTextMessage textMessage = (ServerJMSTextMessage) wrapMessage(TEXT_TYPE, serverMessage, 0);
+      textMessage.decode();
 
-      Assert.assertEquals(text, serverMessage.getText());
+      verifyProperties(textMessage);
 
-      Object obj = converter.outbound((ServerMessage) serverMessage.getInnerMessage(), 0);
+      Assert.assertEquals(text, textMessage.getText());
 
-      reEncodeMsg(obj);
+      EncodedMessage encoded = (EncodedMessage) converter.outbound(serverMessage, 0);
+      Message amqpMessage = encoded.decode();
 
-      MessageImpl outMessage = (MessageImpl) obj;
-      AmqpValue value = (AmqpValue) outMessage.getBody();
+      AmqpValue value = (AmqpValue) amqpMessage.getBody();
       String textValue = (String) value.getValue();
 
       Assert.assertEquals(text, textValue);
 
-      System.out.println("output = " + obj);
-
+      System.out.println("output = " + amqpMessage);
    }
 
-   private void simulatePersistence(ServerJMSMessage serverMessage) {
-      serverMessage.getInnerMessage().setAddress(new SimpleString("jms.queue.SomeAddress"));
+   private void simulatePersistence(ServerMessage serverMessage) {
+      serverMessage.setAddress(new SimpleString("jms.queue.SomeAddress"));
       // This is just to simulate what would happen during the persistence of the message
       // We need to still be able to recover the message when we read it back
-      ((EncodingSupport) serverMessage.getInnerMessage()).encode(new EmptyBuffer());
+      ((EncodingSupport) serverMessage).encode(new EmptyBuffer());
    }
 
    private ProtonJMessage reEncodeMsg(Object obj) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java
new file mode 100644
index 0000000..4caead7
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.message;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidContentTypeException;
+import org.junit.Test;
+
+public class AMQPContentTypeSupportTest {
+
+   @Test(expected = ActiveMQAMQPInvalidContentTypeException.class)
+   public void testParseContentTypeWithOnlyType() throws Exception {
+      doParseContentTypeTestImpl("type", null);
+   }
+
+   @Test(expected = ActiveMQAMQPInvalidContentTypeException.class)
+   public void testParseContentTypeEndsWithSlash() throws Exception {
+      doParseContentTypeTestImpl("type/", null);
+   }
+
+   @Test(expected = ActiveMQAMQPInvalidContentTypeException.class)
+   public void testParseContentTypeMissingSubtype() throws Exception {
+      doParseContentTypeTestImpl("type/;", null);
+   }
+
+   @Test(expected = ActiveMQAMQPInvalidContentTypeException.class)
+   public void testParseContentTypeEmptyString() throws Exception {
+      doParseContentTypeTestImpl("", null);
+   }
+
+   @Test(expected = ActiveMQAMQPInvalidContentTypeException.class)
+   public void testParseContentTypeNullString() throws Exception {
+      doParseContentTypeTestImpl(null, null);
+   }
+
+   @Test
+   public void testParseContentTypeNoParamsAfterSeparatorNonTextual() throws Exception {
+      // Expect null as this is not a textual type
+      doParseContentTypeTestImpl("type/subtype;", null);
+   }
+
+   @Test
+   public void testParseContentTypeNoParamsAfterSeparatorTextualType() throws Exception {
+      doParseContentTypeTestImpl("text/plain;", StandardCharsets.UTF_8);
+   }
+
+   @Test
+   public void testParseContentTypeEmptyParamsAfterSeparator() throws Exception {
+      doParseContentTypeTestImpl("text/plain;;", StandardCharsets.UTF_8);
+   }
+
+   @Test
+   public void testParseContentTypeNoParams() throws Exception {
+      doParseContentTypeTestImpl("text/plain", StandardCharsets.UTF_8);
+   }
+
+   @Test
+   public void testParseContentTypeWithCharsetUtf8() throws Exception {
+      doParseContentTypeTestImpl("text/plain;charset=utf-8", StandardCharsets.UTF_8);
+   }
+
+   @Test
+   public void testParseContentTypeWithCharsetAscii() throws Exception {
+      doParseContentTypeTestImpl("text/plain;charset=us-ascii", StandardCharsets.US_ASCII);
+   }
+
+   @Test
+   public void testParseContentTypeWithMultipleParams() throws Exception {
+      doParseContentTypeTestImpl("text/plain; param=value; charset=us-ascii", StandardCharsets.US_ASCII);
+   }
+
+   @Test
+   public void testParseContentTypeWithCharsetQuoted() throws Exception {
+      doParseContentTypeTestImpl("text/plain;charset=\"us-ascii\"", StandardCharsets.US_ASCII);
+   }
+
+   @Test(expected = ActiveMQAMQPInvalidContentTypeException.class)
+   public void testParseContentTypeWithCharsetQuotedEmpty() throws Exception {
+      doParseContentTypeTestImpl("text/plain;charset=\"\"", null);
+   }
+
+   @Test(expected = ActiveMQAMQPInvalidContentTypeException.class)
+   public void testParseContentTypeWithCharsetQuoteNotClosed() throws Exception {
+      doParseContentTypeTestImpl("text/plain;charset=\"unclosed", null);
+   }
+
+   @Test(expected = ActiveMQAMQPInvalidContentTypeException.class)
+   public void testParseContentTypeWithCharsetQuoteNotClosedEmpty() throws Exception {
+      doParseContentTypeTestImpl("text/plain;charset=\"", null);
+   }
+
+   @Test(expected = ActiveMQAMQPInvalidContentTypeException.class)
+   public void testParseContentTypeWithNoCharsetValue() throws Exception {
+      doParseContentTypeTestImpl("text/plain;charset=", null);
+   }
+
+   @Test
+   public void testParseContentTypeWithTextPlain() throws Exception {
+      doParseContentTypeTestImpl("text/plain;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+      doParseContentTypeTestImpl("text/plain;charset=us-ascii", StandardCharsets.US_ASCII);
+      doParseContentTypeTestImpl("text/plain;charset=utf-8", StandardCharsets.UTF_8);
+      doParseContentTypeTestImpl("text/plain", StandardCharsets.UTF_8);
+   }
+
+   @Test
+   public void testParseContentTypeWithTextJson() throws Exception {
+      doParseContentTypeTestImpl("text/json;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+      doParseContentTypeTestImpl("text/json;charset=us-ascii", StandardCharsets.US_ASCII);
+      doParseContentTypeTestImpl("text/json;charset=utf-8", StandardCharsets.UTF_8);
+      doParseContentTypeTestImpl("text/json", StandardCharsets.UTF_8);
+   }
+
+   @Test
+   public void testParseContentTypeWithTextHtml() throws Exception {
+      doParseContentTypeTestImpl("text/html;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+      doParseContentTypeTestImpl("text/html;charset=us-ascii", StandardCharsets.US_ASCII);
+      doParseContentTypeTestImpl("text/html;charset=utf-8", StandardCharsets.UTF_8);
+      doParseContentTypeTestImpl("text/html", StandardCharsets.UTF_8);
+   }
+
+   @Test
+   public void testParseContentTypeWithTextFoo() throws Exception {
+      doParseContentTypeTestImpl("text/foo;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+      doParseContentTypeTestImpl("text/foo;charset=us-ascii", StandardCharsets.US_ASCII);
+      doParseContentTypeTestImpl("text/foo;charset=utf-8", StandardCharsets.UTF_8);
+      doParseContentTypeTestImpl("text/foo", StandardCharsets.UTF_8);
+   }
+
+   @Test
+   public void testParseContentTypeWithApplicationJson() throws Exception {
+      doParseContentTypeTestImpl("application/json;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+      doParseContentTypeTestImpl("application/json;charset=us-ascii", StandardCharsets.US_ASCII);
+      doParseContentTypeTestImpl("application/json;charset=utf-8", StandardCharsets.UTF_8);
+      doParseContentTypeTestImpl("application/json", StandardCharsets.UTF_8);
+   }
+
+   @Test
+   public void testParseContentTypeWithApplicationJsonVariant() throws Exception {
+      doParseContentTypeTestImpl("application/something+json;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+      doParseContentTypeTestImpl("application/something+json;charset=us-ascii", StandardCharsets.US_ASCII);
+      doParseContentTypeTestImpl("application/something+json;charset=utf-8", StandardCharsets.UTF_8);
+      doParseContentTypeTestImpl("application/something+json", StandardCharsets.UTF_8);
+   }
+
+   @Test
+   public void testParseContentTypeWithApplicationJavascript() throws Exception {
+      doParseContentTypeTestImpl("application/javascript;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+      doParseContentTypeTestImpl("application/javascript;charset=us-ascii", StandardCharsets.US_ASCII);
+      doParseContentTypeTestImpl("application/javascript;charset=utf-8", StandardCharsets.UTF_8);
+      doParseContentTypeTestImpl("application/javascript", StandardCharsets.UTF_8);
+   }
+
+   @Test
+   public void testParseContentTypeWithApplicationEcmascript() throws Exception {
+      doParseContentTypeTestImpl("application/ecmascript;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+      doParseContentTypeTestImpl("application/ecmascript;charset=us-ascii", StandardCharsets.US_ASCII);
+      doParseContentTypeTestImpl("application/ecmascript;charset=utf-8", StandardCharsets.UTF_8);
+      doParseContentTypeTestImpl("application/ecmascript", StandardCharsets.UTF_8);
+   }
+
+   @Test
+   public void testParseContentTypeWithApplicationXml() throws Exception {
+      doParseContentTypeTestImpl("application/xml;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+      doParseContentTypeTestImpl("application/xml;charset=us-ascii", StandardCharsets.US_ASCII);
+      doParseContentTypeTestImpl("application/xml;charset=utf-8", StandardCharsets.UTF_8);
+      doParseContentTypeTestImpl("application/xml", StandardCharsets.UTF_8);
+   }
+
+   @Test
+   public void testParseContentTypeWithApplicationXmlVariant() throws Exception {
+      doParseContentTypeTestImpl("application/something+xml;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+      doParseContentTypeTestImpl("application/something+xml;charset=us-ascii", StandardCharsets.US_ASCII);
+      doParseContentTypeTestImpl("application/something+xml;charset=utf-8", StandardCharsets.UTF_8);
+      doParseContentTypeTestImpl("application/something+xml", StandardCharsets.UTF_8);
+   }
+
+   @Test
+   public void testParseContentTypeWithApplicationXmlDtd() throws Exception {
+      doParseContentTypeTestImpl("application/xml-dtd;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+      doParseContentTypeTestImpl("application/xml-dtd;charset=us-ascii", StandardCharsets.US_ASCII);
+      doParseContentTypeTestImpl("application/xml-dtd;charset=utf-8", StandardCharsets.UTF_8);
+      doParseContentTypeTestImpl("application/xml-dtd", StandardCharsets.UTF_8);
+   }
+
+   @Test
+   public void testParseContentTypeWithApplicationOtherNotTextual() throws Exception {
+      // Expect null as this is not a textual type
+      doParseContentTypeTestImpl("application/other", null);
+   }
+
+   @Test
+   public void testParseContentTypeWithApplicationOctetStream() throws Exception {
+      // Expect null as this is not a textual type
+      doParseContentTypeTestImpl(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE, null);
+   }
+
+   @Test
+   public void testParseContentTypeWithApplicationJavaSerialized() throws Exception {
+      // Expect null as this is not a textual type
+      doParseContentTypeTestImpl(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, null);
+   }
+
+   private void doParseContentTypeTestImpl(String contentType, Charset expected) throws ActiveMQAMQPInvalidContentTypeException {
+      Charset charset = AMQPContentTypeSupport.parseContentTypeForTextualCharset(contentType);
+      if (expected == null) {
+         assertNull("Expected no charset, but got:" + charset, charset);
+      } else {
+         assertEquals("Charset not as expected", expected, charset);
+      }
+   }
+}