You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/08/14 02:42:13 UTC

[1/4] activemq-artemis git commit: fixing release profile on examples

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 6b17d966c -> 6c6568b76


fixing release profile on examples


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a4498d46
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a4498d46
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a4498d46

Branch: refs/heads/master
Commit: a4498d46d148fe6c236ef328502235eb3b63cc45
Parents: 6b17d96
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Aug 13 20:15:31 2015 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Aug 13 20:28:24 2015 -0400

----------------------------------------------------------------------
 examples/features/perf/pom.xml | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a4498d46/examples/features/perf/pom.xml
----------------------------------------------------------------------
diff --git a/examples/features/perf/pom.xml b/examples/features/perf/pom.xml
index eaafdaf..b47c259 100644
--- a/examples/features/perf/pom.xml
+++ b/examples/features/perf/pom.xml
@@ -22,8 +22,8 @@ under the License.
    <modelVersion>4.0.0</modelVersion>
 
    <parent>
-      <groupId>org.apache.activemq.examples</groupId>
-      <artifactId>artemis-examples</artifactId>
+      <groupId>org.apache.activemq.examples.clustered</groupId>
+      <artifactId>broker-features</artifactId>
       <version>1.0.1-SNAPSHOT</version>
    </parent>
 


[3/4] activemq-artemis git commit: ARTEMIS-204 Improvements on OpenWire

Posted by cl...@apache.org.
ARTEMIS-204 Improvements on OpenWire

https://issues.apache.org/jira/browse/ARTEMIS-204

by consequence this will also fix any possible issues with AMQP


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1dae9974
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1dae9974
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1dae9974

Branch: refs/heads/master
Commit: 1dae99746b4532313bbfa66e530751e8d79d2d5b
Parents: a4498d4
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Aug 13 19:20:20 2015 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Aug 13 20:39:01 2015 -0400

----------------------------------------------------------------------
 .../artemis/api/core/ActiveMQBuffer.java        |   7 +-
 .../core/buffers/impl/ChannelBufferWrapper.java |  26 ++-
 .../apache/activemq/artemis/utils/ByteUtil.java |  12 +
 .../CompressedLargeMessageControllerImpl.java   |  30 ++-
 .../client/impl/LargeMessageControllerImpl.java |  23 +-
 .../artemis/reader/BytesMessageUtil.java        | 106 ++++-----
 .../activemq/artemis/reader/MapMessageUtil.java |  17 +-
 .../activemq/artemis/reader/MessageUtil.java    |   7 +-
 .../artemis/reader/StreamMessageUtil.java       |  40 ++--
 .../artemis/reader/TextMessageUtil.java         |   7 +-
 .../jms/client/ActiveMQBytesMessage.java        |  52 ++---
 .../artemis/jms/client/ActiveMQMapMessage.java  |   4 +-
 .../jms/client/ActiveMQStreamMessage.java       |  22 +-
 .../artemis/jms/client/ActiveMQTextMessage.java |   4 +-
 .../converter/jms/ServerJMSBytesMessage.java    |  53 ++---
 .../converter/jms/ServerJMSMapMessage.java      |   4 +-
 .../proton/converter/jms/ServerJMSMessage.java  |  19 ++
 .../converter/jms/ServerJMSStreamMessage.java   |  84 ++++---
 .../converter/jms/ServerJMSTextMessage.java     |   6 +-
 .../core/protocol/proton/TestConversions.java   |  20 +-
 .../protocol/openwire/DataInputWrapper.java     | 228 -------------------
 .../protocol/openwire/OpenWireConnection.java   | 163 ++++++-------
 .../openwire/OpenWireMessageConverter.java      |  23 +-
 .../openwire/OpenWireProtocolManager.java       |  44 ++--
 .../core/server/impl/ServerConsumerImpl.java    |   6 -
 .../openwire/SimpleOpenWireTest.java            | 136 ++++++++---
 .../openwire/VerySimpleOenwireTest.java         | 118 ++++++++++
 .../netty/ActiveMQFrameDecoder2Test.java        |   8 +-
 28 files changed, 646 insertions(+), 623 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
index 12dd09f..da78fb7 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.api.core;
 
+import java.io.DataInput;
 import java.nio.ByteBuffer;
 
 import io.netty.buffer.ByteBuf;
@@ -29,7 +30,7 @@ import io.netty.buffer.ByteBuf;
  *
  * @see ActiveMQBuffers
  */
