You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/01 17:21:07 UTC

[13/23] activemq-artemis git commit: ARTEMIS-1009 Pure Message Encoding.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java
index 6aa44a4..125a20f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import javax.jms.BytesMessage;
 
 import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport;
@@ -33,6 +32,7 @@ import org.apache.activemq.artemis.protocol.amqp.converter.message.InboundTransf
 import org.apache.activemq.artemis.protocol.amqp.converter.message.JMSMappingInboundTransformer;
 import org.apache.activemq.artemis.protocol.amqp.converter.message.JMSMappingOutboundTransformer;
 import org.apache.activemq.artemis.protocol.amqp.converter.message.OutboundTransformer;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
 import org.apache.activemq.artemis.utils.IDGenerator;
@@ -52,8 +52,8 @@ public class ProtonMessageConverter implements MessageConverter {
    private final OutboundTransformer outboundTransformer;
 
    @Override
-   public ServerMessage inbound(Object messageSource) throws Exception {
-      EncodedMessage encodedMessageSource = (EncodedMessage) messageSource;
+   public org.apache.activemq.artemis.api.core.Message inbound(Object messageSource) throws Exception {
+      AMQPMessage encodedMessageSource = (AMQPMessage) messageSource;
       ServerJMSMessage transformedMessage = null;
 
       try {
@@ -67,11 +67,11 @@ public class ProtonMessageConverter implements MessageConverter {
 
       transformedMessage.encode();
 
-      return (ServerMessage) transformedMessage.getInnerMessage();
+      return transformedMessage.getInnerMessage();
    }
 
    @Override
-   public Object outbound(ServerMessage messageOutbound, int deliveryCount) throws Exception {
+   public Object outbound(org.apache.activemq.artemis.api.core.Message messageOutbound, int deliveryCount) throws Exception {
       // Useful for testing but not recommended for real life use.
       ByteBuf nettyBuffer = Unpooled.buffer(1024);
       NettyWritable buffer = new NettyWritable(nettyBuffer);
@@ -83,7 +83,7 @@ public class ProtonMessageConverter implements MessageConverter {
       return encoded;
    }
 
-   public Object outbound(ServerMessage messageOutbound, int deliveryCount, WritableBuffer buffer) throws Exception {
+   public Object outbound(org.apache.activemq.artemis.api.core.Message messageOutbound, int deliveryCount, WritableBuffer buffer) throws Exception {
       ServerJMSMessage jmsMessage = AMQPMessageSupport.wrapMessage(messageOutbound.getType(), messageOutbound, deliveryCount);
 
       jmsMessage.decode();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java
index abdf808..c3a60f0 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java
@@ -19,8 +19,8 @@ package org.apache.activemq.artemis.protocol.amqp.converter.jms;
 import javax.jms.BytesMessage;
 import javax.jms.JMSException;
 
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 
 import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesMessageReset;
 import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadBoolean;
@@ -49,13 +49,13 @@ import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteUTF;
 
 public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMessage {
 
-   public ServerJMSBytesMessage(MessageInternal message, int deliveryCount) {
+   public ServerJMSBytesMessage(Message message, int deliveryCount) {
       super(message, deliveryCount);
    }
 
    @Override
    public long getBodyLength() throws JMSException {
-      return message.getEndOfBodyPosition() - MessageImpl.BODY_OFFSET;
+      return message.getEndOfBodyPosition() - CoreMessage.BODY_OFFSET;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java
index 0268065..df79183 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java
@@ -27,7 +27,6 @@ import java.util.Set;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 import org.apache.activemq.artemis.utils.TypedProperties;
 
 import static org.apache.activemq.artemis.reader.MapMessageUtil.readBodyMap;
@@ -52,7 +51,7 @@ public final class ServerJMSMapMessage extends ServerJMSMessage implements MapMe
    /*
     * This constructor is used to construct messages prior to sending
     */
-   public ServerJMSMapMessage(MessageInternal message, int deliveryCount) {
+   public ServerJMSMapMessage(Message message, int deliveryCount) {
       super(message, deliveryCount);
 
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
index f9a94f5..adf4621 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
@@ -16,18 +16,17 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter.jms;
 
-import java.util.Collections;
-import java.util.Enumeration;
-
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
+import java.util.Collections;
+import java.util.Enumeration;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 import org.apache.activemq.artemis.reader.MessageUtil;
 
@@ -35,16 +34,16 @@ import static org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAG
 
 public class ServerJMSMessage implements Message {
 
-   protected final MessageInternal message;
+   protected final CoreMessage message;
 
    protected int deliveryCount;
 
-   public MessageInternal getInnerMessage() {
+   public org.apache.activemq.artemis.api.core.Message getInnerMessage() {
       return message;
    }
 
-   public ServerJMSMessage(MessageInternal message, int deliveryCount) {
-      this.message = message;
+   public ServerJMSMessage(org.apache.activemq.artemis.api.core.Message message, int deliveryCount) {
+      this.message = (CoreMessage)message;
       this.deliveryCount = deliveryCount;
    }
 
@@ -60,7 +59,7 @@ public class ServerJMSMessage implements Message {
    protected ActiveMQBuffer getReadBodyBuffer() {
       if (readBodyBuffer == null) {
          // to avoid clashes between multiple threads
-         readBodyBuffer = message.getBodyBufferDuplicate();
+         readBodyBuffer = message.getReadOnlyBodyBuffer();
       }
       return readBodyBuffer;
    }
@@ -140,7 +139,7 @@ public class ServerJMSMessage implements Message {
 
    @Override
    public final Destination getJMSDestination() throws JMSException {
-      SimpleString sdest = message.getAddress();
+      SimpleString sdest = message.getAddressSimpleString();
 
       if (sdest == null) {
          return null;
@@ -152,7 +151,7 @@ public class ServerJMSMessage implements Message {
    @Override
    public final void setJMSDestination(Destination destination) throws JMSException {
       if (destination == null) {
-         message.setAddress(null);
+         message.setAddress((SimpleString)null);
       } else {
          message.setAddress(((ActiveMQDestination) destination).getSimpleAddress());
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java
index d1eaac6..15b04a9 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java
@@ -22,7 +22,6 @@ import javax.jms.JMSException;
 import javax.jms.ObjectMessage;
 
 import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 import org.apache.qpid.proton.amqp.Binary;
 
 public class ServerJMSObjectMessage extends ServerJMSMessage implements ObjectMessage {
@@ -31,7 +30,7 @@ public class ServerJMSObjectMessage extends ServerJMSMessage implements ObjectMe
 
    private Binary payload;
 
-   public ServerJMSObjectMessage(MessageInternal message, int deliveryCount) {
+   public ServerJMSObjectMessage(Message message, int deliveryCount) {
       super(message, deliveryCount);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java
index a53fc0e..b092e61 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java
@@ -23,7 +23,6 @@ import javax.jms.StreamMessage;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 import org.apache.activemq.artemis.utils.DataConstants;
 
 import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadBoolean;
@@ -44,7 +43,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
 
    private int bodyLength = 0;
 
-   public ServerJMSStreamMessage(MessageInternal message, int deliveryCount) {
+   public ServerJMSStreamMessage(Message message, int deliveryCount) {
       super(message, deliveryCount);
    }
 
@@ -180,7 +179,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
    @Override
    public Object readObject() throws JMSException {
 
-      if (getReadBodyBuffer().readerIndex() >= message.getEndOfBodyPosition()) {
+      if (getReadBodyBuffer().readerIndex() >= getReadBodyBuffer().writerIndex()) {
          throw new MessageEOFException("");
       }
       try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java
index eb88de0..058a3e9 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java
@@ -21,7 +21,6 @@ import javax.jms.TextMessage;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 
 import static org.apache.activemq.artemis.reader.TextMessageUtil.readBodyText;
 import static org.apache.activemq.artemis.reader.TextMessageUtil.writeBodyText;
@@ -49,7 +48,7 @@ public class ServerJMSTextMessage extends ServerJMSMessage implements TextMessag
    /*
     * This constructor is used to construct messages prior to sending
     */
-   public ServerJMSTextMessage(MessageInternal message, int deliveryCount) {
+   public ServerJMSTextMessage(Message message, int deliveryCount) {
       super(message, deliveryCount);
 
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java
index 8c4612d..0a39573 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java
@@ -16,24 +16,15 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter.message;
 
-import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE;
-import static org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE;
-import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE;
-import static org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE;
-import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE;
-import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE;
-
+import javax.jms.Destination;
+import javax.jms.JMSException;
 import java.nio.charset.Charset;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.Set;
 
-import javax.jms.Destination;
-import javax.jms.JMSException;
-
 import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
@@ -48,6 +39,13 @@ import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Data;
 import org.apache.qpid.proton.message.Message;
 
+import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE;
+
 /**
  * Support class containing constant values and static methods that are used to map to / from
  * AMQP Message types being sent or received.
@@ -181,7 +179,7 @@ public final class AMQPMessageSupport {
       }
    }
 
-   public static ServerJMSMessage wrapMessage(int messageType, ServerMessage wrapped, int deliveryCount) {
+   public static ServerJMSMessage wrapMessage(int messageType, org.apache.activemq.artemis.api.core.Message wrapped, int deliveryCount) {
       switch (messageType) {
          case STREAM_TYPE:
             return new ServerJMSStreamMessage(wrapped, deliveryCount);
@@ -267,8 +265,8 @@ public final class AMQPMessageSupport {
       return message;
    }
 
-   private static ServerMessageImpl newMessage(IDGenerator idGenerator, byte messageType) {
-      ServerMessageImpl message = new ServerMessageImpl(idGenerator.generateID(), 512);
+   private static CoreMessage newMessage(IDGenerator idGenerator, byte messageType) {
+      CoreMessage message = new CoreMessage(idGenerator.generateID(), 512);
       message.setType(messageType);
       ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null);
       return message;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java
deleted file mode 100644
index 7028547..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.protocol.amqp.converter.message;
-
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
-import org.apache.activemq.artemis.utils.IDGenerator;
-
-public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
-
-   public AMQPNativeInboundTransformer(IDGenerator idGenerator) {
-      super(idGenerator);
-   }
-
-   @Override
-   public String getTransformerName() {
-      return TRANSFORMER_NATIVE;
-   }
-
-   @Override
-   public InboundTransformer getFallbackTransformer() {
-      return new AMQPRawInboundTransformer(idGenerator);
-   }
-
-   @Override
-   public ServerJMSMessage transform(EncodedMessage amqpMessage) throws Exception {
-      org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
-
-      return populateMessage(super.transform(amqpMessage), amqp);
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java
deleted file mode 100644
index 445eaca..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.protocol.amqp.converter.message;
-
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_FORMAT;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_NATIVE;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createBytesMessage;
-
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
-import org.apache.activemq.artemis.utils.IDGenerator;
-
-public class AMQPRawInboundTransformer extends InboundTransformer {
-
-   public AMQPRawInboundTransformer(IDGenerator idGenerator) {
-      super(idGenerator);
-   }
-
-   @Override
-   public String getTransformerName() {
-      return TRANSFORMER_RAW;
-   }
-
-   @Override
-   public InboundTransformer getFallbackTransformer() {
-      return null;  // No fallback from full raw transform
-   }
-
-   @Override
-   public ServerJMSMessage transform(EncodedMessage amqpMessage) throws Exception {
-      ServerJMSBytesMessage message = createBytesMessage(idGenerator);
-      message.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
-
-      // We cannot decode the message headers to check so err on the side of caution
-      // and mark all messages as persistent.
-      message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
-      message.setJMSPriority(Message.DEFAULT_PRIORITY);
-      message.setJMSTimestamp(System.currentTimeMillis());
-
-      message.setLongProperty(JMS_AMQP_MESSAGE_FORMAT, amqpMessage.getMessageFormat());
-      message.setBooleanProperty(JMS_AMQP_NATIVE, true);
-
-      return message;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java
index 1316ab7..cec34ef 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java
@@ -37,6 +37,7 @@ import javax.jms.Message;
 
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.utils.IDGenerator;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Decimal128;
@@ -65,12 +66,10 @@ public abstract class InboundTransformer {
       this.idGenerator = idGenerator;
    }
 
-   public abstract ServerJMSMessage transform(EncodedMessage amqpMessage) throws Exception;
+   public abstract ServerJMSMessage transform(AMQPMessage amqpMessage) throws Exception;
 
    public abstract String getTransformerName();
 
-   public abstract InboundTransformer getFallbackTransformer();
-
    @SuppressWarnings("unchecked")
    protected ServerJMSMessage populateMessage(ServerJMSMessage jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
       Header header = amqp.getHeader();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java
index 629c499..4c7426e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java
@@ -48,6 +48,7 @@ import java.util.Map;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.utils.IDGenerator;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
@@ -67,36 +68,21 @@ public class JMSMappingInboundTransformer extends InboundTransformer {
       return TRANSFORMER_JMS;
    }
 
-   @Override
-   public InboundTransformer getFallbackTransformer() {
-      return new AMQPNativeInboundTransformer(idGenerator);
+
+   public ServerJMSMessage transform(EncodedMessage message) throws Exception {
+      AMQPMessage messageEncode = new AMQPMessage(message.getMessageFormat(), message.getArray(), null);
+      return transform(messageEncode);
    }
 
    @Override
-   public ServerJMSMessage transform(EncodedMessage encodedMessage) throws Exception {
-      ServerJMSMessage transformedMessage = null;
-
-      try {
-         Message amqpMessage = encodedMessage.decode();
-         transformedMessage = createServerMessage(amqpMessage);
-         populateMessage(transformedMessage, amqpMessage);
-      } catch (Exception ex) {
-         InboundTransformer transformer = this.getFallbackTransformer();
-
-         while (transformer != null) {
-            try {
-               transformedMessage = transformer.transform(encodedMessage);
-               break;
-            } catch (Exception e) {
-               transformer = transformer.getFallbackTransformer();
-            }
-         }
-      }
+   public ServerJMSMessage transform(AMQPMessage amqpMessage) throws Exception {
+      ServerJMSMessage transformedMessage = createServerMessage(amqpMessage.getProtonMessage());
+      populateMessage(transformedMessage, amqpMessage.getProtonMessage());
 
       // Regardless of the transformer that finally decoded the message we need to ensure that
       // the AMQP Message Format value is preserved for application on retransmit.
-      if (transformedMessage != null && encodedMessage.getMessageFormat() != 0) {
-         transformedMessage.setLongProperty(JMS_AMQP_MESSAGE_FORMAT, encodedMessage.getMessageFormat());
+      if (transformedMessage != null && amqpMessage.getMessageFormat() != 0) {
+         transformedMessage.setLongProperty(JMS_AMQP_MESSAGE_FORMAT, amqpMessage.getMessageFormat());
       }
 
       return transformedMessage;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
index 7dbc6d4..2ef3122 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
@@ -64,7 +64,6 @@ import javax.jms.TemporaryTopic;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
@@ -510,7 +509,7 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
          // will be unknown so we check for special cases of messages with special data
          // encoded into the server message body.
          if (orignalEncoding == AMQP_UNKNOWN) {
-            MessageInternal internalMessage = message.getInnerMessage();
+            org.apache.activemq.artemis.api.core.Message internalMessage = message.getInnerMessage();
             int readerIndex = internalMessage.getBodyBuffer().readerIndex();
             try {
                Object s = internalMessage.getBodyBuffer().readNullableSimpleString();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index 6462315..bac3e7e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -134,6 +134,10 @@ public class AMQPConnectionContext extends ProtonInitializable {
       handler.flush();
    }
 
+   public void flush(boolean wait) {
+      handler.flush(wait);
+   }
+
    public void close(ErrorCondition errorCondition) {
       handler.close(errorCondition);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 8341de7..34c2c07 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.proton;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
@@ -25,7 +23,6 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPExceptio
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
 import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
-import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Rejected;
 import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
@@ -144,26 +141,25 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
          if (delivery.isPartial()) {
             return;
          }
+         byte[] data = new byte[delivery.getDataLength()];
+         Transaction tx = null;
 
-         ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(10 * 1024);
-         try {
-            synchronized (connection.getLock()) {
-               DeliveryUtil.readDelivery(receiver, buffer);
+         synchronized (connection.getLock()) {
+            receiver.recv(data, 0, data.length);
 
-               receiver.advance();
+            receiver.advance();
+         }
 
-               Transaction tx = null;
-               if (delivery.getRemoteState() instanceof TransactionalState) {
+         if (delivery.getRemoteState() instanceof TransactionalState) {
 
-                  TransactionalState txState = (TransactionalState) delivery.getRemoteState();
-                  tx = this.sessionSPI.getTransaction(txState.getTxnId());
-               }
-               sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), buffer);
+            TransactionalState txState = (TransactionalState) delivery.getRemoteState();
+            tx = this.sessionSPI.getTransaction(txState.getTxnId());
+         }
 
-               flow(maxCreditAllocation, minCreditRefresh);
-            }
-         } finally {
-            buffer.release();
+         sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), data);
+
+         synchronized (connection.getLock()) {
+            flow(maxCreditAllocation, minCreditRefresh);
          }
       } catch (Exception e) {
          log.warn(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 2e19f07..15611c3 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -23,6 +23,7 @@ import java.util.Set;
 
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress;
 import org.apache.activemq.artemis.core.server.AddressQueryResult;
@@ -37,6 +38,7 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternal
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
 import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.activemq.artemis.selector.filter.FilterException;
@@ -474,7 +476,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       if (closed) {
          return;
       }
-      Object message = delivery.getContext();
+      Message message = (Message)delivery.getContext();
 
       boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
 
@@ -566,7 +568,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
    /**
     * handle an out going message from ActiveMQ Artemis, send via the Proton Sender
     */
-   public int deliverMessage(Object message, int deliveryCount) throws Exception {
+   public int deliverMessage(Message message, int deliveryCount) throws Exception {
       if (closed) {
          return 0;
       }
@@ -592,13 +594,18 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       try {
          long messageFormat = 0;
 
-         // Encode the Server Message into the given Netty Buffer as an AMQP
-         // Message transformed from the internal message model.
-         try {
-            messageFormat = sessionSPI.encodeMessage(message, deliveryCount, new NettyWritable(nettyBuffer));
-         } catch (Throwable e) {
-            log.warn(e.getMessage(), e);
-            throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
+
+         if (message instanceof AMQPMessage) {
+            message.sendBuffer(nettyBuffer, deliveryCount);
+         } else {
+            // Encode the Server Message into the given Netty Buffer as an AMQP
+            // Message transformed from the internal message model.
+            try {
+               messageFormat = sessionSPI.encodeMessage(message, deliveryCount, new NettyWritable(nettyBuffer));
+            } catch (Throwable e) {
+               log.warn(e.getMessage(), e);
+               throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
+            }
          }
 
          int size = nettyBuffer.writerIndex();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java
index 51f42a3..1afeba8 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java
@@ -22,6 +22,7 @@ import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
 import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
 import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
@@ -61,7 +62,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
             return;
          }
 
-         DeliveryUtil.readDelivery(receiver, buffer);
+         receiver.recv(new NettyWritable(buffer));
 
          receiver.advance();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
index 25ef51e..673a688 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
@@ -254,7 +254,7 @@ public class ProtonHandler extends ProtonInitializable {
       flush(false);
    }
 
-   private void flush(boolean wait) {
+   public void flush(boolean wait) {
       synchronized (lock) {
          transport.process();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java
index 9257c6b..0ff1d3b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java
@@ -17,24 +17,11 @@
 package org.apache.activemq.artemis.protocol.amqp.util;
 
 import io.netty.buffer.ByteBuf;
-import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.message.Message;
 import org.apache.qpid.proton.message.impl.MessageImpl;
 
 public class DeliveryUtil {
 
-   public static int readDelivery(Receiver receiver, ByteBuf buffer) {
-      int initial = buffer.writerIndex();
-      // optimization by norman
-      int count;
-      while ((count = receiver.recv(buffer.array(), buffer.arrayOffset() + buffer.writerIndex(), buffer.writableBytes())) > 0) {
-         // Increment the writer index by the number of bytes written into it while calling recv.
-         buffer.writerIndex(buffer.writerIndex() + count);
-         buffer.ensureWritable(count);
-      }
-      return buffer.writerIndex() - initial;
-   }
-
    public static MessageImpl decodeMessageImpl(ByteBuf buffer) {
       MessageImpl message = (MessageImpl) Message.Factory.create();
       message.decode(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java
new file mode 100644
index 0000000..e0705b4
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.util;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.qpid.proton.codec.ReadableBuffer;
+
+public class NettyReadable implements ReadableBuffer {
+
+   private static final Charset Charset_UTF8 = Charset.forName("UTF-8");
+
+   private final ByteBuf buffer;
+
+   public NettyReadable(ByteBuf buffer) {
+      this.buffer = buffer;
+   }
+
+   @Override
+   public void put(ReadableBuffer other) {
+      buffer.writeBytes(other.byteBuffer());
+   }
+
+   @Override
+   public byte get() {
+      return buffer.readByte();
+   }
+
+   @Override
+   public int getInt() {
+      return buffer.readInt();
+   }
+
+   @Override
+   public long getLong() {
+      return buffer.readLong();
+   }
+
+   @Override
+   public short getShort() {
+      return buffer.readShort();
+   }
+
+   @Override
+   public float getFloat() {
+      return buffer.readFloat();
+   }
+
+   @Override
+   public double getDouble() {
+      return buffer.readDouble();
+   }
+
+   @Override
+   public ReadableBuffer get(byte[] data, int offset, int length) {
+      buffer.readBytes(data, offset, length);
+      return this;
+   }
+
+   @Override
+   public ReadableBuffer get(byte[] data) {
+      buffer.readBytes(data);
+      return this;
+   }
+
+   @Override
+   public ReadableBuffer position(int position) {
+      buffer.readerIndex(position);
+      return this;
+   }
+
+   @Override
+   public ReadableBuffer slice() {
+      return new NettyReadable(buffer.slice());
+   }
+
+   @Override
+   public ReadableBuffer flip() {
+      return new NettyReadable(buffer.duplicate().setIndex(0, buffer.readerIndex()));
+   }
+
+   @Override
+   public ReadableBuffer limit(int limit) {
+      buffer.writerIndex(limit);
+      return this;
+   }
+
+   @Override
+   public int limit() {
+      return buffer.writerIndex();
+   }
+
+   @Override
+   public int remaining() {
+      return buffer.readableBytes();
+   }
+
+   @Override
+   public int position() {
+      return buffer.readerIndex();
+   }
+
+   @Override
+   public boolean hasRemaining() {
+      return buffer.readableBytes() > 0;
+   }
+
+   @Override
+   public ReadableBuffer duplicate() {
+      return new NettyReadable(buffer.duplicate());
+   }
+
+   @Override
+   public ByteBuffer byteBuffer() {
+      return buffer.nioBuffer(0, buffer.writerIndex());
+   }
+
+   @Override
+   public String readUTF8() {
+      return buffer.toString(Charset_UTF8);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
index 08c46be..9a333c7 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
@@ -16,12 +16,6 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter;
 
-import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE;
-import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE;
-import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE;
-import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.wrapMessage;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
@@ -29,10 +23,11 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
@@ -52,8 +47,11 @@ import org.apache.qpid.proton.message.impl.MessageImpl;
 import org.junit.Assert;
 import org.junit.Test;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
+import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.wrapMessage;
 
 public class TestConversions extends Assert {
 
@@ -72,10 +70,10 @@ public class TestConversions extends Assert {
 
       message.setBody(new AmqpValue(new Boolean(true)));
 
-      EncodedMessage encodedMessage = encodeMessage(message);
+      AMQPMessage encodedMessage = new AMQPMessage(message, null);
 
       ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
-      ServerMessage serverMessage = converter.inbound(encodedMessage);
+      org.apache.activemq.artemis.api.core.Message serverMessage = converter.inbound(encodedMessage);
 
       verifyProperties(new ServerJMSMessage(serverMessage, 0));
 
@@ -101,10 +99,10 @@ public class TestConversions extends Assert {
 
       message.setBody(new Data(new Binary(bodyBytes)));
 
-      EncodedMessage encodedMessage = encodeMessage(message);
+      AMQPMessage encodedMessage = new AMQPMessage(message, null);
 
       ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
-      ServerMessage serverMessage = converter.inbound(encodedMessage);
+      org.apache.activemq.artemis.api.core.Message serverMessage = converter.inbound(encodedMessage);
 
       ServerJMSBytesMessage bytesMessage = (ServerJMSBytesMessage) wrapMessage(BYTES_TYPE, serverMessage, 0);
 
@@ -151,10 +149,10 @@ public class TestConversions extends Assert {
 
       message.setBody(new AmqpValue(mapValues));
 
-      EncodedMessage encodedMessage = encodeMessage(message);
+      AMQPMessage encodedMessage = new AMQPMessage(message, null);
 
       ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
-      ServerMessage serverMessage = converter.inbound(encodedMessage);
+      org.apache.activemq.artemis.api.core.Message serverMessage = converter.inbound(encodedMessage);
 
       ServerJMSMapMessage mapMessage = (ServerJMSMapMessage) wrapMessage(MAP_TYPE, serverMessage, 0);
       mapMessage.decode();
@@ -188,12 +186,10 @@ public class TestConversions extends Assert {
 
       message.setBody(new AmqpSequence(objects));
 
-      EncodedMessage encodedMessage = encodeMessage(message);
+      AMQPMessage encodedMessage = new AMQPMessage(message, null);
 
       ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
-      ServerMessage serverMessage = converter.inbound(encodedMessage);
-
-      simulatePersistence(serverMessage);
+      org.apache.activemq.artemis.api.core.Message serverMessage = converter.inbound(encodedMessage);
 
       ServerJMSStreamMessage streamMessage = (ServerJMSStreamMessage) wrapMessage(STREAM_TYPE, serverMessage, 0);
 
@@ -222,12 +218,10 @@ public class TestConversions extends Assert {
       String text = "someText";
       message.setBody(new AmqpValue(text));
 
-      EncodedMessage encodedMessage = encodeMessage(message);
+      AMQPMessage encodedMessage = new AMQPMessage(message, null);
 
       ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
-      ServerMessage serverMessage = converter.inbound(encodedMessage);
-
-      simulatePersistence(serverMessage);
+      org.apache.activemq.artemis.api.core.Message serverMessage = converter.inbound(encodedMessage);
 
       ServerJMSTextMessage textMessage = (ServerJMSTextMessage) wrapMessage(TEXT_TYPE, serverMessage, 0);
       textMessage.decode();
@@ -247,13 +241,6 @@ public class TestConversions extends Assert {
       System.out.println("output = " + amqpMessage);
    }
 
-   private void simulatePersistence(ServerMessage serverMessage) {
-      serverMessage.setAddress(new SimpleString("SomeAddress"));
-      // This is just to simulate what would happen during the persistence of the message
-      // We need to still be able to recover the message when we read it back
-      ((EncodingSupport) serverMessage).encode(new EmptyBuffer());
-   }
-
    private ProtonJMessage reEncodeMsg(Object obj) {
       ProtonJMessage objOut = (ProtonJMessage) obj;
 
@@ -263,14 +250,6 @@ public class TestConversions extends Assert {
       return objOut;
    }
 
-   private EncodedMessage encodeMessage(MessageImpl message) {
-      ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024 * 1024);
-      message.encode(new NettyWritable(buf));
-      byte[] bytesConvert = new byte[buf.writerIndex()];
-      buf.readBytes(bytesConvert);
-      return new EncodedMessage(0, bytesConvert, 0, bytesConvert.length);
-   }
-
    class EmptyBuffer implements ActiveMQBuffer {
 
       @Override
@@ -770,5 +749,10 @@ public class TestConversions extends Assert {
       public void release() {
          //no-op
       }
+
+      @Override
+      public void writeBytes(ByteBuf src, int srcIndex, int length) {
+
+      }
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java
index d7a948a..bfaec54 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java
@@ -44,6 +44,7 @@ import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessa
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.utils.IDGenerator;
 import org.apache.activemq.artemis.utils.SimpleIDGenerator;
 import org.apache.qpid.proton.Proton;
@@ -82,8 +83,9 @@ public class JMSMappingInboundTransformerTest {
       Message message = Message.Factory.create();
       message.setContentType(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE);
 
-      EncodedMessage em = encodeMessage(message);
-      javax.jms.Message jmsMessage = transformer.transform(em);
+      AMQPMessage messageEncode = new AMQPMessage(message, null);
+
+      javax.jms.Message jmsMessage = transformer.transform(messageEncode);
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
index 2ece01d..3fe6d70 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
@@ -16,19 +16,7 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter.message;
 
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_SEQUENCE;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_UNKNOWN;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_BINARY;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
+import javax.jms.JMSException;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.ObjectInputStream;
@@ -39,10 +27,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import javax.jms.JMSException;
-
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
@@ -64,8 +52,18 @@ import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_SEQUENCE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_UNKNOWN;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_BINARY;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class JMSMappingOutboundTransformerTest {
 
@@ -943,8 +941,8 @@ public class JMSMappingOutboundTransformerTest {
       }
    }
 
-   private ServerMessageImpl newMessage(byte messageType) {
-      ServerMessageImpl message = new ServerMessageImpl(idGenerator.generateID(), 512);
+   private CoreMessage newMessage(byte messageType) {
+      CoreMessage message = new CoreMessage(idGenerator.generateID(), 512);
       message.setType(messageType);
       ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null);
       return message;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java
index 99aab33..fdf0129 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java
@@ -21,8 +21,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.activemq.artemis.utils.IDGenerator;
 import org.apache.activemq.artemis.utils.SimpleIDGenerator;
@@ -68,11 +68,11 @@ public class JMSTransformationSpeedComparisonTest {
 
       Message message = Proton.message();
       message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
-      EncodedMessage encoded = encode(message);
+      AMQPMessage encoded = encode(message);
 
       // Warm up
       for (int i = 0; i < WARM_CYCLES; ++i) {
-         ServerMessage intermediate = converter.inbound(encoded);
+         org.apache.activemq.artemis.api.core.Message intermediate = converter.inbound(encoded);
          encode(converter.outbound(intermediate, 1));
       }
 
@@ -80,7 +80,7 @@ public class JMSTransformationSpeedComparisonTest {
 
       long startTime = System.nanoTime();
       for (int i = 0; i < PROFILE_CYCLES; ++i) {
-         ServerMessage intermediate = converter.inbound(encoded);
+         org.apache.activemq.artemis.api.core.Message intermediate = converter.inbound(encoded);
          encode(converter.outbound(intermediate, 1));
       }
       totalDuration += System.nanoTime() - startTime;
@@ -99,11 +99,11 @@ public class JMSTransformationSpeedComparisonTest {
       message.setContentType("text/plain");
       message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
 
-      EncodedMessage encoded = encode(message);
+      AMQPMessage encoded = encode(message);
 
       // Warm up
       for (int i = 0; i < WARM_CYCLES; ++i) {
-         ServerMessage intermediate = converter.inbound(encoded);
+         org.apache.activemq.artemis.api.core.Message intermediate = converter.inbound(encoded);
          encode(converter.outbound(intermediate, 1));
       }
 
@@ -111,7 +111,7 @@ public class JMSTransformationSpeedComparisonTest {
 
       long startTime = System.nanoTime();
       for (int i = 0; i < PROFILE_CYCLES; ++i) {
-         ServerMessage intermediate = converter.inbound(encoded);
+         org.apache.activemq.artemis.api.core.Message intermediate = converter.inbound(encoded);
          encode(converter.outbound(intermediate, 1));
       }
       totalDuration += System.nanoTime() - startTime;
@@ -122,11 +122,11 @@ public class JMSTransformationSpeedComparisonTest {
    @Test
    public void testTypicalQpidJMSMessage() throws Exception {
 
-      EncodedMessage encoded = encode(createTypicalQpidJMSMessage());
+      AMQPMessage encoded = encode(createTypicalQpidJMSMessage());
 
       // Warm up
       for (int i = 0; i < WARM_CYCLES; ++i) {
-         ServerMessage intermediate = converter.inbound(encoded);
+         org.apache.activemq.artemis.api.core.Message intermediate = converter.inbound(encoded);
          encode(converter.outbound(intermediate, 1));
       }
 
@@ -134,7 +134,7 @@ public class JMSTransformationSpeedComparisonTest {
 
       long startTime = System.nanoTime();
       for (int i = 0; i < PROFILE_CYCLES; ++i) {
-         ServerMessage intermediate = converter.inbound(encoded);
+         org.apache.activemq.artemis.api.core.Message intermediate = converter.inbound(encoded);
          encode(converter.outbound(intermediate, 1));
       }
       totalDuration += System.nanoTime() - startTime;
@@ -145,11 +145,11 @@ public class JMSTransformationSpeedComparisonTest {
    @Test
    public void testComplexQpidJMSMessage() throws Exception {
 
-      EncodedMessage encoded = encode(createComplexQpidJMSMessage());
+      AMQPMessage encoded = encode(createComplexQpidJMSMessage());
 
       // Warm up
       for (int i = 0; i < WARM_CYCLES; ++i) {
-         ServerMessage intermediate = converter.inbound(encoded);
+         org.apache.activemq.artemis.api.core.Message intermediate = converter.inbound(encoded);
          encode(converter.outbound(intermediate, 1));
       }
 
@@ -157,7 +157,7 @@ public class JMSTransformationSpeedComparisonTest {
 
       long startTime = System.nanoTime();
       for (int i = 0; i < PROFILE_CYCLES; ++i) {
-         ServerMessage intermediate = converter.inbound(encoded);
+         org.apache.activemq.artemis.api.core.Message intermediate = converter.inbound(encoded);
          encode(converter.outbound(intermediate, 1));
       }
       totalDuration += System.nanoTime() - startTime;
@@ -168,7 +168,7 @@ public class JMSTransformationSpeedComparisonTest {
    @Test
    public void testTypicalQpidJMSMessageInBoundOnly() throws Exception {
 
-      EncodedMessage encoded = encode(createTypicalQpidJMSMessage());
+      AMQPMessage encoded = encode(createTypicalQpidJMSMessage());
 
       // Warm up
       for (int i = 0; i < WARM_CYCLES; ++i) {
@@ -190,8 +190,8 @@ public class JMSTransformationSpeedComparisonTest {
    @Test
    public void testTypicalQpidJMSMessageOutBoundOnly() throws Exception {
 
-      EncodedMessage encoded = encode(createTypicalQpidJMSMessage());
-      ServerMessage intermediate = converter.inbound(encoded);
+      AMQPMessage encoded = encode(createTypicalQpidJMSMessage());
+      org.apache.activemq.artemis.api.core.Message intermediate = converter.inbound(encoded);
 
       // Warm up
       for (int i = 0; i < WARM_CYCLES; ++i) {
@@ -278,14 +278,14 @@ public class JMSTransformationSpeedComparisonTest {
       return message;
    }
 
-   private EncodedMessage encode(Object target) {
+   private AMQPMessage encode(Object target) {
       if (target instanceof ProtonJMessage) {
          ProtonJMessage amqp = (ProtonJMessage) target;
 
          ByteBuf nettyBuffer = Unpooled.buffer(1024);
          amqp.encode(new NettyWritable(nettyBuffer));
 
-         return new EncodedMessage(0, nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes());
+         return new AMQPMessage(0, nettyBuffer.array(), null);
       } else {
          return null;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
index a5a2168..6a0f20c 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
@@ -26,8 +26,8 @@ import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.utils.IDGenerator;
 import org.apache.activemq.artemis.utils.SimpleIDGenerator;
 import org.apache.qpid.proton.Proton;
@@ -87,7 +87,7 @@ public class MessageTransformationTest {
 
       EncodedMessage encoded = encode(incomingMessage);
 
-      ServerMessage outbound = converter.inbound(encoded);
+      org.apache.activemq.artemis.api.core.Message outbound = converter.inbound(encoded);
       Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, outbound.getLongProperty("JMSXDeliveryCount").intValue())).decode();
 
       // Test that message details are equal
@@ -128,8 +128,7 @@ public class MessageTransformationTest {
 
       incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
 
-      EncodedMessage encoded = encode(incomingMessage);
-      ServerMessage outbound = converter.inbound(encoded);
+      org.apache.activemq.artemis.api.core.Message outbound = converter.inbound(new AMQPMessage(incomingMessage, null));
       Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode();
 
       assertNull(outboudMessage.getHeader());
@@ -144,8 +143,7 @@ public class MessageTransformationTest {
       incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
       incomingMessage.setMessageId("ID:SomeQualifier:0:0:1");
 
-      EncodedMessage encoded = encode(incomingMessage);
-      ServerMessage outbound = converter.inbound(encoded);
+      org.apache.activemq.artemis.api.core.Message outbound = converter.inbound(new AMQPMessage(incomingMessage, null));
       Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode();
 
       assertNull(outboudMessage.getHeader());
@@ -160,8 +158,7 @@ public class MessageTransformationTest {
       incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
       incomingMessage.setDurable(true);
 
-      EncodedMessage encoded = encode(incomingMessage);
-      ServerMessage outbound = converter.inbound(encoded);
+      org.apache.activemq.artemis.api.core.Message outbound = converter.inbound(new AMQPMessage(incomingMessage, null));
       Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode();
 
       assertNotNull(outboudMessage.getHeader());
@@ -175,8 +172,7 @@ public class MessageTransformationTest {
 
       incomingMessage.setBody(new AmqpValue(new Boolean(true)));
 
-      EncodedMessage encoded = encode(incomingMessage);
-      ServerMessage outbound = converter.inbound(encoded);
+      org.apache.activemq.artemis.api.core.Message outbound = converter.inbound(new AMQPMessage(incomingMessage, null));
       Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode();
 
       Section section = outboudMessage.getBody();
@@ -233,8 +229,7 @@ public class MessageTransformationTest {
       message.setMessageAnnotations(new MessageAnnotations(messageAnnotations));
       message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
 
-      EncodedMessage encoded = encode(message);
-      ServerMessage outbound = converter.inbound(encoded);
+      org.apache.activemq.artemis.api.core.Message outbound = converter.inbound(new AMQPMessage(message, null));
       Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode();
 
       assertNotNull(outboudMessage.getHeader());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
new file mode 100644
index 0000000..4313eae
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.message;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+import org.apache.commons.collections.map.HashedMap;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.message.impl.MessageImpl;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AMQPMessageTest {
+
+   @Test
+   public void testVerySimple() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader( new Header());
+      Properties properties = new Properties();
+      properties.setTo("someNiceLocal");
+      protonMessage.setProperties(properties);
+      protonMessage.getHeader().setDeliveryCount(new UnsignedInteger(7));
+      protonMessage.getHeader().setDurable(Boolean.TRUE);
+      protonMessage.setApplicationProperties(new ApplicationProperties(new HashedMap()));
+
+      ByteBuf nettyBuffer = Unpooled.buffer(1500);
+
+      protonMessage.encode(new NettyWritable(nettyBuffer));
+
+      byte[] bytes = new byte[nettyBuffer.writerIndex()];
+
+      nettyBuffer.readBytes(bytes);
+
+      AMQPMessage encode = new AMQPMessage(0, bytes, null);
+
+      Assert.assertEquals(7, encode.getHeader().getDeliveryCount().intValue());
+      Assert.assertEquals(true, encode.getHeader().getDurable());
+      Assert.assertEquals("someNiceLocal", encode.getAddress());
+
+
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index f0385dc..e619eb9 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -28,10 +28,9 @@ import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 
 /**
@@ -112,19 +111,20 @@ public class MQTTPublishManager {
     * to original ID and consumer in the Session state.  This way we can look up the consumer Id and the message Id from
     * the PubAck or PubRec message id. *
     */
-   protected void sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
+   protected void sendMessage(CoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
       // This is to allow retries of PubRel.
       if (isManagementConsumer(consumer)) {
          sendPubRelMessage(message);
       } else {
          int qos = decideQoS(message, consumer);
          if (qos == 0) {
-            sendServerMessage((int) message.getMessageID(), (ServerMessageImpl) message, deliveryCount, qos);
+            // TODO-now: fix encoding
+            sendServerMessage((int) message.getMessageID(),  message, deliveryCount, qos);
             session.getServerSession().acknowledge(consumer.getID(), message.getMessageID());
          } else if (qos == 1 || qos == 2) {
             int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID());
             outboundStore.publish(mqttid, message.getMessageID(), consumer.getID());
-            sendServerMessage(mqttid, (ServerMessageImpl) message, deliveryCount, qos);
+            sendServerMessage(mqttid, message, deliveryCount, qos);
          } else {
             // Client must have disconnected and it's Subscription QoS cleared
             consumer.individualCancel(message.getMessageID(), false);
@@ -149,7 +149,7 @@ public class MQTTPublishManager {
     */
    void sendInternal(int messageId, String topic, int qos, ByteBuf payload, boolean retain, boolean internal) throws Exception {
       synchronized (lock) {
-         ServerMessage serverMessage = MQTTUtil.createServerMessageFromByteBuf(session, topic, retain, qos, payload);
+         Message serverMessage = MQTTUtil.createServerMessageFromByteBuf(session, topic, retain, qos, payload);
 
          if (qos > 0) {
             serverMessage.setDurable(MQTTUtil.DURABLE_MESSAGES);
@@ -181,7 +181,7 @@ public class MQTTPublishManager {
       }
    }
 
-   void sendPubRelMessage(ServerMessage message) {
+   void sendPubRelMessage(Message message) {
       int messageId = message.getIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY);
       session.getProtocolHandler().sendPubRel(messageId);
    }
@@ -190,7 +190,7 @@ public class MQTTPublishManager {
       try {
          Pair<Long, Long> ref = outboundStore.publishReceived(messageId);
          if (ref != null) {
-            ServerMessage m = MQTTUtil.createPubRelMessage(session, managementAddress, messageId);
+            Message m = MQTTUtil.createPubRelMessage(session, managementAddress, messageId);
             session.getServerSession().send(m, true);
             session.getServerSession().acknowledge(ref.getB(), ref.getA());
          } else {
@@ -246,7 +246,7 @@ public class MQTTPublishManager {
       }
    }
 
-   private void sendServerMessage(int messageId, ServerMessageImpl message, int deliveryCount, int qos) {
+   private void sendServerMessage(int messageId, CoreMessage message, int deliveryCount, int qos) {
       String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString(), session.getWildcardConfiguration());
 
       ByteBuf payload;
@@ -262,14 +262,14 @@ public class MQTTPublishManager {
                log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage());
             }
          default:
-            ActiveMQBuffer bufferDup = message.getBodyBufferDuplicate();
+            ActiveMQBuffer bufferDup = message.getReadOnlyBodyBuffer();
             payload = bufferDup.readBytes(message.getEndOfBodyPosition() - bufferDup.readerIndex()).byteBuf();
             break;
       }
       session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount);
    }
 
-   private int decideQoS(ServerMessage message, ServerConsumer consumer) {
+   private int decideQoS(Message message, ServerConsumer consumer) {
 
       int subscriptionQoS = -1;
       try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
index 596670b..0b52a0b 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
@@ -17,12 +17,12 @@
 
 package org.apache.activemq.artemis.core.protocol.mqtt;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.LinkedListIterator;
@@ -44,7 +44,7 @@ public class MQTTRetainMessageManager {
     * the subscription queue for the consumer.  When a new retained message is received the message will be sent to
     * the retained queue and the previous retain message consumed to remove it from the queue.
     */
-   void handleRetainedMessage(ServerMessage message, String address, boolean reset, Transaction tx) throws Exception {
+   void handleRetainedMessage(Message message, String address, boolean reset, Transaction tx) throws Exception {
       SimpleString retainAddress = new SimpleString(MQTTUtil.convertMQTTAddressFilterToCoreRetain(address, session.getWildcardConfiguration()));
 
       Queue queue = session.getServer().locateQueue(retainAddress);
@@ -82,7 +82,7 @@ public class MQTTRetainMessageManager {
                Queue retainedQueue = session.getServer().locateQueue(retainedQueueName);
                try (LinkedListIterator<MessageReference> i = retainedQueue.iterator()) {
                   if (i.hasNext()) {
-                     ServerMessage message = i.next().getMessage().copy(session.getServer().getStorageManager().generateID());
+                     Message message = i.next().getMessage().copy(session.getServer().getStorageManager().generateID());
                      sendToQueue(message, queue, tx);
                   }
                }
@@ -95,7 +95,7 @@ public class MQTTRetainMessageManager {
       tx.commit();
    }
 
-   private void sendToQueue(ServerMessage message, Queue queue, Transaction tx) throws Exception {
+   private void sendToQueue(Message message, Queue queue, Transaction tx) throws Exception {
       RoutingContext context = new RoutingContextImpl(tx);
       queue.route(message, context);
       session.getServer().getPostOffice().processRoute(message, context, false);