You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2017/03/06 11:54:01 UTC

[14/22] activemq-artemis git commit: ARTEMIS-1009 Pure Message Encoding.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 2e19f07..4e33c9b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -21,15 +21,19 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
-import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress;
 import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
@@ -38,7 +42,6 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFound
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
 import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
 import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore;
-import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.activemq.artemis.selector.filter.FilterException;
 import org.apache.activemq.artemis.selector.impl.SelectorParser;
 import org.apache.qpid.proton.amqp.DescribedType;
@@ -61,9 +64,6 @@ import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Sender;
 import org.jboss.logging.Logger;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-
 /**
  * TODO: Merge {@link ProtonServerSenderContext} and {@link org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links
  */
@@ -474,7 +474,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       if (closed) {
          return;
       }
-      Object message = delivery.getContext();
+      Message message = (Message)delivery.getContext();
 
       boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
 
@@ -518,6 +518,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
             try {
                sessionSPI.ack(null, brokerConsumer, message);
             } catch (Exception e) {
+               e.printStackTrace();
                throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
             }
          } else if (remoteState instanceof Released) {
@@ -566,7 +567,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
    /**
     * handle an out going message from ActiveMQ Artemis, send via the Proton Sender
     */
-   public int deliverMessage(Object message, int deliveryCount) throws Exception {
+   public int deliverMessage(AMQPMessage message, int deliveryCount) throws Exception {
       if (closed) {
          return 0;
       }
@@ -590,16 +591,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
 
       ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
       try {
-         long messageFormat = 0;
-
-         // Encode the Server Message into the given Netty Buffer as an AMQP
-         // Message transformed from the internal message model.
-         try {
-            messageFormat = sessionSPI.encodeMessage(message, deliveryCount, new NettyWritable(nettyBuffer));
-         } catch (Throwable e) {
-            log.warn(e.getMessage(), e);
-            throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
-         }
+         message.sendBuffer(nettyBuffer, deliveryCount);
 
          int size = nettyBuffer.writerIndex();
 
@@ -609,7 +601,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
             }
             final Delivery delivery;
             delivery = sender.delivery(tag, 0, tag.length);
-            delivery.setMessageFormat((int) messageFormat);
+            delivery.setMessageFormat((int) message.getMessageFormat());
             delivery.setContext(message);
 
             // this will avoid a copy.. patch provided by Norman using buffer.array()

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java
index 51f42a3..1afeba8 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java
@@ -22,6 +22,7 @@ import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
 import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
 import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
@@ -61,7 +62,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
             return;
          }
 
-         DeliveryUtil.readDelivery(receiver, buffer);
+         receiver.recv(new NettyWritable(buffer));
 
          receiver.advance();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
index 25ef51e..673a688 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
@@ -254,7 +254,7 @@ public class ProtonHandler extends ProtonInitializable {
       flush(false);
    }
 
-   private void flush(boolean wait) {
+   public void flush(boolean wait) {
       synchronized (lock) {
          transport.process();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java
new file mode 100644
index 0000000..e0705b4
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.util;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.qpid.proton.codec.ReadableBuffer;
+
+public class NettyReadable implements ReadableBuffer {
+
+   private static final Charset Charset_UTF8 = Charset.forName("UTF-8");
+
+   private final ByteBuf buffer;
+
+   public NettyReadable(ByteBuf buffer) {
+      this.buffer = buffer;
+   }
+
+   @Override
+   public void put(ReadableBuffer other) {
+      buffer.writeBytes(other.byteBuffer());
+   }
+
+   @Override
+   public byte get() {
+      return buffer.readByte();
+   }
+
+   @Override
+   public int getInt() {
+      return buffer.readInt();
+   }
+
+   @Override
+   public long getLong() {
+      return buffer.readLong();
+   }
+
+   @Override
+   public short getShort() {
+      return buffer.readShort();
+   }
+
+   @Override
+   public float getFloat() {
+      return buffer.readFloat();
+   }
+
+   @Override
+   public double getDouble() {
+      return buffer.readDouble();
+   }
+
+   @Override
+   public ReadableBuffer get(byte[] data, int offset, int length) {
+      buffer.readBytes(data, offset, length);
+      return this;
+   }
+
+   @Override
+   public ReadableBuffer get(byte[] data) {
+      buffer.readBytes(data);
+      return this;
+   }
+
+   @Override
+   public ReadableBuffer position(int position) {
+      buffer.readerIndex(position);
+      return this;
+   }
+
+   @Override
+   public ReadableBuffer slice() {
+      return new NettyReadable(buffer.slice());
+   }
+
+   @Override
+   public ReadableBuffer flip() {
+      return new NettyReadable(buffer.duplicate().setIndex(0, buffer.readerIndex()));
+   }
+
+   @Override
+   public ReadableBuffer limit(int limit) {
+      buffer.writerIndex(limit);
+      return this;
+   }
+
+   @Override
+   public int limit() {
+      return buffer.writerIndex();
+   }
+
+   @Override
+   public int remaining() {
+      return buffer.readableBytes();
+   }
+
+   @Override
+   public int position() {
+      return buffer.readerIndex();
+   }
+
+   @Override
+   public boolean hasRemaining() {
+      return buffer.readableBytes() > 0;
+   }
+
+   @Override
+   public ReadableBuffer duplicate() {
+      return new NettyReadable(buffer.duplicate());
+   }
+
+   @Override
+   public ByteBuffer byteBuffer() {
+      return buffer.nioBuffer(0, buffer.writerIndex());
+   }
+
+   @Override
+   public String readUTF8() {
+      return buffer.toString(Charset_UTF8);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/TLSEncode.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/TLSEncode.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/TLSEncode.java
new file mode 100644
index 0000000..b2f0fdc
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/TLSEncode.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.util;
+
+import org.apache.qpid.proton.codec.AMQPDefinedTypes;
+import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.codec.EncoderImpl;
+
+/** This can go away if Proton provides this feature. */
+public class TLSEncode {
+
+   // For now Proton requires that we create a decoder to create an encoder
+   private static class EncoderDecoderPair {
+      DecoderImpl decoder = new DecoderImpl();
+      EncoderImpl encoder = new EncoderImpl(decoder);
+      {
+         AMQPDefinedTypes.registerAllTypes(decoder, encoder);
+      }
+   }
+
+   private static final ThreadLocal<EncoderDecoderPair> tlsCodec = new ThreadLocal<EncoderDecoderPair>() {
+      @Override
+      protected EncoderDecoderPair initialValue() {
+         return new EncoderDecoderPair();
+      }
+   };
+
+   public static EncoderImpl getEncoder() {
+      return tlsCodec.get().encoder;
+   }
+
+   public static DecoderImpl getDecoder() {
+      return tlsCodec.get().decoder;
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
index 08c46be..8ced348 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
@@ -16,44 +16,28 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter;
 
-import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE;
-import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE;
-import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE;
-import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.wrapMessage;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.message.EncodedMessage;
-import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
-import org.apache.activemq.artemis.utils.SimpleIDGenerator;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
 import org.apache.qpid.proton.amqp.messaging.Data;
 import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.message.ProtonJMessage;
 import org.apache.qpid.proton.message.impl.MessageImpl;
 import org.junit.Assert;
 import org.junit.Test;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 
 public class TestConversions extends Assert {
 
@@ -72,18 +56,12 @@ public class TestConversions extends Assert {
 
       message.setBody(new AmqpValue(new Boolean(true)));
 
-      EncodedMessage encodedMessage = encodeMessage(message);
+      AMQPMessage encodedMessage = new AMQPMessage(message);
 
-      ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
-      ServerMessage serverMessage = converter.inbound(encodedMessage);
+      ICoreMessage serverMessage = encodedMessage.toCore();
 
-      verifyProperties(new ServerJMSMessage(serverMessage, 0));
+      verifyProperties(ServerJMSMessage.wrapCoreMessage(serverMessage));
 
-      EncodedMessage encoded = (EncodedMessage) converter.outbound(serverMessage, 0);
-      Message amqpMessage = encoded.decode();
-
-      AmqpValue value = (AmqpValue) amqpMessage.getBody();
-      assertEquals(value.getValue(), true);
    }
 
    @Test
@@ -101,12 +79,11 @@ public class TestConversions extends Assert {
 
       message.setBody(new Data(new Binary(bodyBytes)));
 
-      EncodedMessage encodedMessage = encodeMessage(message);
+      AMQPMessage encodedMessage = new AMQPMessage(message);
 
-      ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
-      ServerMessage serverMessage = converter.inbound(encodedMessage);
+      ICoreMessage serverMessage = encodedMessage.toCore();
 
-      ServerJMSBytesMessage bytesMessage = (ServerJMSBytesMessage) wrapMessage(BYTES_TYPE, serverMessage, 0);
+      ServerJMSBytesMessage bytesMessage = (ServerJMSBytesMessage) ServerJMSMessage.wrapCoreMessage(serverMessage);
 
       verifyProperties(bytesMessage);
 
@@ -118,9 +95,6 @@ public class TestConversions extends Assert {
 
       Assert.assertArrayEquals(bodyBytes, newBodyBytes);
 
-      Object obj = converter.outbound(serverMessage, 0);
-
-      System.out.println("output = " + obj);
    }
 
    private void verifyProperties(javax.jms.Message message) throws Exception {
@@ -151,12 +125,12 @@ public class TestConversions extends Assert {
 
       message.setBody(new AmqpValue(mapValues));
 
-      EncodedMessage encodedMessage = encodeMessage(message);
+      AMQPMessage encodedMessage = new AMQPMessage(message);
 
-      ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
-      ServerMessage serverMessage = converter.inbound(encodedMessage);
+      ICoreMessage serverMessage = encodedMessage.toCore();
+      serverMessage.getReadOnlyBodyBuffer();
 
-      ServerJMSMapMessage mapMessage = (ServerJMSMapMessage) wrapMessage(MAP_TYPE, serverMessage, 0);
+      ServerJMSMapMessage mapMessage = (ServerJMSMapMessage) ServerJMSMessage.wrapCoreMessage(serverMessage);
       mapMessage.decode();
 
       verifyProperties(mapMessage);
@@ -164,15 +138,8 @@ public class TestConversions extends Assert {
       Assert.assertEquals(1, mapMessage.getInt("someint"));
       Assert.assertEquals("value", mapMessage.getString("somestr"));
 
-      EncodedMessage encoded = (EncodedMessage) converter.outbound(serverMessage, 0);
-      Message amqpMessage = encoded.decode();
-
-      AmqpValue value = (AmqpValue) amqpMessage.getBody();
-      Map<?, ?> mapoutput = (Map<?, ?>) value.getValue();
-
-      assertEquals(Integer.valueOf(1), mapoutput.get("someint"));
-
-      System.out.println("output = " + amqpMessage);
+      AMQPMessage newAMQP = CoreAmqpConverter.fromCore(mapMessage.getInnerMessage());
+      System.out.println(newAMQP.getProtonMessage().getBody());
    }
 
    @Test
@@ -188,14 +155,11 @@ public class TestConversions extends Assert {
 
       message.setBody(new AmqpSequence(objects));
 
-      EncodedMessage encodedMessage = encodeMessage(message);
+      AMQPMessage encodedMessage = new AMQPMessage(message);
 
-      ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
-      ServerMessage serverMessage = converter.inbound(encodedMessage);
+      ICoreMessage serverMessage = encodedMessage.toCore();
 
-      simulatePersistence(serverMessage);
-
-      ServerJMSStreamMessage streamMessage = (ServerJMSStreamMessage) wrapMessage(STREAM_TYPE, serverMessage, 0);
+      ServerJMSStreamMessage streamMessage = (ServerJMSStreamMessage) ServerJMSMessage.wrapCoreMessage(serverMessage);
 
       verifyProperties(streamMessage);
 
@@ -203,13 +167,6 @@ public class TestConversions extends Assert {
 
       assertEquals(10, streamMessage.readInt());
       assertEquals("10", streamMessage.readString());
-
-      EncodedMessage encoded = (EncodedMessage) converter.outbound(serverMessage, 0);
-      Message amqpMessage = encoded.decode();
-
-      List<?> list = ((AmqpSequence) amqpMessage.getBody()).getValue();
-      Assert.assertEquals(Integer.valueOf(10), list.get(0));
-      Assert.assertEquals("10", list.get(1));
    }
 
    @Test
@@ -222,553 +179,17 @@ public class TestConversions extends Assert {
       String text = "someText";
       message.setBody(new AmqpValue(text));
 
-      EncodedMessage encodedMessage = encodeMessage(message);
-
-      ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
-      ServerMessage serverMessage = converter.inbound(encodedMessage);
+      AMQPMessage encodedMessage = new AMQPMessage(message);
 
-      simulatePersistence(serverMessage);
+      ICoreMessage serverMessage = encodedMessage.toCore();
 
-      ServerJMSTextMessage textMessage = (ServerJMSTextMessage) wrapMessage(TEXT_TYPE, serverMessage, 0);
+      ServerJMSTextMessage textMessage = (ServerJMSTextMessage) ServerJMSMessage.wrapCoreMessage(serverMessage);
       textMessage.decode();
 
       verifyProperties(textMessage);
 
       Assert.assertEquals(text, textMessage.getText());
 
-      EncodedMessage encoded = (EncodedMessage) converter.outbound(serverMessage, 0);
-      Message amqpMessage = encoded.decode();
-
-      AmqpValue value = (AmqpValue) amqpMessage.getBody();
-      String textValue = (String) value.getValue();
-
-      Assert.assertEquals(text, textValue);
-
-      System.out.println("output = " + amqpMessage);
-   }
-
-   private void simulatePersistence(ServerMessage serverMessage) {
-      serverMessage.setAddress(new SimpleString("SomeAddress"));
-      // This is just to simulate what would happen during the persistence of the message
-      // We need to still be able to recover the message when we read it back
-      ((EncodingSupport) serverMessage).encode(new EmptyBuffer());
-   }
-
-   private ProtonJMessage reEncodeMsg(Object obj) {
-      ProtonJMessage objOut = (ProtonJMessage) obj;
-
-      ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
-
-      objOut.encode(new NettyWritable(nettyBuffer));
-      return objOut;
    }
 
-   private EncodedMessage encodeMessage(MessageImpl message) {
-      ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024 * 1024);
-      message.encode(new NettyWritable(buf));
-      byte[] bytesConvert = new byte[buf.writerIndex()];
-      buf.readBytes(bytesConvert);
-      return new EncodedMessage(0, bytesConvert, 0, bytesConvert.length);
-   }
-
-   class EmptyBuffer implements ActiveMQBuffer {
-
-      @Override
-      public ByteBuf byteBuf() {
-         return null;
-      }
-
-      @Override
-      public int capacity() {
-         return 0;
-      }
-
-      @Override
-      public int readerIndex() {
-         return 0;
-      }
-
-      @Override
-      public void readerIndex(int readerIndex) {
-
-      }
-
-      @Override
-      public int writerIndex() {
-         return 0;
-      }
-
-      @Override
-      public void writerIndex(int writerIndex) {
-
-      }
-
-      @Override
-      public void setIndex(int readerIndex, int writerIndex) {
-
-      }
-
-      @Override
-      public int readableBytes() {
-         return 0;
-      }
-
-      @Override
-      public int writableBytes() {
-         return 0;
-      }
-
-      @Override
-      public boolean readable() {
-         return false;
-      }
-
-      @Override
-      public boolean writable() {
-         return false;
-      }
-
-      @Override
-      public void clear() {
-
-      }
-
-      @Override
-      public void markReaderIndex() {
-
-      }
-
-      @Override
-      public void resetReaderIndex() {
-
-      }
-
-      @Override
-      public void markWriterIndex() {
-
-      }
-
-      @Override
-      public void resetWriterIndex() {
-
-      }
-
-      @Override
-      public void discardReadBytes() {
-
-      }
-
-      @Override
-      public byte getByte(int index) {
-         return 0;
-      }
-
-      @Override
-      public short getUnsignedByte(int index) {
-         return 0;
-      }
-
-      @Override
-      public short getShort(int index) {
-         return 0;
-      }
-
-      @Override
-      public int getUnsignedShort(int index) {
-         return 0;
-      }
-
-      @Override
-      public int getInt(int index) {
-         return 0;
-      }
-
-      @Override
-      public long getUnsignedInt(int index) {
-         return 0;
-      }
-
-      @Override
-      public long getLong(int index) {
-         return 0;
-      }
-
-      @Override
-      public void getBytes(int index, ActiveMQBuffer dst) {
-
-      }
-
-      @Override
-      public void getBytes(int index, ActiveMQBuffer dst, int length) {
-
-      }
-
-      @Override
-      public void getBytes(int index, ActiveMQBuffer dst, int dstIndex, int length) {
-
-      }
-
-      @Override
-      public void getBytes(int index, byte[] dst) {
-
-      }
-
-      @Override
-      public void getBytes(int index, byte[] dst, int dstIndex, int length) {
-
-      }
-
-      @Override
-      public void getBytes(int index, ByteBuffer dst) {
-
-      }
-
-      @Override
-      public char getChar(int index) {
-         return 0;
-      }
-
-      @Override
-      public float getFloat(int index) {
-         return 0;
-      }
-
-      @Override
-      public double getDouble(int index) {
-         return 0;
-      }
-
-      @Override
-      public void setByte(int index, byte value) {
-
-      }
-
-      @Override
-      public void setShort(int index, short value) {
-
-      }
-
-      @Override
-      public void setInt(int index, int value) {
-
-      }
-
-      @Override
-      public void setLong(int index, long value) {
-
-      }
-
-      @Override
-      public void setBytes(int index, ActiveMQBuffer src) {
-
-      }
-
-      @Override
-      public void setBytes(int index, ActiveMQBuffer src, int length) {
-
-      }
-
-      @Override
-      public void setBytes(int index, ActiveMQBuffer src, int srcIndex, int length) {
-
-      }
-
-      @Override
-      public void setBytes(int index, byte[] src) {
-
-      }
-
-      @Override
-      public void setBytes(int index, byte[] src, int srcIndex, int length) {
-
-      }
-
-      @Override
-      public void setBytes(int index, ByteBuffer src) {
-
-      }
-
-      @Override
-      public void setChar(int index, char value) {
-
-      }
-
-      @Override
-      public void setFloat(int index, float value) {
-
-      }
-
-      @Override
-      public void setDouble(int index, double value) {
-
-      }
-
-      @Override
-      public byte readByte() {
-         return 0;
-      }
-
-      @Override
-      public int readUnsignedByte() {
-         return 0;
-      }
-
-      @Override
-      public short readShort() {
-         return 0;
-      }
-
-      @Override
-      public int readUnsignedShort() {
-         return 0;
-      }
-
-      @Override
-      public int readInt() {
-         return 0;
-      }
-
-      @Override
-      public long readUnsignedInt() {
-         return 0;
-      }
-
-      @Override
-      public long readLong() {
-         return 0;
-      }
-
-      @Override
-      public char readChar() {
-         return 0;
-      }
-
-      @Override
-      public float readFloat() {
-         return 0;
-      }
-
-      @Override
-      public double readDouble() {
-         return 0;
-      }
-
-      @Override
-      public boolean readBoolean() {
-         return false;
-      }
-
-      @Override
-      public SimpleString readNullableSimpleString() {
-         return null;
-      }
-
-      @Override
-      public String readNullableString() {
-         return null;
-      }
-
-      @Override
-      public SimpleString readSimpleString() {
-         return null;
-      }
-
-      @Override
-      public String readString() {
-         return null;
-      }
-
-      @Override
-      public String readUTF() {
-         return null;
-      }
-
-      @Override
-      public ActiveMQBuffer readBytes(int length) {
-         return null;
-      }
-
-      @Override
-      public ActiveMQBuffer readSlice(int length) {
-         return null;
-      }
-
-      @Override
-      public void readBytes(ActiveMQBuffer dst) {
-
-      }
-
-      @Override
-      public void readBytes(ActiveMQBuffer dst, int length) {
-
-      }
-
-      @Override
-      public void readBytes(ActiveMQBuffer dst, int dstIndex, int length) {
-
-      }
-
-      @Override
-      public void readBytes(byte[] dst) {
-
-      }
-
-      @Override
-      public void readBytes(byte[] dst, int dstIndex, int length) {
-
-      }
-
-      @Override
-      public void readBytes(ByteBuffer dst) {
-
-      }
-
-      @Override
-      public int skipBytes(int length) {
-         return length;
-      }
-
-      @Override
-      public void writeByte(byte value) {
-
-      }
-
-      @Override
-      public void writeShort(short value) {
-
-      }
-
-      @Override
-      public void writeInt(int value) {
-
-      }
-
-      @Override
-      public void writeLong(long value) {
-
-      }
-
-      @Override
-      public void writeChar(char chr) {
-
-      }
-
-      @Override
-      public void writeFloat(float value) {
-
-      }
-
-      @Override
-      public void writeDouble(double value) {
-
-      }
-
-      @Override
-      public void writeBoolean(boolean val) {
-
-      }
-
-      @Override
-      public void writeNullableSimpleString(SimpleString val) {
-
-      }
-
-      @Override
-      public void writeNullableString(String val) {
-
-      }
-
-      @Override
-      public void writeSimpleString(SimpleString val) {
-
-      }
-
-      @Override
-      public void writeString(String val) {
-
-      }
-
-      @Override
-      public void writeUTF(String utf) {
-
-      }
-
-      @Override
-      public void writeBytes(ActiveMQBuffer src, int length) {
-
-      }
-
-      @Override
-      public void writeBytes(ActiveMQBuffer src, int srcIndex, int length) {
-
-      }
-
-      @Override
-      public void writeBytes(byte[] src) {
-
-      }
-
-      @Override
-      public void writeBytes(byte[] src, int srcIndex, int length) {
-
-      }
-
-      @Override
-      public void writeBytes(ByteBuffer src) {
-
-      }
-
-      @Override
-      public void readFully(byte[] b) throws IOException {
-      }
-
-      @Override
-      public void readFully(byte[] b, int off, int len) throws IOException {
-      }
-
-      @Override
-      public String readLine() throws IOException {
-         return null;
-      }
-
-      @Override
-      public ActiveMQBuffer copy() {
-         return null;
-      }
-
-      @Override
-      public ActiveMQBuffer copy(int index, int length) {
-         return null;
-      }
-
-      @Override
-      public ActiveMQBuffer slice() {
-         return null;
-      }
-
-      @Override
-      public ActiveMQBuffer slice(int index, int length) {
-         return null;
-      }
-
-      @Override
-      public ActiveMQBuffer duplicate() {
-         return null;
-      }
-
-      @Override
-      public ByteBuffer toByteBuffer() {
-         return null;
-      }
-
-      @Override
-      public ByteBuffer toByteBuffer(int index, int length) {
-         return null;
-      }
-
-      @Override
-      public void release() {
-         //no-op
-      }
-   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java
index 4caead7..54a98e6 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java
@@ -16,15 +16,17 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter.message;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPContentTypeSupport;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidContentTypeException;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
 public class AMQPContentTypeSupportTest {
 
    @Test(expected = ActiveMQAMQPInvalidContentTypeException.class)
@@ -216,7 +218,7 @@ public class AMQPContentTypeSupportTest {
    @Test
    public void testParseContentTypeWithApplicationJavaSerialized() throws Exception {
       // Expect null as this is not a textual type
-      doParseContentTypeTestImpl(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, null);
+      doParseContentTypeTestImpl(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), null);
    }
 
    private void doParseContentTypeTestImpl(String contentType, Charset expected) throws ActiveMQAMQPInvalidContentTypeException {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java
index c53cda5..60c1989 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java
@@ -20,19 +20,20 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter.message;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
 import java.util.UUID;
 
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.UnsignedLong;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
 public class AMQPMessageIdHelperTest {
 
    private AMQPMessageIdHelper messageIdHelper;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupportTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupportTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupportTest.java
index d4e078f..6aeb4dc 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupportTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupportTest.java
@@ -16,20 +16,21 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter.message;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
 import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.message.Message;
 import org.junit.Test;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 public class AMQPMessageSupportTest {
 
    // ---------- getSymbol ---------------------------------------------------//

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java
index d7a948a..b7092c3 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java
@@ -16,36 +16,29 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter.message;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
+import javax.jms.Destination;
+import javax.jms.Queue;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import javax.jms.Destination;
-import javax.jms.MapMessage;
-import javax.jms.Queue;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
-import org.apache.activemq.artemis.utils.IDGenerator;
-import org.apache.activemq.artemis.utils.SimpleIDGenerator;
 import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
@@ -57,13 +50,15 @@ import org.apache.qpid.proton.message.Message;
 import org.junit.Before;
 import org.junit.Test;
 
-public class JMSMappingInboundTransformerTest {
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
-   private IDGenerator idGenerator;
+public class JMSMappingInboundTransformerTest {
 
    @Before
    public void setUp() {
-      this.idGenerator = new SimpleIDGenerator(0);
    }
 
    // ----- Null Body Section ------------------------------------------------//
@@ -77,13 +72,14 @@ public class JMSMappingInboundTransformerTest {
     */
    @Test
    public void testCreateBytesMessageFromNoBodySectionAndContentType() throws Exception {
-      JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-
       Message message = Message.Factory.create();
       message.setContentType(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE);
 
-      EncodedMessage em = encodeMessage(message);
-      javax.jms.Message jmsMessage = transformer.transform(em);
+      AMQPMessage messageEncode = new AMQPMessage(message);
+
+      ICoreMessage coreMessage = messageEncode.toCore();
+
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(coreMessage);
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@@ -98,74 +94,25 @@ public class JMSMappingInboundTransformerTest {
     */
    @Test
    public void testCreateBytesMessageFromNoBodySectionAndNoContentType() throws Exception {
-      JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-
       Message message = Message.Factory.create();
 
-      EncodedMessage em = encodeMessage(message);
-      javax.jms.Message jmsMessage = transformer.transform(em);
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
    }
 
-   /**
-    * Test that a message with no body section, but with the content type set to
-    * {@value AMQPMessageSupport#SERIALIZED_JAVA_OBJECT_CONTENT_TYPE} results in an
-    * ObjectMessage when not otherwise annotated to indicate the type of JMS message it is.
-    *
-    * @throws Exception
-    *         if an error occurs during the test.
-    */
-   @Test
-   public void testCreateObjectMessageFromNoBodySectionAndContentType() throws Exception {
-      JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-
-      Message message = Message.Factory.create();
-      message.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
-
-      EncodedMessage em = encodeMessage(message);
-      javax.jms.Message jmsMessage = transformer.transform(em);
-
-      assertNotNull("Message should not be null", jmsMessage);
-      assertEquals("Unexpected message class type", ServerJMSObjectMessage.class, jmsMessage.getClass());
-   }
-
    @Test
    public void testCreateTextMessageFromNoBodySectionAndContentType() throws Exception {
-      JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-
       Message message = Message.Factory.create();
       message.setContentType("text/plain");
 
-      EncodedMessage em = encodeMessage(message);
-      javax.jms.Message jmsMessage = transformer.transform(em);
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass());
    }
 
-   /**
-    * Test that a message with no body section, and with the content type set to an unknown
-    * value results in a plain Message when not otherwise annotated to indicate the type of JMS
-    * message it is.
-    *
-    * @throws Exception
-    *         if an error occurs during the test.
-    */
-   public void testCreateGenericMessageFromNoBodySectionAndUnknownContentType() throws Exception {
-      JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-
-      Message message = Message.Factory.create();
-      message.setContentType("unknown-content-type");
-
-      EncodedMessage em = encodeMessage(message);
-      javax.jms.Message jmsMessage = transformer.transform(em);
-
-      assertNotNull("Message should not be null", jmsMessage);
-      assertEquals("Unexpected message class type", ActiveMQMessage.class, jmsMessage.getClass());
-   }
-
    // ----- Data Body Section ------------------------------------------------//
 
    /**
@@ -183,10 +130,7 @@ public class JMSMappingInboundTransformerTest {
       message.setBody(new Data(binary));
       message.setContentType(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE);
 
-      EncodedMessage em = encodeMessage(message);
-
-      JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-      javax.jms.Message jmsMessage = transformer.transform(em);
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@@ -206,10 +150,7 @@ public class JMSMappingInboundTransformerTest {
       message.setBody(new Data(binary));
       message.setContentType("unknown-content-type");
 
-      EncodedMessage em = encodeMessage(message);
-
-      JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-      javax.jms.Message jmsMessage = transformer.transform(em);
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@@ -230,10 +171,7 @@ public class JMSMappingInboundTransformerTest {
 
       assertNull(message.getContentType());
 
-      EncodedMessage em = encodeMessage(message);
-
-      JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-      javax.jms.Message jmsMessage = transformer.transform(em);
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@@ -252,12 +190,9 @@ public class JMSMappingInboundTransformerTest {
       Message message = Proton.message();
       Binary binary = new Binary(new byte[0]);
       message.setBody(new Data(binary));
-      message.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
-
-      EncodedMessage em = encodeMessage(message);
+      message.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString());
 
-      JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-      javax.jms.Message jmsMessage = transformer.transform(em);
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSObjectMessage.class, jmsMessage.getClass());
@@ -357,10 +292,7 @@ public class JMSMappingInboundTransformerTest {
       message.setBody(new Data(binary));
       message.setContentType(contentType);
 
-      EncodedMessage em = encodeMessage(message);
-
-      JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-      javax.jms.Message jmsMessage = transformer.transform(em);
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       if (StandardCharsets.UTF_8.equals(expectedCharset)) {
@@ -384,10 +316,7 @@ public class JMSMappingInboundTransformerTest {
       Message message = Proton.message();
       message.setBody(new AmqpValue("content"));
 
-      EncodedMessage em = encodeMessage(message);
-
-      JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-      javax.jms.Message jmsMessage = transformer.transform(em);
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass());
@@ -405,10 +334,7 @@ public class JMSMappingInboundTransformerTest {
       Message message = Proton.message();
       message.setBody(new AmqpValue(null));
 
-      EncodedMessage em = encodeMessage(message);
-
-      JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-      javax.jms.Message jmsMessage = transformer.transform(em);
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass());
@@ -424,14 +350,11 @@ public class JMSMappingInboundTransformerTest {
     */
    @Test
    public void testCreateObjectMessageFromAmqpValueWithBinaryAndContentType() throws Exception {
-      JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-
       Message message = Message.Factory.create();
       message.setBody(new AmqpValue(new Binary(new byte[0])));
-      message.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
+      message.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString());
 
-      EncodedMessage em = encodeMessage(message);
-      javax.jms.Message jmsMessage = transformer.transform(em);
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSObjectMessage.class, jmsMessage.getClass());
@@ -450,50 +373,13 @@ public class JMSMappingInboundTransformerTest {
       Map<String, String> map = new HashMap<>();
       message.setBody(new AmqpValue(map));
 
-      EncodedMessage em = encodeMessage(message);
-
-      JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-      javax.jms.Message jmsMessage = transformer.transform(em);
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSMapMessage.class, jmsMessage.getClass());
    }
 
    /**
-    * Test that an amqp-value body containing a map that has an AMQP Binary as one of the
-    * entries encoded into the Map results in an MapMessage where a byte array can be read from
-    * the entry.
-    *
-    * @throws Exception
-    *         if an error occurs during the test.
-    */
-   @Test
-   public void testCreateAmqpMapMessageFromAmqpValueWithMapContainingBinaryEntry() throws Exception {
-      final String ENTRY_NAME = "bytesEntry";
-
-      Message message = Proton.message();
-      Map<String, Object> map = new HashMap<>();
-
-      byte[] inputBytes = new byte[] {1, 2, 3, 4, 5};
-      map.put(ENTRY_NAME, new Binary(inputBytes));
-
-      message.setBody(new AmqpValue(map));
-
-      EncodedMessage em = encodeMessage(message);
-
-      JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-      javax.jms.Message jmsMessage = transformer.transform(em);
-
-      assertNotNull("Message should not be null", jmsMessage);
-      assertEquals("Unexpected message class type", ServerJMSMapMessage.class, jmsMessage.getClass());
-
-      MapMessage mapMessage = (MapMessage) jmsMessage;
-      byte[] outputBytes = mapMessage.getBytes(ENTRY_NAME);
-      assertNotNull(outputBytes);
-      assertTrue(Arrays.equals(inputBytes, outputBytes));
-   }
-
-   /**
     * Test that an amqp-value body containing a list results in an StreamMessage when not
     * otherwise annotated to indicate the type of JMS message it is.
     *
@@ -506,10 +392,7 @@ public class JMSMappingInboundTransformerTest {
       List<String> list = new ArrayList<>();
       message.setBody(new AmqpValue(list));
 
-      EncodedMessage em = encodeMessage(message);
-
-      JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-      javax.jms.Message jmsMessage = transformer.transform(em);
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSStreamMessage.class, jmsMessage.getClass());
@@ -528,10 +411,7 @@ public class JMSMappingInboundTransformerTest {
       List<String> list = new ArrayList<>();
       message.setBody(new AmqpSequence(list));
 
-      EncodedMessage em = encodeMessage(message);
-
-      JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-      javax.jms.Message jmsMessage = transformer.transform(em);
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSStreamMessage.class, jmsMessage.getClass());
@@ -550,10 +430,7 @@ public class JMSMappingInboundTransformerTest {
       Binary binary = new Binary(new byte[0]);
       message.setBody(new AmqpValue(binary));
 
-      EncodedMessage em = encodeMessage(message);
-
-      JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-      javax.jms.Message jmsMessage = transformer.transform(em);
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@@ -572,11 +449,7 @@ public class JMSMappingInboundTransformerTest {
       Message message = Proton.message();
       message.setBody(new AmqpValue(UUID.randomUUID()));
 
-      EncodedMessage em = encodeMessage(message);
-
-      JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-
-      javax.jms.Message jmsMessage = transformer.transform(em);
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@@ -588,10 +461,8 @@ public class JMSMappingInboundTransformerTest {
       Message message = Message.Factory.create();
       message.setBody(new AmqpValue(contentString));
 
-      EncodedMessage em = encodeMessage(message);
-
-      JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-      javax.jms.Message jmsMessage = transformer.transform(em);
+      ServerJMSTextMessage jmsMessage = (ServerJMSTextMessage)ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
+      jmsMessage.decode();
 
       assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
       assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass());
@@ -631,7 +502,6 @@ public class JMSMappingInboundTransformerTest {
 
    private void doTransformWithToTypeDestinationTypeAnnotationTestImpl(Object toTypeAnnotationValue, Class<? extends Destination> expectedClass)
       throws Exception {
-      JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
 
       String toAddress = "toAddress";
       Message amqp = Message.Factory.create();
@@ -644,9 +514,7 @@ public class JMSMappingInboundTransformerTest {
          amqp.setMessageAnnotations(ma);
       }
 
-      EncodedMessage em = encodeMessage(amqp);
-
-      javax.jms.Message jmsMessage = transformer.transform(em);
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(amqp).toCore());
       assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
    }
 
@@ -679,7 +547,6 @@ public class JMSMappingInboundTransformerTest {
 
    private void doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(Object replyToTypeAnnotationValue, Class<? extends Destination> expectedClass)
       throws Exception {
-      JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
 
       String replyToAddress = "replyToAddress";
       Message amqp = Message.Factory.create();
@@ -692,27 +559,8 @@ public class JMSMappingInboundTransformerTest {
          amqp.setMessageAnnotations(ma);
       }
 
-      EncodedMessage em = encodeMessage(amqp);
-
-      javax.jms.Message jmsMessage = transformer.transform(em);
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(amqp).toCore());
       assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
    }
 
-   // ----- Utility Methods --------------------------------------------------//
-
-   private EncodedMessage encodeMessage(Message message) {
-      byte[] encodeBuffer = new byte[1024 * 8];
-      int encodedSize;
-      while (true) {
-         try {
-            encodedSize = message.encode(encodeBuffer, 0, encodeBuffer.length);
-            break;
-         } catch (java.nio.BufferOverflowException e) {
-            encodeBuffer = new byte[encodeBuffer.length * 2];
-         }
-      }
-
-      long messageFormat = 0;
-      return new EncodedMessage(messageFormat, encodeBuffer, 0, encodedSize);
-   }
 }