-public interface ActiveMQBuffer {
+public interface ActiveMQBuffer extends DataInput {
 
    /**
     * Returns the underlying Netty's ByteBuf
@@ -642,7 +643,7 @@ public interface ActiveMQBuffer {
     * @return an unsigned byte at the current {@code readerIndex}
     * @throws IndexOutOfBoundsException if {@code this.readableBytes} is less than {@code 1}
     */
-   short readUnsignedByte();
+   int readUnsignedByte();
 
    /**
     * Gets a 16-bit short integer at the current {@code readerIndex}
@@ -874,7 +875,7 @@ public interface ActiveMQBuffer {
     * @param length The number of bytes to skip
     * @throws IndexOutOfBoundsException if {@code length} is greater than {@code this.readableBytes}
     */
-   void skipBytes(int length);
+   int skipBytes(int length);
 
    /**
     * Sets the specified byte at the current {@code writerIndex}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
index 51fea91..1c830c6 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
@@ -16,12 +16,14 @@
  */
 package org.apache.activemq.artemis.core.buffers.impl;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.activemq.artemis.utils.DataConstants;
 import org.apache.activemq.artemis.utils.UTF8Util;
 
@@ -350,7 +352,7 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
       return new ChannelBufferWrapper(buffer.readSlice(length), releasable);
    }
 
-   public short readUnsignedByte() {
+   public int readUnsignedByte() {
       return buffer.readUnsignedByte();
    }
 
@@ -426,8 +428,9 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
       buffer.setShort(index, value);
    }
 
-   public void skipBytes(final int length) {
+   public int skipBytes(final int length) {
       buffer.skipBytes(length);
+      return length;
    }
 
    public ActiveMQBuffer slice() {
@@ -510,4 +513,23 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
       buffer.writeShort(value);
    }
 
+   /** from {@link java.io.DataInput} interface */
+   @Override
+   public void readFully(byte[] b) throws IOException {
+      readBytes(b);
+   }
+
+   /** from {@link java.io.DataInput} interface */
+   @Override
+   public void readFully(byte[] b, int off, int len) throws IOException {
+      readBytes(b, off, len);
+   }
+
+   /** from {@link java.io.DataInput} interface */
+   @Override
+   public String readLine() throws IOException {
+      return ByteUtil.readLine(this);
+   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
index 6e2ca99..b7ff841 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.utils;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.UnpooledByteBufAllocator;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 
 public class ByteUtil {
 
@@ -67,4 +68,15 @@ public class ByteUtil {
       return buffer.array();
    }
 
+
+   public static String readLine(ActiveMQBuffer buffer) {
+      StringBuilder sb = new StringBuilder("");
+      char c = buffer.readChar();
+      while (c != '\n') {
+         sb.append(c);
+         c = buffer.readChar();
+      }
+      return sb.toString();
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
index 1e93ced..ae711bf 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.client.impl;
 
 import java.io.DataInputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
@@ -27,8 +28,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
-import org.apache.activemq.artemis.utils.DataConstants;
 import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream;
+import org.apache.activemq.artemis.utils.DataConstants;
 import org.apache.activemq.artemis.utils.InflaterReader;
 import org.apache.activemq.artemis.utils.InflaterWriter;
 import org.apache.activemq.artemis.utils.UTF8Util;
@@ -302,9 +303,9 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll
       throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
    }
 
-   public short readUnsignedByte() {
+   public int readUnsignedByte() {
       try {
-         return (short) getStream().readUnsignedByte();
+         return getStream().readUnsignedByte();
       }
       catch (Exception e) {
          throw new IllegalStateException(e.getMessage(), e);
@@ -391,18 +392,39 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll
       dst.put(bytesToGet);
    }
 
-   public void skipBytes(final int length) {
+   public int skipBytes(final int length) {
 
       try {
          for (int i = 0; i < length; i++) {
             getStream().read();
          }
+         return length;
       }
       catch (Exception e) {
          throw new IllegalStateException(e.getMessage(), e);
       }
    }
 
+
+   /** from {@link java.io.DataInput} interface */
+   @Override
+   public void readFully(byte[] b) throws IOException {
+      readBytes(b);
+   }
+
+   /** from {@link java.io.DataInput} interface */
+   @Override
+   public void readFully(byte[] b, int off, int len) throws IOException {
+      readBytes(b, off, len);
+   }
+
+   /** from {@link java.io.DataInput} interface */
+   @Override
+   public String readLine() throws IOException {
+      return getStream().readLine();
+   }
+
+
    public void writeByte(final byte value) {
       throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
index 7f44cff..ad79e82 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
@@ -38,6 +38,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
 import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
+import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.activemq.artemis.utils.DataConstants;
 import org.apache.activemq.artemis.utils.UTF8Util;
 
@@ -666,7 +667,7 @@ public class LargeMessageControllerImpl implements LargeMessageController {
       throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
    }
 
-   public short readUnsignedByte() {
+   public int readUnsignedByte() {
       return (short) (readByte() & 0xFF);
    }
 
@@ -758,11 +759,12 @@ public class LargeMessageControllerImpl implements LargeMessageController {
       readerIndex += length;
    }
 
-   public void skipBytes(final int length) {
+   public int skipBytes(final int length) {
 
       long newReaderIndex = readerIndex + length;
       checkForPacket(newReaderIndex);
       readerIndex = newReaderIndex;
+      return length;
    }
 
    public void writeByte(final byte value) {
@@ -1176,7 +1178,24 @@ public class LargeMessageControllerImpl implements LargeMessageController {
             }
          }
       }
+   }
+
+   /** from {@link java.io.DataInput} interface */
+   @Override
+   public void readFully(byte[] b) throws IOException {
+      readBytes(b);
+   }
 
+   /** from {@link java.io.DataInput} interface */
+   @Override
+   public void readFully(byte[] b, int off, int len) throws IOException {
+      readBytes(b, off, len);
+   }
+
+   /** from {@link java.io.DataInput} interface */
+   @Override
+   public String readLine() throws IOException {
+      return ByteUtil.readLine(this);
    }
 
    public ByteBuf byteBuf() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/BytesMessageUtil.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/BytesMessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/BytesMessageUtil.java
index 806a321..a8dce4a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/BytesMessageUtil.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/BytesMessageUtil.java
@@ -16,115 +16,115 @@
  */
 package org.apache.activemq.artemis.reader;
 
-import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 
 public class BytesMessageUtil extends MessageUtil {
 
-   public static boolean bytesReadBoolean(Message message) {
-      return getBodyBuffer(message).readBoolean();
+   public static boolean bytesReadBoolean(ActiveMQBuffer message) {
+      return message.readBoolean();
    }
 
-   public static byte bytesReadByte(Message message) {
-      return getBodyBuffer(message).readByte();
+   public static byte bytesReadByte(ActiveMQBuffer message) {
+      return message.readByte();
    }
 
-   public static int bytesReadUnsignedByte(Message message) {
-      return getBodyBuffer(message).readUnsignedByte();
+   public static int bytesReadUnsignedByte(ActiveMQBuffer message) {
+      return message.readUnsignedByte();
    }
 
-   public static short bytesReadShort(Message message) {
-      return getBodyBuffer(message).readShort();
+   public static short bytesReadShort(ActiveMQBuffer message) {
+      return message.readShort();
    }
 
-   public static int bytesReadUnsignedShort(Message message) {
-      return getBodyBuffer(message).readUnsignedShort();
+   public static int bytesReadUnsignedShort(ActiveMQBuffer message) {
+      return message.readUnsignedShort();
    }
 
-   public static char bytesReadChar(Message message) {
-      return (char) getBodyBuffer(message).readShort();
+   public static char bytesReadChar(ActiveMQBuffer message) {
+      return (char) message.readShort();
    }
 
-   public static int bytesReadInt(Message message) {
-      return getBodyBuffer(message).readInt();
+   public static int bytesReadInt(ActiveMQBuffer message) {
+      return message.readInt();
    }
 
-   public static long bytesReadLong(Message message) {
-      return getBodyBuffer(message).readLong();
+   public static long bytesReadLong(ActiveMQBuffer message) {
+      return message.readLong();
    }
 
-   public static float bytesReadFloat(Message message) {
-      return Float.intBitsToFloat(getBodyBuffer(message).readInt());
+   public static float bytesReadFloat(ActiveMQBuffer message) {
+      return Float.intBitsToFloat(message.readInt());
    }
 
-   public static double bytesReadDouble(Message message) {
-      return Double.longBitsToDouble(getBodyBuffer(message).readLong());
+   public static double bytesReadDouble(ActiveMQBuffer message) {
+      return Double.longBitsToDouble(message.readLong());
    }
 
-   public static String bytesReadUTF(Message message) {
-      return getBodyBuffer(message).readUTF();
+   public static String bytesReadUTF(ActiveMQBuffer message) {
+      return message.readUTF();
    }
 
-   public static int bytesReadBytes(Message message, final byte[] value) {
+   public static int bytesReadBytes(ActiveMQBuffer message, final byte[] value) {
       return bytesReadBytes(message, value, value.length);
    }
 
-   public static int bytesReadBytes(Message message, final byte[] value, final int length) {
-      if (!getBodyBuffer(message).readable()) {
+   public static int bytesReadBytes(ActiveMQBuffer message, final byte[] value, final int length) {
+      if (!message.readable()) {
          return -1;
       }
 
-      int read = Math.min(length, getBodyBuffer(message).readableBytes());
+      int read = Math.min(length, message.readableBytes());
 
       if (read != 0) {
-         getBodyBuffer(message).readBytes(value, 0, read);
+         message.readBytes(value, 0, read);
       }
 
       return read;
 
    }
 
-   public static void bytesWriteBoolean(Message message, boolean value) {
-      getBodyBuffer(message).writeBoolean(value);
+   public static void bytesWriteBoolean(ActiveMQBuffer message, boolean value) {
+      message.writeBoolean(value);
    }
 
-   public static void bytesWriteByte(Message message, byte value) {
-      getBodyBuffer(message).writeByte(value);
+   public static void bytesWriteByte(ActiveMQBuffer message, byte value) {
+      message.writeByte(value);
    }
 
-   public static void bytesWriteShort(Message message, short value) {
-      getBodyBuffer(message).writeShort(value);
+   public static void bytesWriteShort(ActiveMQBuffer message, short value) {
+      message.writeShort(value);
    }
 
-   public static void bytesWriteChar(Message message, char value) {
-      getBodyBuffer(message).writeShort((short) value);
+   public static void bytesWriteChar(ActiveMQBuffer message, char value) {
+      message.writeShort((short) value);
    }
 
-   public static void bytesWriteInt(Message message, int value) {
-      getBodyBuffer(message).writeInt(value);
+   public static void bytesWriteInt(ActiveMQBuffer message, int value) {
+      message.writeInt(value);
    }
 
-   public static void bytesWriteLong(Message message, long value) {
-      getBodyBuffer(message).writeLong(value);
+   public static void bytesWriteLong(ActiveMQBuffer message, long value) {
+      message.writeLong(value);
    }
 
-   public static void bytesWriteFloat(Message message, float value) {
-      getBodyBuffer(message).writeInt(Float.floatToIntBits(value));
+   public static void bytesWriteFloat(ActiveMQBuffer message, float value) {
+      message.writeInt(Float.floatToIntBits(value));
    }
 
-   public static void bytesWriteDouble(Message message, double value) {
-      getBodyBuffer(message).writeLong(Double.doubleToLongBits(value));
+   public static void bytesWriteDouble(ActiveMQBuffer message, double value) {
+      message.writeLong(Double.doubleToLongBits(value));
    }
 
-   public static void bytesWriteUTF(Message message, String value) {
-      getBodyBuffer(message).writeUTF(value);
+   public static void bytesWriteUTF(ActiveMQBuffer message, String value) {
+      message.writeUTF(value);
    }
 
-   public static void bytesWriteBytes(Message message, byte[] value) {
-      getBodyBuffer(message).writeBytes(value);
+   public static void bytesWriteBytes(ActiveMQBuffer message, byte[] value) {
+      message.writeBytes(value);
    }
 
-   public static void bytesWriteBytes(Message message, final byte[] value, final int offset, final int length) {
-      getBodyBuffer(message).writeBytes(value, offset, length);
+   public static void bytesWriteBytes(ActiveMQBuffer message, final byte[] value, final int offset, final int length) {
+      message.writeBytes(value, offset, length);
    }
 
    /**
@@ -134,7 +134,7 @@ public class BytesMessageUtil extends MessageUtil {
     * @param value
     * @return
     */
-   public static boolean bytesWriteObject(Message message, Object value) {
+   public static boolean bytesWriteObject(ActiveMQBuffer message, Object value) {
       if (value == null) {
          throw new NullPointerException("Attempt to write a null value");
       }
@@ -175,8 +175,8 @@ public class BytesMessageUtil extends MessageUtil {
       return true;
    }
 
-   public static void bytesMessageReset(Message message) {
-      getBodyBuffer(message).resetReaderIndex();
+   public static void bytesMessageReset(ActiveMQBuffer message) {
+      message.resetReaderIndex();
    }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java
index 9ae4798..65aeccb 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java
@@ -17,7 +17,6 @@
 package org.apache.activemq.artemis.reader;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.utils.TypedProperties;
 
 public class MapMessageUtil extends MessageUtil {
@@ -25,16 +24,15 @@ public class MapMessageUtil extends MessageUtil {
    /**
     * Utility method to set the map on a message body
     */
-   public static void writeBodyMap(Message message, TypedProperties properties) {
-      ActiveMQBuffer buff = getBodyBuffer(message);
-      buff.resetWriterIndex();
-      properties.encode(buff);
+   public static void writeBodyMap(ActiveMQBuffer message, TypedProperties properties) {
+      message.resetWriterIndex();
+      properties.encode(message);
    }
 
    /**
     * Utility method to set the map on a message body
     */
-   public static TypedProperties readBodyMap(Message message) {
+   public static TypedProperties readBodyMap(ActiveMQBuffer message) {
       TypedProperties map = new TypedProperties();
       readBodyMap(message, map);
       return map;
@@ -43,10 +41,9 @@ public class MapMessageUtil extends MessageUtil {
    /**
     * Utility method to set the map on a message body
     */
-   public static void readBodyMap(Message message, TypedProperties map) {
-      ActiveMQBuffer buff = getBodyBuffer(message);
-      buff.resetReaderIndex();
-      map.decode(buff);
+   public static void readBodyMap(ActiveMQBuffer message, TypedProperties map) {
+      message.resetReaderIndex();
+      map.decode(message);
    }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
index 9f1a598..b56abc4 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
@@ -21,7 +21,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.Message;
@@ -52,9 +51,9 @@ public class MessageUtil {
 
    public static final SimpleString CONNECTION_ID_PROPERTY_NAME = new SimpleString("__AMQ_CID");
 
-   public static ActiveMQBuffer getBodyBuffer(Message message) {
-      return message.getBodyBuffer();
-   }
+//   public static ActiveMQBuffer getBodyBuffer(Message message) {
+//      return message.getBodyBuffer();
+//   }
 
    public static byte[] getJMSCorrelationIDAsBytes(Message message) {
       Object obj = message.getObjectProperty(CORRELATIONID_HEADER_NAME);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/StreamMessageUtil.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/StreamMessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/StreamMessageUtil.java
index d59662f..dbba989 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/StreamMessageUtil.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/StreamMessageUtil.java
@@ -17,7 +17,6 @@
 package org.apache.activemq.artemis.reader;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.utils.DataConstants;
 
@@ -27,11 +26,10 @@ public class StreamMessageUtil extends MessageUtil {
     * Method to read boolean values out of the Stream protocol existent on JMS Stream Messages
     * Throws IllegalStateException if the type was invalid
     *
-    * @param message
+    * @param buff
     * @return
     */
-   public static boolean streamReadBoolean(Message message) {
-      ActiveMQBuffer buff = getBodyBuffer(message);
+   public static boolean streamReadBoolean(ActiveMQBuffer buff) {
       byte type = buff.readByte();
 
       switch (type) {
@@ -46,8 +44,7 @@ public class StreamMessageUtil extends MessageUtil {
 
    }
 
-   public static byte streamReadByte(Message message) {
-      ActiveMQBuffer buff = getBodyBuffer(message);
+   public static byte streamReadByte(ActiveMQBuffer buff) {
       int index = buff.readerIndex();
       try {
          byte type = buff.readByte();
@@ -68,8 +65,7 @@ public class StreamMessageUtil extends MessageUtil {
 
    }
 
-   public static short streamReadShort(Message message) {
-      ActiveMQBuffer buff = getBodyBuffer(message);
+   public static short streamReadShort(ActiveMQBuffer buff) {
       byte type = buff.readByte();
       switch (type) {
          case DataConstants.BYTE:
@@ -84,8 +80,7 @@ public class StreamMessageUtil extends MessageUtil {
       }
    }
 
-   public static char streamReadChar(Message message) {
-      ActiveMQBuffer buff = getBodyBuffer(message);
+   public static char streamReadChar(ActiveMQBuffer buff) {
       byte type = buff.readByte();
       switch (type) {
          case DataConstants.CHAR:
@@ -104,8 +99,7 @@ public class StreamMessageUtil extends MessageUtil {
 
    }
 
-   public static int streamReadInteger(Message message) {
-      ActiveMQBuffer buff = getBodyBuffer(message);
+   public static int streamReadInteger(ActiveMQBuffer buff) {
       byte type = buff.readByte();
       switch (type) {
          case DataConstants.BYTE:
@@ -122,8 +116,7 @@ public class StreamMessageUtil extends MessageUtil {
       }
    }
 
-   public static long streamReadLong(Message message) {
-      ActiveMQBuffer buff = getBodyBuffer(message);
+   public static long streamReadLong(ActiveMQBuffer buff) {
       byte type = buff.readByte();
       switch (type) {
          case DataConstants.BYTE:
@@ -142,8 +135,7 @@ public class StreamMessageUtil extends MessageUtil {
       }
    }
 
-   public static float streamReadFloat(Message message) {
-      ActiveMQBuffer buff = getBodyBuffer(message);
+   public static float streamReadFloat(ActiveMQBuffer buff) {
       byte type = buff.readByte();
       switch (type) {
          case DataConstants.FLOAT:
@@ -156,8 +148,7 @@ public class StreamMessageUtil extends MessageUtil {
       }
    }
 
-   public static double streamReadDouble(Message message) {
-      ActiveMQBuffer buff = getBodyBuffer(message);
+   public static double streamReadDouble(ActiveMQBuffer buff) {
       byte type = buff.readByte();
       switch (type) {
          case DataConstants.FLOAT:
@@ -172,8 +163,7 @@ public class StreamMessageUtil extends MessageUtil {
       }
    }
 
-   public static String streamReadString(Message message) {
-      ActiveMQBuffer buff = getBodyBuffer(message);
+   public static String streamReadString(ActiveMQBuffer buff) {
       byte type = buff.readByte();
       switch (type) {
          case DataConstants.BOOLEAN:
@@ -204,12 +194,10 @@ public class StreamMessageUtil extends MessageUtil {
     * It will return remainingBytes, bytesRead
     *
     * @param remainingBytes remaining Bytes from previous read. Send it to 0 if it was the first call for the message
-    * @param message
+    * @param buff
     * @return a pair of remaining bytes and bytes read
     */
-   public static Pair<Integer, Integer> streamReadBytes(Message message, int remainingBytes, byte[] value) {
-      ActiveMQBuffer buff = getBodyBuffer(message);
-
+   public static Pair<Integer, Integer> streamReadBytes(ActiveMQBuffer buff, int remainingBytes, byte[] value) {
       if (remainingBytes == -1) {
          return new Pair<>(0, -1);
       }
@@ -230,9 +218,7 @@ public class StreamMessageUtil extends MessageUtil {
 
    }
 
-   public static Object streamReadObject(Message message) {
-      ActiveMQBuffer buff = getBodyBuffer(message);
-
+   public static Object streamReadObject(ActiveMQBuffer buff) {
       byte type = buff.readByte();
       switch (type) {
          case DataConstants.BOOLEAN:

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/TextMessageUtil.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/TextMessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/TextMessageUtil.java
index c7515fc..c9ece4d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/TextMessageUtil.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/TextMessageUtil.java
@@ -17,7 +17,6 @@
 package org.apache.activemq.artemis.reader;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 
 public class TextMessageUtil extends MessageUtil {
@@ -25,8 +24,7 @@ public class TextMessageUtil extends MessageUtil {
    /**
     * Utility method to set the Text message on a message body
     */
-   public static void writeBodyText(Message message, SimpleString text) {
-      ActiveMQBuffer buff = getBodyBuffer(message);
+   public static void writeBodyText(ActiveMQBuffer buff, SimpleString text) {
       buff.clear();
       buff.writeNullableSimpleString(text);
    }
@@ -34,8 +32,7 @@ public class TextMessageUtil extends MessageUtil {
    /**
     * Utility method to set the Text message on a message body
     */
-   public static SimpleString readBodyText(Message message) {
-      ActiveMQBuffer buff = getBodyBuffer(message);
+   public static SimpleString readBodyText(ActiveMQBuffer buff) {
       buff.resetReaderIndex();
       return buff.readNullableSimpleString();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java
index 72770a4..09d6b18 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java
@@ -102,7 +102,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
    public boolean readBoolean() throws JMSException {
       checkRead();
       try {
-         return bytesReadBoolean(message);
+         return bytesReadBoolean(message.getBodyBuffer());
       }
       catch (IndexOutOfBoundsException e) {
          throw new MessageEOFException("");
@@ -112,7 +112,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
    public byte readByte() throws JMSException {
       checkRead();
       try {
-         return bytesReadByte(message);
+         return bytesReadByte(message.getBodyBuffer());
       }
       catch (IndexOutOfBoundsException e) {
          throw new MessageEOFException("");
@@ -122,7 +122,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
    public int readUnsignedByte() throws JMSException {
       checkRead();
       try {
-         return bytesReadUnsignedByte(message);
+         return bytesReadUnsignedByte(message.getBodyBuffer());
       }
       catch (IndexOutOfBoundsException e) {
          throw new MessageEOFException("");
@@ -132,7 +132,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
    public short readShort() throws JMSException {
       checkRead();
       try {
-         return bytesReadShort(message);
+         return bytesReadShort(message.getBodyBuffer());
       }
       catch (IndexOutOfBoundsException e) {
          throw new MessageEOFException("");
@@ -142,7 +142,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
    public int readUnsignedShort() throws JMSException {
       checkRead();
       try {
-         return bytesReadUnsignedShort(message);
+         return bytesReadUnsignedShort(message.getBodyBuffer());
       }
       catch (IndexOutOfBoundsException e) {
          throw new MessageEOFException("");
@@ -152,7 +152,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
    public char readChar() throws JMSException {
       checkRead();
       try {
-         return bytesReadChar(message);
+         return bytesReadChar(message.getBodyBuffer());
       }
       catch (IndexOutOfBoundsException e) {
          throw new MessageEOFException("");
@@ -162,7 +162,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
    public int readInt() throws JMSException {
       checkRead();
       try {
-         return bytesReadInt(message);
+         return bytesReadInt(message.getBodyBuffer());
       }
       catch (IndexOutOfBoundsException e) {
          throw new MessageEOFException("");
@@ -172,7 +172,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
    public long readLong() throws JMSException {
       checkRead();
       try {
-         return bytesReadLong(message);
+         return bytesReadLong(message.getBodyBuffer());
       }
       catch (IndexOutOfBoundsException e) {
          throw new MessageEOFException("");
@@ -182,7 +182,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
    public float readFloat() throws JMSException {
       checkRead();
       try {
-         return bytesReadFloat(message);
+         return bytesReadFloat(message.getBodyBuffer());
       }
       catch (IndexOutOfBoundsException e) {
          throw new MessageEOFException("");
@@ -192,7 +192,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
    public double readDouble() throws JMSException {
       checkRead();
       try {
-         return bytesReadDouble(message);
+         return bytesReadDouble(message.getBodyBuffer());
       }
       catch (IndexOutOfBoundsException e) {
          throw new MessageEOFException("");
@@ -202,7 +202,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
    public String readUTF() throws JMSException {
       checkRead();
       try {
-         return bytesReadUTF(message);
+         return bytesReadUTF(message.getBodyBuffer());
       }
       catch (IndexOutOfBoundsException e) {
          throw new MessageEOFException("");
@@ -217,59 +217,59 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
 
    public int readBytes(final byte[] value) throws JMSException {
       checkRead();
-      return bytesReadBytes(message, value);
+      return bytesReadBytes(message.getBodyBuffer(), value);
    }
 
    public int readBytes(final byte[] value, final int length) throws JMSException {
       checkRead();
-      return bytesReadBytes(message, value, length);
+      return bytesReadBytes(message.getBodyBuffer(), value, length);
 
    }
 
    public void writeBoolean(final boolean value) throws JMSException {
       checkWrite();
-      bytesWriteBoolean(message, value);
+      bytesWriteBoolean(message.getBodyBuffer(), value);
    }
 
    public void writeByte(final byte value) throws JMSException {
       checkWrite();
-      bytesWriteByte(message, value);
+      bytesWriteByte(message.getBodyBuffer(), value);
    }
 
    public void writeShort(final short value) throws JMSException {
       checkWrite();
-      bytesWriteShort(message, value);
+      bytesWriteShort(message.getBodyBuffer(), value);
    }
 
    public void writeChar(final char value) throws JMSException {
       checkWrite();
-      bytesWriteChar(message, value);
+      bytesWriteChar(message.getBodyBuffer(), value);
    }
 
    public void writeInt(final int value) throws JMSException {
       checkWrite();
-      bytesWriteInt(message, value);
+      bytesWriteInt(message.getBodyBuffer(), value);
    }
 
    public void writeLong(final long value) throws JMSException {
       checkWrite();
-      bytesWriteLong(message, value);
+      bytesWriteLong(message.getBodyBuffer(), value);
    }
 
    public void writeFloat(final float value) throws JMSException {
       checkWrite();
-      bytesWriteFloat(message, value);
+      bytesWriteFloat(message.getBodyBuffer(), value);
    }
 
    public void writeDouble(final double value) throws JMSException {
       checkWrite();
-      bytesWriteDouble(message, value);
+      bytesWriteDouble(message.getBodyBuffer(), value);
    }
 
    public void writeUTF(final String value) throws JMSException {
       checkWrite();
       try {
-         bytesWriteUTF(message, value);
+         bytesWriteUTF(message.getBodyBuffer(), value);
       }
       catch (Exception e) {
          JMSException je = new JMSException("Failed to write UTF");
@@ -282,17 +282,17 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
 
    public void writeBytes(final byte[] value) throws JMSException {
       checkWrite();
-      bytesWriteBytes(message, value);
+      bytesWriteBytes(message.getBodyBuffer(), value);
    }
 
    public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException {
       checkWrite();
-      bytesWriteBytes(message, value, offset, length);
+      bytesWriteBytes(message.getBodyBuffer(), value, offset, length);
    }
 
    public void writeObject(final Object value) throws JMSException {
       checkWrite();
-      if (!bytesWriteObject(message, value)) {
+      if (!bytesWriteObject(message.getBodyBuffer(), value)) {
          throw new MessageFormatException("Invalid object for properties");
       }
    }
@@ -304,7 +304,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
          bodyLength = message.getBodySize();
       }
 
-      bytesMessageReset(message);
+      bytesMessageReset(message.getBodyBuffer());
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMapMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMapMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMapMessage.java
index 35ee0e4..747316d 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMapMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMapMessage.java
@@ -317,7 +317,7 @@ public final class ActiveMQMapMessage extends ActiveMQMessage implements MapMess
    @Override
    public void doBeforeSend() throws Exception {
       if (invalid) {
-         writeBodyMap(message, map);
+         writeBodyMap(message.getBodyBuffer(), map);
          invalid = false;
       }
 
@@ -328,7 +328,7 @@ public final class ActiveMQMapMessage extends ActiveMQMessage implements MapMess
    public void doBeforeReceive() throws ActiveMQException {
       super.doBeforeReceive();
 
-      readBodyMap(message, map);
+      readBodyMap(message.getBodyBuffer(), map);
    }
 
    // Package protected ---------------------------------------------

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java
index 4a8d5a8..315721f 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java
@@ -89,7 +89,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
    public boolean readBoolean() throws JMSException {
       checkRead();
       try {
-         return streamReadBoolean(message);
+         return streamReadBoolean(message.getBodyBuffer());
       }
       catch (IllegalStateException e) {
          throw new MessageFormatException(e.getMessage());
@@ -103,7 +103,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
       checkRead();
 
       try {
-         return streamReadByte(message);
+         return streamReadByte(message.getBodyBuffer());
       }
       catch (IllegalStateException e) {
          throw new MessageFormatException(e.getMessage());
@@ -116,7 +116,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
    public short readShort() throws JMSException {
       checkRead();
       try {
-         return streamReadShort(message);
+         return streamReadShort(message.getBodyBuffer());
       }
       catch (IllegalStateException e) {
          throw new MessageFormatException(e.getMessage());
@@ -129,7 +129,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
    public char readChar() throws JMSException {
       checkRead();
       try {
-         return streamReadChar(message);
+         return streamReadChar(message.getBodyBuffer());
       }
       catch (IllegalStateException e) {
          throw new MessageFormatException(e.getMessage());
@@ -142,7 +142,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
    public int readInt() throws JMSException {
       checkRead();
       try {
-         return streamReadInteger(message);
+         return streamReadInteger(message.getBodyBuffer());
       }
       catch (IllegalStateException e) {
          throw new MessageFormatException(e.getMessage());
@@ -155,7 +155,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
    public long readLong() throws JMSException {
       checkRead();
       try {
-         return streamReadLong(message);
+         return streamReadLong(message.getBodyBuffer());
       }
       catch (IllegalStateException e) {
          throw new MessageFormatException(e.getMessage());
@@ -168,7 +168,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
    public float readFloat() throws JMSException {
       checkRead();
       try {
-         return streamReadFloat(message);
+         return streamReadFloat(message.getBodyBuffer());
       }
       catch (IllegalStateException e) {
          throw new MessageFormatException(e.getMessage());
@@ -181,7 +181,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
    public double readDouble() throws JMSException {
       checkRead();
       try {
-         return streamReadDouble(message);
+         return streamReadDouble(message.getBodyBuffer());
       }
       catch (IllegalStateException e) {
          throw new MessageFormatException(e.getMessage());
@@ -194,7 +194,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
    public String readString() throws JMSException {
       checkRead();
       try {
-         return streamReadString(message);
+         return streamReadString(message.getBodyBuffer());
       }
       catch (IllegalStateException e) {
          throw new MessageFormatException(e.getMessage());
@@ -212,7 +212,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
    public int readBytes(final byte[] value) throws JMSException {
       checkRead();
       try {
-         Pair<Integer, Integer> pairRead = streamReadBytes(message, len, value);
+         Pair<Integer, Integer> pairRead = streamReadBytes(message.getBodyBuffer(), len, value);
 
          len = pairRead.getA();
          return pairRead.getB();
@@ -228,7 +228,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
    public Object readObject() throws JMSException {
       checkRead();
       try {
-         return streamReadObject(message);
+         return streamReadObject(message.getBodyBuffer());
       }
       catch (IllegalStateException e) {
          throw new MessageFormatException(e.getMessage());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTextMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTextMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTextMessage.java
index b9fba2d..bd0a615 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTextMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTextMessage.java
@@ -84,7 +84,7 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
          this.text = null;
       }
 
-      writeBodyText(message, this.text);
+      writeBodyText(message.getBodyBuffer(), this.text);
    }
 
    public String getText() {
@@ -109,7 +109,7 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
    public void doBeforeReceive() throws ActiveMQException {
       super.doBeforeReceive();
 
-      text = readBodyText(message);
+      text = readBodyText(message.getBodyBuffer());
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java
index 76e2515..cc436f4 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java
@@ -60,128 +60,128 @@ public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMess
 
    @Override
    public boolean readBoolean() throws JMSException {
-      return bytesReadBoolean(message);
+      return bytesReadBoolean(getReadBodyBuffer());
    }
 
    @Override
    public byte readByte() throws JMSException {
-      return bytesReadByte(message);
+      return bytesReadByte(getReadBodyBuffer());
    }
 
    @Override
    public int readUnsignedByte() throws JMSException {
-      return bytesReadUnsignedByte(message);
+      return bytesReadUnsignedByte(getReadBodyBuffer());
    }
 
    @Override
    public short readShort() throws JMSException {
-      return bytesReadShort(message);
+      return bytesReadShort(getReadBodyBuffer());
    }
 
    @Override
    public int readUnsignedShort() throws JMSException {
-      return bytesReadUnsignedShort(message);
+      return bytesReadUnsignedShort(getReadBodyBuffer());
    }
 
    @Override
    public char readChar() throws JMSException {
-      return bytesReadChar(message);
+      return bytesReadChar(getReadBodyBuffer());
    }
 
    @Override
    public int readInt() throws JMSException {
-      return bytesReadInt(message);
+      return bytesReadInt(getReadBodyBuffer());
    }
 
    @Override
    public long readLong() throws JMSException {
-      return bytesReadLong(message);
+      return bytesReadLong(getReadBodyBuffer());
    }
 
    @Override
    public float readFloat() throws JMSException {
-      return bytesReadFloat(message);
+      return bytesReadFloat(getReadBodyBuffer());
    }
 
    @Override
    public double readDouble() throws JMSException {
-      return bytesReadDouble(message);
+      return bytesReadDouble(getReadBodyBuffer());
    }
 
    @Override
    public String readUTF() throws JMSException {
-      return bytesReadUTF(message);
+      return bytesReadUTF(getReadBodyBuffer());
    }
 
    @Override
    public int readBytes(byte[] value) throws JMSException {
-      return bytesReadBytes(message, value);
+      return bytesReadBytes(getReadBodyBuffer(), value);
    }
 
    @Override
    public int readBytes(byte[] value, int length) throws JMSException {
-      return bytesReadBytes(message, value, length);
+      return bytesReadBytes(getReadBodyBuffer(), value, length);
    }
 
    @Override
    public void writeBoolean(boolean value) throws JMSException {
-      bytesWriteBoolean(message, value);
+      bytesWriteBoolean(getWriteBodyBuffer(), value);
 
    }
 
    @Override
    public void writeByte(byte value) throws JMSException {
-      bytesWriteByte(message, value);
+      bytesWriteByte(getWriteBodyBuffer(), value);
    }
 
    @Override
    public void writeShort(short value) throws JMSException {
-      bytesWriteShort(message, value);
+      bytesWriteShort(getWriteBodyBuffer(), value);
    }
 
    @Override
    public void writeChar(char value) throws JMSException {
-      bytesWriteChar(message, value);
+      bytesWriteChar(getWriteBodyBuffer(), value);
    }
 
    @Override
    public void writeInt(int value) throws JMSException {
-      bytesWriteInt(message, value);
+      bytesWriteInt(getWriteBodyBuffer(), value);
    }
 
    @Override
    public void writeLong(long value) throws JMSException {
-      bytesWriteLong(message, value);
+      bytesWriteLong(getWriteBodyBuffer(), value);
    }
 
    @Override
    public void writeFloat(float value) throws JMSException {
-      bytesWriteFloat(message, value);
+      bytesWriteFloat(getWriteBodyBuffer(), value);
    }
 
    @Override
    public void writeDouble(double value) throws JMSException {
-      bytesWriteDouble(message, value);
+      bytesWriteDouble(getWriteBodyBuffer(), value);
    }
 
    @Override
    public void writeUTF(String value) throws JMSException {
-      bytesWriteUTF(message, value);
+      bytesWriteUTF(getWriteBodyBuffer(), value);
    }
 
    @Override
    public void writeBytes(byte[] value) throws JMSException {
-      bytesWriteBytes(message, value);
+      bytesWriteBytes(getWriteBodyBuffer(), value);
    }
 
    @Override
    public void writeBytes(byte[] value, int offset, int length) throws JMSException {
-      bytesWriteBytes(message, value, offset, length);
+      bytesWriteBytes(getWriteBodyBuffer(), value, offset, length);
    }
 
    @Override
    public void writeObject(Object value) throws JMSException {
-      if (!bytesWriteObject(message, value)) {
+      if (!bytesWriteObject(getWriteBodyBuffer(), value)) {
          throw new JMSException("Can't make conversion of " + value + " to any known type");
       }
    }
@@ -199,7 +199,8 @@ public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMess
 
    @Override
    public void reset() throws JMSException {
-      bytesMessageReset(message);
+      bytesMessageReset(getReadBodyBuffer());
+      bytesMessageReset(getWriteBodyBuffer());
    }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMapMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMapMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMapMessage.java
index bbece71..e41a1a3 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMapMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMapMessage.java
@@ -247,12 +247,12 @@ public final class ServerJMSMapMessage extends ServerJMSMessage implements MapMe
 
    public void encode() throws Exception {
       super.encode();
-      writeBodyMap(message, map);
+      writeBodyMap(getWriteBodyBuffer(), map);
    }
 
    public void decode() throws Exception {
       super.decode();
-      readBodyMap(message, map);
+      readBodyMap(getReadBodyBuffer(), map);
    }
 
    // Package protected ---------------------------------------------

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java
index 315dd12..afacd21 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java
@@ -23,6 +23,7 @@ import javax.jms.Message;
 import java.util.Collections;
 import java.util.Enumeration;
 
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.message.impl.MessageInternal;
@@ -45,6 +46,24 @@ public class ServerJMSMessage implements Message {
       this.deliveryCount = deliveryCount;
    }
 
+   private ActiveMQBuffer readBodyBuffer;
+
+   /** When reading we use a protected copy so multi-threads can work fine */
+   protected ActiveMQBuffer getReadBodyBuffer() {
+      if (readBodyBuffer == null) {
+         // to avoid clashes between multiple threads
+         readBodyBuffer = message.getBodyBufferCopy();
+      }
+      return readBodyBuffer;
+   }
+
+   /** When writing on the conversion we use the buffer directly */
+   protected ActiveMQBuffer getWriteBodyBuffer() {
+      readBodyBuffer = null; // it invalidates this buffer if anything is written
+      return message.getBodyBuffer();
+   }
+
+
    @Override
    public final String getJMSMessageID() throws JMSException {
       return null;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java
index 1afc8eb..9b70f57 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java
@@ -21,13 +21,11 @@ import javax.jms.MessageEOFException;
 import javax.jms.MessageFormatException;
 import javax.jms.StreamMessage;
 
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 import org.apache.activemq.artemis.utils.DataConstants;
 
-import static org.apache.activemq.artemis.reader.MessageUtil.getBodyBuffer;
 import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadBoolean;
 import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadByte;
 import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadBytes;
@@ -48,14 +46,14 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
 
    public ServerJMSStreamMessage(MessageInternal message, int deliveryCount) {
       super(message, deliveryCount);
-
    }
 
    // StreamMessage implementation ----------------------------------
 
    public boolean readBoolean() throws JMSException {
+
       try {
-         return streamReadBoolean(message);
+         return streamReadBoolean(getReadBodyBuffer());
       }
       catch (IllegalStateException e) {
          throw new MessageFormatException(e.getMessage());
@@ -67,7 +65,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
 
    public byte readByte() throws JMSException {
       try {
-         return streamReadByte(message);
+         return streamReadByte(getReadBodyBuffer());
       }
       catch (IllegalStateException e) {
          throw new MessageFormatException(e.getMessage());
@@ -80,7 +78,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
    public short readShort() throws JMSException {
 
       try {
-         return streamReadShort(message);
+         return streamReadShort(getReadBodyBuffer());
       }
       catch (IllegalStateException e) {
          throw new MessageFormatException(e.getMessage());
@@ -93,7 +91,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
    public char readChar() throws JMSException {
 
       try {
-         return streamReadChar(message);
+         return streamReadChar(getReadBodyBuffer());
       }
       catch (IllegalStateException e) {
          throw new MessageFormatException(e.getMessage());
@@ -106,7 +104,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
    public int readInt() throws JMSException {
 
       try {
-         return streamReadInteger(message);
+         return streamReadInteger(getReadBodyBuffer());
       }
       catch (IllegalStateException e) {
          throw new MessageFormatException(e.getMessage());
@@ -119,7 +117,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
    public long readLong() throws JMSException {
 
       try {
-         return streamReadLong(message);
+         return streamReadLong(getReadBodyBuffer());
       }
       catch (IllegalStateException e) {
          throw new MessageFormatException(e.getMessage());
@@ -132,7 +130,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
    public float readFloat() throws JMSException {
 
       try {
-         return streamReadFloat(message);
+         return streamReadFloat(getReadBodyBuffer());
       }
       catch (IllegalStateException e) {
          throw new MessageFormatException(e.getMessage());
@@ -145,7 +143,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
    public double readDouble() throws JMSException {
 
       try {
-         return streamReadDouble(message);
+         return streamReadDouble(getReadBodyBuffer());
       }
       catch (IllegalStateException e) {
          throw new MessageFormatException(e.getMessage());
@@ -158,7 +156,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
    public String readString() throws JMSException {
 
       try {
-         return streamReadString(message);
+         return streamReadString(getReadBodyBuffer());
       }
       catch (IllegalStateException e) {
          throw new MessageFormatException(e.getMessage());
@@ -176,7 +174,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
    public int readBytes(final byte[] value) throws JMSException {
 
       try {
-         Pair<Integer, Integer> pairRead = streamReadBytes(message, len, value);
+         Pair<Integer, Integer> pairRead = streamReadBytes(getReadBodyBuffer(), len, value);
 
          len = pairRead.getA();
          return pairRead.getB();
@@ -191,11 +189,11 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
 
    public Object readObject() throws JMSException {
 
-      if (getBodyBuffer(message).readerIndex() >= message.getEndOfBodyPosition()) {
+      if (getReadBodyBuffer().readerIndex() >= message.getEndOfBodyPosition()) {
          throw new MessageEOFException("");
       }
       try {
-         return streamReadObject(message);
+         return streamReadObject(getReadBodyBuffer());
       }
       catch (IllegalStateException e) {
          throw new MessageFormatException(e.getMessage());
@@ -207,70 +205,70 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
 
    public void writeBoolean(final boolean value) throws JMSException {
 
-      getBuffer().writeByte(DataConstants.BOOLEAN);
-      getBuffer().writeBoolean(value);
+      getWriteBodyBuffer().writeByte(DataConstants.BOOLEAN);
+      getWriteBodyBuffer().writeBoolean(value);
    }
 
    public void writeByte(final byte value) throws JMSException {
 
-      getBuffer().writeByte(DataConstants.BYTE);
-      getBuffer().writeByte(value);
+      getWriteBodyBuffer().writeByte(DataConstants.BYTE);
+      getWriteBodyBuffer().writeByte(value);
    }
 
    public void writeShort(final short value) throws JMSException {
 
-      getBuffer().writeByte(DataConstants.SHORT);
-      getBuffer().writeShort(value);
+      getWriteBodyBuffer().writeByte(DataConstants.SHORT);
+      getWriteBodyBuffer().writeShort(value);
    }
 
    public void writeChar(final char value) throws JMSException {
 
-      getBuffer().writeByte(DataConstants.CHAR);
-      getBuffer().writeShort((short) value);
+      getWriteBodyBuffer().writeByte(DataConstants.CHAR);
+      getWriteBodyBuffer().writeShort((short) value);
    }
 
    public void writeInt(final int value) throws JMSException {
 
-      getBuffer().writeByte(DataConstants.INT);
-      getBuffer().writeInt(value);
+      getWriteBodyBuffer().writeByte(DataConstants.INT);
+      getWriteBodyBuffer().writeInt(value);
    }
 
    public void writeLong(final long value) throws JMSException {
 
-      getBuffer().writeByte(DataConstants.LONG);
-      getBuffer().writeLong(value);
+      getWriteBodyBuffer().writeByte(DataConstants.LONG);
+      getWriteBodyBuffer().writeLong(value);
    }
 
    public void writeFloat(final float value) throws JMSException {
 
-      getBuffer().writeByte(DataConstants.FLOAT);
-      getBuffer().writeInt(Float.floatToIntBits(value));
+      getWriteBodyBuffer().writeByte(DataConstants.FLOAT);
+      getWriteBodyBuffer().writeInt(Float.floatToIntBits(value));
    }
 
    public void writeDouble(final double value) throws JMSException {
 
-      getBuffer().writeByte(DataConstants.DOUBLE);
-      getBuffer().writeLong(Double.doubleToLongBits(value));
+      getWriteBodyBuffer().writeByte(DataConstants.DOUBLE);
+      getWriteBodyBuffer().writeLong(Double.doubleToLongBits(value));
    }
 
    public void writeString(final String value) throws JMSException {
 
-      getBuffer().writeByte(DataConstants.STRING);
-      getBuffer().writeNullableString(value);
+      getWriteBodyBuffer().writeByte(DataConstants.STRING);
+      getWriteBodyBuffer().writeNullableString(value);
    }
 
    public void writeBytes(final byte[] value) throws JMSException {
 
-      getBuffer().writeByte(DataConstants.BYTES);
-      getBuffer().writeInt(value.length);
-      getBuffer().writeBytes(value);
+      getWriteBodyBuffer().writeByte(DataConstants.BYTES);
+      getWriteBodyBuffer().writeInt(value.length);
+      getWriteBodyBuffer().writeBytes(value);
    }
 
    public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException {
 
-      getBuffer().writeByte(DataConstants.BYTES);
-      getBuffer().writeInt(length);
-      getBuffer().writeBytes(value, offset, length);
+      getWriteBodyBuffer().writeByte(DataConstants.BYTES);
+      getWriteBodyBuffer().writeInt(length);
+      getWriteBodyBuffer().writeBytes(value, offset, length);
    }
 
    public void writeObject(final Object value) throws JMSException {
@@ -313,7 +311,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
    }
 
    public void reset() throws JMSException {
-      getBuffer().resetReaderIndex();
+      getWriteBodyBuffer().resetReaderIndex();
    }
 
    // ActiveMQRAMessage overrides ----------------------------------------
@@ -322,11 +320,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
    public void clearBody() throws JMSException {
       super.clearBody();
 
-      getBuffer().clear();
-   }
-
-   private ActiveMQBuffer getBuffer() {
-      return message.getBodyBuffer();
+      getWriteBodyBuffer().clear();
    }
 
    public void decode() throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSTextMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSTextMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSTextMessage.java
index 95e24b5..3191067 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSTextMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSTextMessage.java
@@ -63,7 +63,7 @@ public class ServerJMSTextMessage extends ServerJMSMessage implements TextMessag
          this.text = null;
       }
 
-      writeBodyText(message, this.text);
+      writeBodyText(getWriteBodyBuffer(), this.text);
    }
 
    public String getText() {
@@ -84,12 +84,12 @@ public class ServerJMSTextMessage extends ServerJMSMessage implements TextMessag
 
    public void encode() throws Exception {
       super.encode();
-      writeBodyText(message, text);
+      writeBodyText(getWriteBodyBuffer(), text);
    }
 
    public void decode() throws Exception {
       super.decode();
-      text = readBodyText(message);
+      text = readBodyText(getReadBodyBuffer());
    }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java
index 0b5cb51..b563e61 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.protocol.proton;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -473,7 +474,7 @@ public class TestConversions extends Assert {
       }
 
       @Override
-      public short readUnsignedByte() {
+      public int readUnsignedByte() {
          return 0;
       }
 
@@ -588,8 +589,8 @@ public class TestConversions extends Assert {
       }
 
       @Override
-      public void skipBytes(int length) {
-
+      public int skipBytes(int length) {
+         return length;
       }
 
       @Override
@@ -683,6 +684,19 @@ public class TestConversions extends Assert {
       }
 
       @Override
+      public void readFully(byte[] b) throws IOException {
+      }
+
+      @Override
+      public void readFully(byte[] b, int off, int len) throws IOException {
+      }
+
+      @Override
+      public String readLine() throws IOException {
+         return null;
+      }
+
+      @Override
       public ActiveMQBuffer copy() {
          return null;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/DataInputWrapper.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/DataInputWrapper.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/DataInputWrapper.java
deleted file mode 100644
index 32d37d4..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/DataInputWrapper.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.utils.UTF8Util;
-import org.apache.activemq.artemis.utils.UTF8Util.StringUtilBuffer;
-
-public class DataInputWrapper implements DataInput {
-
-   private static final int DEFAULT_CAPACITY = 1024 * 1024;
-   private static final NotEnoughBytesException exception = new NotEnoughBytesException();
-   private ByteBuffer internalBuffer;
-
-   public DataInputWrapper() {
-      this(DEFAULT_CAPACITY);
-   }
-
-   public DataInputWrapper(int capacity) {
-      this.internalBuffer = ByteBuffer.allocateDirect(capacity);
-      this.internalBuffer.mark();
-      this.internalBuffer.limit(0);
-   }
-
-   public void receiveData(byte[] data) {
-      int newSize = data.length;
-      int freeSpace = internalBuffer.capacity() - internalBuffer.limit();
-      if (freeSpace < newSize) {
-         internalBuffer.reset();
-         internalBuffer.compact();
-         if (internalBuffer.remaining() < newSize) {
-            //need to enlarge
-         }
-         //make sure mark is at zero and position is at effective limit
-         int pos = internalBuffer.position();
-         internalBuffer.position(0);
-         internalBuffer.mark();
-         internalBuffer.position(pos);
-      }
-      else {
-         internalBuffer.position(internalBuffer.limit());
-         internalBuffer.limit(internalBuffer.capacity());
-      }
-      internalBuffer.put(data);
-      internalBuffer.limit(internalBuffer.position());
-      internalBuffer.reset();
-   }
-
-   public void receiveData(ActiveMQBuffer buffer) {
-      int newSize = buffer.readableBytes();
-      byte[] newData = new byte[newSize];
-      buffer.readBytes(newData);
-      this.receiveData(newData);
-   }
-
-   //invoke after each successful unmarshall
-   public void mark() {
-      this.internalBuffer.mark();
-   }
-
-   @Override
-   public void readFully(byte[] b) throws IOException {
-      readFully(b, 0, b.length);
-   }
-
-   private void checkSize(int n) throws NotEnoughBytesException {
-      if (internalBuffer.remaining() < n) {
-         throw exception;
-      }
-   }
-
-   @Override
-   public void readFully(byte[] b, int off, int len) throws IOException {
-      checkSize(len);
-      internalBuffer.get(b, off, len);
-   }
-
-   @Override
-   public int skipBytes(int n) throws IOException {
-      checkSize(n);
-      int pos = internalBuffer.position();
-      internalBuffer.position(pos + n);
-      return n;
-   }
-
-   @Override
-   public boolean readBoolean() throws IOException {
-      checkSize(1);
-      byte b = internalBuffer.get();
-      return b != 0;
-   }
-
-   @Override
-   public byte readByte() throws IOException {
-      checkSize(1);
-      return this.internalBuffer.get();
-   }
-
-   @Override
-   public int readUnsignedByte() throws IOException {
-      checkSize(1);
-      return 0xFF & this.internalBuffer.get();
-   }
-
-   @Override
-   public short readShort() throws IOException {
-      checkSize(2);
-      return this.internalBuffer.getShort();
-   }
-
-   @Override
-   public int readUnsignedShort() throws IOException {
-      checkSize(2);
-      return 0xFFFF & this.internalBuffer.getShort();
-   }
-
-   @Override
-   public char readChar() throws IOException {
-      checkSize(2);
-      return this.internalBuffer.getChar();
-   }
-
-   @Override
-   public int readInt() throws IOException {
-      checkSize(4);
-      return this.internalBuffer.getInt();
-   }
-
-   @Override
-   public long readLong() throws IOException {
-      checkSize(8);
-      return this.internalBuffer.getLong();
-   }
-
-   @Override
-   public float readFloat() throws IOException {
-      checkSize(4);
-      return this.internalBuffer.getFloat();
-   }
-
-   @Override
-   public double readDouble() throws IOException {
-      checkSize(8);
-      return this.internalBuffer.getDouble();
-   }
-
-   @Override
-   public String readLine() throws IOException {
-      StringBuilder sb = new StringBuilder("");
-      char c = this.readChar();
-      while (c != '\n') {
-         sb.append(c);
-         c = this.readChar();
-      }
-      return sb.toString();
-   }
-
-   @Override
-   public String readUTF() throws IOException {
-      StringUtilBuffer buffer = UTF8Util.getThreadLocalBuffer();
-
-      final int size = this.readUnsignedShort();
-
-      if (size > buffer.byteBuffer.length) {
-         buffer.resizeByteBuffer(size);
-      }
-
-      if (size > buffer.charBuffer.length) {
-         buffer.resizeCharBuffer(size);
-      }
-
-      int count = 0;
-      int byte1, byte2, byte3;
-      int charCount = 0;
-
-      this.readFully(buffer.byteBuffer, 0, size);
-
-      while (count < size) {
-         byte1 = buffer.byteBuffer[count++];
-
-         if (byte1 > 0 && byte1 <= 0x7F) {
-            buffer.charBuffer[charCount++] = (char) byte1;
-         }
-         else {
-            int c = byte1 & 0xff;
-            switch (c >> 4) {
-               case 0xc:
-               case 0xd:
-                  byte2 = buffer.byteBuffer[count++];
-                  buffer.charBuffer[charCount++] = (char) ((c & 0x1F) << 6 | byte2 & 0x3F);
-                  break;
-               case 0xe:
-                  byte2 = buffer.byteBuffer[count++];
-                  byte3 = buffer.byteBuffer[count++];
-                  buffer.charBuffer[charCount++] = (char) ((c & 0x0F) << 12 | (byte2 & 0x3F) << 6 | (byte3 & 0x3F) << 0);
-                  break;
-               default:
-                  throw new InternalError("unhandled utf8 byte " + c);
-            }
-         }
-      }
-
-      return new String(buffer.charBuffer, 0, charCount);
-   }
-
-   public boolean readable() {
-      return this.internalBuffer.hasRemaining();
-   }
-
-}


[2/4] activemq-artemis git commit: ARTEMIS-204 Improvements on OpenWire

Posted by cl...@apache.org.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index e7582f2..ada2a70 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -40,9 +40,23 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQBrokerStoppedException;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsumerBrokerExchange;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumerBrokerExchange;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQMessageAuthorizationPolicy;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleConsumerBrokerExchange;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransaction;
+import org.apache.activemq.artemis.core.remoting.CloseListener;
+import org.apache.activemq.artemis.core.remoting.FailureListener;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.utils.ConcurrentHashSet;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.BrokerInfo;
@@ -77,20 +91,7 @@ import org.apache.activemq.command.ShutdownInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.command.WireFormatInfo;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQBrokerStoppedException;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumerBrokerExchange;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQMessageAuthorizationPolicy;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransaction;
-import org.apache.activemq.artemis.core.remoting.CloseListener;
-import org.apache.activemq.artemis.core.remoting.FailureListener;
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
-import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.state.CommandVisitor;
 import org.apache.activemq.state.ConnectionState;
 import org.apache.activemq.state.ConsumerState;
@@ -100,7 +101,6 @@ import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.TransmitCallback;
 import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.artemis.utils.ConcurrentHashSet;
 import org.apache.activemq.wireformat.WireFormat;
 
 /**
@@ -176,8 +176,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
 
    private final Set<String> tempQueues = new ConcurrentHashSet<String>();
 
-   private DataInputWrapper dataInput = new DataInputWrapper();
-
    private Map<TransactionId, TransactionInfo> txMap = new ConcurrentHashMap<TransactionId, TransactionInfo>();
 
    private volatile AMQSession advisorySession;
@@ -196,96 +194,78 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
    @Override
    public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
       try {
-         dataInput.receiveData(buffer);
-      }
-      catch (Throwable t) {
-         ActiveMQServerLogger.LOGGER.error("decoding error", t);
-         return;
-      }
-
-      // this.setDataReceived();
-      while (dataInput.readable()) {
-         try {
-            Object object = null;
-            try {
-               object = wireFormat.unmarshal(dataInput);
-               dataInput.mark();
-            }
-            catch (NotEnoughBytesException e) {
-               //meaning the dataInput hasn't enough bytes for a command.
-               //in that case we just return and waiting for the next
-               //call of bufferReceived()
-               return;
+         Object object = wireFormat.unmarshal(buffer);
+
+         Command command = (Command) object;
+         boolean responseRequired = command.isResponseRequired();
+         int commandId = command.getCommandId();
+         // the connection handles pings, negotiations directly.
+         // and delegate all other commands to manager.
+         if (command.getClass() == KeepAliveInfo.class) {
+            KeepAliveInfo info = (KeepAliveInfo) command;
+            if (info.isResponseRequired()) {
+               info.setResponseRequired(false);
+               protocolManager.sendReply(this, info);
             }
+         }
+         else if (command.getClass() == WireFormatInfo.class) {
+            // amq here starts a read/write monitor thread (detect ttl?)
+            negotiate((WireFormatInfo) command);
+         }
+         else if (command.getClass() == ConnectionInfo.class || command.getClass() == ConsumerInfo.class || command.getClass() == RemoveInfo.class ||
+                  command.getClass() == SessionInfo.class || command.getClass() == ProducerInfo.class || ActiveMQMessage.class.isAssignableFrom(command.getClass()) ||
+                  command.getClass() == MessageAck.class || command.getClass() == TransactionInfo.class || command.getClass() == DestinationInfo.class ||
+                  command.getClass() == ShutdownInfo.class || command.getClass() == RemoveSubscriptionInfo.class) {
+            Response response = null;
 
-            Command command = (Command) object;
-            boolean responseRequired = command.isResponseRequired();
-            int commandId = command.getCommandId();
-            // the connection handles pings, negotiations directly.
-            // and delegate all other commands to manager.
-            if (command.getClass() == KeepAliveInfo.class) {
-               KeepAliveInfo info = (KeepAliveInfo) command;
-               if (info.isResponseRequired()) {
-                  info.setResponseRequired(false);
-                  protocolManager.sendReply(this, info);
-               }
-            }
-            else if (command.getClass() == WireFormatInfo.class) {
-               // amq here starts a read/write monitor thread (detect ttl?)
-               negotiate((WireFormatInfo) command);
+            if (pendingStop) {
+               response = new ExceptionResponse(this.stopError);
             }
-            else if (command.getClass() == ConnectionInfo.class || command.getClass() == ConsumerInfo.class || command.getClass() == RemoveInfo.class || command.getClass() == SessionInfo.class || command.getClass() == ProducerInfo.class || ActiveMQMessage.class.isAssignableFrom(command.getClass()) || command.getClass() == MessageAck.class || command.getClass() == TransactionInfo.class || command.getClass() == DestinationInfo.class || command.getClass() == ShutdownInfo.class || command.getClass() == RemoveSubscriptionInfo.class) {
-               Response response = null;
-
-               if (pendingStop) {
-                  response = new ExceptionResponse(this.stopError);
-               }
-               else {
-                  response = ((Command) command).visit(this);
-
-                  if (response instanceof ExceptionResponse) {
-                     if (!responseRequired) {
-                        Throwable cause = ((ExceptionResponse) response).getException();
-                        serviceException(cause);
-                        response = null;
-                     }
-                  }
-               }
-
-               if (responseRequired) {
-                  if (response == null) {
-                     response = new Response();
-                  }
-               }
+            else {
+               response = ((Command) command).visit(this);
 
-               // The context may have been flagged so that the response is not
-               // sent.
-               if (context != null) {
-                  if (context.isDontSendReponse()) {
-                     context.setDontSendReponse(false);
+               if (response instanceof ExceptionResponse) {
+                  if (!responseRequired) {
+                     Throwable cause = ((ExceptionResponse) response).getException();
+                     serviceException(cause);
                      response = null;
                   }
                }
+            }
 
-               if (response != null && !protocolManager.isStopping()) {
-                  response.setCorrelationId(commandId);
-                  dispatchSync(response);
+            if (responseRequired) {
+               if (response == null) {
+                  response = new Response();
                }
+            }
 
+            // The context may have been flagged so that the response is not
+            // sent.
+            if (context != null) {
+               if (context.isDontSendReponse()) {
+                  context.setDontSendReponse(false);
+                  response = null;
+               }
             }
-            else {
-               // note!!! wait for negotiation (e.g. use a countdown latch)
-               // before handling any other commands
-               this.protocolManager.handleCommand(this, command);
+
+            if (response != null && !protocolManager.isStopping()) {
+               response.setCorrelationId(commandId);
+               dispatchSync(response);
             }
+
          }
-         catch (IOException e) {
-            ActiveMQServerLogger.LOGGER.error("error decoding", e);
-         }
-         catch (Throwable t) {
-            ActiveMQServerLogger.LOGGER.error("error decoding", t);
+         else {
+            // note!!! wait for negotiation (e.g. use a countdown latch)
+            // before handling any other commands
+            this.protocolManager.handleCommand(this, command);
          }
       }
+      catch (IOException e) {
+         ActiveMQServerLogger.LOGGER.error("error decoding", e);
+      }
+      catch (Throwable t) {
+         ActiveMQServerLogger.LOGGER.error("error decoding", t);
+      }
    }
 
    private void negotiate(WireFormatInfo command) throws IOException {
@@ -624,6 +604,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
 
    public void serviceExceptionAsync(final IOException e) {
       if (asyncException.compareAndSet(false, true)) {
+         // Why this is not through an executor?
          new Thread("Async Exception Handler") {
             @Override
             public void run() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 260ee02..90518ec 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.protocol.openwire;
 
+import javax.jms.JMSException;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -23,13 +24,19 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Set;
 import java.util.Map.Entry;
-
-import javax.jms.JMSException;
+import java.util.Set;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
+import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.activemq.artemis.utils.TypedProperties;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.apache.activemq.command.ActiveMQBytesMessage;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMapMessage;
@@ -51,14 +58,6 @@ import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.MarshallingSupport;
 import org.apache.activemq.wireformat.WireFormat;
 import org.fusesource.hawtbuf.UTF8Buffer;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
-import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
-import org.apache.activemq.artemis.utils.DataConstants;
-import org.apache.activemq.artemis.utils.TypedProperties;
-import org.apache.activemq.artemis.utils.UUIDGenerator;
 
 public class OpenWireMessageConverter implements MessageConverter {
 
@@ -429,7 +428,7 @@ public class OpenWireMessageConverter implements MessageConverter {
       }
       amqMsg.setBrokerInTime(brokerInTime);
 
-      ActiveMQBuffer buffer = coreMessage.getBodyBuffer();
+      ActiveMQBuffer buffer = coreMessage.getBodyBufferCopy();
       if (buffer != null) {
          buffer.resetReaderIndex();
          byte[] bytes = null;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 10d67a1..8c20c46 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -31,18 +31,38 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ScheduledExecutorService;
 
 import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.Interceptor;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
-import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQPersistenceAdapter;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
+import org.apache.activemq.artemis.core.security.CheckType;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.core.server.management.Notification;
 import org.apache.activemq.artemis.core.server.management.NotificationListener;
+import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
+import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
+import org.apache.activemq.artemis.utils.DataConstants;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -66,26 +86,8 @@ import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQPersistenceAdapter;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
-import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
-import org.apache.activemq.artemis.core.security.CheckType;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.openwire.OpenWireFormatFactory;
-import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
-import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
-import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
-import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
-import org.apache.activemq.artemis.spi.core.remoting.Connection;
-import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
 import org.apache.activemq.state.ConnectionState;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.state.SessionState;
@@ -183,8 +185,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
 
    @Override
    public void addChannelHandlers(ChannelPipeline pipeline) {
-      // TODO Auto-generated method stub
-
+      // each read will have a full packet with this
+      pipeline.addLast("packet-decipher", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, DataConstants.SIZE_INT));
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 741c32b..12ddb94 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -275,12 +275,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
          return HandleStatus.BUSY;
       }
 
-      // TODO - https://jira.jboss.org/browse/HORNETQ-533
-      // if (!writeReady.get())
-      // {
-      // return HandleStatus.BUSY;
-      // }
-
       synchronized (lock) {
          // If the consumer is stopped then we don't accept the message, it
          // should go back into the

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
index 83c94ee..faa947e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
@@ -27,18 +27,18 @@ import javax.jms.Session;
 import javax.jms.TemporaryQueue;
 import javax.jms.TemporaryTopic;
 import javax.jms.TextMessage;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
-import java.util.concurrent.TimeUnit;
-
 public class SimpleOpenWireTest extends BasicOpenWireTest {
 
    @Rule
@@ -300,6 +300,42 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
       }
    }
 
+   @Test
+   public void testFailoverTransportReconnect() throws Exception {
+      Connection exConn = null;
+
+      try {
+         String urlString = "failover:(tcp://" + OWHOST + ":" + OWPORT + ")";
+         ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(urlString);
+
+         Queue queue = new ActiveMQQueue(durableQueueName);
+
+         exConn = exFact.createConnection();
+         exConn.start();
+
+         Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer messageProducer = session.createProducer(queue);
+         messageProducer.send(session.createTextMessage("Test"));
+
+         MessageConsumer consumer = session.createConsumer(queue);
+         assertNotNull(consumer.receive(5000));
+
+         server.stop();
+         Thread.sleep(3000);
+
+         server.start();
+         server.waitForActivation(10, TimeUnit.SECONDS);
+
+         messageProducer.send(session.createTextMessage("Test2"));
+         assertNotNull(consumer.receive(5000));
+      }
+      finally {
+         if (exConn != null) {
+            exConn.close();
+         }
+      }
+   }
+
    /**
     * This is the example shipped with the distribution
     *
@@ -309,41 +345,30 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
    public void testOpenWireExample() throws Exception {
       Connection exConn = null;
 
+      SimpleString durableQueue = new SimpleString("jms.queue.exampleQueue");
+      this.server.createQueue(durableQueue, durableQueue, null, true, false);
+
       try {
-         String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true";
-         ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(urlString);
+         ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
 
-         // Step 2. Perfom a lookup on the queue
          Queue queue = new ActiveMQQueue(durableQueueName);
 
-         // Step 4.Create a JMS Connection
          exConn = exFact.createConnection();
 
-         // Step 10. Start the Connection
          exConn.start();
 
-         // Step 5. Create a JMS Session
          Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
-         // Step 6. Create a JMS Message Producer
          MessageProducer producer = session.createProducer(queue);
 
-         // Step 7. Create a Text Message
          TextMessage message = session.createTextMessage("This is a text message");
 
-         //System.out.println("Sent message: " + message.getText());
-
-         // Step 8. Send the Message
          producer.send(message);
 
-         // Step 9. Create a JMS Message Consumer
          MessageConsumer messageConsumer = session.createConsumer(queue);
 
-         // Step 11. Receive the message
          TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
 
-         System.out.println("Received message: " + messageReceived);
-
          assertEquals("This is a text message", messageReceived.getText());
       }
       finally {
@@ -354,39 +379,88 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
 
    }
 
+
+   /**
+    * This is the example shipped with the distribution
+    *
+    * @throws Exception
+    */
    @Test
-   public void testFailoverTransportReconnect() throws Exception {
+   public void testMultipleConsumers() throws Exception {
       Connection exConn = null;
 
+      SimpleString durableQueue = new SimpleString("jms.queue.exampleQueue");
+      this.server.createQueue(durableQueue, durableQueue, null, true, false);
+
       try {
-         String urlString = "failover:(tcp://" + OWHOST + ":" + OWPORT + ")";
-         ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(urlString);
+         ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
 
          Queue queue = new ActiveMQQueue(durableQueueName);
 
          exConn = exFact.createConnection();
+
          exConn.start();
 
          Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageProducer messageProducer = session.createProducer(queue);
-         messageProducer.send(session.createTextMessage("Test"));
 
-         MessageConsumer consumer = session.createConsumer(queue);
-         assertNotNull(consumer.receive(5000));
+         MessageProducer producer = session.createProducer(queue);
 
-         server.stop();
-         Thread.sleep(3000);
+         TextMessage message = session.createTextMessage("This is a text message");
 
-         server.start();
-         server.waitForActivation(10, TimeUnit.SECONDS);
+         producer.send(message);
 
-         messageProducer.send(session.createTextMessage("Test2"));
-         assertNotNull(consumer.receive(5000));
+         MessageConsumer messageConsumer = session.createConsumer(queue);
+
+         TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
+
+         assertEquals("This is a text message", messageReceived.getText());
       }
       finally {
          if (exConn != null) {
             exConn.close();
          }
       }
+
    }
+
+   @Test
+   public void testMixedOpenWireExample() throws Exception {
+      Connection openConn = null;
+
+      SimpleString durableQueue = new SimpleString("jms.queue.exampleQueue");
+      this.server.createQueue(durableQueue, durableQueue, null, true, false);
+
+      ActiveMQConnectionFactory openCF = new ActiveMQConnectionFactory();
+
+      Queue queue = new ActiveMQQueue("exampleQueue");
+
+      openConn = openCF.createConnection();
+
+      openConn.start();
+
+      Session openSession = openConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      MessageProducer producer = openSession.createProducer(queue);
+
+      TextMessage message = openSession.createTextMessage("This is a text message");
+
+      producer.send(message);
+
+      org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory artemisCF = new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory();
+
+      Connection artemisConn = artemisCF.createConnection();
+      Session artemisSession = artemisConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      artemisConn.start();
+      MessageConsumer messageConsumer = artemisSession.createConsumer(artemisSession.createQueue("exampleQueue"));
+
+      TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
+
+      assertEquals("This is a text message", messageReceived.getText());
+
+      openConn.close();
+      artemisConn.close();
+
+   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VerySimpleOenwireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VerySimpleOenwireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VerySimpleOenwireTest.java
new file mode 100644
index 0000000..8d315d3
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VerySimpleOenwireTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.openwire;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.Test;
+
+/** This is useful to debug connection ordering. There's only one connection being made from these tests */
+public class VerySimpleOenwireTest extends OpenWireTestBase {
+
+   /**
+    * This is the example shipped with the distribution
+    *
+    * @throws Exception
+    */
+   @Test
+   public void testOpenWireExample() throws Exception {
+      Connection exConn = null;
+
+      SimpleString durableQueue = new SimpleString("jms.queue.exampleQueue");
+      this.server.createQueue(durableQueue, durableQueue, null, true, false);
+
+      try {
+         ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
+
+         Queue queue = new ActiveMQQueue("exampleQueue");
+
+         exConn = exFact.createConnection();
+
+         exConn.start();
+
+         Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer producer = session.createProducer(queue);
+
+         TextMessage message = session.createTextMessage("This is a text message");
+
+         producer.send(message);
+
+         MessageConsumer messageConsumer = session.createConsumer(queue);
+
+         TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
+
+         assertEquals("This is a text message", messageReceived.getText());
+      }
+      finally {
+         if (exConn != null) {
+            exConn.close();
+         }
+      }
+
+   }
+
+   @Test
+   public void testMixedOpenWireExample() throws Exception {
+      Connection openConn = null;
+
+      SimpleString durableQueue = new SimpleString("jms.queue.exampleQueue");
+      this.server.createQueue(durableQueue, durableQueue, null, true, false);
+
+      ActiveMQConnectionFactory openCF = new ActiveMQConnectionFactory();
+
+      Queue queue = new ActiveMQQueue("exampleQueue");
+
+      openConn = openCF.createConnection();
+
+      openConn.start();
+
+      Session openSession = openConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      MessageProducer producer = openSession.createProducer(queue);
+
+      TextMessage message = openSession.createTextMessage("This is a text message");
+
+      producer.send(message);
+
+      org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory artemisCF = new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory();
+
+      Connection artemisConn = artemisCF.createConnection();
+      Session artemisSession = artemisConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      artemisConn.start();
+      MessageConsumer messageConsumer = artemisSession.createConsumer(artemisSession.createQueue("exampleQueue"));
+
+      TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
+
+      assertEquals("This is a text message", messageReceived.getText());
+
+      openConn.close();
+      artemisConn.close();
+
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/ActiveMQFrameDecoder2Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/ActiveMQFrameDecoder2Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/ActiveMQFrameDecoder2Test.java
index a770183..321fda7 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/ActiveMQFrameDecoder2Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/ActiveMQFrameDecoder2Test.java
@@ -16,6 +16,10 @@
  */
 package org.apache.activemq.artemis.tests.integration.transports.netty;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.embedded.EmbeddedChannel;
@@ -24,10 +28,6 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
 public class ActiveMQFrameDecoder2Test extends ActiveMQTestBase {
 
    private static final int MSG_CNT = 10000;


[4/4] activemq-artemis git commit: This closes #126 openwire fixes

Posted by cl...@apache.org.
This closes #126 openwire fixes


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/6c6568b7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/6c6568b7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/6c6568b7

Branch: refs/heads/master
Commit: 6c6568b769f72cdb6f1e10662d0eca62d401902c
Parents: 6b17d96 1dae997
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Aug 13 20:42:00 2015 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Aug 13 20:42:00 2015 -0400

----------------------------------------------------------------------
 .../artemis/api/core/ActiveMQBuffer.java        |   7 +-
 .../core/buffers/impl/ChannelBufferWrapper.java |  26 ++-
 .../apache/activemq/artemis/utils/ByteUtil.java |  12 +
 .../CompressedLargeMessageControllerImpl.java   |  30 ++-
 .../client/impl/LargeMessageControllerImpl.java |  23 +-
 .../artemis/reader/BytesMessageUtil.java        | 106 ++++-----
 .../activemq/artemis/reader/MapMessageUtil.java |  17 +-
 .../activemq/artemis/reader/MessageUtil.java    |   7 +-
 .../artemis/reader/StreamMessageUtil.java       |  40 ++--
 .../artemis/reader/TextMessageUtil.java         |   7 +-
 .../jms/client/ActiveMQBytesMessage.java        |  52 ++---
 .../artemis/jms/client/ActiveMQMapMessage.java  |   4 +-
 .../jms/client/ActiveMQStreamMessage.java       |  22 +-
 .../artemis/jms/client/ActiveMQTextMessage.java |   4 +-
 .../converter/jms/ServerJMSBytesMessage.java    |  53 ++---
 .../converter/jms/ServerJMSMapMessage.java      |   4 +-
 .../proton/converter/jms/ServerJMSMessage.java  |  19 ++
 .../converter/jms/ServerJMSStreamMessage.java   |  84 ++++---
 .../converter/jms/ServerJMSTextMessage.java     |   6 +-
 .../core/protocol/proton/TestConversions.java   |  20 +-
 .../protocol/openwire/DataInputWrapper.java     | 228 -------------------
 .../protocol/openwire/OpenWireConnection.java   | 163 ++++++-------
 .../openwire/OpenWireMessageConverter.java      |  23 +-
 .../openwire/OpenWireProtocolManager.java       |  44 ++--
 .../core/server/impl/ServerConsumerImpl.java    |   6 -
 examples/features/perf/pom.xml                  |   4 +-
 .../openwire/SimpleOpenWireTest.java            | 136 ++++++++---
 .../openwire/VerySimpleOenwireTest.java         | 118 ++++++++++
 .../netty/ActiveMQFrameDecoder2Test.java        |   8 +-
 29 files changed, 648 insertions(+), 625 deletions(-)
----------------------------------------------------------------------