You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/08/14 02:42:15 UTC
[3/4] activemq-artemis git commit: ARTEMIS-204 Improvements on
OpenWire
ARTEMIS-204 Improvements on OpenWire
https://issues.apache.org/jira/browse/ARTEMIS-204
by consequence this will also fix any possible issues with AMQP
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1dae9974
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1dae9974
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1dae9974
Branch: refs/heads/master
Commit: 1dae99746b4532313bbfa66e530751e8d79d2d5b
Parents: a4498d4
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Aug 13 19:20:20 2015 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Aug 13 20:39:01 2015 -0400
----------------------------------------------------------------------
.../artemis/api/core/ActiveMQBuffer.java | 7 +-
.../core/buffers/impl/ChannelBufferWrapper.java | 26 ++-
.../apache/activemq/artemis/utils/ByteUtil.java | 12 +
.../CompressedLargeMessageControllerImpl.java | 30 ++-
.../client/impl/LargeMessageControllerImpl.java | 23 +-
.../artemis/reader/BytesMessageUtil.java | 106 ++++-----
.../activemq/artemis/reader/MapMessageUtil.java | 17 +-
.../activemq/artemis/reader/MessageUtil.java | 7 +-
.../artemis/reader/StreamMessageUtil.java | 40 ++--
.../artemis/reader/TextMessageUtil.java | 7 +-
.../jms/client/ActiveMQBytesMessage.java | 52 ++---
.../artemis/jms/client/ActiveMQMapMessage.java | 4 +-
.../jms/client/ActiveMQStreamMessage.java | 22 +-
.../artemis/jms/client/ActiveMQTextMessage.java | 4 +-
.../converter/jms/ServerJMSBytesMessage.java | 53 ++---
.../converter/jms/ServerJMSMapMessage.java | 4 +-
.../proton/converter/jms/ServerJMSMessage.java | 19 ++
.../converter/jms/ServerJMSStreamMessage.java | 84 ++++---
.../converter/jms/ServerJMSTextMessage.java | 6 +-
.../core/protocol/proton/TestConversions.java | 20 +-
.../protocol/openwire/DataInputWrapper.java | 228 -------------------
.../protocol/openwire/OpenWireConnection.java | 163 ++++++-------
.../openwire/OpenWireMessageConverter.java | 23 +-
.../openwire/OpenWireProtocolManager.java | 44 ++--
.../core/server/impl/ServerConsumerImpl.java | 6 -
.../openwire/SimpleOpenWireTest.java | 136 ++++++++---
.../openwire/VerySimpleOenwireTest.java | 118 ++++++++++
.../netty/ActiveMQFrameDecoder2Test.java | 8 +-
28 files changed, 646 insertions(+), 623 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
index 12dd09f..da78fb7 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.api.core;
+import java.io.DataInput;
import java.nio.ByteBuffer;
import io.netty.buffer.ByteBuf;
@@ -29,7 +30,7 @@ import io.netty.buffer.ByteBuf;
*
* @see ActiveMQBuffers
*/
-public interface ActiveMQBuffer {
+public interface ActiveMQBuffer extends DataInput {
/**
* Returns the underlying Netty's ByteBuf
@@ -642,7 +643,7 @@ public interface ActiveMQBuffer {
* @return an unsigned byte at the current {@code readerIndex}
* @throws IndexOutOfBoundsException if {@code this.readableBytes} is less than {@code 1}
*/
- short readUnsignedByte();
+ int readUnsignedByte();
/**
* Gets a 16-bit short integer at the current {@code readerIndex}
@@ -874,7 +875,7 @@ public interface ActiveMQBuffer {
* @param length The number of bytes to skip
* @throws IndexOutOfBoundsException if {@code length} is greater than {@code this.readableBytes}
*/
- void skipBytes(int length);
+ int skipBytes(int length);
/**
* Sets the specified byte at the current {@code writerIndex}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
index 51fea91..1c830c6 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
@@ -16,12 +16,14 @@
*/
package org.apache.activemq.artemis.core.buffers.impl;
+import java.io.IOException;
import java.nio.ByteBuffer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.UTF8Util;
@@ -350,7 +352,7 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
return new ChannelBufferWrapper(buffer.readSlice(length), releasable);
}
- public short readUnsignedByte() {
+ public int readUnsignedByte() {
return buffer.readUnsignedByte();
}
@@ -426,8 +428,9 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
buffer.setShort(index, value);
}
- public void skipBytes(final int length) {
+ public int skipBytes(final int length) {
buffer.skipBytes(length);
+ return length;
}
public ActiveMQBuffer slice() {
@@ -510,4 +513,23 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
buffer.writeShort(value);
}
+ /** from {@link java.io.DataInput} interface */
+ @Override
+ public void readFully(byte[] b) throws IOException {
+ readBytes(b);
+ }
+
+ /** from {@link java.io.DataInput} interface */
+ @Override
+ public void readFully(byte[] b, int off, int len) throws IOException {
+ readBytes(b, off, len);
+ }
+
+ /** from {@link java.io.DataInput} interface */
+ @Override
+ public String readLine() throws IOException {
+ return ByteUtil.readLine(this);
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
index 6e2ca99..b7ff841 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.utils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
public class ByteUtil {
@@ -67,4 +68,15 @@ public class ByteUtil {
return buffer.array();
}
+
+ public static String readLine(ActiveMQBuffer buffer) {
+ StringBuilder sb = new StringBuilder("");
+ char c = buffer.readChar();
+ while (c != '\n') {
+ sb.append(c);
+ c = buffer.readChar();
+ }
+ return sb.toString();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
index 1e93ced..ae711bf 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.client.impl;
import java.io.DataInputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
@@ -27,8 +28,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
-import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream;
+import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.InflaterReader;
import org.apache.activemq.artemis.utils.InflaterWriter;
import org.apache.activemq.artemis.utils.UTF8Util;
@@ -302,9 +303,9 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll
throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
}
- public short readUnsignedByte() {
+ public int readUnsignedByte() {
try {
- return (short) getStream().readUnsignedByte();
+ return getStream().readUnsignedByte();
}
catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
@@ -391,18 +392,39 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll
dst.put(bytesToGet);
}
- public void skipBytes(final int length) {
+ public int skipBytes(final int length) {
try {
for (int i = 0; i < length; i++) {
getStream().read();
}
+ return length;
}
catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
+
+ /** from {@link java.io.DataInput} interface */
+ @Override
+ public void readFully(byte[] b) throws IOException {
+ readBytes(b);
+ }
+
+ /** from {@link java.io.DataInput} interface */
+ @Override
+ public void readFully(byte[] b, int off, int len) throws IOException {
+ readBytes(b, off, len);
+ }
+
+ /** from {@link java.io.DataInput} interface */
+ @Override
+ public String readLine() throws IOException {
+ return getStream().readLine();
+ }
+
+
public void writeByte(final byte value) {
throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
index 7f44cff..ad79e82 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
@@ -38,6 +38,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
+import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.UTF8Util;
@@ -666,7 +667,7 @@ public class LargeMessageControllerImpl implements LargeMessageController {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public short readUnsignedByte() {
+ public int readUnsignedByte() {
return (short) (readByte() & 0xFF);
}
@@ -758,11 +759,12 @@ public class LargeMessageControllerImpl implements LargeMessageController {
readerIndex += length;
}
- public void skipBytes(final int length) {
+ public int skipBytes(final int length) {
long newReaderIndex = readerIndex + length;
checkForPacket(newReaderIndex);
readerIndex = newReaderIndex;
+ return length;
}
public void writeByte(final byte value) {
@@ -1176,7 +1178,24 @@ public class LargeMessageControllerImpl implements LargeMessageController {
}
}
}
+ }
+
+ /** from {@link java.io.DataInput} interface */
+ @Override
+ public void readFully(byte[] b) throws IOException {
+ readBytes(b);
+ }
+ /** from {@link java.io.DataInput} interface */
+ @Override
+ public void readFully(byte[] b, int off, int len) throws IOException {
+ readBytes(b, off, len);
+ }
+
+ /** from {@link java.io.DataInput} interface */
+ @Override
+ public String readLine() throws IOException {
+ return ByteUtil.readLine(this);
}
public ByteBuf byteBuf() {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/BytesMessageUtil.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/BytesMessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/BytesMessageUtil.java
index 806a321..a8dce4a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/BytesMessageUtil.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/BytesMessageUtil.java
@@ -16,115 +16,115 @@
*/
package org.apache.activemq.artemis.reader;
-import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
public class BytesMessageUtil extends MessageUtil {
- public static boolean bytesReadBoolean(Message message) {
- return getBodyBuffer(message).readBoolean();
+ public static boolean bytesReadBoolean(ActiveMQBuffer message) {
+ return message.readBoolean();
}
- public static byte bytesReadByte(Message message) {
- return getBodyBuffer(message).readByte();
+ public static byte bytesReadByte(ActiveMQBuffer message) {
+ return message.readByte();
}
- public static int bytesReadUnsignedByte(Message message) {
- return getBodyBuffer(message).readUnsignedByte();
+ public static int bytesReadUnsignedByte(ActiveMQBuffer message) {
+ return message.readUnsignedByte();
}
- public static short bytesReadShort(Message message) {
- return getBodyBuffer(message).readShort();
+ public static short bytesReadShort(ActiveMQBuffer message) {
+ return message.readShort();
}
- public static int bytesReadUnsignedShort(Message message) {
- return getBodyBuffer(message).readUnsignedShort();
+ public static int bytesReadUnsignedShort(ActiveMQBuffer message) {
+ return message.readUnsignedShort();
}
- public static char bytesReadChar(Message message) {
- return (char) getBodyBuffer(message).readShort();
+ public static char bytesReadChar(ActiveMQBuffer message) {
+ return (char) message.readShort();
}
- public static int bytesReadInt(Message message) {
- return getBodyBuffer(message).readInt();
+ public static int bytesReadInt(ActiveMQBuffer message) {
+ return message.readInt();
}
- public static long bytesReadLong(Message message) {
- return getBodyBuffer(message).readLong();
+ public static long bytesReadLong(ActiveMQBuffer message) {
+ return message.readLong();
}
- public static float bytesReadFloat(Message message) {
- return Float.intBitsToFloat(getBodyBuffer(message).readInt());
+ public static float bytesReadFloat(ActiveMQBuffer message) {
+ return Float.intBitsToFloat(message.readInt());
}
- public static double bytesReadDouble(Message message) {
- return Double.longBitsToDouble(getBodyBuffer(message).readLong());
+ public static double bytesReadDouble(ActiveMQBuffer message) {
+ return Double.longBitsToDouble(message.readLong());
}
- public static String bytesReadUTF(Message message) {
- return getBodyBuffer(message).readUTF();
+ public static String bytesReadUTF(ActiveMQBuffer message) {
+ return message.readUTF();
}
- public static int bytesReadBytes(Message message, final byte[] value) {
+ public static int bytesReadBytes(ActiveMQBuffer message, final byte[] value) {
return bytesReadBytes(message, value, value.length);
}
- public static int bytesReadBytes(Message message, final byte[] value, final int length) {
- if (!getBodyBuffer(message).readable()) {
+ public static int bytesReadBytes(ActiveMQBuffer message, final byte[] value, final int length) {
+ if (!message.readable()) {
return -1;
}
- int read = Math.min(length, getBodyBuffer(message).readableBytes());
+ int read = Math.min(length, message.readableBytes());
if (read != 0) {
- getBodyBuffer(message).readBytes(value, 0, read);
+ message.readBytes(value, 0, read);
}
return read;
}
- public static void bytesWriteBoolean(Message message, boolean value) {
- getBodyBuffer(message).writeBoolean(value);
+ public static void bytesWriteBoolean(ActiveMQBuffer message, boolean value) {
+ message.writeBoolean(value);
}
- public static void bytesWriteByte(Message message, byte value) {
- getBodyBuffer(message).writeByte(value);
+ public static void bytesWriteByte(ActiveMQBuffer message, byte value) {
+ message.writeByte(value);
}
- public static void bytesWriteShort(Message message, short value) {
- getBodyBuffer(message).writeShort(value);
+ public static void bytesWriteShort(ActiveMQBuffer message, short value) {
+ message.writeShort(value);
}
- public static void bytesWriteChar(Message message, char value) {
- getBodyBuffer(message).writeShort((short) value);
+ public static void bytesWriteChar(ActiveMQBuffer message, char value) {
+ message.writeShort((short) value);
}
- public static void bytesWriteInt(Message message, int value) {
- getBodyBuffer(message).writeInt(value);
+ public static void bytesWriteInt(ActiveMQBuffer message, int value) {
+ message.writeInt(value);
}
- public static void bytesWriteLong(Message message, long value) {
- getBodyBuffer(message).writeLong(value);
+ public static void bytesWriteLong(ActiveMQBuffer message, long value) {
+ message.writeLong(value);
}
- public static void bytesWriteFloat(Message message, float value) {
- getBodyBuffer(message).writeInt(Float.floatToIntBits(value));
+ public static void bytesWriteFloat(ActiveMQBuffer message, float value) {
+ message.writeInt(Float.floatToIntBits(value));
}
- public static void bytesWriteDouble(Message message, double value) {
- getBodyBuffer(message).writeLong(Double.doubleToLongBits(value));
+ public static void bytesWriteDouble(ActiveMQBuffer message, double value) {
+ message.writeLong(Double.doubleToLongBits(value));
}
- public static void bytesWriteUTF(Message message, String value) {
- getBodyBuffer(message).writeUTF(value);
+ public static void bytesWriteUTF(ActiveMQBuffer message, String value) {
+ message.writeUTF(value);
}
- public static void bytesWriteBytes(Message message, byte[] value) {
- getBodyBuffer(message).writeBytes(value);
+ public static void bytesWriteBytes(ActiveMQBuffer message, byte[] value) {
+ message.writeBytes(value);
}
- public static void bytesWriteBytes(Message message, final byte[] value, final int offset, final int length) {
- getBodyBuffer(message).writeBytes(value, offset, length);
+ public static void bytesWriteBytes(ActiveMQBuffer message, final byte[] value, final int offset, final int length) {
+ message.writeBytes(value, offset, length);
}
/**
@@ -134,7 +134,7 @@ public class BytesMessageUtil extends MessageUtil {
* @param value
* @return
*/
- public static boolean bytesWriteObject(Message message, Object value) {
+ public static boolean bytesWriteObject(ActiveMQBuffer message, Object value) {
if (value == null) {
throw new NullPointerException("Attempt to write a null value");
}
@@ -175,8 +175,8 @@ public class BytesMessageUtil extends MessageUtil {
return true;
}
- public static void bytesMessageReset(Message message) {
- getBodyBuffer(message).resetReaderIndex();
+ public static void bytesMessageReset(ActiveMQBuffer message) {
+ message.resetReaderIndex();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java
index 9ae4798..65aeccb 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java
@@ -17,7 +17,6 @@
package org.apache.activemq.artemis.reader;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.utils.TypedProperties;
public class MapMessageUtil extends MessageUtil {
@@ -25,16 +24,15 @@ public class MapMessageUtil extends MessageUtil {
/**
* Utility method to set the map on a message body
*/
- public static void writeBodyMap(Message message, TypedProperties properties) {
- ActiveMQBuffer buff = getBodyBuffer(message);
- buff.resetWriterIndex();
- properties.encode(buff);
+ public static void writeBodyMap(ActiveMQBuffer message, TypedProperties properties) {
+ message.resetWriterIndex();
+ properties.encode(message);
}
/**
* Utility method to set the map on a message body
*/
- public static TypedProperties readBodyMap(Message message) {
+ public static TypedProperties readBodyMap(ActiveMQBuffer message) {
TypedProperties map = new TypedProperties();
readBodyMap(message, map);
return map;
@@ -43,10 +41,9 @@ public class MapMessageUtil extends MessageUtil {
/**
* Utility method to set the map on a message body
*/
- public static void readBodyMap(Message message, TypedProperties map) {
- ActiveMQBuffer buff = getBodyBuffer(message);
- buff.resetReaderIndex();
- map.decode(buff);
+ public static void readBodyMap(ActiveMQBuffer message, TypedProperties map) {
+ message.resetReaderIndex();
+ map.decode(message);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
index 9f1a598..b56abc4 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
@@ -21,7 +21,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.Message;
@@ -52,9 +51,9 @@ public class MessageUtil {
public static final SimpleString CONNECTION_ID_PROPERTY_NAME = new SimpleString("__AMQ_CID");
- public static ActiveMQBuffer getBodyBuffer(Message message) {
- return message.getBodyBuffer();
- }
+// public static ActiveMQBuffer getBodyBuffer(Message message) {
+// return message.getBodyBuffer();
+// }
public static byte[] getJMSCorrelationIDAsBytes(Message message) {
Object obj = message.getObjectProperty(CORRELATIONID_HEADER_NAME);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/StreamMessageUtil.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/StreamMessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/StreamMessageUtil.java
index d59662f..dbba989 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/StreamMessageUtil.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/StreamMessageUtil.java
@@ -17,7 +17,6 @@
package org.apache.activemq.artemis.reader;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.utils.DataConstants;
@@ -27,11 +26,10 @@ public class StreamMessageUtil extends MessageUtil {
* Method to read boolean values out of the Stream protocol existent on JMS Stream Messages
* Throws IllegalStateException if the type was invalid
*
- * @param message
+ * @param buff
* @return
*/
- public static boolean streamReadBoolean(Message message) {
- ActiveMQBuffer buff = getBodyBuffer(message);
+ public static boolean streamReadBoolean(ActiveMQBuffer buff) {
byte type = buff.readByte();
switch (type) {
@@ -46,8 +44,7 @@ public class StreamMessageUtil extends MessageUtil {
}
- public static byte streamReadByte(Message message) {
- ActiveMQBuffer buff = getBodyBuffer(message);
+ public static byte streamReadByte(ActiveMQBuffer buff) {
int index = buff.readerIndex();
try {
byte type = buff.readByte();
@@ -68,8 +65,7 @@ public class StreamMessageUtil extends MessageUtil {
}
- public static short streamReadShort(Message message) {
- ActiveMQBuffer buff = getBodyBuffer(message);
+ public static short streamReadShort(ActiveMQBuffer buff) {
byte type = buff.readByte();
switch (type) {
case DataConstants.BYTE:
@@ -84,8 +80,7 @@ public class StreamMessageUtil extends MessageUtil {
}
}
- public static char streamReadChar(Message message) {
- ActiveMQBuffer buff = getBodyBuffer(message);
+ public static char streamReadChar(ActiveMQBuffer buff) {
byte type = buff.readByte();
switch (type) {
case DataConstants.CHAR:
@@ -104,8 +99,7 @@ public class StreamMessageUtil extends MessageUtil {
}
- public static int streamReadInteger(Message message) {
- ActiveMQBuffer buff = getBodyBuffer(message);
+ public static int streamReadInteger(ActiveMQBuffer buff) {
byte type = buff.readByte();
switch (type) {
case DataConstants.BYTE:
@@ -122,8 +116,7 @@ public class StreamMessageUtil extends MessageUtil {
}
}
- public static long streamReadLong(Message message) {
- ActiveMQBuffer buff = getBodyBuffer(message);
+ public static long streamReadLong(ActiveMQBuffer buff) {
byte type = buff.readByte();
switch (type) {
case DataConstants.BYTE:
@@ -142,8 +135,7 @@ public class StreamMessageUtil extends MessageUtil {
}
}
- public static float streamReadFloat(Message message) {
- ActiveMQBuffer buff = getBodyBuffer(message);
+ public static float streamReadFloat(ActiveMQBuffer buff) {
byte type = buff.readByte();
switch (type) {
case DataConstants.FLOAT:
@@ -156,8 +148,7 @@ public class StreamMessageUtil extends MessageUtil {
}
}
- public static double streamReadDouble(Message message) {
- ActiveMQBuffer buff = getBodyBuffer(message);
+ public static double streamReadDouble(ActiveMQBuffer buff) {
byte type = buff.readByte();
switch (type) {
case DataConstants.FLOAT:
@@ -172,8 +163,7 @@ public class StreamMessageUtil extends MessageUtil {
}
}
- public static String streamReadString(Message message) {
- ActiveMQBuffer buff = getBodyBuffer(message);
+ public static String streamReadString(ActiveMQBuffer buff) {
byte type = buff.readByte();
switch (type) {
case DataConstants.BOOLEAN:
@@ -204,12 +194,10 @@ public class StreamMessageUtil extends MessageUtil {
* It will return remainingBytes, bytesRead
*
* @param remainingBytes remaining Bytes from previous read. Send it to 0 if it was the first call for the message
- * @param message
+ * @param buff
* @return a pair of remaining bytes and bytes read
*/
- public static Pair<Integer, Integer> streamReadBytes(Message message, int remainingBytes, byte[] value) {
- ActiveMQBuffer buff = getBodyBuffer(message);
-
+ public static Pair<Integer, Integer> streamReadBytes(ActiveMQBuffer buff, int remainingBytes, byte[] value) {
if (remainingBytes == -1) {
return new Pair<>(0, -1);
}
@@ -230,9 +218,7 @@ public class StreamMessageUtil extends MessageUtil {
}
- public static Object streamReadObject(Message message) {
- ActiveMQBuffer buff = getBodyBuffer(message);
-
+ public static Object streamReadObject(ActiveMQBuffer buff) {
byte type = buff.readByte();
switch (type) {
case DataConstants.BOOLEAN:
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/TextMessageUtil.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/TextMessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/TextMessageUtil.java
index c7515fc..c9ece4d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/TextMessageUtil.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/TextMessageUtil.java
@@ -17,7 +17,6 @@
package org.apache.activemq.artemis.reader;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
public class TextMessageUtil extends MessageUtil {
@@ -25,8 +24,7 @@ public class TextMessageUtil extends MessageUtil {
/**
* Utility method to set the Text message on a message body
*/
- public static void writeBodyText(Message message, SimpleString text) {
- ActiveMQBuffer buff = getBodyBuffer(message);
+ public static void writeBodyText(ActiveMQBuffer buff, SimpleString text) {
buff.clear();
buff.writeNullableSimpleString(text);
}
@@ -34,8 +32,7 @@ public class TextMessageUtil extends MessageUtil {
/**
* Utility method to set the Text message on a message body
*/
- public static SimpleString readBodyText(Message message) {
- ActiveMQBuffer buff = getBodyBuffer(message);
+ public static SimpleString readBodyText(ActiveMQBuffer buff) {
buff.resetReaderIndex();
return buff.readNullableSimpleString();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java
index 72770a4..09d6b18 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java
@@ -102,7 +102,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
public boolean readBoolean() throws JMSException {
checkRead();
try {
- return bytesReadBoolean(message);
+ return bytesReadBoolean(message.getBodyBuffer());
}
catch (IndexOutOfBoundsException e) {
throw new MessageEOFException("");
@@ -112,7 +112,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
public byte readByte() throws JMSException {
checkRead();
try {
- return bytesReadByte(message);
+ return bytesReadByte(message.getBodyBuffer());
}
catch (IndexOutOfBoundsException e) {
throw new MessageEOFException("");
@@ -122,7 +122,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
public int readUnsignedByte() throws JMSException {
checkRead();
try {
- return bytesReadUnsignedByte(message);
+ return bytesReadUnsignedByte(message.getBodyBuffer());
}
catch (IndexOutOfBoundsException e) {
throw new MessageEOFException("");
@@ -132,7 +132,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
public short readShort() throws JMSException {
checkRead();
try {
- return bytesReadShort(message);
+ return bytesReadShort(message.getBodyBuffer());
}
catch (IndexOutOfBoundsException e) {
throw new MessageEOFException("");
@@ -142,7 +142,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
public int readUnsignedShort() throws JMSException {
checkRead();
try {
- return bytesReadUnsignedShort(message);
+ return bytesReadUnsignedShort(message.getBodyBuffer());
}
catch (IndexOutOfBoundsException e) {
throw new MessageEOFException("");
@@ -152,7 +152,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
public char readChar() throws JMSException {
checkRead();
try {
- return bytesReadChar(message);
+ return bytesReadChar(message.getBodyBuffer());
}
catch (IndexOutOfBoundsException e) {
throw new MessageEOFException("");
@@ -162,7 +162,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
public int readInt() throws JMSException {
checkRead();
try {
- return bytesReadInt(message);
+ return bytesReadInt(message.getBodyBuffer());
}
catch (IndexOutOfBoundsException e) {
throw new MessageEOFException("");
@@ -172,7 +172,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
public long readLong() throws JMSException {
checkRead();
try {
- return bytesReadLong(message);
+ return bytesReadLong(message.getBodyBuffer());
}
catch (IndexOutOfBoundsException e) {
throw new MessageEOFException("");
@@ -182,7 +182,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
public float readFloat() throws JMSException {
checkRead();
try {
- return bytesReadFloat(message);
+ return bytesReadFloat(message.getBodyBuffer());
}
catch (IndexOutOfBoundsException e) {
throw new MessageEOFException("");
@@ -192,7 +192,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
public double readDouble() throws JMSException {
checkRead();
try {
- return bytesReadDouble(message);
+ return bytesReadDouble(message.getBodyBuffer());
}
catch (IndexOutOfBoundsException e) {
throw new MessageEOFException("");
@@ -202,7 +202,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
public String readUTF() throws JMSException {
checkRead();
try {
- return bytesReadUTF(message);
+ return bytesReadUTF(message.getBodyBuffer());
}
catch (IndexOutOfBoundsException e) {
throw new MessageEOFException("");
@@ -217,59 +217,59 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
public int readBytes(final byte[] value) throws JMSException {
checkRead();
- return bytesReadBytes(message, value);
+ return bytesReadBytes(message.getBodyBuffer(), value);
}
public int readBytes(final byte[] value, final int length) throws JMSException {
checkRead();
- return bytesReadBytes(message, value, length);
+ return bytesReadBytes(message.getBodyBuffer(), value, length);
}
public void writeBoolean(final boolean value) throws JMSException {
checkWrite();
- bytesWriteBoolean(message, value);
+ bytesWriteBoolean(message.getBodyBuffer(), value);
}
public void writeByte(final byte value) throws JMSException {
checkWrite();
- bytesWriteByte(message, value);
+ bytesWriteByte(message.getBodyBuffer(), value);
}
public void writeShort(final short value) throws JMSException {
checkWrite();
- bytesWriteShort(message, value);
+ bytesWriteShort(message.getBodyBuffer(), value);
}
public void writeChar(final char value) throws JMSException {
checkWrite();
- bytesWriteChar(message, value);
+ bytesWriteChar(message.getBodyBuffer(), value);
}
public void writeInt(final int value) throws JMSException {
checkWrite();
- bytesWriteInt(message, value);
+ bytesWriteInt(message.getBodyBuffer(), value);
}
public void writeLong(final long value) throws JMSException {
checkWrite();
- bytesWriteLong(message, value);
+ bytesWriteLong(message.getBodyBuffer(), value);
}
public void writeFloat(final float value) throws JMSException {
checkWrite();
- bytesWriteFloat(message, value);
+ bytesWriteFloat(message.getBodyBuffer(), value);
}
public void writeDouble(final double value) throws JMSException {
checkWrite();
- bytesWriteDouble(message, value);
+ bytesWriteDouble(message.getBodyBuffer(), value);
}
public void writeUTF(final String value) throws JMSException {
checkWrite();
try {
- bytesWriteUTF(message, value);
+ bytesWriteUTF(message.getBodyBuffer(), value);
}
catch (Exception e) {
JMSException je = new JMSException("Failed to write UTF");
@@ -282,17 +282,17 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
public void writeBytes(final byte[] value) throws JMSException {
checkWrite();
- bytesWriteBytes(message, value);
+ bytesWriteBytes(message.getBodyBuffer(), value);
}
public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException {
checkWrite();
- bytesWriteBytes(message, value, offset, length);
+ bytesWriteBytes(message.getBodyBuffer(), value, offset, length);
}
public void writeObject(final Object value) throws JMSException {
checkWrite();
- if (!bytesWriteObject(message, value)) {
+ if (!bytesWriteObject(message.getBodyBuffer(), value)) {
throw new MessageFormatException("Invalid object for properties");
}
}
@@ -304,7 +304,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
bodyLength = message.getBodySize();
}
- bytesMessageReset(message);
+ bytesMessageReset(message.getBodyBuffer());
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMapMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMapMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMapMessage.java
index 35ee0e4..747316d 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMapMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMapMessage.java
@@ -317,7 +317,7 @@ public final class ActiveMQMapMessage extends ActiveMQMessage implements MapMess
@Override
public void doBeforeSend() throws Exception {
if (invalid) {
- writeBodyMap(message, map);
+ writeBodyMap(message.getBodyBuffer(), map);
invalid = false;
}
@@ -328,7 +328,7 @@ public final class ActiveMQMapMessage extends ActiveMQMessage implements MapMess
public void doBeforeReceive() throws ActiveMQException {
super.doBeforeReceive();
- readBodyMap(message, map);
+ readBodyMap(message.getBodyBuffer(), map);
}
// Package protected ---------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java
index 4a8d5a8..315721f 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java
@@ -89,7 +89,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
public boolean readBoolean() throws JMSException {
checkRead();
try {
- return streamReadBoolean(message);
+ return streamReadBoolean(message.getBodyBuffer());
}
catch (IllegalStateException e) {
throw new MessageFormatException(e.getMessage());
@@ -103,7 +103,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
checkRead();
try {
- return streamReadByte(message);
+ return streamReadByte(message.getBodyBuffer());
}
catch (IllegalStateException e) {
throw new MessageFormatException(e.getMessage());
@@ -116,7 +116,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
public short readShort() throws JMSException {
checkRead();
try {
- return streamReadShort(message);
+ return streamReadShort(message.getBodyBuffer());
}
catch (IllegalStateException e) {
throw new MessageFormatException(e.getMessage());
@@ -129,7 +129,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
public char readChar() throws JMSException {
checkRead();
try {
- return streamReadChar(message);
+ return streamReadChar(message.getBodyBuffer());
}
catch (IllegalStateException e) {
throw new MessageFormatException(e.getMessage());
@@ -142,7 +142,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
public int readInt() throws JMSException {
checkRead();
try {
- return streamReadInteger(message);
+ return streamReadInteger(message.getBodyBuffer());
}
catch (IllegalStateException e) {
throw new MessageFormatException(e.getMessage());
@@ -155,7 +155,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
public long readLong() throws JMSException {
checkRead();
try {
- return streamReadLong(message);
+ return streamReadLong(message.getBodyBuffer());
}
catch (IllegalStateException e) {
throw new MessageFormatException(e.getMessage());
@@ -168,7 +168,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
public float readFloat() throws JMSException {
checkRead();
try {
- return streamReadFloat(message);
+ return streamReadFloat(message.getBodyBuffer());
}
catch (IllegalStateException e) {
throw new MessageFormatException(e.getMessage());
@@ -181,7 +181,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
public double readDouble() throws JMSException {
checkRead();
try {
- return streamReadDouble(message);
+ return streamReadDouble(message.getBodyBuffer());
}
catch (IllegalStateException e) {
throw new MessageFormatException(e.getMessage());
@@ -194,7 +194,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
public String readString() throws JMSException {
checkRead();
try {
- return streamReadString(message);
+ return streamReadString(message.getBodyBuffer());
}
catch (IllegalStateException e) {
throw new MessageFormatException(e.getMessage());
@@ -212,7 +212,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
public int readBytes(final byte[] value) throws JMSException {
checkRead();
try {
- Pair<Integer, Integer> pairRead = streamReadBytes(message, len, value);
+ Pair<Integer, Integer> pairRead = streamReadBytes(message.getBodyBuffer(), len, value);
len = pairRead.getA();
return pairRead.getB();
@@ -228,7 +228,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
public Object readObject() throws JMSException {
checkRead();
try {
- return streamReadObject(message);
+ return streamReadObject(message.getBodyBuffer());
}
catch (IllegalStateException e) {
throw new MessageFormatException(e.getMessage());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTextMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTextMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTextMessage.java
index b9fba2d..bd0a615 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTextMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTextMessage.java
@@ -84,7 +84,7 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
this.text = null;
}
- writeBodyText(message, this.text);
+ writeBodyText(message.getBodyBuffer(), this.text);
}
public String getText() {
@@ -109,7 +109,7 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
public void doBeforeReceive() throws ActiveMQException {
super.doBeforeReceive();
- text = readBodyText(message);
+ text = readBodyText(message.getBodyBuffer());
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java
index 76e2515..cc436f4 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java
@@ -60,128 +60,128 @@ public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMess
@Override
public boolean readBoolean() throws JMSException {
- return bytesReadBoolean(message);
+ return bytesReadBoolean(getReadBodyBuffer());
}
@Override
public byte readByte() throws JMSException {
- return bytesReadByte(message);
+ return bytesReadByte(getReadBodyBuffer());
}
@Override
public int readUnsignedByte() throws JMSException {
- return bytesReadUnsignedByte(message);
+ return bytesReadUnsignedByte(getReadBodyBuffer());
}
@Override
public short readShort() throws JMSException {
- return bytesReadShort(message);
+ return bytesReadShort(getReadBodyBuffer());
}
@Override
public int readUnsignedShort() throws JMSException {
- return bytesReadUnsignedShort(message);
+ return bytesReadUnsignedShort(getReadBodyBuffer());
}
@Override
public char readChar() throws JMSException {
- return bytesReadChar(message);
+ return bytesReadChar(getReadBodyBuffer());
}
@Override
public int readInt() throws JMSException {
- return bytesReadInt(message);
+ return bytesReadInt(getReadBodyBuffer());
}
@Override
public long readLong() throws JMSException {
- return bytesReadLong(message);
+ return bytesReadLong(getReadBodyBuffer());
}
@Override
public float readFloat() throws JMSException {
- return bytesReadFloat(message);
+ return bytesReadFloat(getReadBodyBuffer());
}
@Override
public double readDouble() throws JMSException {
- return bytesReadDouble(message);
+ return bytesReadDouble(getReadBodyBuffer());
}
@Override
public String readUTF() throws JMSException {
- return bytesReadUTF(message);
+ return bytesReadUTF(getReadBodyBuffer());
}
@Override
public int readBytes(byte[] value) throws JMSException {
- return bytesReadBytes(message, value);
+ return bytesReadBytes(getReadBodyBuffer(), value);
}
@Override
public int readBytes(byte[] value, int length) throws JMSException {
- return bytesReadBytes(message, value, length);
+ return bytesReadBytes(getReadBodyBuffer(), value, length);
}
@Override
public void writeBoolean(boolean value) throws JMSException {
- bytesWriteBoolean(message, value);
+ bytesWriteBoolean(getWriteBodyBuffer(), value);
}
@Override
public void writeByte(byte value) throws JMSException {
- bytesWriteByte(message, value);
+ bytesWriteByte(getWriteBodyBuffer(), value);
}
@Override
public void writeShort(short value) throws JMSException {
- bytesWriteShort(message, value);
+ bytesWriteShort(getWriteBodyBuffer(), value);
}
@Override
public void writeChar(char value) throws JMSException {
- bytesWriteChar(message, value);
+ bytesWriteChar(getWriteBodyBuffer(), value);
}
@Override
public void writeInt(int value) throws JMSException {
- bytesWriteInt(message, value);
+ bytesWriteInt(getWriteBodyBuffer(), value);
}
@Override
public void writeLong(long value) throws JMSException {
- bytesWriteLong(message, value);
+ bytesWriteLong(getWriteBodyBuffer(), value);
}
@Override
public void writeFloat(float value) throws JMSException {
- bytesWriteFloat(message, value);
+ bytesWriteFloat(getWriteBodyBuffer(), value);
}
@Override
public void writeDouble(double value) throws JMSException {
- bytesWriteDouble(message, value);
+ bytesWriteDouble(getWriteBodyBuffer(), value);
}
@Override
public void writeUTF(String value) throws JMSException {
- bytesWriteUTF(message, value);
+ bytesWriteUTF(getWriteBodyBuffer(), value);
}
@Override
public void writeBytes(byte[] value) throws JMSException {
- bytesWriteBytes(message, value);
+ bytesWriteBytes(getWriteBodyBuffer(), value);
}
@Override
public void writeBytes(byte[] value, int offset, int length) throws JMSException {
- bytesWriteBytes(message, value, offset, length);
+ bytesWriteBytes(getWriteBodyBuffer(), value, offset, length);
}
@Override
public void writeObject(Object value) throws JMSException {
- if (!bytesWriteObject(message, value)) {
+ if (!bytesWriteObject(getWriteBodyBuffer(), value)) {
throw new JMSException("Can't make conversion of " + value + " to any known type");
}
}
@@ -199,7 +199,8 @@ public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMess
@Override
public void reset() throws JMSException {
- bytesMessageReset(message);
+ bytesMessageReset(getReadBodyBuffer());
+ bytesMessageReset(getWriteBodyBuffer());
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMapMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMapMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMapMessage.java
index bbece71..e41a1a3 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMapMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMapMessage.java
@@ -247,12 +247,12 @@ public final class ServerJMSMapMessage extends ServerJMSMessage implements MapMe
public void encode() throws Exception {
super.encode();
- writeBodyMap(message, map);
+ writeBodyMap(getWriteBodyBuffer(), map);
}
public void decode() throws Exception {
super.decode();
- readBodyMap(message, map);
+ readBodyMap(getReadBodyBuffer(), map);
}
// Package protected ---------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java
index 315dd12..afacd21 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java
@@ -23,6 +23,7 @@ import javax.jms.Message;
import java.util.Collections;
import java.util.Enumeration;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
@@ -45,6 +46,24 @@ public class ServerJMSMessage implements Message {
this.deliveryCount = deliveryCount;
}
+ private ActiveMQBuffer readBodyBuffer;
+
+ /** When reading we use a protected copy so multi-threads can work fine */
+ protected ActiveMQBuffer getReadBodyBuffer() {
+ if (readBodyBuffer == null) {
+ // to avoid clashes between multiple threads
+ readBodyBuffer = message.getBodyBufferCopy();
+ }
+ return readBodyBuffer;
+ }
+
+ /** When writing on the conversion we use the buffer directly */
+ protected ActiveMQBuffer getWriteBodyBuffer() {
+ readBodyBuffer = null; // it invalidates this buffer if anything is written
+ return message.getBodyBuffer();
+ }
+
+
@Override
public final String getJMSMessageID() throws JMSException {
return null;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java
index 1afc8eb..9b70f57 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java
@@ -21,13 +21,11 @@ import javax.jms.MessageEOFException;
import javax.jms.MessageFormatException;
import javax.jms.StreamMessage;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.utils.DataConstants;
-import static org.apache.activemq.artemis.reader.MessageUtil.getBodyBuffer;
import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadBoolean;
import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadByte;
import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadBytes;
@@ -48,14 +46,14 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
public ServerJMSStreamMessage(MessageInternal message, int deliveryCount) {
super(message, deliveryCount);
-
}
// StreamMessage implementation ----------------------------------
public boolean readBoolean() throws JMSException {
+
try {
- return streamReadBoolean(message);
+ return streamReadBoolean(getReadBodyBuffer());
}
catch (IllegalStateException e) {
throw new MessageFormatException(e.getMessage());
@@ -67,7 +65,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
public byte readByte() throws JMSException {
try {
- return streamReadByte(message);
+ return streamReadByte(getReadBodyBuffer());
}
catch (IllegalStateException e) {
throw new MessageFormatException(e.getMessage());
@@ -80,7 +78,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
public short readShort() throws JMSException {
try {
- return streamReadShort(message);
+ return streamReadShort(getReadBodyBuffer());
}
catch (IllegalStateException e) {
throw new MessageFormatException(e.getMessage());
@@ -93,7 +91,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
public char readChar() throws JMSException {
try {
- return streamReadChar(message);
+ return streamReadChar(getReadBodyBuffer());
}
catch (IllegalStateException e) {
throw new MessageFormatException(e.getMessage());
@@ -106,7 +104,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
public int readInt() throws JMSException {
try {
- return streamReadInteger(message);
+ return streamReadInteger(getReadBodyBuffer());
}
catch (IllegalStateException e) {
throw new MessageFormatException(e.getMessage());
@@ -119,7 +117,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
public long readLong() throws JMSException {
try {
- return streamReadLong(message);
+ return streamReadLong(getReadBodyBuffer());
}
catch (IllegalStateException e) {
throw new MessageFormatException(e.getMessage());
@@ -132,7 +130,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
public float readFloat() throws JMSException {
try {
- return streamReadFloat(message);
+ return streamReadFloat(getReadBodyBuffer());
}
catch (IllegalStateException e) {
throw new MessageFormatException(e.getMessage());
@@ -145,7 +143,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
public double readDouble() throws JMSException {
try {
- return streamReadDouble(message);
+ return streamReadDouble(getReadBodyBuffer());
}
catch (IllegalStateException e) {
throw new MessageFormatException(e.getMessage());
@@ -158,7 +156,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
public String readString() throws JMSException {
try {
- return streamReadString(message);
+ return streamReadString(getReadBodyBuffer());
}
catch (IllegalStateException e) {
throw new MessageFormatException(e.getMessage());
@@ -176,7 +174,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
public int readBytes(final byte[] value) throws JMSException {
try {
- Pair<Integer, Integer> pairRead = streamReadBytes(message, len, value);
+ Pair<Integer, Integer> pairRead = streamReadBytes(getReadBodyBuffer(), len, value);
len = pairRead.getA();
return pairRead.getB();
@@ -191,11 +189,11 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
public Object readObject() throws JMSException {
- if (getBodyBuffer(message).readerIndex() >= message.getEndOfBodyPosition()) {
+ if (getReadBodyBuffer().readerIndex() >= message.getEndOfBodyPosition()) {
throw new MessageEOFException("");
}
try {
- return streamReadObject(message);
+ return streamReadObject(getReadBodyBuffer());
}
catch (IllegalStateException e) {
throw new MessageFormatException(e.getMessage());
@@ -207,70 +205,70 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
public void writeBoolean(final boolean value) throws JMSException {
- getBuffer().writeByte(DataConstants.BOOLEAN);
- getBuffer().writeBoolean(value);
+ getWriteBodyBuffer().writeByte(DataConstants.BOOLEAN);
+ getWriteBodyBuffer().writeBoolean(value);
}
public void writeByte(final byte value) throws JMSException {
- getBuffer().writeByte(DataConstants.BYTE);
- getBuffer().writeByte(value);
+ getWriteBodyBuffer().writeByte(DataConstants.BYTE);
+ getWriteBodyBuffer().writeByte(value);
}
public void writeShort(final short value) throws JMSException {
- getBuffer().writeByte(DataConstants.SHORT);
- getBuffer().writeShort(value);
+ getWriteBodyBuffer().writeByte(DataConstants.SHORT);
+ getWriteBodyBuffer().writeShort(value);
}
public void writeChar(final char value) throws JMSException {
- getBuffer().writeByte(DataConstants.CHAR);
- getBuffer().writeShort((short) value);
+ getWriteBodyBuffer().writeByte(DataConstants.CHAR);
+ getWriteBodyBuffer().writeShort((short) value);
}
public void writeInt(final int value) throws JMSException {
- getBuffer().writeByte(DataConstants.INT);
- getBuffer().writeInt(value);
+ getWriteBodyBuffer().writeByte(DataConstants.INT);
+ getWriteBodyBuffer().writeInt(value);
}
public void writeLong(final long value) throws JMSException {
- getBuffer().writeByte(DataConstants.LONG);
- getBuffer().writeLong(value);
+ getWriteBodyBuffer().writeByte(DataConstants.LONG);
+ getWriteBodyBuffer().writeLong(value);
}
public void writeFloat(final float value) throws JMSException {
- getBuffer().writeByte(DataConstants.FLOAT);
- getBuffer().writeInt(Float.floatToIntBits(value));
+ getWriteBodyBuffer().writeByte(DataConstants.FLOAT);
+ getWriteBodyBuffer().writeInt(Float.floatToIntBits(value));
}
public void writeDouble(final double value) throws JMSException {
- getBuffer().writeByte(DataConstants.DOUBLE);
- getBuffer().writeLong(Double.doubleToLongBits(value));
+ getWriteBodyBuffer().writeByte(DataConstants.DOUBLE);
+ getWriteBodyBuffer().writeLong(Double.doubleToLongBits(value));
}
public void writeString(final String value) throws JMSException {
- getBuffer().writeByte(DataConstants.STRING);
- getBuffer().writeNullableString(value);
+ getWriteBodyBuffer().writeByte(DataConstants.STRING);
+ getWriteBodyBuffer().writeNullableString(value);
}
public void writeBytes(final byte[] value) throws JMSException {
- getBuffer().writeByte(DataConstants.BYTES);
- getBuffer().writeInt(value.length);
- getBuffer().writeBytes(value);
+ getWriteBodyBuffer().writeByte(DataConstants.BYTES);
+ getWriteBodyBuffer().writeInt(value.length);
+ getWriteBodyBuffer().writeBytes(value);
}
public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException {
- getBuffer().writeByte(DataConstants.BYTES);
- getBuffer().writeInt(length);
- getBuffer().writeBytes(value, offset, length);
+ getWriteBodyBuffer().writeByte(DataConstants.BYTES);
+ getWriteBodyBuffer().writeInt(length);
+ getWriteBodyBuffer().writeBytes(value, offset, length);
}
public void writeObject(final Object value) throws JMSException {
@@ -313,7 +311,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
}
public void reset() throws JMSException {
- getBuffer().resetReaderIndex();
+ getWriteBodyBuffer().resetReaderIndex();
}
// ActiveMQRAMessage overrides ----------------------------------------
@@ -322,11 +320,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
public void clearBody() throws JMSException {
super.clearBody();
- getBuffer().clear();
- }
-
- private ActiveMQBuffer getBuffer() {
- return message.getBodyBuffer();
+ getWriteBodyBuffer().clear();
}
public void decode() throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSTextMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSTextMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSTextMessage.java
index 95e24b5..3191067 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSTextMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSTextMessage.java
@@ -63,7 +63,7 @@ public class ServerJMSTextMessage extends ServerJMSMessage implements TextMessag
this.text = null;
}
- writeBodyText(message, this.text);
+ writeBodyText(getWriteBodyBuffer(), this.text);
}
public String getText() {
@@ -84,12 +84,12 @@ public class ServerJMSTextMessage extends ServerJMSMessage implements TextMessag
public void encode() throws Exception {
super.encode();
- writeBodyText(message, text);
+ writeBodyText(getWriteBodyBuffer(), text);
}
public void decode() throws Exception {
super.decode();
- text = readBodyText(message);
+ text = readBodyText(getReadBodyBuffer());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java
index 0b5cb51..b563e61 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.protocol.proton;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
@@ -473,7 +474,7 @@ public class TestConversions extends Assert {
}
@Override
- public short readUnsignedByte() {
+ public int readUnsignedByte() {
return 0;
}
@@ -588,8 +589,8 @@ public class TestConversions extends Assert {
}
@Override
- public void skipBytes(int length) {
-
+ public int skipBytes(int length) {
+ return length;
}
@Override
@@ -683,6 +684,19 @@ public class TestConversions extends Assert {
}
@Override
+ public void readFully(byte[] b) throws IOException {
+ }
+
+ @Override
+ public void readFully(byte[] b, int off, int len) throws IOException {
+ }
+
+ @Override
+ public String readLine() throws IOException {
+ return null;
+ }
+
+ @Override
public ActiveMQBuffer copy() {
return null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/DataInputWrapper.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/DataInputWrapper.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/DataInputWrapper.java
deleted file mode 100644
index 32d37d4..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/DataInputWrapper.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.utils.UTF8Util;
-import org.apache.activemq.artemis.utils.UTF8Util.StringUtilBuffer;
-
-public class DataInputWrapper implements DataInput {
-
- private static final int DEFAULT_CAPACITY = 1024 * 1024;
- private static final NotEnoughBytesException exception = new NotEnoughBytesException();
- private ByteBuffer internalBuffer;
-
- public DataInputWrapper() {
- this(DEFAULT_CAPACITY);
- }
-
- public DataInputWrapper(int capacity) {
- this.internalBuffer = ByteBuffer.allocateDirect(capacity);
- this.internalBuffer.mark();
- this.internalBuffer.limit(0);
- }
-
- public void receiveData(byte[] data) {
- int newSize = data.length;
- int freeSpace = internalBuffer.capacity() - internalBuffer.limit();
- if (freeSpace < newSize) {
- internalBuffer.reset();
- internalBuffer.compact();
- if (internalBuffer.remaining() < newSize) {
- //need to enlarge
- }
- //make sure mark is at zero and position is at effective limit
- int pos = internalBuffer.position();
- internalBuffer.position(0);
- internalBuffer.mark();
- internalBuffer.position(pos);
- }
- else {
- internalBuffer.position(internalBuffer.limit());
- internalBuffer.limit(internalBuffer.capacity());
- }
- internalBuffer.put(data);
- internalBuffer.limit(internalBuffer.position());
- internalBuffer.reset();
- }
-
- public void receiveData(ActiveMQBuffer buffer) {
- int newSize = buffer.readableBytes();
- byte[] newData = new byte[newSize];
- buffer.readBytes(newData);
- this.receiveData(newData);
- }
-
- //invoke after each successful unmarshall
- public void mark() {
- this.internalBuffer.mark();
- }
-
- @Override
- public void readFully(byte[] b) throws IOException {
- readFully(b, 0, b.length);
- }
-
- private void checkSize(int n) throws NotEnoughBytesException {
- if (internalBuffer.remaining() < n) {
- throw exception;
- }
- }
-
- @Override
- public void readFully(byte[] b, int off, int len) throws IOException {
- checkSize(len);
- internalBuffer.get(b, off, len);
- }
-
- @Override
- public int skipBytes(int n) throws IOException {
- checkSize(n);
- int pos = internalBuffer.position();
- internalBuffer.position(pos + n);
- return n;
- }
-
- @Override
- public boolean readBoolean() throws IOException {
- checkSize(1);
- byte b = internalBuffer.get();
- return b != 0;
- }
-
- @Override
- public byte readByte() throws IOException {
- checkSize(1);
- return this.internalBuffer.get();
- }
-
- @Override
- public int readUnsignedByte() throws IOException {
- checkSize(1);
- return 0xFF & this.internalBuffer.get();
- }
-
- @Override
- public short readShort() throws IOException {
- checkSize(2);
- return this.internalBuffer.getShort();
- }
-
- @Override
- public int readUnsignedShort() throws IOException {
- checkSize(2);
- return 0xFFFF & this.internalBuffer.getShort();
- }
-
- @Override
- public char readChar() throws IOException {
- checkSize(2);
- return this.internalBuffer.getChar();
- }
-
- @Override
- public int readInt() throws IOException {
- checkSize(4);
- return this.internalBuffer.getInt();
- }
-
- @Override
- public long readLong() throws IOException {
- checkSize(8);
- return this.internalBuffer.getLong();
- }
-
- @Override
- public float readFloat() throws IOException {
- checkSize(4);
- return this.internalBuffer.getFloat();
- }
-
- @Override
- public double readDouble() throws IOException {
- checkSize(8);
- return this.internalBuffer.getDouble();
- }
-
- @Override
- public String readLine() throws IOException {
- StringBuilder sb = new StringBuilder("");
- char c = this.readChar();
- while (c != '\n') {
- sb.append(c);
- c = this.readChar();
- }
- return sb.toString();
- }
-
- @Override
- public String readUTF() throws IOException {
- StringUtilBuffer buffer = UTF8Util.getThreadLocalBuffer();
-
- final int size = this.readUnsignedShort();
-
- if (size > buffer.byteBuffer.length) {
- buffer.resizeByteBuffer(size);
- }
-
- if (size > buffer.charBuffer.length) {
- buffer.resizeCharBuffer(size);
- }
-
- int count = 0;
- int byte1, byte2, byte3;
- int charCount = 0;
-
- this.readFully(buffer.byteBuffer, 0, size);
-
- while (count < size) {
- byte1 = buffer.byteBuffer[count++];
-
- if (byte1 > 0 && byte1 <= 0x7F) {
- buffer.charBuffer[charCount++] = (char) byte1;
- }
- else {
- int c = byte1 & 0xff;
- switch (c >> 4) {
- case 0xc:
- case 0xd:
- byte2 = buffer.byteBuffer[count++];
- buffer.charBuffer[charCount++] = (char) ((c & 0x1F) << 6 | byte2 & 0x3F);
- break;
- case 0xe:
- byte2 = buffer.byteBuffer[count++];
- byte3 = buffer.byteBuffer[count++];
- buffer.charBuffer[charCount++] = (char) ((c & 0x0F) << 12 | (byte2 & 0x3F) << 6 | (byte3 & 0x3F) << 0);
- break;
- default:
- throw new InternalError("unhandled utf8 byte " + c);
- }
- }
- }
-
- return new String(buffer.charBuffer, 0, charCount);
- }
-
- public boolean readable() {
- return this.internalBuffer.hasRemaining();
- }
-
-}