You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 16:42:51 UTC
svn commit: r1187375 [34/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQType.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQType.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQType.java Fri Oct 21 14:42:12 2011
@@ -20,8 +20,9 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
-
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.math.BigDecimal;
/**
@@ -60,12 +61,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
EncodingUtils.writeLongStringBytes(buffer, (String) value);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
return EncodingUtils.readLongString(buffer);
}
@@ -106,12 +107,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
EncodingUtils.writeUnsignedInteger(buffer, (Long) value);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
return EncodingUtils.readUnsignedInteger(buffer);
}
@@ -137,7 +138,7 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
BigDecimal bd = (BigDecimal) value;
@@ -150,7 +151,7 @@ public enum AMQType
EncodingUtils.writeInteger(buffer, unscaled);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
byte places = EncodingUtils.readByte(buffer);
@@ -182,12 +183,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
EncodingUtils.writeLong(buffer, (Long) value);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
return EncodingUtils.readLong(buffer);
}
@@ -246,7 +247,7 @@ public enum AMQType
* @param value An instance of the type.
* @param buffer The byte buffer to write it to.
*/
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
// Ensure that the value is a FieldTable.
if (!(value instanceof FieldTable))
@@ -267,7 +268,7 @@ public enum AMQType
*
* @return An instance of the type.
*/
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
try
{
@@ -301,10 +302,10 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer)
{ }
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer)
{
return null;
}
@@ -330,12 +331,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
EncodingUtils.writeLongstr(buffer, (byte[]) value);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
return EncodingUtils.readLongstr(buffer);
}
@@ -360,12 +361,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
EncodingUtils.writeLongStringBytes(buffer, (String) value);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
return EncodingUtils.readLongString(buffer);
}
@@ -391,12 +392,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
EncodingUtils.writeLongStringBytes(buffer, (String) value);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
return EncodingUtils.readLongString(buffer);
}
@@ -426,12 +427,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
EncodingUtils.writeBoolean(buffer, (Boolean) value);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
return EncodingUtils.readBoolean(buffer);
}
@@ -461,12 +462,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
EncodingUtils.writeChar(buffer, (Character) value);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
return EncodingUtils.readChar(buffer);
}
@@ -496,12 +497,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
EncodingUtils.writeByte(buffer, (Byte) value);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
return EncodingUtils.readByte(buffer);
}
@@ -535,12 +536,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
EncodingUtils.writeShort(buffer, (Short) value);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
return EncodingUtils.readShort(buffer);
}
@@ -577,12 +578,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
EncodingUtils.writeInteger(buffer, (Integer) value);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
return EncodingUtils.readInteger(buffer);
}
@@ -624,12 +625,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
EncodingUtils.writeLong(buffer, (Long) value);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
return EncodingUtils.readLong(buffer);
}
@@ -659,12 +660,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
EncodingUtils.writeFloat(buffer, (Float) value);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
return EncodingUtils.readFloat(buffer);
}
@@ -698,12 +699,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
EncodingUtils.writeDouble(buffer, (Double) value);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
return EncodingUtils.readDouble(buffer);
}
@@ -770,9 +771,9 @@ public enum AMQType
* @param value An instance of the type.
* @param buffer The byte buffer to write it to.
*/
- public void writeToBuffer(Object value, ByteBuffer buffer)
+ public void writeToBuffer(Object value, DataOutputStream buffer) throws IOException
{
- buffer.put(identifier());
+ buffer.writeByte(identifier());
writeValueImpl(value, buffer);
}
@@ -782,7 +783,7 @@ public enum AMQType
* @param value An instance of the type.
* @param buffer The byte buffer to write it to.
*/
- abstract void writeValueImpl(Object value, ByteBuffer buffer);
+ abstract void writeValueImpl(Object value, DataOutputStream buffer) throws IOException;
/**
* Reads an instance of the type from a specified byte buffer.
@@ -791,5 +792,5 @@ public enum AMQType
*
* @return An instance of the type.
*/
- abstract Object readValueFromBuffer(ByteBuffer buffer);
+ abstract Object readValueFromBuffer(DataInputStream buffer) throws IOException;
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java Fri Oct 21 14:42:12 2011
@@ -20,8 +20,9 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
-
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.util.Date;
import java.util.Map;
import java.math.BigDecimal;
@@ -60,7 +61,7 @@ public class AMQTypedValue
_value = type.toNativeValue(value);
}
- private AMQTypedValue(AMQType type, ByteBuffer buffer)
+ private AMQTypedValue(AMQType type, DataInputStream buffer) throws IOException
{
_type = type;
_value = type.readValueFromBuffer(buffer);
@@ -76,7 +77,7 @@ public class AMQTypedValue
return _value;
}
- public void writeToBuffer(ByteBuffer buffer)
+ public void writeToBuffer(DataOutputStream buffer) throws IOException
{
_type.writeToBuffer(_value, buffer);
}
@@ -86,9 +87,9 @@ public class AMQTypedValue
return _type.getEncodingSize(_value);
}
- public static AMQTypedValue readFromBuffer(ByteBuffer buffer)
+ public static AMQTypedValue readFromBuffer(DataInputStream buffer) throws IOException
{
- AMQType type = AMQTypeMap.getType(buffer.get());
+ AMQType type = AMQTypeMap.getType(buffer.readByte());
return new AMQTypedValue(type, buffer);
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java Fri Oct 21 14:42:12 2011
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,27 +37,6 @@ public class BasicContentHeaderPropertie
private static final AMQShortString ZERO_STRING = null;
- /**
- * We store the encoded form when we decode the content header so that if we need to write it out without modifying
- * it we can do so without incurring the expense of reencoding it
- */
- private byte[] _encodedForm;
-
- /** Flag indicating whether the entire content header has been decoded yet */
- private boolean _decoded = true;
-
- /**
- * We have some optimisations for partial decoding for maximum performance. The headers are used in the broker for
- * routing in some cases so we can decode that separately.
- */
- private boolean _decodedHeaders = true;
-
- /**
- * We have some optimisations for partial decoding for maximum performance. The content type is used by all clients
- * to determine the message type
- */
- private boolean _decodedContentType = true;
-
private AMQShortString _contentType;
private AMQShortString _encoding;
@@ -86,10 +67,10 @@ public class BasicContentHeaderPropertie
private int _propertyFlags = 0;
private static final int CONTENT_TYPE_MASK = 1 << 15;
- private static final int ENCONDING_MASK = 1 << 14;
+ private static final int ENCODING_MASK = 1 << 14;
private static final int HEADERS_MASK = 1 << 13;
private static final int DELIVERY_MODE_MASK = 1 << 12;
- private static final int PROPRITY_MASK = 1 << 11;
+ private static final int PRIORITY_MASK = 1 << 11;
private static final int CORRELATION_ID_MASK = 1 << 10;
private static final int REPLY_TO_MASK = 1 << 9;
private static final int EXPIRATION_MASK = 1 << 8;
@@ -101,34 +82,11 @@ public class BasicContentHeaderPropertie
private static final int CLUSTER_ID_MASK = 1 << 2;
- /**
- * This is 0_10 specific. We use this property to check if some message properties have been changed.
- */
- private boolean _hasBeenUpdated = false;
-
- public boolean reset()
- {
- boolean result = _hasBeenUpdated;
- _hasBeenUpdated = false;
- return result;
- }
-
- public void updated()
- {
- _hasBeenUpdated = true;
- }
-
public BasicContentHeaderProperties()
{ }
public int getPropertyListSize()
{
- if (_encodedForm != null)
- {
- return _encodedForm.length;
- }
- else
- {
int size = 0;
if ((_propertyFlags & (CONTENT_TYPE_MASK)) > 0)
@@ -136,7 +94,7 @@ public class BasicContentHeaderPropertie
size += EncodingUtils.encodedShortStringLength(_contentType);
}
- if ((_propertyFlags & ENCONDING_MASK) > 0)
+ if ((_propertyFlags & ENCODING_MASK) > 0)
{
size += EncodingUtils.encodedShortStringLength(_encoding);
}
@@ -151,7 +109,7 @@ public class BasicContentHeaderPropertie
size += 1;
}
- if ((_propertyFlags & PROPRITY_MASK) > 0)
+ if ((_propertyFlags & PRIORITY_MASK) > 0)
{
size += 1;
}
@@ -209,23 +167,10 @@ public class BasicContentHeaderPropertie
}
return size;
- }
- }
-
- private void clearEncodedForm()
- {
- if (!_decoded && (_encodedForm != null))
- {
- // decode();
- }
-
- _encodedForm = null;
}
public void setPropertyFlags(int propertyFlags)
{
- _hasBeenUpdated = true;
- clearEncodedForm();
_propertyFlags = propertyFlags;
}
@@ -234,94 +179,87 @@ public class BasicContentHeaderPropertie
return _propertyFlags;
}
- public void writePropertyListPayload(ByteBuffer buffer)
+ public void writePropertyListPayload(DataOutputStream buffer) throws IOException
{
- if (_encodedForm != null)
+ if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
{
- buffer.put(_encodedForm);
+ EncodingUtils.writeShortStringBytes(buffer, _contentType);
}
- else
+
+ if ((_propertyFlags & ENCODING_MASK) != 0)
{
- if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _contentType);
- }
+ EncodingUtils.writeShortStringBytes(buffer, _encoding);
+ }
- if ((_propertyFlags & ENCONDING_MASK) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _encoding);
- }
+ if ((_propertyFlags & HEADERS_MASK) != 0)
+ {
+ EncodingUtils.writeFieldTableBytes(buffer, _headers);
+ }
- if ((_propertyFlags & HEADERS_MASK) != 0)
- {
- EncodingUtils.writeFieldTableBytes(buffer, _headers);
- }
+ if ((_propertyFlags & DELIVERY_MODE_MASK) != 0)
+ {
+ buffer.writeByte(_deliveryMode);
+ }
- if ((_propertyFlags & DELIVERY_MODE_MASK) != 0)
- {
- buffer.put(_deliveryMode);
- }
+ if ((_propertyFlags & PRIORITY_MASK) != 0)
+ {
+ buffer.writeByte(_priority);
+ }
- if ((_propertyFlags & PROPRITY_MASK) != 0)
- {
- buffer.put(_priority);
- }
+ if ((_propertyFlags & CORRELATION_ID_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _correlationId);
+ }
- if ((_propertyFlags & CORRELATION_ID_MASK) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _correlationId);
- }
+ if ((_propertyFlags & REPLY_TO_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _replyTo);
+ }
- if ((_propertyFlags & REPLY_TO_MASK) != 0)
+ if ((_propertyFlags & EXPIRATION_MASK) != 0)
+ {
+ if (_expiration == 0L)
{
- EncodingUtils.writeShortStringBytes(buffer, _replyTo);
+ EncodingUtils.writeShortStringBytes(buffer, ZERO_STRING);
}
-
- if ((_propertyFlags & EXPIRATION_MASK) != 0)
+ else
{
- if (_expiration == 0L)
- {
- EncodingUtils.writeShortStringBytes(buffer, ZERO_STRING);
- }
- else
- {
- EncodingUtils.writeShortStringBytes(buffer, String.valueOf(_expiration));
- }
+ EncodingUtils.writeShortStringBytes(buffer, String.valueOf(_expiration));
}
+ }
- if ((_propertyFlags & MESSAGE_ID_MASK) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _messageId);
- }
+ if ((_propertyFlags & MESSAGE_ID_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _messageId);
+ }
- if ((_propertyFlags & TIMESTAMP_MASK) != 0)
- {
- EncodingUtils.writeTimestamp(buffer, _timestamp);
- }
+ if ((_propertyFlags & TIMESTAMP_MASK) != 0)
+ {
+ EncodingUtils.writeTimestamp(buffer, _timestamp);
+ }
- if ((_propertyFlags & TYPE_MASK) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _type);
- }
+ if ((_propertyFlags & TYPE_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _type);
+ }
- if ((_propertyFlags & USER_ID_MASK) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _userId);
- }
+ if ((_propertyFlags & USER_ID_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _userId);
+ }
- if ((_propertyFlags & APPLICATION_ID_MASK) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _appId);
- }
+ if ((_propertyFlags & APPLICATION_ID_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _appId);
+ }
- if ((_propertyFlags & CLUSTER_ID_MASK) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _clusterId);
- }
+ if ((_propertyFlags & CLUSTER_ID_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _clusterId);
}
}
- public void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size) throws AMQFrameDecodingException
+ public void populatePropertiesFromBuffer(DataInputStream buffer, int propertyFlags, int size) throws AMQFrameDecodingException, IOException
{
_propertyFlags = propertyFlags;
@@ -331,25 +269,18 @@ public class BasicContentHeaderPropertie
}
decode(buffer);
- /*_encodedForm = new byte[size];
- buffer.get(_encodedForm, 0, size);
- _decoded = false;
- _decodedHeaders = false;
- _decodedContentType = false;*/
}
- private void decode(ByteBuffer buffer)
+ private void decode(DataInputStream buffer) throws IOException, AMQFrameDecodingException
{
// ByteBuffer buffer = ByteBuffer.wrap(_encodedForm);
- int pos = buffer.position();
- try
- {
+
if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
{
_contentType = EncodingUtils.readAMQShortString(buffer);
}
- if ((_propertyFlags & ENCONDING_MASK) != 0)
+ if ((_propertyFlags & ENCODING_MASK) != 0)
{
_encoding = EncodingUtils.readAMQShortString(buffer);
}
@@ -361,12 +292,12 @@ public class BasicContentHeaderPropertie
if ((_propertyFlags & DELIVERY_MODE_MASK) != 0)
{
- _deliveryMode = buffer.get();
+ _deliveryMode = buffer.readByte();
}
- if ((_propertyFlags & PROPRITY_MASK) != 0)
+ if ((_propertyFlags & PRIORITY_MASK) != 0)
{
- _priority = buffer.get();
+ _priority = buffer.readByte();
}
if ((_propertyFlags & CORRELATION_ID_MASK) != 0)
@@ -413,116 +344,29 @@ public class BasicContentHeaderPropertie
{
_clusterId = EncodingUtils.readAMQShortString(buffer);
}
- }
- catch (AMQFrameDecodingException e)
- {
- throw new RuntimeException("Error in content header data: " + e, e);
- }
-
- final int endPos = buffer.position();
- buffer.position(pos);
- final int len = endPos - pos;
- _encodedForm = new byte[len];
- final int limit = buffer.limit();
- buffer.limit(endPos);
- buffer.get(_encodedForm, 0, len);
- buffer.limit(limit);
- buffer.position(endPos);
- _decoded = true;
- }
-
- private void decodeUpToHeaders()
- {
- ByteBuffer buffer = ByteBuffer.wrap(_encodedForm);
- try
- {
- if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
- {
- byte length = buffer.get();
- buffer.skip(length);
- }
- if ((_propertyFlags & ENCONDING_MASK) != 0)
- {
- byte length = buffer.get();
- buffer.skip(length);
- }
- if ((_propertyFlags & HEADERS_MASK) != 0)
- {
- _headers = EncodingUtils.readFieldTable(buffer);
-
- }
-
- _decodedHeaders = true;
- }
- catch (AMQFrameDecodingException e)
- {
- throw new RuntimeException("Error in content header data: " + e, e);
- }
}
- private void decodeUpToContentType()
- {
- ByteBuffer buffer = ByteBuffer.wrap(_encodedForm);
-
- if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
- {
- _contentType = EncodingUtils.readAMQShortString(buffer);
- }
-
- _decodedContentType = true;
- }
-
- private void decodeIfNecessary()
- {
- if (!_decoded)
- {
- // decode();
- }
- }
-
- private void decodeHeadersIfNecessary()
- {
- if (!_decoded && !_decodedHeaders)
- {
- decodeUpToHeaders();
- }
- }
-
- private void decodeContentTypeIfNecessary()
- {
- if (!_decoded && !_decodedContentType)
- {
- decodeUpToContentType();
- }
- }
public AMQShortString getContentType()
{
- decodeContentTypeIfNecessary();
-
return _contentType;
}
public String getContentTypeAsString()
{
- decodeContentTypeIfNecessary();
-
return (_contentType == null) ? null : _contentType.toString();
}
public void setContentType(AMQShortString contentType)
{
- _hasBeenUpdated = true;
- clearEncodedForm();
_propertyFlags |= (CONTENT_TYPE_MASK);
_contentType = contentType;
}
public void setContentType(String contentType)
{
- _hasBeenUpdated = true;
setContentType((contentType == null) ? null : new AMQShortString(contentType));
}
@@ -534,31 +378,23 @@ public class BasicContentHeaderPropertie
public AMQShortString getEncoding()
{
- decodeIfNecessary();
-
return _encoding;
}
public void setEncoding(String encoding)
{
- _hasBeenUpdated = true;
- clearEncodedForm();
- _propertyFlags |= ENCONDING_MASK;
+ _propertyFlags |= ENCODING_MASK;
_encoding = (encoding == null) ? null : new AMQShortString(encoding);
}
public void setEncoding(AMQShortString encoding)
{
- _hasBeenUpdated = true;
- clearEncodedForm();
- _propertyFlags |= ENCONDING_MASK;
+ _propertyFlags |= ENCODING_MASK;
_encoding = encoding;
}
public FieldTable getHeaders()
{
- decodeHeadersIfNecessary();
-
if (_headers == null)
{
setHeaders(FieldTableFactory.newFieldTable());
@@ -569,191 +405,146 @@ public class BasicContentHeaderPropertie
public void setHeaders(FieldTable headers)
{
- _hasBeenUpdated = true;
- clearEncodedForm();
_propertyFlags |= HEADERS_MASK;
_headers = headers;
}
public byte getDeliveryMode()
{
- decodeIfNecessary();
-
return _deliveryMode;
}
public void setDeliveryMode(byte deliveryMode)
{
- clearEncodedForm();
_propertyFlags |= DELIVERY_MODE_MASK;
_deliveryMode = deliveryMode;
}
public byte getPriority()
{
- decodeIfNecessary();
-
return _priority;
}
public void setPriority(byte priority)
{
- clearEncodedForm();
- _propertyFlags |= PROPRITY_MASK;
+ _propertyFlags |= PRIORITY_MASK;
_priority = priority;
}
public AMQShortString getCorrelationId()
{
- decodeIfNecessary();
-
return _correlationId;
}
public String getCorrelationIdAsString()
{
- decodeIfNecessary();
-
return (_correlationId == null) ? null : _correlationId.toString();
}
public void setCorrelationId(String correlationId)
{
- _hasBeenUpdated = true;
setCorrelationId((correlationId == null) ? null : new AMQShortString(correlationId));
}
public void setCorrelationId(AMQShortString correlationId)
{
- _hasBeenUpdated = true;
- clearEncodedForm();
_propertyFlags |= CORRELATION_ID_MASK;
_correlationId = correlationId;
}
public String getReplyToAsString()
{
- decodeIfNecessary();
-
return (_replyTo == null) ? null : _replyTo.toString();
}
public AMQShortString getReplyTo()
{
- decodeIfNecessary();
-
return _replyTo;
}
public void setReplyTo(String replyTo)
{
- _hasBeenUpdated = true;
setReplyTo((replyTo == null) ? null : new AMQShortString(replyTo));
}
public void setReplyTo(AMQShortString replyTo)
{
- _hasBeenUpdated = true;
- clearEncodedForm();
_propertyFlags |= REPLY_TO_MASK;
_replyTo = replyTo;
}
public long getExpiration()
{
- decodeIfNecessary();
return _expiration;
}
public void setExpiration(long expiration)
{
- clearEncodedForm();
_propertyFlags |= EXPIRATION_MASK;
_expiration = expiration;
}
public AMQShortString getMessageId()
{
- decodeIfNecessary();
-
return _messageId;
}
public String getMessageIdAsString()
{
- decodeIfNecessary();
-
return (_messageId == null) ? null : _messageId.toString();
}
public void setMessageId(String messageId)
{
- _hasBeenUpdated = true;
- clearEncodedForm();
_propertyFlags |= MESSAGE_ID_MASK;
_messageId = (messageId == null) ? null : new AMQShortString(messageId);
}
public void setMessageId(AMQShortString messageId)
{
- _hasBeenUpdated = true;
- clearEncodedForm();
_propertyFlags |= MESSAGE_ID_MASK;
_messageId = messageId;
}
public long getTimestamp()
{
- decodeIfNecessary();
return _timestamp;
}
public void setTimestamp(long timestamp)
{
- clearEncodedForm();
_propertyFlags |= TIMESTAMP_MASK;
_timestamp = timestamp;
}
public String getTypeAsString()
{
- decodeIfNecessary();
-
return (_type == null) ? null : _type.toString();
}
public AMQShortString getType()
{
- decodeIfNecessary();
-
return _type;
}
public void setType(String type)
{
- _hasBeenUpdated = true;
setType((type == null) ? null : new AMQShortString(type));
}
public void setType(AMQShortString type)
{
- _hasBeenUpdated = true;
- clearEncodedForm();
_propertyFlags |= TYPE_MASK;
_type = type;
}
public String getUserIdAsString()
{
- decodeIfNecessary();
-
return (_userId == null) ? null : _userId.toString();
}
public AMQShortString getUserId()
{
- decodeIfNecessary();
-
return _userId;
}
@@ -764,65 +555,48 @@ public class BasicContentHeaderPropertie
public void setUserId(AMQShortString userId)
{
- _hasBeenUpdated = true;
- clearEncodedForm();
_propertyFlags |= USER_ID_MASK;
_userId = userId;
}
public String getAppIdAsString()
{
- decodeIfNecessary();
-
return (_appId == null) ? null : _appId.toString();
}
public AMQShortString getAppId()
{
- decodeIfNecessary();
-
return _appId;
}
public void setAppId(String appId)
{
- _hasBeenUpdated = true;
setAppId((appId == null) ? null : new AMQShortString(appId));
}
public void setAppId(AMQShortString appId)
{
- _hasBeenUpdated = true;
- clearEncodedForm();
_propertyFlags |= APPLICATION_ID_MASK;
_appId = appId;
- _hasBeenUpdated = true;
}
public String getClusterIdAsString()
{
- _hasBeenUpdated = true;
- decodeIfNecessary();
return (_clusterId == null) ? null : _clusterId.toString();
}
public AMQShortString getClusterId()
{
- _hasBeenUpdated = true;
- decodeIfNecessary();
return _clusterId;
}
public void setClusterId(String clusterId)
{
- _hasBeenUpdated = true;
setClusterId((clusterId == null) ? null : new AMQShortString(clusterId));
}
public void setClusterId(AMQShortString clusterId)
{
- _hasBeenUpdated = true;
- clearEncodedForm();
_propertyFlags |= CLUSTER_ID_MASK;
_clusterId = clusterId;
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java Fri Oct 21 14:42:12 2011
@@ -20,12 +20,13 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataInputStream;
+import java.io.IOException;
/**
* Any class that is capable of turning a stream of bytes into an AMQ structure must implement this interface.
*/
public interface BodyFactory
{
- AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException;
+ AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException;
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java Fri Oct 21 14:42:12 2011
@@ -20,7 +20,8 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataOutputStream;
+import java.io.IOException;
public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
{
@@ -49,7 +50,7 @@ public class CompositeAMQDataBlock exten
return frameSize;
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
for (int i = 0; i < _blocks.length; i++)
{
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java Fri Oct 21 14:42:12 2011
@@ -20,7 +20,10 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.AMQException;
@@ -28,27 +31,22 @@ public class ContentBody implements AMQB
{
public static final byte TYPE = 3;
- public ByteBuffer payload;
+ public byte[] _payload;
public ContentBody()
{
}
- public ContentBody(ByteBuffer buffer, long size) throws AMQFrameDecodingException
+ public ContentBody(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException
{
- if (size > 0)
- {
- payload = buffer.slice();
- payload.limit((int) size);
- buffer.skip((int) size);
- }
-
+ _payload = new byte[(int)size];
+ buffer.read(_payload);
}
- public ContentBody(ByteBuffer payload)
+ public ContentBody(byte[] payload)
{
- this.payload = payload;
+ _payload = payload;
}
public byte getFrameType()
@@ -58,23 +56,12 @@ public class ContentBody implements AMQB
public int getSize()
{
- return (payload == null ? 0 : payload.limit());
+ return _payload == null ? 0 : _payload.length;
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
- if (payload != null)
- {
- if(payload.isDirect() || payload.isReadOnly())
- {
- ByteBuffer copy = payload.duplicate();
- buffer.put(copy.rewind());
- }
- else
- {
- buffer.put(payload.array(),payload.arrayOffset(),payload.limit());
- }
- }
+ buffer.write(_payload);
}
public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
@@ -83,32 +70,18 @@ public class ContentBody implements AMQB
session.contentBodyReceived(channelId, this);
}
- protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
+ protected void populateFromBuffer(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException
{
if (size > 0)
{
- payload = buffer.slice();
- payload.limit((int) size);
- buffer.skip((int) size);
+ _payload = new byte[(int)size];
+ buffer.read(_payload);
}
}
public void reduceBufferToFit()
{
- if (payload != null && (payload.remaining() < payload.capacity() / 2))
- {
- int size = payload.limit();
- ByteBuffer newPayload = ByteBuffer.allocate(size);
-
- newPayload.put(payload);
- newPayload.flip();
-
- //reduce reference count on payload
- payload.release();
-
- payload = newPayload;
- }
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java Fri Oct 21 14:42:12 2011
@@ -20,7 +20,8 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataInputStream;
+import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,7 +42,7 @@ public class ContentBodyFactory implemen
_log.debug("Creating content body factory");
}
- public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
+ public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException
{
return new ContentBody(in, bodySize);
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Fri Oct 21 14:42:12 2011
@@ -20,7 +20,10 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.AMQException;
@@ -36,18 +39,18 @@ public class ContentHeaderBody implement
public long bodySize;
/** must never be null */
- public ContentHeaderProperties properties;
+ private ContentHeaderProperties properties;
public ContentHeaderBody()
{
}
- public ContentHeaderBody(ByteBuffer buffer, long size) throws AMQFrameDecodingException
+ public ContentHeaderBody(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException
{
- classId = buffer.getUnsignedShort();
- weight = buffer.getUnsignedShort();
- bodySize = buffer.getLong();
- int propertyFlags = buffer.getUnsignedShort();
+ classId = buffer.readUnsignedShort();
+ weight = buffer.readUnsignedShort();
+ bodySize = buffer.readLong();
+ int propertyFlags = buffer.readUnsignedShort();
ContentHeaderPropertiesFactory factory = ContentHeaderPropertiesFactory.getInstance();
properties = factory.createContentHeaderProperties(classId, propertyFlags, buffer, (int)size - 14);
@@ -72,13 +75,13 @@ public class ContentHeaderBody implement
return TYPE;
}
- protected void populateFromBuffer(ByteBuffer buffer, long size)
- throws AMQFrameDecodingException, AMQProtocolVersionException
+ protected void populateFromBuffer(DataInputStream buffer, long size)
+ throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
{
- classId = buffer.getUnsignedShort();
- weight = buffer.getUnsignedShort();
- bodySize = buffer.getLong();
- int propertyFlags = buffer.getUnsignedShort();
+ classId = buffer.readUnsignedShort();
+ weight = buffer.readUnsignedShort();
+ bodySize = buffer.readLong();
+ int propertyFlags = buffer.readUnsignedShort();
ContentHeaderPropertiesFactory factory = ContentHeaderPropertiesFactory.getInstance();
properties = factory.createContentHeaderProperties(classId, propertyFlags, buffer, (int)size - 14);
}
@@ -90,8 +93,8 @@ public class ContentHeaderBody implement
* @return
* @throws AMQFrameDecodingException
*/
- public static ContentHeaderBody createFromBuffer(ByteBuffer buffer, long size)
- throws AMQFrameDecodingException, AMQProtocolVersionException
+ public static ContentHeaderBody createFromBuffer(DataInputStream buffer, long size)
+ throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
{
ContentHeaderBody body = new ContentHeaderBody(buffer, size);
@@ -103,11 +106,11 @@ public class ContentHeaderBody implement
return 2 + 2 + 8 + 2 + properties.getPropertyListSize();
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
EncodingUtils.writeUnsignedShort(buffer, classId);
EncodingUtils.writeUnsignedShort(buffer, weight);
- buffer.putLong(bodySize);
+ buffer.writeLong(bodySize);
EncodingUtils.writeUnsignedShort(buffer, properties.getPropertyFlags());
properties.writePropertyListPayload(buffer);
}
@@ -128,4 +131,25 @@ public class ContentHeaderBody implement
{
return new AMQFrame(channelId, body);
}
+
+ public ContentHeaderProperties getProperties()
+ {
+ return properties;
+ }
+
+ public void setProperties(ContentHeaderProperties props)
+ {
+ properties = props;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ContentHeaderBody{" +
+ "classId=" + classId +
+ ", weight=" + weight +
+ ", bodySize=" + bodySize +
+ ", properties=" + properties +
+ '}';
+ }
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java Fri Oct 21 14:42:12 2011
@@ -20,7 +20,8 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataInputStream;
+import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,7 +42,7 @@ public class ContentHeaderBodyFactory im
_log.debug("Creating content header body factory");
}
- public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
+ public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException
{
// all content headers are the same - it is only the properties that differ.
// the content header body further delegates construction of properties
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java Fri Oct 21 14:42:12 2011
@@ -20,7 +20,10 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
/**
* There will be an implementation of this interface for each content type. All content types have associated
@@ -32,7 +35,7 @@ public interface ContentHeaderProperties
* Writes the property list to the buffer, in a suitably encoded form.
* @param buffer The buffer to write to
*/
- void writePropertyListPayload(ByteBuffer buffer);
+ void writePropertyListPayload(DataOutputStream buffer) throws IOException;
/**
* Populates the properties from buffer.
@@ -40,8 +43,8 @@ public interface ContentHeaderProperties
* @param propertyFlags he property flags.
* @throws AMQFrameDecodingException when the buffer does not contain valid data
*/
- void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size)
- throws AMQFrameDecodingException;
+ void populatePropertiesFromBuffer(DataInputStream buffer, int propertyFlags, int size)
+ throws AMQFrameDecodingException, IOException;
/**
* @return the size of the encoded property list in bytes.
@@ -56,5 +59,4 @@ public interface ContentHeaderProperties
*/
int getPropertyFlags();
- void updated();
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java Fri Oct 21 14:42:12 2011
@@ -20,7 +20,8 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataInputStream;
+import java.io.IOException;
import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
@@ -38,8 +39,8 @@ public class ContentHeaderPropertiesFact
}
public ContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags,
- ByteBuffer buffer, int size)
- throws AMQFrameDecodingException
+ DataInputStream buffer, int size)
+ throws AMQFrameDecodingException, IOException
{
ContentHeaderProperties properties;
// AMQP version change: "Hardwired" version to major=8, minor=0
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java Fri Oct 21 14:42:12 2011
@@ -20,11 +20,12 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
import java.nio.charset.Charset;
public class EncodingUtils
@@ -218,7 +219,7 @@ public class EncodingUtils
return 0;
}
- public static void writeShortStringBytes(ByteBuffer buffer, String s)
+ public static void writeShortStringBytes(DataOutputStream buffer, String s) throws IOException
{
if (s != null)
{
@@ -231,18 +232,18 @@ public class EncodingUtils
// TODO: check length fits in an unsigned byte
writeUnsignedByte(buffer, (short)encodedString.length);
- buffer.put(encodedString);
+ buffer.write(encodedString);
}
else
{
// really writing out unsigned byte
- buffer.put((byte) 0);
+ buffer.write((byte) 0);
}
}
- public static void writeShortStringBytes(ByteBuffer buffer, AMQShortString s)
+ public static void writeShortStringBytes(DataOutputStream buffer, AMQShortString s) throws IOException
{
if (s != null)
{
@@ -252,11 +253,11 @@ public class EncodingUtils
else
{
// really writing out unsigned byte
- buffer.put((byte) 0);
+ buffer.write((byte) 0);
}
}
- public static void writeLongStringBytes(ByteBuffer buffer, String s)
+ public static void writeLongStringBytes(DataOutputStream buffer, String s) throws IOException
{
assert (s == null) || (s.length() <= 0xFFFE);
if (s != null)
@@ -270,7 +271,7 @@ public class EncodingUtils
encodedString[i] = (byte) cha[i];
}
- buffer.put(encodedString);
+ buffer.write(encodedString);
}
else
{
@@ -278,7 +279,7 @@ public class EncodingUtils
}
}
- public static void writeLongStringBytes(ByteBuffer buffer, char[] s)
+ public static void writeLongStringBytes(DataOutputStream buffer, char[] s) throws IOException
{
assert (s == null) || (s.length <= 0xFFFE);
if (s != null)
@@ -291,7 +292,7 @@ public class EncodingUtils
encodedString[i] = (byte) s[i];
}
- buffer.put(encodedString);
+ buffer.write(encodedString);
}
else
{
@@ -299,13 +300,13 @@ public class EncodingUtils
}
}
- public static void writeLongStringBytes(ByteBuffer buffer, byte[] bytes)
+ public static void writeLongStringBytes(DataOutputStream buffer, byte[] bytes) throws IOException
{
assert (bytes == null) || (bytes.length <= 0xFFFE);
if (bytes != null)
{
writeUnsignedInteger(buffer, bytes.length);
- buffer.put(bytes);
+ buffer.write(bytes);
}
else
{
@@ -313,24 +314,24 @@ public class EncodingUtils
}
}
- public static void writeUnsignedByte(ByteBuffer buffer, short b)
+ public static void writeUnsignedByte(DataOutputStream buffer, short b) throws IOException
{
byte bv = (byte) b;
- buffer.put(bv);
+ buffer.write(bv);
}
- public static void writeUnsignedShort(ByteBuffer buffer, int s)
+ public static void writeUnsignedShort(DataOutputStream buffer, int s) throws IOException
{
// TODO: Is this comparison safe? Do I need to cast RHS to long?
if (s < Short.MAX_VALUE)
{
- buffer.putShort((short) s);
+ buffer.writeShort(s);
}
else
{
short sv = (short) s;
- buffer.put((byte) (0xFF & (sv >> 8)));
- buffer.put((byte) (0xFF & sv));
+ buffer.write((byte) (0xFF & (sv >> 8)));
+ buffer.write((byte) (0xFF & sv));
}
}
@@ -339,12 +340,12 @@ public class EncodingUtils
return 4;
}
- public static void writeUnsignedInteger(ByteBuffer buffer, long l)
+ public static void writeUnsignedInteger(DataOutputStream buffer, long l) throws IOException
{
// TODO: Is this comparison safe? Do I need to cast RHS to long?
if (l < Integer.MAX_VALUE)
{
- buffer.putInt((int) l);
+ buffer.writeInt((int) l);
}
else
{
@@ -352,14 +353,14 @@ public class EncodingUtils
// FIXME: This *may* go faster if we build this into a local 4-byte array and then
// put the array in a single call.
- buffer.put((byte) (0xFF & (iv >> 24)));
- buffer.put((byte) (0xFF & (iv >> 16)));
- buffer.put((byte) (0xFF & (iv >> 8)));
- buffer.put((byte) (0xFF & iv));
+ buffer.write((byte) (0xFF & (iv >> 24)));
+ buffer.write((byte) (0xFF & (iv >> 16)));
+ buffer.write((byte) (0xFF & (iv >> 8)));
+ buffer.write((byte) (0xFF & iv));
}
}
- public static void writeFieldTableBytes(ByteBuffer buffer, FieldTable table)
+ public static void writeFieldTableBytes(DataOutputStream buffer, FieldTable table) throws IOException
{
if (table != null)
{
@@ -371,12 +372,12 @@ public class EncodingUtils
}
}
- public static void writeContentBytes(ByteBuffer buffer, Content content)
+ public static void writeContentBytes(DataOutputStream buffer, Content content)
{
// TODO: New Content class required for AMQP 0-9.
}
- public static void writeBooleans(ByteBuffer buffer, boolean[] values)
+ public static void writeBooleans(DataOutputStream buffer, boolean[] values) throws IOException
{
byte packedValue = 0;
for (int i = 0; i < values.length; i++)
@@ -387,16 +388,16 @@ public class EncodingUtils
}
}
- buffer.put(packedValue);
+ buffer.write(packedValue);
}
- public static void writeBooleans(ByteBuffer buffer, boolean value)
+ public static void writeBooleans(DataOutputStream buffer, boolean value) throws IOException
{
- buffer.put(value ? (byte) 1 : (byte) 0);
+ buffer.write(value ? (byte) 1 : (byte) 0);
}
- public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1)
+ public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1) throws IOException
{
byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -405,10 +406,10 @@ public class EncodingUtils
packedValue = (byte) (packedValue | (byte) (1 << 1));
}
- buffer.put(packedValue);
+ buffer.write(packedValue);
}
- public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2)
+ public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2) throws IOException
{
byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -422,10 +423,10 @@ public class EncodingUtils
packedValue = (byte) (packedValue | (byte) (1 << 2));
}
- buffer.put(packedValue);
+ buffer.write(packedValue);
}
- public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3)
+ public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3) throws IOException
{
byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -444,11 +445,11 @@ public class EncodingUtils
packedValue = (byte) (packedValue | (byte) (1 << 3));
}
- buffer.put(packedValue);
+ buffer.write(packedValue);
}
- public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3,
- boolean value4)
+ public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3,
+ boolean value4) throws IOException
{
byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -472,11 +473,11 @@ public class EncodingUtils
packedValue = (byte) (packedValue | (byte) (1 << 4));
}
- buffer.put(packedValue);
+ buffer.write(packedValue);
}
- public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3,
- boolean value4, boolean value5)
+ public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3,
+ boolean value4, boolean value5) throws IOException
{
byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -505,11 +506,11 @@ public class EncodingUtils
packedValue = (byte) (packedValue | (byte) (1 << 5));
}
- buffer.put(packedValue);
+ buffer.write(packedValue);
}
- public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3,
- boolean value4, boolean value5, boolean value6)
+ public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3,
+ boolean value4, boolean value5, boolean value6) throws IOException
{
byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -543,11 +544,11 @@ public class EncodingUtils
packedValue = (byte) (packedValue | (byte) (1 << 6));
}
- buffer.put(packedValue);
+ buffer.write(packedValue);
}
- public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3,
- boolean value4, boolean value5, boolean value6, boolean value7)
+ public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3,
+ boolean value4, boolean value5, boolean value6, boolean value7) throws IOException
{
byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -586,7 +587,7 @@ public class EncodingUtils
packedValue = (byte) (packedValue | (byte) (1 << 7));
}
- buffer.put(packedValue);
+ buffer.write(packedValue);
}
/**
@@ -595,12 +596,12 @@ public class EncodingUtils
* @param buffer
* @param data
*/
- public static void writeLongstr(ByteBuffer buffer, byte[] data)
+ public static void writeLongstr(DataOutputStream buffer, byte[] data) throws IOException
{
if (data != null)
{
writeUnsignedInteger(buffer, data.length);
- buffer.put(data);
+ buffer.write(data);
}
else
{
@@ -608,14 +609,14 @@ public class EncodingUtils
}
}
- public static void writeTimestamp(ByteBuffer buffer, long timestamp)
+ public static void writeTimestamp(DataOutputStream buffer, long timestamp) throws IOException
{
writeLong(buffer, timestamp);
}
- public static boolean[] readBooleans(ByteBuffer buffer)
+ public static boolean[] readBooleans(DataInputStream buffer) throws IOException
{
- final byte packedValue = buffer.get();
+ final byte packedValue = buffer.readByte();
if (packedValue == 0)
{
return ALL_FALSE_ARRAY;
@@ -640,9 +641,9 @@ public class EncodingUtils
return result;
}
- public static FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException
+ public static FieldTable readFieldTable(DataInputStream buffer) throws AMQFrameDecodingException, IOException
{
- long length = buffer.getUnsignedInt();
+ long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL;
if (length == 0)
{
return null;
@@ -653,21 +654,21 @@ public class EncodingUtils
}
}
- public static Content readContent(ByteBuffer buffer) throws AMQFrameDecodingException
+ public static Content readContent(DataInputStream buffer) throws AMQFrameDecodingException
{
// TODO: New Content class required for AMQP 0-9.
return null;
}
- public static AMQShortString readAMQShortString(ByteBuffer buffer)
+ public static AMQShortString readAMQShortString(DataInputStream buffer) throws IOException
{
return AMQShortString.readFromBuffer(buffer);
}
- public static String readShortString(ByteBuffer buffer)
+ public static String readShortString(DataInputStream buffer) throws IOException
{
- short length = buffer.getUnsigned();
+ short length = (short) (((short)buffer.readByte()) & 0xFF);
if (length == 0)
{
return null;
@@ -680,7 +681,7 @@ public class EncodingUtils
// this approach here is valid since we know that all the chars are
// ASCII (0-127)
byte[] stringBytes = new byte[length];
- buffer.get(stringBytes, 0, length);
+ buffer.read(stringBytes, 0, length);
char[] stringChars = new char[length];
for (int i = 0; i < stringChars.length; i++)
{
@@ -691,9 +692,9 @@ public class EncodingUtils
}
}
- public static String readLongString(ByteBuffer buffer)
+ public static String readLongString(DataInputStream buffer) throws IOException
{
- long length = buffer.getUnsignedInt();
+ long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL;
if (length == 0)
{
return "";
@@ -706,7 +707,7 @@ public class EncodingUtils
// this approach here is valid since we know that all the chars are
// ASCII (0-127)
byte[] stringBytes = new byte[(int) length];
- buffer.get(stringBytes, 0, (int) length);
+ buffer.read(stringBytes, 0, (int) length);
char[] stringChars = new char[(int) length];
for (int i = 0; i < stringChars.length; i++)
{
@@ -717,9 +718,9 @@ public class EncodingUtils
}
}
- public static byte[] readLongstr(ByteBuffer buffer)
+ public static byte[] readLongstr(DataInputStream buffer) throws IOException
{
- long length = buffer.getUnsignedInt();
+ long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL;
if (length == 0)
{
return null;
@@ -727,17 +728,17 @@ public class EncodingUtils
else
{
byte[] result = new byte[(int) length];
- buffer.get(result);
+ buffer.read(result);
return result;
}
}
- public static long readTimestamp(ByteBuffer buffer)
+ public static long readTimestamp(DataInputStream buffer) throws IOException
{
// Discard msb from AMQ timestamp
// buffer.getUnsignedInt();
- return buffer.getLong();
+ return buffer.readLong();
}
static byte[] hexToByteArray(String id)
@@ -817,14 +818,14 @@ public class EncodingUtils
// AMQP_BOOLEAN_PROPERTY_PREFIX
- public static void writeBoolean(ByteBuffer buffer, Boolean aBoolean)
+ public static void writeBoolean(DataOutputStream buffer, Boolean aBoolean) throws IOException
{
- buffer.put((byte) (aBoolean ? 1 : 0));
+ buffer.write(aBoolean ? 1 : 0);
}
- public static boolean readBoolean(ByteBuffer buffer)
+ public static boolean readBoolean(DataInputStream buffer) throws IOException
{
- byte packedValue = buffer.get();
+ byte packedValue = buffer.readByte();
return (packedValue == 1);
}
@@ -835,14 +836,14 @@ public class EncodingUtils
}
// AMQP_BYTE_PROPERTY_PREFIX
- public static void writeByte(ByteBuffer buffer, Byte aByte)
+ public static void writeByte(DataOutputStream buffer, Byte aByte) throws IOException
{
- buffer.put(aByte);
+ buffer.writeByte(aByte);
}
- public static byte readByte(ByteBuffer buffer)
+ public static byte readByte(DataInputStream buffer) throws IOException
{
- return buffer.get();
+ return buffer.readByte();
}
public static int encodedByteLength()
@@ -851,14 +852,14 @@ public class EncodingUtils
}
// AMQP_SHORT_PROPERTY_PREFIX
- public static void writeShort(ByteBuffer buffer, Short aShort)
+ public static void writeShort(DataOutputStream buffer, Short aShort) throws IOException
{
- buffer.putShort(aShort);
+ buffer.writeShort(aShort);
}
- public static short readShort(ByteBuffer buffer)
+ public static short readShort(DataInputStream buffer) throws IOException
{
- return buffer.getShort();
+ return buffer.readShort();
}
public static int encodedShortLength()
@@ -867,14 +868,14 @@ public class EncodingUtils
}
// INTEGER_PROPERTY_PREFIX
- public static void writeInteger(ByteBuffer buffer, Integer aInteger)
+ public static void writeInteger(DataOutputStream buffer, Integer aInteger) throws IOException
{
- buffer.putInt(aInteger);
+ buffer.writeInt(aInteger);
}
- public static int readInteger(ByteBuffer buffer)
+ public static int readInteger(DataInputStream buffer) throws IOException
{
- return buffer.getInt();
+ return buffer.readInt();
}
public static int encodedIntegerLength()
@@ -883,14 +884,14 @@ public class EncodingUtils
}
// AMQP_LONG_PROPERTY_PREFIX
- public static void writeLong(ByteBuffer buffer, Long aLong)
+ public static void writeLong(DataOutputStream buffer, Long aLong) throws IOException
{
- buffer.putLong(aLong);
+ buffer.writeLong(aLong);
}
- public static long readLong(ByteBuffer buffer)
+ public static long readLong(DataInputStream buffer) throws IOException
{
- return buffer.getLong();
+ return buffer.readLong();
}
public static int encodedLongLength()
@@ -899,14 +900,14 @@ public class EncodingUtils
}
// Float_PROPERTY_PREFIX
- public static void writeFloat(ByteBuffer buffer, Float aFloat)
+ public static void writeFloat(DataOutputStream buffer, Float aFloat) throws IOException
{
- buffer.putFloat(aFloat);
+ buffer.writeFloat(aFloat);
}
- public static float readFloat(ByteBuffer buffer)
+ public static float readFloat(DataInputStream buffer) throws IOException
{
- return buffer.getFloat();
+ return buffer.readFloat();
}
public static int encodedFloatLength()
@@ -915,14 +916,14 @@ public class EncodingUtils
}
// Double_PROPERTY_PREFIX
- public static void writeDouble(ByteBuffer buffer, Double aDouble)
+ public static void writeDouble(DataOutputStream buffer, Double aDouble) throws IOException
{
- buffer.putDouble(aDouble);
+ buffer.writeDouble(aDouble);
}
- public static double readDouble(ByteBuffer buffer)
+ public static double readDouble(DataInputStream buffer) throws IOException
{
- return buffer.getDouble();
+ return buffer.readDouble();
}
public static int encodedDoubleLength()
@@ -930,9 +931,9 @@ public class EncodingUtils
return 8;
}
- public static byte[] readBytes(ByteBuffer buffer)
+ public static byte[] readBytes(DataInputStream buffer) throws IOException
{
- long length = buffer.getUnsignedInt();
+ long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL;
if (length == 0)
{
return null;
@@ -940,19 +941,19 @@ public class EncodingUtils
else
{
byte[] dataBytes = new byte[(int)length];
- buffer.get(dataBytes, 0, (int)length);
+ buffer.read(dataBytes, 0, (int) length);
return dataBytes;
}
}
- public static void writeBytes(ByteBuffer buffer, byte[] data)
+ public static void writeBytes(DataOutputStream buffer, byte[] data) throws IOException
{
if (data != null)
{
// TODO: check length fits in an unsigned byte
writeUnsignedInteger(buffer, (long)data.length);
- buffer.put(data);
+ buffer.write(data);
}
else
{
@@ -968,35 +969,35 @@ public class EncodingUtils
return encodedByteLength();
}
- public static char readChar(ByteBuffer buffer)
+ public static char readChar(DataInputStream buffer) throws IOException
{
// This is valid as we know that the Character is ASCII 0..127
- return (char) buffer.get();
+ return (char) buffer.read();
}
- public static void writeChar(ByteBuffer buffer, char character)
+ public static void writeChar(DataOutputStream buffer, char character) throws IOException
{
// This is valid as we know that the Character is ASCII 0..127
writeByte(buffer, (byte) character);
}
- public static long readLongAsShortString(ByteBuffer buffer)
+ public static long readLongAsShortString(DataInputStream buffer) throws IOException
{
- short length = buffer.getUnsigned();
+ short length = (short) buffer.readUnsignedByte();
short pos = 0;
if (length == 0)
{
return 0L;
}
- byte digit = buffer.get();
+ byte digit = buffer.readByte();
boolean isNegative;
long result = 0;
if (digit == (byte) '-')
{
isNegative = true;
pos++;
- digit = buffer.get();
+ digit = buffer.readByte();
}
else
{
@@ -1009,7 +1010,7 @@ public class EncodingUtils
while (pos < length)
{
pos++;
- digit = buffer.get();
+ digit = buffer.readByte();
result = (result << 3) + (result << 1);
result += digit - (byte) '0';
}
@@ -1017,15 +1018,15 @@ public class EncodingUtils
return result;
}
- public static long readUnsignedInteger(ByteBuffer buffer)
+ public static long readUnsignedInteger(DataInputStream buffer) throws IOException
{
- long l = 0xFF & buffer.get();
+ long l = 0xFF & buffer.readByte();
l <<= 8;
- l = l | (0xFF & buffer.get());
+ l = l | (0xFF & buffer.readByte());
l <<= 8;
- l = l | (0xFF & buffer.get());
+ l = l | (0xFF & buffer.readByte());
l <<= 8;
- l = l | (0xFF & buffer.get());
+ l = l | (0xFF & buffer.readByte());
return l;
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java Fri Oct 21 14:42:12 2011
@@ -20,12 +20,16 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQPInvalidClassException;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.math.BigDecimal;
import java.util.Collections;
import java.util.Enumeration;
@@ -43,8 +47,8 @@ public class FieldTable
private static final String STRICT_AMQP = "STRICT_AMQP";
private final boolean _strictAMQP = Boolean.valueOf(System.getProperty(STRICT_AMQP, "false"));
- private ByteBuffer _encodedForm;
- private LinkedHashMap<AMQShortString, AMQTypedValue> _properties;
+ private byte[] _encodedForm;
+ private LinkedHashMap<AMQShortString, AMQTypedValue> _properties = null;
private long _encodedSize;
private static final int INITIAL_HASHMAP_CAPACITY = 16;
private static final int INITIAL_ENCODED_FORM_SIZE = 256;
@@ -52,9 +56,6 @@ public class FieldTable
public FieldTable()
{
super();
- // _encodedForm = ByteBuffer.allocate(INITIAL_ENCODED_FORM_SIZE);
- // _encodedForm.setAutoExpand(true);
- // _encodedForm.limit(0);
}
/**
@@ -63,16 +64,12 @@ public class FieldTable
* @param buffer the buffer from which to read data. The length byte must be read already
* @param length the length of the field table. Must be > 0.
*/
- public FieldTable(ByteBuffer buffer, long length)
+ public FieldTable(DataInputStream buffer, long length) throws IOException
{
this();
- ByteBuffer encodedForm = buffer.slice();
- encodedForm.limit((int) length);
- _encodedForm = ByteBuffer.allocate((int)length);
- _encodedForm.put(encodedForm);
- _encodedForm.flip();
+ _encodedForm = new byte[(int) length];
+ buffer.read(_encodedForm);
_encodedSize = length;
- buffer.skip((int) length);
}
public AMQTypedValue getProperty(AMQShortString string)
@@ -108,13 +105,19 @@ public class FieldTable
{
try
{
- setFromBuffer(_encodedForm, _encodedSize);
+ setFromBuffer();
}
catch (AMQFrameDecodingException e)
{
_logger.error("Error decoding FieldTable in deferred decoding mode ", e);
throw new IllegalArgumentException(e);
}
+ catch (IOException e)
+ {
+ _logger.error("Unexpected IO exception decoding field table");
+ throw new IllegalArgumentException(e);
+
+ }
}
private AMQTypedValue setProperty(AMQShortString key, AMQTypedValue val)
@@ -766,7 +769,7 @@ public class FieldTable
// ************************* Byte Buffer Processing
- public void writeToBuffer(ByteBuffer buffer)
+ public void writeToBuffer(DataOutputStream buffer) throws IOException
{
final boolean trace = _logger.isDebugEnabled();
@@ -786,17 +789,21 @@ public class FieldTable
public byte[] getDataAsBytes()
{
- final int encodedSize = (int) getEncodedSize();
- final ByteBuffer buffer = ByteBuffer.allocate(encodedSize); // FIXME XXX: Is cast a problem?
-
- putDataInBuffer(buffer);
-
- final byte[] result = new byte[encodedSize];
- buffer.flip();
- buffer.get(result);
- buffer.release();
+ if(_encodedForm == null)
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try
+ {
+ putDataInBuffer(new DataOutputStream(baos));
+ return baos.toByteArray();
+ }
+ catch (IOException e)
+ {
+ throw new IllegalArgumentException("IO Exception should never be thrown here");
+ }
- return result;
+ }
+ return _encodedForm.clone();
}
public long getEncodedSize()
@@ -926,15 +933,8 @@ public class FieldTable
public Iterator<Map.Entry<AMQShortString, AMQTypedValue>> iterator()
{
- if(_encodedForm != null)
- {
- return new FieldTableIterator(_encodedForm.duplicate().rewind(),(int)_encodedSize);
- }
- else
- {
- initMapIfNecessary();
- return _properties.entrySet().iterator();
- }
+ initMapIfNecessary();
+ return _properties.entrySet().iterator();
}
public Object get(String key)
@@ -1002,26 +1002,12 @@ public class FieldTable
return _properties.keySet();
}
- private void putDataInBuffer(ByteBuffer buffer)
+ private void putDataInBuffer(DataOutputStream buffer) throws IOException
{
if (_encodedForm != null)
{
- if(buffer.isDirect() || buffer.isReadOnly())
- {
- ByteBuffer encodedForm = _encodedForm.duplicate();
-
- if (encodedForm.position() != 0)
- {
- encodedForm.flip();
- }
-
- buffer.put(encodedForm);
- }
- else
- {
- buffer.put(_encodedForm.array(),_encodedForm.arrayOffset(),(int)_encodedSize);
- }
+ buffer.write(_encodedForm);
}
else if (_properties != null)
{
@@ -1035,41 +1021,27 @@ public class FieldTable
final Map.Entry<AMQShortString, AMQTypedValue> me = it.next();
try
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:"
- + me.getValue().getValue());
- _logger.debug("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining());
- }
-
// Write the actual parameter name
EncodingUtils.writeShortStringBytes(buffer, me.getKey());
me.getValue().writeToBuffer(buffer);
}
catch (Exception e)
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Exception thrown:" + e);
- _logger.debug("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:"
- + me.getValue().getValue());
- _logger.debug("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining());
- }
-
throw new RuntimeException(e);
}
}
}
}
- private void setFromBuffer(ByteBuffer buffer, long length) throws AMQFrameDecodingException
+ private void setFromBuffer() throws AMQFrameDecodingException, IOException
{
+ final ByteArrayInputStream in = new ByteArrayInputStream(_encodedForm);
+ DataInputStream buffer = new DataInputStream(in);
final boolean trace = _logger.isDebugEnabled();
- if (length > 0)
+ if (_encodedSize > 0)
{
- final int expectedRemaining = buffer.remaining() - (int) length;
_properties = new LinkedHashMap<AMQShortString, AMQTypedValue>(INITIAL_HASHMAP_CAPACITY);
@@ -1077,121 +1049,16 @@ public class FieldTable
{
final AMQShortString key = EncodingUtils.readAMQShortString(buffer);
-
- _logger.debug("FieldTable::PropFieldTable(buffer," + length + "): Read key '" + key);
-
AMQTypedValue value = AMQTypedValue.readFromBuffer(buffer);
-
- if (trace)
- {
- _logger.debug("FieldTable::PropFieldTable(buffer," + length + "): Read type '" + value.getType()
- + "', key '" + key + "', value '" + value.getValue() + "'");
- }
-
_properties.put(key, value);
}
- while (buffer.remaining() > expectedRemaining);
-
- }
-
- _encodedSize = length;
-
- if (trace)
- {
- _logger.debug("FieldTable::FieldTable(buffer," + length + "): Done.");
- }
- }
-
- private static final class FieldTableEntry implements Map.Entry<AMQShortString, AMQTypedValue>
- {
- private final AMQTypedValue _value;
- private final AMQShortString _key;
-
- public FieldTableEntry(final AMQShortString key, final AMQTypedValue value)
- {
- _key = key;
- _value = value;
- }
-
- public AMQShortString getKey()
- {
- return _key;
- }
-
- public AMQTypedValue getValue()
- {
- return _value;
- }
-
- public AMQTypedValue setValue(final AMQTypedValue value)
- {
- throw new UnsupportedOperationException();
- }
-
- public boolean equals(Object o)
- {
- if(o instanceof FieldTableEntry)
- {
- FieldTableEntry other = (FieldTableEntry) o;
- return (_key == null ? other._key == null : _key.equals(other._key))
- && (_value == null ? other._value == null : _value.equals(other._value));
- }
- else
- {
- return false;
- }
- }
-
- public int hashCode()
- {
- return (getKey()==null ? 0 : getKey().hashCode())
- ^ (getValue()==null ? 0 : getValue().hashCode());
- }
-
- }
-
-
- private static final class FieldTableIterator implements Iterator<Map.Entry<AMQShortString, AMQTypedValue>>
- {
+ while (in.available() > 0);
- private final ByteBuffer _buffer;
- private int _expectedRemaining;
-
- public FieldTableIterator(ByteBuffer buffer, int length)
- {
- _buffer = buffer;
- _expectedRemaining = buffer.remaining() - length;
- }
-
- public boolean hasNext()
- {
- return (_buffer.remaining() > _expectedRemaining);
}
- public Map.Entry<AMQShortString, AMQTypedValue> next()
- {
- if(hasNext())
- {
- final AMQShortString key = EncodingUtils.readAMQShortString(_buffer);
- AMQTypedValue value = AMQTypedValue.readFromBuffer(_buffer);
- return new FieldTableEntry(key, value);
- }
- else
- {
- return null;
- }
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
}
-
-
-
public int hashCode()
{
initMapIfNecessary();
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java Fri Oct 21 14:42:12 2011
@@ -20,7 +20,8 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataInputStream;
+import java.io.IOException;
public class FieldTableFactory
{
@@ -29,7 +30,7 @@ public class FieldTableFactory
return new FieldTable();
}
- public static FieldTable newFieldTable(ByteBuffer byteBuffer, long length) throws AMQFrameDecodingException
+ public static FieldTable newFieldTable(DataInputStream byteBuffer, long length) throws AMQFrameDecodingException, IOException
{
return new FieldTable(byteBuffer, length);
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java Fri Oct 21 14:42:12 2011
@@ -20,7 +20,10 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.AMQException;
@@ -34,12 +37,12 @@ public class HeartbeatBody implements AM
}
- public HeartbeatBody(ByteBuffer buffer, long size)
+ public HeartbeatBody(DataInputStream buffer, long size) throws IOException
{
if(size > 0)
{
//allow other implementations to have a payload, but ignore it:
- buffer.skip((int) size);
+ buffer.skip(size);
}
}
@@ -53,7 +56,7 @@ public class HeartbeatBody implements AM
return 0;//heartbeats we generate have no payload
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer)
{
}
@@ -63,12 +66,12 @@ public class HeartbeatBody implements AM
session.heartbeatBodyReceived(channelId, this);
}
- protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
+ protected void populateFromBuffer(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException
{
if(size > 0)
{
//allow other implementations to have a payload, but ignore it:
- buffer.skip((int) size);
+ buffer.skip(size);
}
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org