You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/12/28 14:02:48 UTC
svn commit: r1225178 [7/8] - in /qpid/trunk/qpid/java: ./ bdbstore/src/main/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/
bdbstore/src/test/ bdbstore/src/test/jav...
Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java?rev=1225178&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java Wed Dec 28 13:02:41 2011
@@ -0,0 +1,174 @@
+package org.apache.qpid.framing;
+
+import org.apache.qpid.codec.MarkableDataInput;
+
+import java.io.IOException;
+
+public class ByteArrayDataInput implements ExtendedDataInput, MarkableDataInput
+{
+ private byte[] _data;
+ private int _offset;
+ private int _length;
+ private int _origin;
+ private int _mark;
+
+ public ByteArrayDataInput(byte[] data)
+ {
+ this(data,0, data.length);
+ }
+
+ public ByteArrayDataInput(byte[] data, int offset, int length)
+ {
+ _data = data;
+ _offset = offset;
+ _length = length;
+ _origin = offset;
+ _mark = 0;
+ }
+
+ public void readFully(byte[] b)
+ {
+ System.arraycopy(_data,_offset,b,0,b.length);
+ _offset+=b.length;
+ }
+
+ public void readFully(byte[] b, int off, int len)
+ {
+ System.arraycopy(_data,_offset,b,off,len);
+ _offset+=len;
+ }
+
+ public int skipBytes(int n)
+ {
+ return _offset+=n;
+ }
+
+ public boolean readBoolean()
+ {
+ return _data[_offset++] != 0;
+ }
+
+ public byte readByte()
+ {
+ return _data[_offset++];
+ }
+
+ public int readUnsignedByte()
+ {
+ return ((int)_data[_offset++]) & 0xFF;
+ }
+
+ public short readShort()
+ {
+ return (short) (((((int)_data[_offset++]) << 8) & 0xFF00) | (((int)_data[_offset++]) & 0xFF));
+ }
+
+ public int readUnsignedShort()
+ {
+ return (((((int)_data[_offset++]) << 8) & 0xFF00) | (((int)_data[_offset++]) & 0xFF));
+ }
+
+ public char readChar()
+ {
+ return (char) (((((int)_data[_offset++]) << 8) & 0xFF00) | (((int)_data[_offset++]) & 0xFF));
+ }
+
+ public int readInt()
+ {
+ return ((((int)_data[_offset++]) << 24) & 0xFF000000)
+ | ((((int)_data[_offset++]) << 16) & 0xFF0000)
+ | ((((int)_data[_offset++]) << 8) & 0xFF00)
+ | (((int)_data[_offset++]) & 0xFF);
+ }
+
+ public long readLong()
+ {
+ return ((((long)_data[_offset++]) << 56) & 0xFF00000000000000L)
+ | ((((long)_data[_offset++]) << 48) & 0xFF000000000000L)
+ | ((((long)_data[_offset++]) << 40) & 0xFF0000000000L)
+ | ((((long)_data[_offset++]) << 32) & 0xFF00000000L)
+ | ((((long)_data[_offset++]) << 24) & 0xFF000000L)
+ | ((((long)_data[_offset++]) << 16) & 0xFF0000L)
+ | ((((long)_data[_offset++]) << 8) & 0xFF00L)
+ | (((long)_data[_offset++]) & 0xFFL);
+ }
+
+ public float readFloat()
+ {
+ return Float.intBitsToFloat(readInt());
+ }
+
+ public double readDouble()
+ {
+ return Double.longBitsToDouble(readLong());
+ }
+
+ public AMQShortString readAMQShortString()
+ {
+ int length = _data[_offset++] & 0xff;
+ if(length == 0)
+ {
+ return null;
+ }
+ else
+ {
+ final AMQShortString amqShortString = new AMQShortString(_data, _offset, length);
+ _offset+=length;
+ return amqShortString;
+ }
+ }
+
+ public String readLine()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public String readUTF()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public int available()
+ {
+ return (_origin+_length)-_offset;
+ }
+
+
+ public long skip(long i)
+ {
+ _offset+=i;
+ return i;
+ }
+
+ public int read(byte[] b)
+ {
+ readFully(b);
+ return b.length;
+ }
+
+ public int position()
+ {
+ return _offset - _origin;
+ }
+
+ public void position(int position)
+ {
+ _offset = position + _origin;
+ }
+
+ public int length()
+ {
+ return _length;
+ }
+
+
+ public void mark(int readAhead)
+ {
+ _mark = _offset-_origin;
+ }
+
+ public void reset()
+ {
+ _offset = _origin + _mark;
+ }
+}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java Wed Dec 28 13:02:41 2011
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.framing;
-import java.io.DataOutputStream;
+import java.io.DataOutput;
import java.io.IOException;
public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
@@ -50,7 +50,7 @@ public class CompositeAMQDataBlock exten
return frameSize;
}
- public void writePayload(DataOutputStream buffer) throws IOException
+ public void writePayload(DataOutput buffer) throws IOException
{
for (int i = 0; i < _blocks.length; i++)
{
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java Wed Dec 28 13:02:41 2011
@@ -20,9 +20,11 @@
*/
package org.apache.qpid.framing;
+import java.io.DataInput;
import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataOutput;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.AMQException;
@@ -37,10 +39,10 @@ public class ContentBody implements AMQB
{
}
- public ContentBody(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException
+ public ContentBody(DataInput buffer, long size) throws AMQFrameDecodingException, IOException
{
_payload = new byte[(int)size];
- buffer.read(_payload);
+ buffer.readFully(_payload);
}
@@ -59,7 +61,7 @@ public class ContentBody implements AMQB
return _payload == null ? 0 : _payload.length;
}
- public void writePayload(DataOutputStream buffer) throws IOException
+ public void writePayload(DataOutput buffer) throws IOException
{
buffer.write(_payload);
}
@@ -84,11 +86,62 @@ public class ContentBody implements AMQB
{
}
+ private static class BufferContentBody implements AMQBody
+ {
+ private final int _length;
+ private final int _offset;
+ private final ByteBuffer _buf;
+
+ private BufferContentBody( ByteBuffer buf, int offset, int length)
+ {
+ _length = length;
+ _offset = offset;
+ _buf = buf;
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
+ }
+
+
+ public int getSize()
+ {
+ return _length;
+ }
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ if(_buf.hasArray())
+ {
+ buffer.write(_buf.array(), _buf.arrayOffset() + _offset, _length);
+ }
+ else
+ {
+ byte[] data = new byte[_length];
+ ByteBuffer buf = _buf.duplicate();
+
+ buf.position(_offset);
+ buf.limit(_offset+_length);
+ buf.get(data);
+ buffer.write(data);
+ }
+ }
+
+
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+ {
+ throw new RuntimeException("Buffered Body only to be used for outgoing data");
+ }
+ }
+
+ public static AMQFrame createAMQFrame(int channelId, ByteBuffer buf, int offset, int length)
+ {
+ return new AMQFrame(channelId, new BufferContentBody(buf, offset, length));
+ }
public static AMQFrame createAMQFrame(int channelId, ContentBody body)
{
- final AMQFrame frame = new AMQFrame(channelId, body);
- return frame;
+ return new AMQFrame(channelId, body);
}
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java Wed Dec 28 13:02:41 2011
@@ -20,9 +20,9 @@
*/
package org.apache.qpid.framing;
-import java.io.DataInputStream;
import java.io.IOException;
+import org.apache.qpid.codec.MarkableDataInput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +42,7 @@ public class ContentBodyFactory implemen
_log.debug("Creating content body factory");
}
- public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException
+ public AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException, IOException
{
return new ContentBody(in, bodySize);
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Wed Dec 28 13:02:41 2011
@@ -20,8 +20,9 @@
*/
package org.apache.qpid.framing;
+import java.io.DataInput;
import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataOutput;
import java.io.IOException;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
@@ -45,7 +46,7 @@ public class ContentHeaderBody implement
{
}
- public ContentHeaderBody(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException
+ public ContentHeaderBody(DataInput buffer, long size) throws AMQFrameDecodingException, IOException
{
classId = buffer.readUnsignedShort();
weight = buffer.readUnsignedShort();
@@ -106,7 +107,7 @@ public class ContentHeaderBody implement
return 2 + 2 + 8 + 2 + properties.getPropertyListSize();
}
- public void writePayload(DataOutputStream buffer) throws IOException
+ public void writePayload(DataOutput buffer) throws IOException
{
EncodingUtils.writeUnsignedShort(buffer, classId);
EncodingUtils.writeUnsignedShort(buffer, weight);
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java Wed Dec 28 13:02:41 2011
@@ -20,9 +20,9 @@
*/
package org.apache.qpid.framing;
-import java.io.DataInputStream;
import java.io.IOException;
+import org.apache.qpid.codec.MarkableDataInput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +42,7 @@ public class ContentHeaderBodyFactory im
_log.debug("Creating content header body factory");
}
- public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException
+ public AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException, IOException
{
// all content headers are the same - it is only the properties that differ.
// the content header body further delegates construction of properties
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java Wed Dec 28 13:02:41 2011
@@ -20,8 +20,9 @@
*/
package org.apache.qpid.framing;
+import java.io.DataInput;
import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataOutput;
import java.io.IOException;
@@ -35,7 +36,7 @@ public interface ContentHeaderProperties
* Writes the property list to the buffer, in a suitably encoded form.
* @param buffer The buffer to write to
*/
- void writePropertyListPayload(DataOutputStream buffer) throws IOException;
+ void writePropertyListPayload(DataOutput buffer) throws IOException;
/**
* Populates the properties from buffer.
@@ -43,7 +44,7 @@ public interface ContentHeaderProperties
* @param propertyFlags he property flags.
* @throws AMQFrameDecodingException when the buffer does not contain valid data
*/
- void populatePropertiesFromBuffer(DataInputStream buffer, int propertyFlags, int size)
+ void populatePropertiesFromBuffer(DataInput buffer, int propertyFlags, int size)
throws AMQFrameDecodingException, IOException;
/**
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java Wed Dec 28 13:02:41 2011
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.framing;
+import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
@@ -39,7 +40,7 @@ public class ContentHeaderPropertiesFact
}
public ContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags,
- DataInputStream buffer, int size)
+ DataInput buffer, int size)
throws AMQFrameDecodingException, IOException
{
ContentHeaderProperties properties;
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java Wed Dec 28 13:02:41 2011
@@ -219,7 +219,7 @@ public class EncodingUtils
return 0;
}
- public static void writeShortStringBytes(DataOutputStream buffer, String s) throws IOException
+ public static void writeShortStringBytes(DataOutput buffer, String s) throws IOException
{
if (s != null)
{
@@ -243,7 +243,7 @@ public class EncodingUtils
}
}
- public static void writeShortStringBytes(DataOutputStream buffer, AMQShortString s) throws IOException
+ public static void writeShortStringBytes(DataOutput buffer, AMQShortString s) throws IOException
{
if (s != null)
{
@@ -257,7 +257,7 @@ public class EncodingUtils
}
}
- public static void writeLongStringBytes(DataOutputStream buffer, String s) throws IOException
+ public static void writeLongStringBytes(DataOutput buffer, String s) throws IOException
{
assert (s == null) || (s.length() <= 0xFFFE);
if (s != null)
@@ -279,7 +279,7 @@ public class EncodingUtils
}
}
- public static void writeLongStringBytes(DataOutputStream buffer, char[] s) throws IOException
+ public static void writeLongStringBytes(DataOutput buffer, char[] s) throws IOException
{
assert (s == null) || (s.length <= 0xFFFE);
if (s != null)
@@ -300,7 +300,7 @@ public class EncodingUtils
}
}
- public static void writeLongStringBytes(DataOutputStream buffer, byte[] bytes) throws IOException
+ public static void writeLongStringBytes(DataOutput buffer, byte[] bytes) throws IOException
{
assert (bytes == null) || (bytes.length <= 0xFFFE);
if (bytes != null)
@@ -314,13 +314,13 @@ public class EncodingUtils
}
}
- public static void writeUnsignedByte(DataOutputStream buffer, short b) throws IOException
+ public static void writeUnsignedByte(DataOutput buffer, short b) throws IOException
{
byte bv = (byte) b;
buffer.write(bv);
}
- public static void writeUnsignedShort(DataOutputStream buffer, int s) throws IOException
+ public static void writeUnsignedShort(DataOutput buffer, int s) throws IOException
{
// TODO: Is this comparison safe? Do I need to cast RHS to long?
if (s < Short.MAX_VALUE)
@@ -340,7 +340,7 @@ public class EncodingUtils
return 4;
}
- public static void writeUnsignedInteger(DataOutputStream buffer, long l) throws IOException
+ public static void writeUnsignedInteger(DataOutput buffer, long l) throws IOException
{
// TODO: Is this comparison safe? Do I need to cast RHS to long?
if (l < Integer.MAX_VALUE)
@@ -360,7 +360,7 @@ public class EncodingUtils
}
}
- public static void writeFieldTableBytes(DataOutputStream buffer, FieldTable table) throws IOException
+ public static void writeFieldTableBytes(DataOutput buffer, FieldTable table) throws IOException
{
if (table != null)
{
@@ -372,12 +372,12 @@ public class EncodingUtils
}
}
- public static void writeContentBytes(DataOutputStream buffer, Content content)
+ public static void writeContentBytes(DataOutput buffer, Content content)
{
// TODO: New Content class required for AMQP 0-9.
}
- public static void writeBooleans(DataOutputStream buffer, boolean[] values) throws IOException
+ public static void writeBooleans(DataOutput buffer, boolean[] values) throws IOException
{
byte packedValue = 0;
for (int i = 0; i < values.length; i++)
@@ -391,13 +391,13 @@ public class EncodingUtils
buffer.write(packedValue);
}
- public static void writeBooleans(DataOutputStream buffer, boolean value) throws IOException
+ public static void writeBooleans(DataOutput buffer, boolean value) throws IOException
{
buffer.write(value ? (byte) 1 : (byte) 0);
}
- public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1) throws IOException
+ public static void writeBooleans(DataOutput buffer, boolean value0, boolean value1) throws IOException
{
byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -409,7 +409,7 @@ public class EncodingUtils
buffer.write(packedValue);
}
- public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2) throws IOException
+ public static void writeBooleans(DataOutput buffer, boolean value0, boolean value1, boolean value2) throws IOException
{
byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -426,7 +426,7 @@ public class EncodingUtils
buffer.write(packedValue);
}
- public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3) throws IOException
+ public static void writeBooleans(DataOutput buffer, boolean value0, boolean value1, boolean value2, boolean value3) throws IOException
{
byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -448,7 +448,7 @@ public class EncodingUtils
buffer.write(packedValue);
}
- public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3,
+ public static void writeBooleans(DataOutput buffer, boolean value0, boolean value1, boolean value2, boolean value3,
boolean value4) throws IOException
{
byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -476,7 +476,7 @@ public class EncodingUtils
buffer.write(packedValue);
}
- public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3,
+ public static void writeBooleans(DataOutput buffer, boolean value0, boolean value1, boolean value2, boolean value3,
boolean value4, boolean value5) throws IOException
{
byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -509,7 +509,7 @@ public class EncodingUtils
buffer.write(packedValue);
}
- public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3,
+ public static void writeBooleans(DataOutput buffer, boolean value0, boolean value1, boolean value2, boolean value3,
boolean value4, boolean value5, boolean value6) throws IOException
{
byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -547,7 +547,7 @@ public class EncodingUtils
buffer.write(packedValue);
}
- public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3,
+ public static void writeBooleans(DataOutput buffer, boolean value0, boolean value1, boolean value2, boolean value3,
boolean value4, boolean value5, boolean value6, boolean value7) throws IOException
{
byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -596,7 +596,7 @@ public class EncodingUtils
* @param buffer
* @param data
*/
- public static void writeLongstr(DataOutputStream buffer, byte[] data) throws IOException
+ public static void writeLongstr(DataOutput buffer, byte[] data) throws IOException
{
if (data != null)
{
@@ -609,12 +609,12 @@ public class EncodingUtils
}
}
- public static void writeTimestamp(DataOutputStream buffer, long timestamp) throws IOException
+ public static void writeTimestamp(DataOutput buffer, long timestamp) throws IOException
{
writeLong(buffer, timestamp);
}
- public static boolean[] readBooleans(DataInputStream buffer) throws IOException
+ public static boolean[] readBooleans(DataInput buffer) throws IOException
{
final byte packedValue = buffer.readByte();
if (packedValue == 0)
@@ -641,7 +641,7 @@ public class EncodingUtils
return result;
}
- public static FieldTable readFieldTable(DataInputStream buffer) throws AMQFrameDecodingException, IOException
+ public static FieldTable readFieldTable(DataInput buffer) throws AMQFrameDecodingException, IOException
{
long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL;
if (length == 0)
@@ -654,19 +654,19 @@ public class EncodingUtils
}
}
- public static Content readContent(DataInputStream buffer) throws AMQFrameDecodingException
+ public static Content readContent(DataInput buffer) throws AMQFrameDecodingException
{
// TODO: New Content class required for AMQP 0-9.
return null;
}
- public static AMQShortString readAMQShortString(DataInputStream buffer) throws IOException
+ public static AMQShortString readAMQShortString(DataInput buffer) throws IOException
{
return AMQShortString.readFromBuffer(buffer);
}
- public static String readShortString(DataInputStream buffer) throws IOException
+ public static String readShortString(DataInput buffer) throws IOException
{
short length = (short) (((short)buffer.readByte()) & 0xFF);
if (length == 0)
@@ -681,7 +681,7 @@ public class EncodingUtils
// this approach here is valid since we know that all the chars are
// ASCII (0-127)
byte[] stringBytes = new byte[length];
- buffer.read(stringBytes, 0, length);
+ buffer.readFully(stringBytes, 0, length);
char[] stringChars = new char[length];
for (int i = 0; i < stringChars.length; i++)
{
@@ -692,7 +692,7 @@ public class EncodingUtils
}
}
- public static String readLongString(DataInputStream buffer) throws IOException
+ public static String readLongString(DataInput buffer) throws IOException
{
long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL;
if (length == 0)
@@ -707,7 +707,7 @@ public class EncodingUtils
// this approach here is valid since we know that all the chars are
// ASCII (0-127)
byte[] stringBytes = new byte[(int) length];
- buffer.read(stringBytes, 0, (int) length);
+ buffer.readFully(stringBytes, 0, (int) length);
char[] stringChars = new char[(int) length];
for (int i = 0; i < stringChars.length; i++)
{
@@ -718,7 +718,7 @@ public class EncodingUtils
}
}
- public static byte[] readLongstr(DataInputStream buffer) throws IOException
+ public static byte[] readLongstr(DataInput buffer) throws IOException
{
long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL;
if (length == 0)
@@ -728,13 +728,13 @@ public class EncodingUtils
else
{
byte[] result = new byte[(int) length];
- buffer.read(result);
+ buffer.readFully(result);
return result;
}
}
- public static long readTimestamp(DataInputStream buffer) throws IOException
+ public static long readTimestamp(DataInput buffer) throws IOException
{
// Discard msb from AMQ timestamp
// buffer.getUnsignedInt();
@@ -818,12 +818,12 @@ public class EncodingUtils
// AMQP_BOOLEAN_PROPERTY_PREFIX
- public static void writeBoolean(DataOutputStream buffer, Boolean aBoolean) throws IOException
+ public static void writeBoolean(DataOutput buffer, boolean aBoolean) throws IOException
{
buffer.write(aBoolean ? 1 : 0);
}
- public static boolean readBoolean(DataInputStream buffer) throws IOException
+ public static boolean readBoolean(DataInput buffer) throws IOException
{
byte packedValue = buffer.readByte();
@@ -836,12 +836,12 @@ public class EncodingUtils
}
// AMQP_BYTE_PROPERTY_PREFIX
- public static void writeByte(DataOutputStream buffer, Byte aByte) throws IOException
+ public static void writeByte(DataOutput buffer, byte aByte) throws IOException
{
buffer.writeByte(aByte);
}
- public static byte readByte(DataInputStream buffer) throws IOException
+ public static byte readByte(DataInput buffer) throws IOException
{
return buffer.readByte();
}
@@ -852,12 +852,12 @@ public class EncodingUtils
}
// AMQP_SHORT_PROPERTY_PREFIX
- public static void writeShort(DataOutputStream buffer, Short aShort) throws IOException
+ public static void writeShort(DataOutput buffer, short aShort) throws IOException
{
buffer.writeShort(aShort);
}
- public static short readShort(DataInputStream buffer) throws IOException
+ public static short readShort(DataInput buffer) throws IOException
{
return buffer.readShort();
}
@@ -868,12 +868,12 @@ public class EncodingUtils
}
// INTEGER_PROPERTY_PREFIX
- public static void writeInteger(DataOutputStream buffer, Integer aInteger) throws IOException
+ public static void writeInteger(DataOutput buffer, int aInteger) throws IOException
{
buffer.writeInt(aInteger);
}
- public static int readInteger(DataInputStream buffer) throws IOException
+ public static int readInteger(DataInput buffer) throws IOException
{
return buffer.readInt();
}
@@ -884,12 +884,12 @@ public class EncodingUtils
}
// AMQP_LONG_PROPERTY_PREFIX
- public static void writeLong(DataOutputStream buffer, Long aLong) throws IOException
+ public static void writeLong(DataOutput buffer, long aLong) throws IOException
{
buffer.writeLong(aLong);
}
- public static long readLong(DataInputStream buffer) throws IOException
+ public static long readLong(DataInput buffer) throws IOException
{
return buffer.readLong();
}
@@ -900,12 +900,12 @@ public class EncodingUtils
}
// Float_PROPERTY_PREFIX
- public static void writeFloat(DataOutputStream buffer, Float aFloat) throws IOException
+ public static void writeFloat(DataOutput buffer, float aFloat) throws IOException
{
buffer.writeFloat(aFloat);
}
- public static float readFloat(DataInputStream buffer) throws IOException
+ public static float readFloat(DataInput buffer) throws IOException
{
return buffer.readFloat();
}
@@ -916,12 +916,12 @@ public class EncodingUtils
}
// Double_PROPERTY_PREFIX
- public static void writeDouble(DataOutputStream buffer, Double aDouble) throws IOException
+ public static void writeDouble(DataOutput buffer, Double aDouble) throws IOException
{
buffer.writeDouble(aDouble);
}
- public static double readDouble(DataInputStream buffer) throws IOException
+ public static double readDouble(DataInput buffer) throws IOException
{
return buffer.readDouble();
}
@@ -931,7 +931,7 @@ public class EncodingUtils
return 8;
}
- public static byte[] readBytes(DataInputStream buffer) throws IOException
+ public static byte[] readBytes(DataInput buffer) throws IOException
{
long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL;
if (length == 0)
@@ -941,13 +941,13 @@ public class EncodingUtils
else
{
byte[] dataBytes = new byte[(int)length];
- buffer.read(dataBytes, 0, (int) length);
+ buffer.readFully(dataBytes, 0, (int) length);
return dataBytes;
}
}
- public static void writeBytes(DataOutputStream buffer, byte[] data) throws IOException
+ public static void writeBytes(DataOutput buffer, byte[] data) throws IOException
{
if (data != null)
{
@@ -969,19 +969,19 @@ public class EncodingUtils
return encodedByteLength();
}
- public static char readChar(DataInputStream buffer) throws IOException
+ public static char readChar(DataInput buffer) throws IOException
{
// This is valid as we know that the Character is ASCII 0..127
- return (char) buffer.read();
+ return (char) buffer.readByte();
}
- public static void writeChar(DataOutputStream buffer, char character) throws IOException
+ public static void writeChar(DataOutput buffer, char character) throws IOException
{
// This is valid as we know that the Character is ASCII 0..127
writeByte(buffer, (byte) character);
}
- public static long readLongAsShortString(DataInputStream buffer) throws IOException
+ public static long readLongAsShortString(DataInput buffer) throws IOException
{
short length = (short) buffer.readUnsignedByte();
short pos = 0;
@@ -1018,7 +1018,7 @@ public class EncodingUtils
return result;
}
- public static long readUnsignedInteger(DataInputStream buffer) throws IOException
+ public static long readUnsignedInteger(DataInput buffer) throws IOException
{
long l = 0xFF & buffer.readByte();
l <<= 8;
Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ExtendedDataInput.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ExtendedDataInput.java?rev=1225178&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ExtendedDataInput.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ExtendedDataInput.java Wed Dec 28 13:02:41 2011
@@ -0,0 +1,14 @@
+package org.apache.qpid.framing;
+
+import java.io.DataInput;
+
+public interface ExtendedDataInput extends DataInput
+{
+ AMQShortString readAMQShortString();
+
+ int available();
+
+ int position();
+
+ void position(int position);
+}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java Wed Dec 28 13:02:41 2011
@@ -25,11 +25,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQPInvalidClassException;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.io.*;
import java.math.BigDecimal;
import java.util.Collections;
import java.util.Enumeration;
@@ -44,18 +40,27 @@ import java.util.Set;
public class FieldTable
{
private static final Logger _logger = LoggerFactory.getLogger(FieldTable.class);
- private static final String STRICT_AMQP = "STRICT_AMQP";
- private final boolean _strictAMQP = Boolean.valueOf(System.getProperty(STRICT_AMQP, "false"));
+ private static final String STRICT_AMQP_NAME = "STRICT_AMQP";
+ private static final boolean STRICT_AMQP = Boolean.valueOf(System.getProperty(STRICT_AMQP_NAME, "false"));
private byte[] _encodedForm;
+ private int _encodedFormOffset;
private LinkedHashMap<AMQShortString, AMQTypedValue> _properties = null;
private long _encodedSize;
private static final int INITIAL_HASHMAP_CAPACITY = 16;
private static final int INITIAL_ENCODED_FORM_SIZE = 256;
+ private final boolean _strictAMQP;
public FieldTable()
{
+ this(STRICT_AMQP);
+ }
+
+
+ public FieldTable(boolean strictAMQP)
+ {
super();
+ _strictAMQP = strictAMQP;
}
/**
@@ -64,14 +69,28 @@ public class FieldTable
* @param buffer the buffer from which to read data. The length byte must be read already
* @param length the length of the field table. Must be > 0.
*/
- public FieldTable(DataInputStream buffer, long length) throws IOException
+ public FieldTable(DataInput buffer, long length) throws IOException
{
this();
_encodedForm = new byte[(int) length];
- buffer.read(_encodedForm);
+ buffer.readFully(_encodedForm);
_encodedSize = length;
}
+ public FieldTable(byte[] encodedForm, int offset, int length) throws IOException
+ {
+ this();
+ _encodedForm = encodedForm;
+ _encodedFormOffset = offset;
+ _encodedSize = length;
+ }
+
+
+ public boolean isClean()
+ {
+ return _encodedForm != null;
+ }
+
public AMQTypedValue getProperty(AMQShortString string)
{
checkPropertyName(string);
@@ -181,7 +200,7 @@ public class FieldTable
public Boolean getBoolean(String string)
{
- return getBoolean(new AMQShortString(string));
+ return getBoolean(AMQShortString.valueOf(string));
}
public Boolean getBoolean(AMQShortString string)
@@ -199,7 +218,7 @@ public class FieldTable
public Byte getByte(String string)
{
- return getByte(new AMQShortString(string));
+ return getByte(AMQShortString.valueOf(string));
}
public Byte getByte(AMQShortString string)
@@ -217,7 +236,7 @@ public class FieldTable
public Short getShort(String string)
{
- return getShort(new AMQShortString(string));
+ return getShort(AMQShortString.valueOf(string));
}
public Short getShort(AMQShortString string)
@@ -235,7 +254,7 @@ public class FieldTable
public Integer getInteger(String string)
{
- return getInteger(new AMQShortString(string));
+ return getInteger(AMQShortString.valueOf(string));
}
public Integer getInteger(AMQShortString string)
@@ -253,7 +272,7 @@ public class FieldTable
public Long getLong(String string)
{
- return getLong(new AMQShortString(string));
+ return getLong(AMQShortString.valueOf(string));
}
public Long getLong(AMQShortString string)
@@ -271,7 +290,7 @@ public class FieldTable
public Float getFloat(String string)
{
- return getFloat(new AMQShortString(string));
+ return getFloat(AMQShortString.valueOf(string));
}
public Float getFloat(AMQShortString string)
@@ -289,7 +308,7 @@ public class FieldTable
public Double getDouble(String string)
{
- return getDouble(new AMQShortString(string));
+ return getDouble(AMQShortString.valueOf(string));
}
public Double getDouble(AMQShortString string)
@@ -307,7 +326,7 @@ public class FieldTable
public String getString(String string)
{
- return getString(new AMQShortString(string));
+ return getString(AMQShortString.valueOf(string));
}
public String getString(AMQShortString string)
@@ -330,7 +349,7 @@ public class FieldTable
public Character getCharacter(String string)
{
- return getCharacter(new AMQShortString(string));
+ return getCharacter(AMQShortString.valueOf(string));
}
public Character getCharacter(AMQShortString string)
@@ -348,7 +367,7 @@ public class FieldTable
public byte[] getBytes(String string)
{
- return getBytes(new AMQShortString(string));
+ return getBytes(AMQShortString.valueOf(string));
}
public byte[] getBytes(AMQShortString string)
@@ -374,7 +393,7 @@ public class FieldTable
*/
public FieldTable getFieldTable(String string)
{
- return getFieldTable(new AMQShortString(string));
+ return getFieldTable(AMQShortString.valueOf(string));
}
/**
@@ -401,7 +420,7 @@ public class FieldTable
public Object getObject(String string)
{
- return getObject(new AMQShortString(string));
+ return getObject(AMQShortString.valueOf(string));
}
public Object getObject(AMQShortString string)
@@ -447,7 +466,7 @@ public class FieldTable
// ************ Setters
public Object setBoolean(String string, Boolean b)
{
- return setBoolean(new AMQShortString(string), b);
+ return setBoolean(AMQShortString.valueOf(string), b);
}
public Object setBoolean(AMQShortString string, Boolean b)
@@ -457,7 +476,7 @@ public class FieldTable
public Object setByte(String string, Byte b)
{
- return setByte(new AMQShortString(string), b);
+ return setByte(AMQShortString.valueOf(string), b);
}
public Object setByte(AMQShortString string, Byte b)
@@ -467,7 +486,7 @@ public class FieldTable
public Object setShort(String string, Short i)
{
- return setShort(new AMQShortString(string), i);
+ return setShort(AMQShortString.valueOf(string), i);
}
public Object setShort(AMQShortString string, Short i)
@@ -475,29 +494,29 @@ public class FieldTable
return setProperty(string, AMQType.SHORT.asTypedValue(i));
}
- public Object setInteger(String string, Integer i)
+ public Object setInteger(String string, int i)
{
- return setInteger(new AMQShortString(string), i);
+ return setInteger(AMQShortString.valueOf(string), i);
}
- public Object setInteger(AMQShortString string, Integer i)
+ public Object setInteger(AMQShortString string, int i)
{
- return setProperty(string, AMQType.INT.asTypedValue(i));
+ return setProperty(string, AMQTypedValue.createAMQTypedValue(i));
}
- public Object setLong(String string, Long l)
+ public Object setLong(String string, long l)
{
- return setLong(new AMQShortString(string), l);
+ return setLong(AMQShortString.valueOf(string), l);
}
- public Object setLong(AMQShortString string, Long l)
+ public Object setLong(AMQShortString string, long l)
{
- return setProperty(string, AMQType.LONG.asTypedValue(l));
+ return setProperty(string, AMQTypedValue.createAMQTypedValue(l));
}
public Object setFloat(String string, Float f)
{
- return setFloat(new AMQShortString(string), f);
+ return setFloat(AMQShortString.valueOf(string), f);
}
public Object setFloat(AMQShortString string, Float v)
@@ -507,7 +526,7 @@ public class FieldTable
public Object setDouble(String string, Double d)
{
- return setDouble(new AMQShortString(string), d);
+ return setDouble(AMQShortString.valueOf(string), d);
}
public Object setDouble(AMQShortString string, Double v)
@@ -517,7 +536,7 @@ public class FieldTable
public Object setString(String string, String s)
{
- return setString(new AMQShortString(string), s);
+ return setString(AMQShortString.valueOf(string), s);
}
public Object setAsciiString(AMQShortString string, String value)
@@ -546,7 +565,7 @@ public class FieldTable
public Object setChar(String string, char c)
{
- return setChar(new AMQShortString(string), c);
+ return setChar(AMQShortString.valueOf(string), c);
}
public Object setChar(AMQShortString string, char c)
@@ -556,7 +575,7 @@ public class FieldTable
public Object setBytes(String string, byte[] b)
{
- return setBytes(new AMQShortString(string), b);
+ return setBytes(AMQShortString.valueOf(string), b);
}
public Object setBytes(AMQShortString string, byte[] bytes)
@@ -566,7 +585,7 @@ public class FieldTable
public Object setBytes(String string, byte[] bytes, int start, int length)
{
- return setBytes(new AMQShortString(string), bytes, start, length);
+ return setBytes(AMQShortString.valueOf(string), bytes, start, length);
}
public Object setBytes(AMQShortString string, byte[] bytes, int start, int length)
@@ -579,7 +598,7 @@ public class FieldTable
public Object setObject(String string, Object o)
{
- return setObject(new AMQShortString(string), o);
+ return setObject(AMQShortString.valueOf(string), o);
}
public Object setTimestamp(AMQShortString string, long datetime)
@@ -617,7 +636,7 @@ public class FieldTable
*/
public Object setFieldTable(String string, FieldTable ftValue)
{
- return setFieldTable(new AMQShortString(string), ftValue);
+ return setFieldTable(AMQShortString.valueOf(string), ftValue);
}
/**
@@ -681,7 +700,7 @@ public class FieldTable
public boolean isNullStringValue(String name)
{
- AMQTypedValue value = getProperty(new AMQShortString(name));
+ AMQTypedValue value = getProperty(AMQShortString.valueOf(name));
return (value != null) && (value.getType() == AMQType.VOID);
}
@@ -713,7 +732,7 @@ public class FieldTable
public boolean itemExists(String string)
{
- return itemExists(new AMQShortString(string));
+ return itemExists(AMQShortString.valueOf(string));
}
public String toString()
@@ -769,7 +788,7 @@ public class FieldTable
// ************************* Byte Buffer Processing
- public void writeToBuffer(DataOutputStream buffer) throws IOException
+ public void writeToBuffer(DataOutput buffer) throws IOException
{
final boolean trace = _logger.isDebugEnabled();
@@ -919,7 +938,7 @@ public class FieldTable
public boolean containsKey(String key)
{
- return containsKey(new AMQShortString(key));
+ return containsKey(AMQShortString.valueOf(key));
}
public Set<String> keys()
@@ -942,7 +961,7 @@ public class FieldTable
public Object get(String key)
{
- return get(new AMQShortString(key));
+ return get(AMQShortString.valueOf(key));
}
public Object get(AMQShortString key)
@@ -958,7 +977,7 @@ public class FieldTable
public Object remove(String key)
{
- return remove(new AMQShortString(key));
+ return remove(AMQShortString.valueOf(key));
}
@@ -1005,12 +1024,12 @@ public class FieldTable
return _properties.keySet();
}
- private void putDataInBuffer(DataOutputStream buffer) throws IOException
+ private void putDataInBuffer(DataOutput buffer) throws IOException
{
if (_encodedForm != null)
{
- buffer.write(_encodedForm);
+ buffer.write(_encodedForm,_encodedFormOffset,(int)_encodedSize);
}
else if (_properties != null)
{
@@ -1039,9 +1058,8 @@ public class FieldTable
private void setFromBuffer() throws AMQFrameDecodingException, IOException
{
- final ByteArrayInputStream in = new ByteArrayInputStream(_encodedForm);
- DataInputStream buffer = new DataInputStream(in);
- final boolean trace = _logger.isDebugEnabled();
+ ByteArrayDataInput baid = new ByteArrayDataInput(_encodedForm, _encodedFormOffset, (int)_encodedSize);
+
if (_encodedSize > 0)
{
@@ -1051,12 +1069,12 @@ public class FieldTable
do
{
- final AMQShortString key = EncodingUtils.readAMQShortString(buffer);
- AMQTypedValue value = AMQTypedValue.readFromBuffer(buffer);
+ final AMQShortString key = baid.readAMQShortString();
+ AMQTypedValue value = AMQTypedValue.readFromBuffer(baid);
_properties.put(key, value);
}
- while (in.available() > 0);
+ while (baid.available() > 0);
}
@@ -1101,7 +1119,7 @@ public class FieldTable
FieldTable table = new FieldTable();
for(Map.Entry<String,Object> entry : map.entrySet())
{
- table.put(new AMQShortString(entry.getKey()), entry.getValue());
+ table.put(AMQShortString.valueOf(entry.getKey()), entry.getValue());
}
return table;
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java Wed Dec 28 13:02:41 2011
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.framing;
+import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
@@ -30,7 +31,7 @@ public class FieldTableFactory
return new FieldTable();
}
- public static FieldTable newFieldTable(DataInputStream byteBuffer, long length) throws AMQFrameDecodingException, IOException
+ public static FieldTable newFieldTable(DataInput byteBuffer, long length) throws AMQFrameDecodingException, IOException
{
return new FieldTable(byteBuffer, length);
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java Wed Dec 28 13:02:41 2011
@@ -21,7 +21,7 @@
package org.apache.qpid.framing;
import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataOutput;
import java.io.IOException;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
@@ -56,7 +56,7 @@ public class HeartbeatBody implements AM
return 0;//heartbeats we generate have no payload
}
- public void writePayload(DataOutputStream buffer)
+ public void writePayload(DataOutput buffer)
{
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java Wed Dec 28 13:02:41 2011
@@ -20,12 +20,13 @@
*/
package org.apache.qpid.framing;
-import java.io.DataInputStream;
+import org.apache.qpid.codec.MarkableDataInput;
public class HeartbeatBodyFactory implements BodyFactory
{
- public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException
+ public AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException
{
return new HeartbeatBody();
}
+
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java Wed Dec 28 13:02:41 2011
@@ -21,13 +21,10 @@
package org.apache.qpid.framing;
import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.MarkableDataInput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.io.*;
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
import java.util.Arrays;
public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock
@@ -66,7 +63,7 @@ public class ProtocolInitiation extends
pv.equals(ProtocolVersion.v0_91) ? 1 : pv.getMinorVersion());
}
- public ProtocolInitiation(DataInputStream in) throws IOException
+ public ProtocolInitiation(MarkableDataInput in) throws IOException
{
_protocolHeader = new byte[4];
in.read(_protocolHeader);
@@ -82,7 +79,7 @@ public class ProtocolInitiation extends
return 4 + 1 + 1 + 1 + 1;
}
- public void writePayload(DataOutputStream buffer) throws IOException
+ public void writePayload(DataOutput buffer) throws IOException
{
buffer.write(_protocolHeader);
@@ -143,7 +140,7 @@ public class ProtocolInitiation extends
* @return true if we have enough data to decode the PI frame fully, false if more
* data is required
*/
- public boolean decodable(DataInputStream in) throws IOException
+ public boolean decodable(MarkableDataInput in) throws IOException
{
return (in.available() >= 8);
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java Wed Dec 28 13:02:41 2011
@@ -21,7 +21,7 @@
package org.apache.qpid.framing;
-import java.io.DataOutputStream;
+import java.io.DataOutput;
import java.io.IOException;
public class SmallCompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
@@ -69,7 +69,7 @@ public class SmallCompositeAMQDataBlock
return frameSize;
}
- public void writePayload(DataOutputStream buffer) throws IOException
+ public void writePayload(DataOutput buffer) throws IOException
{
if (_firstFrame != null)
{
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java Wed Dec 28 13:02:41 2011
@@ -23,6 +23,7 @@ package org.apache.qpid.framing;
import java.io.DataInputStream;
import java.io.IOException;
+import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.slf4j.Logger;
@@ -145,7 +146,7 @@ public class VersionSpecificRegistry
}
- public AMQMethodBody get(short classID, short methodID, DataInputStream in, long size) throws AMQFrameDecodingException, IOException
+ public AMQMethodBody get(short classID, short methodID, MarkableDataInput in, long size) throws AMQFrameDecodingException, IOException
{
AMQMethodBodyInstanceFactory bodyFactory;
try
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Wed Dec 28 13:02:41 2011
@@ -383,13 +383,19 @@ public class Connection extends Connecti
public void received(ProtocolEvent event)
{
- log.debug("RECV: [%s] %s", this, event);
+ if(log.isDebugEnabled())
+ {
+ log.debug("RECV: [%s] %s", this, event);
+ }
event.delegate(this, delegate);
}
public void send(ProtocolEvent event)
{
- log.debug("SEND: [%s] %s", this, event);
+ if(log.isDebugEnabled())
+ {
+ log.debug("SEND: [%s] %s", this, event);
+ }
Sender s = sender;
if (s == null)
{
@@ -400,8 +406,15 @@ public class Connection extends Connecti
public void flush()
{
- log.debug("FLUSH: [%s]", this);
- sender.flush();
+ if(log.isDebugEnabled())
+ {
+ log.debug("FLUSH: [%s]", this);
+ }
+ final Sender<ProtocolEvent> theSender = sender;
+ if(theSender != null)
+ {
+ theSender.flush();
+ }
}
protected void invoke(Method method)
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java Wed Dec 28 13:02:41 2011
@@ -20,13 +20,7 @@
*/
package org.apache.qpid.transport;
-import org.apache.qpid.transport.network.Frame;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.LinkedHashMap;
-import java.nio.ByteBuffer;
+import java.util.*;
/**
@@ -35,45 +29,87 @@ import java.nio.ByteBuffer;
* @author Rafael H. Schloming
*/
-public class Header {
+public class Header
+{
- private final Struct[] structs;
+ private final DeliveryProperties _deliveryProps;
+ private final MessageProperties _messageProps;
+ private final List<Struct> _nonStandardProps;
- public Header(List<Struct> structs)
+ public Header(DeliveryProperties deliveryProps, MessageProperties messageProps)
{
- this(structs.toArray(new Struct[structs.size()]));
+ this(deliveryProps, messageProps, null);
}
- public Header(Struct ... structs)
+ public Header(DeliveryProperties deliveryProps, MessageProperties messageProps, List<Struct> nonStandardProps)
{
- this.structs = structs;
+ _deliveryProps = deliveryProps;
+ _messageProps = messageProps;
+ _nonStandardProps = nonStandardProps;
}
public Struct[] getStructs()
{
+ int size = 0;
+ if(_deliveryProps != null)
+ {
+ size++;
+ }
+ if(_messageProps != null)
+ {
+ size++;
+ }
+ if(_nonStandardProps != null)
+ {
+ size+=_nonStandardProps.size();
+ }
+ Struct[] structs = new Struct[size];
+ int index = 0;
+ if(_deliveryProps != null)
+ {
+ structs[index++] = _deliveryProps;
+ }
+ if(_messageProps != null)
+ {
+ structs[index++] = _messageProps;
+ }
+ if(_nonStandardProps != null)
+ {
+ for(Struct struct : _nonStandardProps)
+ {
+ structs[index++] = struct;
+ }
+ }
+
return structs;
}
+ public DeliveryProperties getDeliveryProperties()
+ {
+ return _deliveryProps;
+ }
- public <T> T get(Class<T> klass)
+ public MessageProperties getMessageProperties()
{
- for (Struct st : structs)
- {
- if (klass.isInstance(st))
- {
- return (T) st;
- }
- }
+ return _messageProps;
+ }
- return null;
+ public List<Struct> getNonStandardProperties()
+ {
+ return _nonStandardProps;
}
public String toString()
{
- StringBuffer str = new StringBuffer();
+ StringBuilder str = new StringBuilder();
str.append(" Header(");
boolean first = true;
- for (Struct s : structs)
+ if(_deliveryProps !=null)
+ {
+ first=false;
+ str.append(_deliveryProps);
+ }
+ if(_messageProps != null)
{
if (first)
{
@@ -83,9 +119,24 @@ public class Header {
{
str.append(", ");
}
- str.append(s);
+ str.append(_messageProps);
+ }
+ if(_nonStandardProps != null)
+ {
+ for (Struct s : _nonStandardProps)
+ {
+ if (first)
+ {
+ first = false;
+ }
+ else
+ {
+ str.append(", ");
+ }
+ str.append(s);
+ }
}
- str.append(")");
+ str.append(')');
return str.toString();
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java Wed Dec 28 13:02:41 2011
@@ -21,6 +21,8 @@
package org.apache.qpid.transport;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import static org.apache.qpid.util.Serial.*;
@@ -32,94 +34,259 @@ import static org.apache.qpid.util.Seria
* @author Rafael H. Schloming
*/
-public final class Range
+public abstract class Range implements RangeSet
{
- private final int lower;
- private final int upper;
+ public static Range newInstance(int point)
+ {
+ return new PointImpl(point);
+ }
+
+ public static Range newInstance(int lower, int upper)
+ {
+ return lower == upper ? new PointImpl(lower) : new RangeImpl(lower, upper);
+ }
+
+ public abstract int getLower();
+
+ public abstract int getUpper();
+
+ public abstract boolean includes(int value);
+
+ public abstract boolean includes(Range range);
+
+ public abstract boolean intersects(Range range);
+
+ public abstract boolean touches(Range range);
+
+ public abstract Range span(Range range);
- public Range(int lower, int upper)
+ public abstract List<Range> subtract(Range range);
+
+
+ public Range intersect(Range range)
{
- this.lower = lower;
- this.upper = upper;
+ int l = max(getLower(), range.getLower());
+ int r = min(getUpper(), range.getUpper());
+ if (gt(l, r))
+ {
+ return null;
+ }
+ else
+ {
+ return newInstance(l, r);
+ }
}
- public int getLower()
+
+
+ public int size()
{
- return lower;
+ return 1;
}
- public int getUpper()
+ public Iterator<Range> iterator()
{
- return upper;
+ return new RangeIterator();
}
- public boolean includes(int value)
+ public Range getFirst()
{
- return le(lower, value) && le(value, upper);
+ return this;
}
- public boolean includes(Range range)
+ public Range getLast()
{
- return includes(range.lower) && includes(range.upper);
+ return this;
}
- public boolean intersects(Range range)
+ public void add(Range range)
{
- return (includes(range.lower) || includes(range.upper) ||
- range.includes(lower) || range.includes(upper));
+ throw new UnsupportedOperationException();
}
- public boolean touches(Range range)
+ public void add(int lower, int upper)
{
- return (intersects(range) ||
- includes(range.upper + 1) || includes(range.lower - 1) ||
- range.includes(upper + 1) || range.includes(lower - 1));
+ throw new UnsupportedOperationException();
}
- public Range span(Range range)
+ public void add(int value)
{
- return new Range(min(lower, range.lower), max(upper, range.upper));
+ throw new UnsupportedOperationException();
}
- public List<Range> subtract(Range range)
+ public void clear()
{
- List<Range> result = new ArrayList<Range>();
+ throw new UnsupportedOperationException();
+ }
+
+ public RangeSet copy()
+ {
+ RangeSet rangeSet = RangeSetFactory.createRangeSet();
+ rangeSet.add(this);
+ return rangeSet;
+ }
+
+ private static class PointImpl extends Range
+ {
+ private final int point;
+
+ private PointImpl(int point)
+ {
+ this.point = point;
+ }
+
+ public int getLower()
+ {
+ return point;
+ }
+
+ public int getUpper()
+ {
+ return point;
+ }
+
+ public boolean includes(int value)
+ {
+ return value == point;
+ }
+
+
+ public boolean includes(Range range)
+ {
+ return range.getLower() == point && range.getUpper() == point;
+ }
+
+ public boolean intersects(Range range)
+ {
+ return range.includes(point);
+ }
- if (includes(range.lower) && le(lower, range.lower - 1))
+ public boolean touches(Range range)
{
- result.add(new Range(lower, range.lower - 1));
+ return intersects(range) ||
+ includes(range.getUpper() + 1) || includes(range.getLower() - 1) ||
+ range.includes(point + 1) || range.includes(point - 1);
}
- if (includes(range.upper) && le(range.upper + 1, upper))
+ public Range span(Range range)
{
- result.add(new Range(range.upper + 1, upper));
+ return newInstance(min(point, range.getLower()), max(point, range.getUpper()));
}
- if (result.isEmpty() && !range.includes(this))
+ public List<Range> subtract(Range range)
{
- result.add(this);
+ if(range.includes(point))
+ {
+ return Collections.emptyList();
+ }
+ else
+ {
+ return Collections.singletonList((Range) this);
+ }
}
- return result;
}
- public Range intersect(Range range)
+ private static class RangeImpl extends Range
{
- int l = max(lower, range.lower);
- int r = min(upper, range.upper);
- if (gt(l, r))
+ private final int lower;
+ private final int upper;
+
+ private RangeImpl(int lower, int upper)
{
- return null;
+ this.lower = lower;
+ this.upper = upper;
}
- else
+
+ public int getLower()
+ {
+ return lower;
+ }
+
+ public int getUpper()
+ {
+ return upper;
+ }
+
+ public boolean includes(int value)
+ {
+ return le(lower, value) && le(value, upper);
+ }
+
+ public boolean includes(Range range)
+ {
+ return includes(range.getLower()) && includes(range.getUpper());
+ }
+
+ public boolean intersects(Range range)
+ {
+ return (includes(range.getLower()) || includes(range.getUpper()) ||
+ range.includes(lower) || range.includes(upper));
+ }
+
+ public boolean touches(Range range)
{
- return new Range(l, r);
+ return (intersects(range) ||
+ includes(range.getUpper() + 1) || includes(range.getLower() - 1) ||
+ range.includes(upper + 1) || range.includes(lower - 1));
+ }
+
+ public Range span(Range range)
+ {
+ return newInstance(min(lower, range.getLower()), max(upper, range.getUpper()));
+ }
+
+ public List<Range> subtract(Range range)
+ {
+ List<Range> result = new ArrayList<Range>();
+
+ if (includes(range.getLower()) && le(lower, range.getLower() - 1))
+ {
+ result.add(newInstance(lower, range.getLower() - 1));
+ }
+
+ if (includes(range.getUpper()) && le(range.getUpper() + 1, upper))
+ {
+ result.add(newInstance(range.getUpper() + 1, upper));
+ }
+
+ if (result.isEmpty() && !range.includes(this))
+ {
+ result.add(this);
+ }
+
+ return result;
+ }
+
+
+ public String toString()
+ {
+ return "[" + lower + ", " + upper + "]";
}
}
- public String toString()
+
+ private class RangeIterator implements Iterator<Range>
{
- return "[" + lower + ", " + upper + "]";
- }
+ private boolean atFirst = true;
+
+ public boolean hasNext()
+ {
+ return atFirst;
+ }
+
+ public Range next()
+ {
+ Range range = atFirst ? Range.this : null;
+ atFirst = false;
+ return range;
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java Wed Dec 28 13:02:41 2011
@@ -20,9 +20,7 @@
*/
package org.apache.qpid.transport;
-import java.util.Iterator;
-import java.util.ListIterator;
-import java.util.LinkedList;
+import java.util.*;
import static org.apache.qpid.util.Serial.*;
@@ -32,121 +30,29 @@ import static org.apache.qpid.util.Seria
* @author Rafael H. Schloming
*/
-public final class RangeSet implements Iterable<Range>
+public interface RangeSet extends Iterable<Range>
{
- private LinkedList<Range> ranges = new LinkedList<Range>();
+ int size();
- public int size()
- {
- return ranges.size();
- }
-
- public Iterator<Range> iterator()
- {
- return ranges.iterator();
- }
-
- public Range getFirst()
- {
- return ranges.getFirst();
- }
-
- public Range getLast()
- {
- return ranges.getLast();
- }
-
- public boolean includes(Range range)
- {
- for (Range r : this)
- {
- if (r.includes(range))
- {
- return true;
- }
- }
-
- return false;
- }
-
- public boolean includes(int n)
- {
- for (Range r : this)
- {
- if (r.includes(n))
- {
- return true;
- }
- }
-
- return false;
- }
-
- public void add(Range range)
- {
- ListIterator<Range> it = ranges.listIterator();
-
- while (it.hasNext())
- {
- Range next = it.next();
- if (range.touches(next))
- {
- it.remove();
- range = range.span(next);
- }
- else if (lt(range.getUpper(), next.getLower()))
- {
- it.previous();
- it.add(range);
- return;
- }
- }
-
- it.add(range);
- }
-
- public void add(int lower, int upper)
- {
- add(new Range(lower, upper));
- }
-
- public void add(int value)
- {
- add(value, value);
- }
-
- public void clear()
- {
- ranges.clear();
- }
-
- public RangeSet copy()
- {
- RangeSet copy = new RangeSet();
- copy.ranges.addAll(ranges);
- return copy;
- }
-
- public String toString()
- {
- StringBuffer str = new StringBuffer();
- str.append("{");
- boolean first = true;
- for (Range range : ranges)
- {
- if (first)
- {
- first = false;
- }
- else
- {
- str.append(", ");
- }
- str.append(range);
- }
- str.append("}");
- return str.toString();
- }
+ Iterator<Range> iterator();
+
+ Range getFirst();
+
+ Range getLast();
+
+ boolean includes(Range range);
+
+ boolean includes(int n);
+
+ void add(Range range);
+
+ void add(int lower, int upper);
+
+ void add(int value);
+
+ void clear();
+
+ RangeSet copy();
}
Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java?rev=1225178&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java Wed Dec 28 13:02:41 2011
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+public class RangeSetFactory
+{
+ public static RangeSet createRangeSet()
+ {
+ return new RangeSetImpl();
+ }
+
+ public static RangeSet createRangeSet(int size)
+ {
+ return new RangeSetImpl(size);
+ }
+}
Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java?rev=1225178&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java Wed Dec 28 13:02:41 2011
@@ -0,0 +1,178 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+
+import static org.apache.qpid.util.Serial.lt;
+
+public class RangeSetImpl implements RangeSet
+{
+
+ private List<Range> ranges;
+
+ public RangeSetImpl()
+ {
+ ranges = new ArrayList<Range>();
+ }
+
+ public RangeSetImpl(int size)
+ {
+ ranges = new ArrayList<Range>(size);
+ }
+
+
+ public RangeSetImpl(org.apache.qpid.transport.RangeSetImpl copy)
+ {
+ ranges = new ArrayList<Range>(copy.ranges);
+ }
+
+ public int size()
+ {
+ return ranges.size();
+ }
+
+ public Iterator<Range> iterator()
+ {
+ return ranges.iterator();
+ }
+
+ public Range getFirst()
+ {
+ return ranges.get(0);
+ }
+
+ public Range getLast()
+ {
+ return ranges.get(ranges.size() - 1);
+ }
+
+ public boolean includes(Range range)
+ {
+ for (Range r : this)
+ {
+ if (r.includes(range))
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ public boolean includes(int n)
+ {
+ for (Range r : this)
+ {
+ if (r.includes(n))
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ public void add(Range range)
+ {
+ ListIterator<Range> it = ranges.listIterator();
+
+ while (it.hasNext())
+ {
+ Range next = it.next();
+ if (range.touches(next))
+ {
+ it.remove();
+ range = range.span(next);
+ }
+ else if (lt(range.getUpper(), next.getLower()))
+ {
+ it.previous();
+ it.add(range);
+ return;
+ }
+ }
+
+ it.add(range);
+ }
+
+ public void add(int lower, int upper)
+ {
+ switch(ranges.size())
+ {
+ case 0:
+ ranges.add(Range.newInstance(lower, upper));
+ break;
+
+ case 1:
+ Range first = ranges.get(0);
+ if(first.getUpper() + 1 >= lower && upper >= first.getUpper())
+ {
+ ranges.set(0, Range.newInstance(first.getLower(), upper));
+ break;
+ }
+
+ default:
+ add(Range.newInstance(lower, upper));
+ }
+
+
+ }
+
+ public void add(int value)
+ {
+ add(value, value);
+ }
+
+ public void clear()
+ {
+ ranges.clear();
+ }
+
+ public RangeSet copy()
+ {
+ return new org.apache.qpid.transport.RangeSetImpl(this);
+ }
+
+ public String toString()
+ {
+ StringBuffer str = new StringBuffer();
+ str.append("{");
+ boolean first = true;
+ for (Range range : ranges)
+ {
+ if (first)
+ {
+ first = false;
+ }
+ else
+ {
+ str.append(", ");
+ }
+ str.append(range);
+ }
+ str.append("}");
+ return str.toString();
+ }
+}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Wed Dec 28 13:02:41 2011
@@ -247,7 +247,7 @@ public class Session extends SessionInvo
synchronized (processedLock)
{
incomingInit = false;
- processed = new RangeSet();
+ processed = RangeSetFactory.createRangeSet();
}
}
@@ -276,22 +276,22 @@ public class Session extends SessionInvo
else if (m instanceof MessageTransfer)
{
MessageTransfer xfr = (MessageTransfer)m;
-
- if (xfr.getHeader() != null)
+
+ Header header = xfr.getHeader();
+
+ if (header != null)
{
- if (xfr.getHeader().get(DeliveryProperties.class) != null)
+ if (header.getDeliveryProperties() != null)
{
- xfr.getHeader().get(DeliveryProperties.class).setRedelivered(true);
+ header.getDeliveryProperties().setRedelivered(true);
}
else
{
- Struct[] structs = xfr.getHeader().getStructs();
DeliveryProperties deliveryProps = new DeliveryProperties();
deliveryProps.setRedelivered(true);
-
- List<Struct> list = Arrays.asList(structs);
- list.add(deliveryProps);
- xfr.setHeader(new Header(list));
+
+ xfr.setHeader(new Header(deliveryProps, header.getMessageProperties(),
+ header.getNonStandardProperties()));
}
}
@@ -299,7 +299,7 @@ public class Session extends SessionInvo
{
DeliveryProperties deliveryProps = new DeliveryProperties();
deliveryProps.setRedelivered(true);
- xfr.setHeader(new Header(deliveryProps));
+ xfr.setHeader(new Header(deliveryProps, null, null));
}
}
sessionCommandPoint(m.getId(), 0);
@@ -394,38 +394,46 @@ public class Session extends SessionInvo
public void processed(int command)
{
- processed(new Range(command, command));
+ processed(command, command);
}
- public void processed(int lower, int upper)
+ public void processed(Range range)
{
- processed(new Range(lower, upper));
+ processed(range.getLower(), range.getUpper());
}
- public void processed(Range range)
+ public void processed(int lower, int upper)
{
- log.debug("%s processed(%s) %s %s", this, range, syncPoint, maxProcessed);
+ if(log.isDebugEnabled())
+ {
+ log.debug("%s processed([%d,%d]) %s %s", this, lower, upper, syncPoint, maxProcessed);
+ }
boolean flush;
synchronized (processedLock)
{
- log.debug("%s", processed);
+ if(log.isDebugEnabled())
+ {
+ log.debug("%s", processed);
+ }
- if (ge(range.getUpper(), commandsIn))
+ if (ge(upper, commandsIn))
{
throw new IllegalArgumentException
- ("range exceeds max received command-id: " + range);
+ ("range exceeds max received command-id: " + Range.newInstance(lower, upper));
}
- processed.add(range);
+ processed.add(lower, upper);
+
Range first = processed.getFirst();
- int lower = first.getLower();
- int upper = first.getUpper();
+
+ int flower = first.getLower();
+ int fupper = first.getUpper();
int old = maxProcessed;
- if (le(lower, maxProcessed + 1))
+ if (le(flower, maxProcessed + 1))
{
- maxProcessed = max(maxProcessed, upper);
+ maxProcessed = max(maxProcessed, fupper);
}
boolean synced = ge(maxProcessed, syncPoint);
flush = lt(old, syncPoint) && synced;
@@ -442,7 +450,7 @@ public class Session extends SessionInvo
void flushExpected()
{
- RangeSet rs = new RangeSet();
+ RangeSet rs = RangeSetFactory.createRangeSet();
synchronized (processedLock)
{
if (incomingInit)
@@ -478,7 +486,7 @@ public class Session extends SessionInvo
{
synchronized (processedLock)
{
- RangeSet newProcessed = new RangeSet();
+ RangeSet newProcessed = RangeSetFactory.createRangeSet();
for (Range pr : processed)
{
for (Range kr : kc)
@@ -534,7 +542,12 @@ public class Session extends SessionInvo
{
maxComplete = max(maxComplete, upper);
}
- log.debug("%s commands remaining: %s", this, commandsOut - maxComplete);
+
+ if(log.isDebugEnabled())
+ {
+ log.debug("%s commands remaining: %s", this, commandsOut - maxComplete);
+ }
+
commands.notifyAll();
return gt(maxComplete, old);
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java Wed Dec 28 13:02:41 2011
@@ -91,21 +91,38 @@ public class SessionDelegate
{
RangeSet ranges = cmp.getCommands();
RangeSet known = null;
- if (cmp.getTimelyReply())
- {
- known = new RangeSet();
- }
if (ranges != null)
{
- for (Range range : ranges)
+ if(ranges.size() == 1)
{
+ Range range = ranges.getFirst();
boolean advanced = ssn.complete(range.getLower(), range.getUpper());
- if (advanced && known != null)
+
+ if(advanced && cmp.getTimelyReply())
{
- known.add(range);
+ known = range;
}
}
+ else
+ {
+ if (cmp.getTimelyReply())
+ {
+ known = RangeSetFactory.createRangeSet();
+ }
+ for (Range range : ranges)
+ {
+ boolean advanced = ssn.complete(range.getLower(), range.getUpper());
+ if (advanced && known != null)
+ {
+ known.add(range);
+ }
+ }
+ }
+ }
+ else if (cmp.getTimelyReply())
+ {
+ known = RangeSetFactory.createRangeSet();
}
if (known != null)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org