You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/02/14 21:02:22 UTC
svn commit: r507672 [14/16] - in /incubator/qpid/branches/qpid.0-9:
gentools/src/org/apache/qpid/gentools/ gentools/templ.java/
gentools/templ.net/ java/ java/broker/ java/broker/bin/
java/broker/distribution/ java/broker/distribution/src/ java/broker/...
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java Wed Feb 14 12:02:03 2007
@@ -35,6 +35,7 @@
public static final int SIZEOF_UNSIGNED_SHORT = 2;
public static final int SIZEOF_UNSIGNED_INT = 4;
+ private static final boolean[] ALL_FALSE_ARRAY = new boolean[8];
public static int encodedShortStringLength(String s)
{
@@ -48,6 +49,120 @@
}
}
+
+ public static int encodedShortStringLength(short s)
+ {
+ if( s == 0 )
+ {
+ return 1 + 1;
+ }
+
+ int len = 0;
+ if(s < 0)
+ {
+ len=1;
+ // sloppy - doesn't work of Integer.MIN_VALUE
+ s=(short)-s;
+ }
+
+ if(s>9999)
+ {
+ return 1+5;
+ }
+ else if(s>999)
+ {
+ return 1+4;
+ }
+ else if(s>99)
+ {
+ return 1+3;
+ }
+ else if(s>9)
+ {
+ return 1+2;
+ }
+ else
+ {
+ return 1+1;
+ }
+
+ }
+
+
+ public static int encodedShortStringLength(int i)
+ {
+ if( i == 0 )
+ {
+ return 1 + 1;
+ }
+
+ int len = 0;
+ if(i < 0)
+ {
+ len=1;
+ // sloppy - doesn't work of Integer.MIN_VALUE
+ i=-i;
+ }
+
+ // range is now 1 - 2147483647
+ if(i < Short.MAX_VALUE)
+ {
+ return len + encodedShortStringLength((short)i);
+ }
+ else if (i > 999999)
+ {
+ return len + 6 + encodedShortStringLength((short)(i/1000000));
+ }
+ else // if (i > 99999)
+ {
+ return len + 5 + encodedShortStringLength((short)(i/100000));
+ }
+
+ }
+
+ public static int encodedShortStringLength(long l)
+ {
+ if(l == 0)
+ {
+ return 1 + 1;
+ }
+
+ int len = 0;
+ if(l < 0)
+ {
+ len=1;
+ // sloppy - doesn't work of Long.MIN_VALUE
+ l=-l;
+ }
+ if(l < Integer.MAX_VALUE)
+ {
+ return len + encodedShortStringLength((int) l);
+ }
+ else if(l > 9999999999L)
+ {
+ return len + 10 + encodedShortStringLength((int) (l / 10000000000L));
+ }
+ else
+ {
+ return len + 1 + encodedShortStringLength((int) (l / 10L));
+ }
+
+ }
+
+
+ public static int encodedShortStringLength(AMQShortString s)
+ {
+ if (s == null)
+ {
+ return 1;
+ }
+ else
+ {
+ return (short) (1 + s.length());
+ }
+ }
+
+
public static int encodedLongStringLength(String s)
{
if (s == null)
@@ -122,6 +237,21 @@
}
}
+
+ public static void writeShortStringBytes(ByteBuffer buffer, AMQShortString s)
+ {
+ if (s != null)
+ {
+
+ s.writeToBuffer(buffer);
+ }
+ else
+ {
+ // really writing out unsigned byte
+ buffer.put((byte) 0);
+ }
+ }
+
public static void writeLongStringBytes(ByteBuffer buffer, String s)
{
assert s == null || s.length() <= 0xFFFE;
@@ -224,6 +354,7 @@
}
}
+
public static void writeFieldTableBytes(ByteBuffer buffer, FieldTable table)
{
if (table != null)
@@ -255,6 +386,238 @@
buffer.put(packedValue);
}
+ public static void writeBooleans(ByteBuffer buffer, boolean value)
+ {
+
+ buffer.put(value ? (byte) 1 : (byte) 0);
+ }
+
+ public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1)
+ {
+ byte packedValue = value0 ? (byte) 1 : (byte) 0;
+
+ if (value1)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 1));
+ }
+
+
+ buffer.put(packedValue);
+ }
+
+ public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2)
+ {
+ byte packedValue = value0 ? (byte) 1 : (byte) 0;
+
+ if (value1)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 1));
+ }
+
+ if (value2)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 2));
+ }
+
+
+ buffer.put(packedValue);
+ }
+
+
+
+ public static void writeBooleans(ByteBuffer buffer,
+ boolean value0,
+ boolean value1,
+ boolean value2,
+ boolean value3)
+ {
+ byte packedValue = value0 ? (byte) 1 : (byte) 0;
+
+ if (value1)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 1));
+ }
+
+ if (value2)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 2));
+ }
+
+ if (value3)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 3));
+ }
+
+ buffer.put(packedValue);
+ }
+
+ public static void writeBooleans(ByteBuffer buffer,
+ boolean value0,
+ boolean value1,
+ boolean value2,
+ boolean value3,
+ boolean value4)
+ {
+ byte packedValue = value0 ? (byte) 1 : (byte) 0;
+
+ if (value1)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 1));
+ }
+
+ if (value2)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 2));
+ }
+
+ if (value3)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 3));
+ }
+
+ if (value4)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 4));
+ }
+
+ buffer.put(packedValue);
+ }
+
+ public static void writeBooleans(ByteBuffer buffer,
+ boolean value0,
+ boolean value1,
+ boolean value2,
+ boolean value3,
+ boolean value4,
+ boolean value5)
+ {
+ byte packedValue = value0 ? (byte) 1 : (byte) 0;
+
+ if (value1)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 1));
+ }
+
+ if (value2)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 2));
+ }
+
+ if (value3)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 3));
+ }
+
+ if (value4)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 4));
+ }
+
+ if (value5)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 5));
+ }
+
+ buffer.put(packedValue);
+ }
+
+ public static void writeBooleans(ByteBuffer buffer,
+ boolean value0,
+ boolean value1,
+ boolean value2,
+ boolean value3,
+ boolean value4,
+ boolean value5,
+ boolean value6)
+ {
+ byte packedValue = value0 ? (byte) 1 : (byte) 0;
+
+ if (value1)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 1));
+ }
+
+ if (value2)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 2));
+ }
+
+ if (value3)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 3));
+ }
+
+ if (value4)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 4));
+ }
+
+ if (value5)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 5));
+ }
+
+ if (value6)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 6));
+ }
+
+ buffer.put(packedValue);
+ }
+
+ public static void writeBooleans(ByteBuffer buffer,
+ boolean value0,
+ boolean value1,
+ boolean value2,
+ boolean value3,
+ boolean value4,
+ boolean value5,
+ boolean value6,
+ boolean value7)
+ {
+ byte packedValue = value0 ? (byte) 1 : (byte) 0;
+
+ if (value1)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 1));
+ }
+
+ if (value2)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 2));
+ }
+
+ if (value3)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 3));
+ }
+
+ if (value4)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 4));
+ }
+
+ if (value5)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 5));
+ }
+
+ if (value6)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 6));
+ }
+
+ if (value7)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 7));
+ }
+
+ buffer.put(packedValue);
+ }
+
+
+
+
/**
* This is used for writing longstrs.
*
@@ -282,13 +645,27 @@
public static boolean[] readBooleans(ByteBuffer buffer)
{
- byte packedValue = buffer.get();
- boolean[] result = new boolean[8];
+ final byte packedValue = buffer.get();
+ if(packedValue == 0)
+ {
+ return ALL_FALSE_ARRAY;
+ }
+ final boolean[] result = new boolean[8];
- for (int i = 0; i < 8; i++)
+ result[0] = ((packedValue & 1) != 0);
+ result[1] = ((packedValue & (1 << 1)) != 0);
+ result[2] = ((packedValue & (1 << 2)) != 0);
+ result[3] = ((packedValue & (1 << 3)) != 0);
+ if((packedValue & 0xF0) == 0)
{
- result[i] = ((packedValue & (1 << i)) != 0);
+ result[0] = ((packedValue & 1) != 0);
}
+ result[4] = ((packedValue & (1 << 4)) != 0);
+ result[5] = ((packedValue & (1 << 5)) != 0);
+ result[6] = ((packedValue & (1 << 6)) != 0);
+ result[7] = ((packedValue & (1 << 7)) != 0);
+
+
return result;
}
@@ -312,6 +689,12 @@
return content;
}
+ public static AMQShortString readAMQShortString(ByteBuffer buffer)
+ {
+ return AMQShortString.readFromBuffer(buffer);
+
+ }
+
public static String readShortString(ByteBuffer buffer)
{
short length = buffer.getUnsigned();
@@ -363,7 +746,7 @@
}
}
- public static byte[] readLongstr(ByteBuffer buffer) throws AMQFrameDecodingException
+ public static byte[] readLongstr(ByteBuffer buffer)
{
long length = buffer.getUnsignedInt();
if (length == 0)
@@ -468,7 +851,7 @@
buffer.put((byte) (aBoolean ? 1 : 0));
}
- public static Boolean readBoolean(ByteBuffer buffer)
+ public static boolean readBoolean(ByteBuffer buffer)
{
byte packedValue = buffer.get();
return (packedValue == 1);
@@ -485,7 +868,7 @@
buffer.put(aByte);
}
- public static Byte readByte(ByteBuffer buffer)
+ public static byte readByte(ByteBuffer buffer)
{
return buffer.get();
}
@@ -502,7 +885,7 @@
buffer.putShort(aShort);
}
- public static Short readShort(ByteBuffer buffer)
+ public static short readShort(ByteBuffer buffer)
{
return buffer.getShort();
}
@@ -518,7 +901,7 @@
buffer.putInt(aInteger);
}
- public static Integer readInteger(ByteBuffer buffer)
+ public static int readInteger(ByteBuffer buffer)
{
return buffer.getInt();
}
@@ -534,7 +917,7 @@
buffer.putLong(aLong);
}
- public static Long readLong(ByteBuffer buffer)
+ public static long readLong(ByteBuffer buffer)
{
return buffer.getLong();
}
@@ -550,7 +933,7 @@
buffer.putFloat(aFloat);
}
- public static Float readFloat(ByteBuffer buffer)
+ public static float readFloat(ByteBuffer buffer)
{
return buffer.getFloat();
}
@@ -567,7 +950,7 @@
buffer.putDouble(aDouble);
}
- public static Double readDouble(ByteBuffer buffer)
+ public static double readDouble(ByteBuffer buffer)
{
return buffer.getDouble();
}
@@ -627,6 +1010,41 @@
writeByte(buffer, (byte) character);
}
+ public static long readLongAsShortString(ByteBuffer buffer)
+ {
+ short length = buffer.getUnsigned();
+ short pos = 0;
+ if(length == 0)
+ {
+ return 0L;
+ }
+ byte digit = buffer.get();
+ boolean isNegative;
+ long result = 0;
+ if(digit == (byte)'-')
+ {
+ isNegative = true;
+ pos++;
+ digit = buffer.get();
+ }
+ else
+ {
+ isNegative = false;
+ }
+ result = digit - (byte)'0';
+ pos++;
+
+ while(pos < length)
+ {
+ pos++;
+ digit = buffer.get();
+ result = (result << 3) + (result << 1);
+ result += digit - (byte)'0';
+ }
+
+ return result;
+ }
+
public static long readUnsignedInteger(ByteBuffer buffer)
{
long l = 0xFF & buffer.get();
@@ -638,5 +1056,24 @@
l = l | (0xFF & buffer.get());
return l;
+ }
+
+
+ public static void main(String[] args)
+ {
+ ByteBuffer buf = ByteBuffer.allocate(8);
+ buf.setAutoExpand(true);
+
+ long l = (long) Integer.MAX_VALUE;
+ l += 1024L;
+
+ writeUnsignedInteger(buf, l);
+
+ buf.flip();
+
+ long l2 = readUnsignedInteger(buf);
+
+ System.out.println("before: " + l);
+ System.out.println("after: " + l2);
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java Wed Feb 14 12:02:03 2007
@@ -31,17 +31,16 @@
{
private static final Logger _logger = Logger.getLogger(FieldTable.class);
- private LinkedHashMap<String, AMQTypedValue> _properties;
+ private ByteBuffer _encodedForm;
+ private LinkedHashMap<AMQShortString, AMQTypedValue> _properties;
+ private long _encodedSize;
+ private static final int INITIAL_HASHMAP_CAPACITY = 16;
public FieldTable()
{
super();
- _properties = new LinkedHashMap<String, AMQTypedValue>();
-
}
-
-
/**
* Construct a new field table.
*
@@ -52,14 +51,105 @@
public FieldTable(ByteBuffer buffer, long length) throws AMQFrameDecodingException
{
this();
- setFromBuffer(buffer, length);
+ _encodedForm = buffer.slice();
+ _encodedForm.limit((int)length);
+ _encodedSize = length;
+ buffer.skip((int)length);
+ }
+
+
+
+ private AMQTypedValue getProperty(AMQShortString string)
+ {
+ synchronized(this)
+ {
+ if(_properties == null)
+ {
+ if(_encodedForm == null)
+ {
+ return null;
+ }
+ else
+ {
+ populateFromBuffer();
+ }
+ }
+ }
+
+ if(_properties == null)
+ {
+ return null;
+ }
+ else
+ {
+ return _properties.get(string);
+ }
+ }
+
+ private void populateFromBuffer()
+ {
+ try
+ {
+ setFromBuffer(_encodedForm, _encodedSize);
+ }
+ catch (AMQFrameDecodingException e)
+ {
+ _logger.error("Error decoding FieldTable in deferred decoding mode ", e);
+ throw new IllegalArgumentException(e);
+ }
}
+ private AMQTypedValue setProperty(AMQShortString key, AMQTypedValue val)
+ {
+ initMapIfNecessary();
+ _encodedForm = null;
+ if(val == null)
+ {
+ return removeKey(key);
+ }
+ AMQTypedValue oldVal = _properties.put(key,val);
+ if(oldVal != null)
+ {
+ _encodedSize -= oldVal.getEncodingSize();
+ }
+ else
+ {
+ _encodedSize += EncodingUtils.encodedShortStringLength(key) + 1;
+ }
+ _encodedSize += val.getEncodingSize();
+
+ return oldVal;
+ }
+
+ private void initMapIfNecessary()
+ {
+ synchronized(this)
+ {
+ if(_properties == null)
+ {
+ if(_encodedForm == null)
+ {
+ _properties = new LinkedHashMap<AMQShortString,AMQTypedValue>();
+ }
+ else
+ {
+ populateFromBuffer();
+ }
+ }
+
+ }
+ }
+
public Boolean getBoolean(String string)
{
- AMQTypedValue value = _properties.get(string);
+ return getBoolean(new AMQShortString(string));
+ }
+
+ public Boolean getBoolean(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
if (value != null && (value.getType() == AMQType.BOOLEAN))
{
return (Boolean) value.getValue();
@@ -70,9 +160,15 @@
}
}
+
public Byte getByte(String string)
{
- AMQTypedValue value = _properties.get(string);
+ return getByte(new AMQShortString(string));
+ }
+
+ public Byte getByte(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
if (value != null && (value.getType() == AMQType.BYTE))
{
return (Byte) value.getValue();
@@ -85,7 +181,12 @@
public Short getShort(String string)
{
- AMQTypedValue value = _properties.get(string);
+ return getShort(new AMQShortString(string));
+ }
+
+ public Short getShort(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
if (value != null && (value.getType() == AMQType.SHORT))
{
return (Short) value.getValue();
@@ -98,7 +199,12 @@
public Integer getInteger(String string)
{
- AMQTypedValue value = _properties.get(string);
+ return getInteger(new AMQShortString(string));
+ }
+
+ public Integer getInteger(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
if (value != null && (value.getType() == AMQType.INT))
{
return (Integer) value.getValue();
@@ -111,7 +217,12 @@
public Long getLong(String string)
{
- AMQTypedValue value = _properties.get(string);
+ return getLong(new AMQShortString(string));
+ }
+
+ public Long getLong(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
if (value != null && (value.getType() == AMQType.LONG))
{
return (Long) value.getValue();
@@ -124,7 +235,12 @@
public Float getFloat(String string)
{
- AMQTypedValue value = _properties.get(string);
+ return getFloat(new AMQShortString(string));
+ }
+
+ public Float getFloat(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
if (value != null && (value.getType() == AMQType.FLOAT))
{
return (Float) value.getValue();
@@ -137,7 +253,12 @@
public Double getDouble(String string)
{
- AMQTypedValue value = _properties.get(string);
+ return getDouble(new AMQShortString(string));
+ }
+
+ public Double getDouble(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
if (value != null && (value.getType() == AMQType.DOUBLE))
{
return (Double) value.getValue();
@@ -150,7 +271,12 @@
public String getString(String string)
{
- AMQTypedValue value = _properties.get(string);
+ return getString(new AMQShortString(string));
+ }
+
+ public String getString(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
if ((value != null) && ((value.getType() == AMQType.WIDE_STRING) ||
(value.getType() == AMQType.ASCII_STRING)))
{
@@ -170,7 +296,12 @@
public Character getCharacter(String string)
{
- AMQTypedValue value = _properties.get(string);
+ return getCharacter(new AMQShortString(string));
+ }
+
+ public Character getCharacter(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
if (value != null && (value.getType() == AMQType.ASCII_CHARACTER))
{
return (Character) value.getValue();
@@ -183,7 +314,12 @@
public byte[] getBytes(String string)
{
- AMQTypedValue value = _properties.get(string);
+ return getBytes(new AMQShortString(string));
+ }
+
+ public byte[] getBytes(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
if (value != null && (value.getType() == AMQType.BINARY))
{
return (byte[]) value.getValue();
@@ -196,7 +332,12 @@
public Object getObject(String string)
{
- AMQTypedValue value = _properties.get(string);
+ return getObject(new AMQShortString(string));
+ }
+
+ public Object getObject(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
if(value != null)
{
return value.getValue();
@@ -209,92 +350,163 @@
}
// ************ Setters
-
public Object setBoolean(String string, boolean b)
{
+ return setBoolean(new AMQShortString(string), b);
+ }
+
+ public Object setBoolean(AMQShortString string, boolean b)
+ {
checkPropertyName(string);
- return _properties.put(string, AMQType.BOOLEAN.asTypedValue(b));
+ return setProperty(string, AMQType.BOOLEAN.asTypedValue(b));
}
public Object setByte(String string, byte b)
{
+ return setByte(new AMQShortString(string), b);
+ }
+
+ public Object setByte(AMQShortString string, byte b)
+ {
checkPropertyName(string);
- return _properties.put(string, AMQType.BYTE.asTypedValue(b));
+ return setProperty(string, AMQType.BYTE.asTypedValue(b));
}
public Object setShort(String string, short i)
{
+ return setShort(new AMQShortString(string), i);
+ }
+
+ public Object setShort(AMQShortString string, short i)
+ {
checkPropertyName(string);
- return _properties.put(string, AMQType.SHORT.asTypedValue(i));
+ return setProperty(string, AMQType.SHORT.asTypedValue(i));
}
+
public Object setInteger(String string, int i)
{
+ return setInteger(new AMQShortString(string), i);
+ }
+
+ public Object setInteger(AMQShortString string, int i)
+ {
checkPropertyName(string);
- return _properties.put(string, AMQType.INT.asTypedValue(i));
+ return setProperty(string, AMQType.INT.asTypedValue(i));
}
+
public Object setLong(String string, long l)
{
+ return setLong(new AMQShortString(string), l);
+ }
+
+ public Object setLong(AMQShortString string, long l)
+ {
checkPropertyName(string);
- return _properties.put(string, AMQType.LONG.asTypedValue(l));
+ return setProperty(string, AMQType.LONG.asTypedValue(l));
}
- public Object setFloat(String string, float v)
+
+ public Object setFloat(String string, float f)
+ {
+ return setFloat(new AMQShortString(string), f);
+ }
+
+ public Object setFloat(AMQShortString string, float v)
{
checkPropertyName(string);
- return _properties.put(string, AMQType.FLOAT.asTypedValue(v));
+ return setProperty(string, AMQType.FLOAT.asTypedValue(v));
}
- public Object setDouble(String string, double v)
+ public Object setDouble(String string, double d)
+ {
+ return setDouble(new AMQShortString(string), d);
+ }
+
+
+ public Object setDouble(AMQShortString string, double v)
{
checkPropertyName(string);
- return _properties.put(string, AMQType.DOUBLE.asTypedValue(v));
+ return setProperty(string, AMQType.DOUBLE.asTypedValue(v));
+ }
+
+
+ public Object setString(String string, String s)
+ {
+ return setString(new AMQShortString(string), s);
}
- public Object setString(String string, String value)
+ public Object setAsciiString(AMQShortString string, String value)
{
checkPropertyName(string);
if (value == null)
{
- return _properties.put(string, AMQType.VOID.asTypedValue(null));
+ return setProperty(string, AMQType.VOID.asTypedValue(null));
}
else
{
- //FIXME: determine string encoding and set either WIDE or ASCII string
-// if ()
- {
- return _properties.put(string, AMQType.WIDE_STRING.asTypedValue(value));
- }
-// else
-// {
-// return _properties.put(string, AMQType.ASCII_STRING.asTypedValue(value));
-// }
+ return setProperty(string, AMQType.ASCII_STRING.asTypedValue(value));
}
}
+ public Object setString(AMQShortString string, String value)
+ {
+ checkPropertyName(string);
+ if (value == null)
+ {
+ return setProperty(string, AMQType.VOID.asTypedValue(null));
+ }
+ else
+ {
+ return setProperty(string, AMQType.LONG_STRING.asTypedValue(value));
+ }
+ }
+
+
public Object setChar(String string, char c)
{
+ return setChar(new AMQShortString(string), c);
+ }
+
+
+ public Object setChar(AMQShortString string, char c)
+ {
checkPropertyName(string);
- return _properties.put(string, AMQType.ASCII_CHARACTER.asTypedValue(c));
+ return setProperty(string, AMQType.ASCII_CHARACTER.asTypedValue(c));
}
- public Object setBytes(String string, byte[] bytes)
+
+ public Object setBytes(String string, byte[] b)
+ {
+ return setBytes(new AMQShortString(string), b);
+ }
+
+ public Object setBytes(AMQShortString string, byte[] bytes)
{
checkPropertyName(string);
- return _properties.put(string, AMQType.BINARY.asTypedValue(bytes));
+ return setProperty(string, AMQType.BINARY.asTypedValue(bytes));
}
public Object setBytes(String string, byte[] bytes, int start, int length)
{
+ return setBytes(new AMQShortString(string), bytes,start,length);
+ }
+
+ public Object setBytes(AMQShortString string, byte[] bytes, int start, int length)
+ {
checkPropertyName(string);
byte[] newBytes = new byte[length];
System.arraycopy(bytes,start,newBytes,0,length);
return setBytes(string, bytes);
}
+ public Object setObject(String string, Object o)
+ {
+ return setObject(new AMQShortString(string), o);
+ }
- public Object setObject(String string, Object object)
+ public Object setObject(AMQShortString string, Object object)
{
if (object instanceof Boolean)
{
@@ -343,7 +555,7 @@
public boolean isNullStringValue(String name)
{
- AMQTypedValue value = _properties.get(name);
+ AMQTypedValue value = getProperty(new AMQShortString(name));
return (value != null) && (value.getType() == AMQType.VOID);
}
@@ -351,7 +563,12 @@
public Enumeration getPropertyNames()
{
- return Collections.enumeration(_properties.keySet());
+ return Collections.enumeration(keys());
+ }
+
+ public boolean propertyExists(AMQShortString propertyName)
+ {
+ return itemExists(propertyName);
}
public boolean propertyExists(String propertyName)
@@ -359,25 +576,34 @@
return itemExists(propertyName);
}
- public boolean itemExists(String string)
+ public boolean itemExists(AMQShortString string)
{
+ initMapIfNecessary();
return _properties.containsKey(string);
}
+ public boolean itemExists(String string)
+ {
+ return itemExists(new AMQShortString(string));
+ }
+
public String toString()
{
+ initMapIfNecessary();
+// if (_encodedForm != null)
+// _encodedForm.rewind();
return _properties.toString();
}
- private void checkPropertyName(String propertyName)
+ private void checkPropertyName(AMQShortString propertyName)
{
if (propertyName == null)
{
throw new IllegalArgumentException("Property name must not be null");
}
- else if ("".equals(propertyName))
+ else if (propertyName.length()==0)
{
throw new IllegalArgumentException("Property name must not be the empty string");
}
@@ -386,7 +612,7 @@
}
- protected static void checkIdentiferFormat(String propertyName)
+ protected static void checkIdentiferFormat(AMQShortString propertyName)
{
// AMQP Spec: 4.2.5.5 Field Tables
// Guidelines for implementers:
@@ -448,20 +674,31 @@
public long getEncodedSize()
{
+ return _encodedSize;
+ }
+
+ private void recalculateEncodedSize()
+ {
+
int encodedSize = 0;
- for(Map.Entry<String,AMQTypedValue> e : _properties.entrySet())
+ if(_properties != null)
{
- encodedSize += EncodingUtils.encodedShortStringLength(e.getKey());
- encodedSize++; // the byte for the encoding Type
- encodedSize += e.getValue().getEncodingSize();
+ for(Map.Entry<AMQShortString,AMQTypedValue> e : _properties.entrySet())
+ {
+ encodedSize += EncodingUtils.encodedShortStringLength(e.getKey());
+ encodedSize++; // the byte for the encoding Type
+ encodedSize += e.getValue().getEncodingSize();
+ }
}
- return encodedSize;
+ _encodedSize = encodedSize;
}
public void addAll(FieldTable fieldTable)
{
+ initMapIfNecessary();
_properties.putAll(fieldTable._properties);
+ recalculateEncodedSize();
}
@@ -473,135 +710,213 @@
public Object processOverElements(FieldTableElementProcessor processor)
{
- for(Map.Entry<String,AMQTypedValue> e : _properties.entrySet())
+ initMapIfNecessary();
+ if(_properties != null)
{
- boolean result = processor.processElement(e.getKey(), e.getValue());
- if(!result)
+ for(Map.Entry<AMQShortString,AMQTypedValue> e : _properties.entrySet())
{
- break;
+ boolean result = processor.processElement(e.getKey().toString(), e.getValue());
+ if(!result)
+ {
+ break;
+ }
}
}
return processor.getResult();
+
+
}
public int size()
{
+ initMapIfNecessary();
return _properties.size();
+
}
public boolean isEmpty()
{
- return _properties.isEmpty();
+ return size() ==0;
}
- public boolean containsKey(String key)
+ public boolean containsKey(AMQShortString key)
{
+ initMapIfNecessary();
return _properties.containsKey(key);
}
+ public boolean containsKey(String key)
+ {
+ return containsKey(new AMQShortString(key));
+ }
+
public Set<String> keys()
{
- return _properties.keySet();
+ initMapIfNecessary();
+ Set<String> keys = new LinkedHashSet<String>();
+ for(AMQShortString key : _properties.keySet())
+ {
+ keys.add(key.toString());
+ }
+ return keys;
}
- public Object get(Object key)
+ public Object get(AMQShortString key)
{
- return getObject((String)key);
+ return getObject(key);
}
- public Object put(Object key, Object value)
+
+ public Object put(AMQShortString key, Object value)
{
- return setObject(key.toString(), value);
+ return setObject(key, value);
}
-
+
public Object remove(String key)
{
+
+ return remove(new AMQShortString(key));
+
+ }
+
+ public Object remove(AMQShortString key)
+ {
+ AMQTypedValue val = removeKey(key);
+ return val == null ? null : val.getValue();
+
+ }
+
+
+ public AMQTypedValue removeKey(AMQShortString key)
+ {
+ initMapIfNecessary();
+ _encodedForm = null;
AMQTypedValue value = _properties.remove(key);
- return value == null ? null : value.getValue();
+ if(value == null)
+ {
+ return null;
+ }
+ else
+ {
+ _encodedSize -= EncodingUtils.encodedShortStringLength(key);
+ _encodedSize--;
+ _encodedSize -= value.getEncodingSize();
+ return value;
+ }
+
}
public void clear()
{
+ initMapIfNecessary();
+ _encodedForm = null;
_properties.clear();
+ _encodedSize = 0;
}
- public Set keySet()
+ public Set<AMQShortString> keySet()
{
+ initMapIfNecessary();
return _properties.keySet();
}
private void putDataInBuffer(ByteBuffer buffer)
{
- final Iterator<Map.Entry<String,AMQTypedValue>> it = _properties.entrySet().iterator();
+ if(_encodedForm != null)
+ {
+ if (_encodedForm.remaining() == 0)
+ {
+ _encodedForm.rewind();
+ }
+ buffer.put(_encodedForm);
+ }
+ else if(_properties != null)
+ {
+ final Iterator<Map.Entry<AMQShortString,AMQTypedValue>> it = _properties.entrySet().iterator();
- //If there are values then write out the encoded Size... could check _encodedSize != 0
- // write out the total length, which we have kept up to date as data is added
+ //If there are values then write out the encoded Size... could check _encodedSize != 0
+ // write out the total length, which we have kept up to date as data is added
- while (it.hasNext())
- {
- final Map.Entry<String,AMQTypedValue> me = it.next();
- try
+ while (it.hasNext())
{
- if (_logger.isTraceEnabled())
+ final Map.Entry<AMQShortString,AMQTypedValue> me = it.next();
+ try
{
- _logger.trace("Writing Property:" + me.getKey() +
- " Type:" + me.getValue().getType() +
- " Value:" + me.getValue().getValue());
- _logger.trace("Buffer Position:" + buffer.position() +
- " Remaining:" + buffer.remaining());
- }
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("Writing Property:" + me.getKey() +
+ " Type:" + me.getValue().getType() +
+ " Value:" + me.getValue().getValue());
+ _logger.trace("Buffer Position:" + buffer.position() +
+ " Remaining:" + buffer.remaining());
+ }
- //Write the actual parameter name
- EncodingUtils.writeShortStringBytes(buffer, me.getKey());
- me.getValue().writeToBuffer(buffer);
- }
- catch (Exception e)
- {
- if (_logger.isTraceEnabled())
+ //Write the actual parameter name
+ EncodingUtils.writeShortStringBytes(buffer, me.getKey());
+ me.getValue().writeToBuffer(buffer);
+ }
+ catch (Exception e)
{
- _logger.trace("Exception thrown:" + e);
- _logger.trace("Writing Property:" + me.getKey() +
- " Type:" + me.getValue().getType() +
- " Value:" + me.getValue().getValue());
- _logger.trace("Buffer Position:" + buffer.position() +
- " Remaining:" + buffer.remaining());
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("Exception thrown:" + e);
+ _logger.trace("Writing Property:" + me.getKey() +
+ " Type:" + me.getValue().getType() +
+ " Value:" + me.getValue().getValue());
+ _logger.trace("Buffer Position:" + buffer.position() +
+ " Remaining:" + buffer.remaining());
+ }
+ throw new RuntimeException(e);
}
- throw new RuntimeException(e);
}
}
}
- public void setFromBuffer(ByteBuffer buffer, long length) throws AMQFrameDecodingException
+ private void setFromBuffer(ByteBuffer buffer, long length) throws AMQFrameDecodingException
{
- final boolean trace = _logger.isTraceEnabled();
- int sizeRead = 0;
- while (sizeRead < length)
+ final boolean trace = _logger.isTraceEnabled();
+ if(length > 0)
{
- int sizeRemaining = buffer.remaining();
- final String key = EncodingUtils.readShortString(buffer);
- AMQTypedValue value = AMQTypedValue.readFromBuffer(buffer);
- sizeRead += (sizeRemaining - buffer.remaining());
- if (trace)
+ final int expectedRemaining = buffer.remaining()-(int)length;
+
+ _properties = new LinkedHashMap<AMQShortString,AMQTypedValue>(INITIAL_HASHMAP_CAPACITY);
+
+ do
{
- _logger.trace("FieldTable::PropFieldTable(buffer," + length + "): Read type '" + value.getType() + "', key '" + key + "', value '" + value.getValue() + "' (now read " + sizeRead + " of " + length + " encoded bytes)...");
+
+ final AMQShortString key = EncodingUtils.readAMQShortString(buffer);
+ AMQTypedValue value = AMQTypedValue.readFromBuffer(buffer);
+
+ if (trace)
+ {
+ _logger.trace("FieldTable::PropFieldTable(buffer," + length + "): Read type '" + value.getType() + "', key '" + key + "', value '" + value.getValue() + "'");
+ }
+
+
+
+ _properties.put(key,value);
+
+
+
}
+ while (buffer.remaining() > expectedRemaining);
- _properties.put(key,value);
}
+ _encodedSize = length;
if (trace)
{
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java Wed Feb 14 12:02:03 2007
@@ -27,7 +27,21 @@
public static final byte TYPE = 8;
public static AMQFrame FRAME = new HeartbeatBody().toFrame();
- protected byte getFrameType()
+ public HeartbeatBody()
+ {
+
+ }
+
+ public HeartbeatBody(ByteBuffer buffer, long size)
+ {
+ if(size > 0)
+ {
+ //allow other implementations to have a payload, but ignore it:
+ buffer.skip((int) size);
+ }
+ }
+
+ public byte getFrameType()
{
return TYPE;
}
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java Wed Feb 14 12:02:03 2007
@@ -24,7 +24,7 @@
public class HeartbeatBodyFactory implements BodyFactory
{
- public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException
+ public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
{
return new HeartbeatBody();
}
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java Wed Feb 14 12:02:03 2007
@@ -164,4 +164,14 @@
protocolMajor + "." + protocolMinor + " not found in protocol version list.");
}
}
+
+ public String toString()
+ {
+ StringBuffer buffer = new StringBuffer(new String(header));
+ buffer.append(Integer.toHexString(protocolClass));
+ buffer.append(Integer.toHexString(protocolInstance));
+ buffer.append(Integer.toHexString(protocolMajor));
+ buffer.append(Integer.toHexString(protocolMinor));
+ return buffer.toString();
+ }
}
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java Wed Feb 14 12:02:03 2007
@@ -40,7 +40,7 @@
* to be known.
*/
private boolean serverFlag;
- private int connectionId;
+ private long connectionId;
/**
* Request and response frames must have a requestID and responseID which
@@ -56,7 +56,7 @@
private ConcurrentHashMap<Long, AMQMethodListener> requestSentMap;
- public RequestManager(int connectionId, int channel, AMQProtocolWriter protocolWriter, boolean serverFlag)
+ public RequestManager(long connectionId, int channel, AMQProtocolWriter protocolWriter, boolean serverFlag)
{
this.channel = channel;
this.protocolWriter = protocolWriter;
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java Wed Feb 14 12:02:03 2007
@@ -43,7 +43,7 @@
* to be known.
*/
private boolean serverFlag;
- private int connectionId;
+ private long connectionId;
private int maxAccumulatedResponses = 20; // Default
// private Class currentResponseMethodBodyClass;
@@ -80,11 +80,18 @@
{
return (int)(requestId - o.requestId);
}
+
+ public String toString()
+ {
+ return requestId + ":" + (responseMethodBody == null ?
+ "null" :
+ "C" + responseMethodBody.getClazz() + " M" + responseMethodBody.getMethod());
+ }
}
private ConcurrentHashMap<Long, ResponseStatus> responseMap;
- public ResponseManager(int connectionId, int channel, AMQMethodListener methodListener,
+ public ResponseManager(long connectionId, int channel, AMQMethodListener methodListener,
AMQProtocolWriter protocolWriter, boolean serverFlag)
{
this.channel = channel;
Copied: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java (from r501854, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java?view=diff&rev=507672&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java&r1=501854&p2=incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java&r2=507672
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java Wed Feb 14 12:02:03 2007
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/pool/Event.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/pool/Event.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/pool/Event.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/pool/Event.java Wed Feb 14 12:02:03 2007
@@ -25,90 +25,66 @@
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IdleStatus;
-/**
- * Represents an operation on IoFilter.
- */
-enum EventType
-{
- OPENED, CLOSED, READ, WRITE, WRITTEN, RECEIVED, SENT, IDLE, EXCEPTION
-}
-class Event
+abstract public class Event
{
- private static final Logger _log = Logger.getLogger(Event.class);
-
- private final EventType type;
- private final IoFilter.NextFilter nextFilter;
- private final Object data;
- public Event(IoFilter.NextFilter nextFilter, EventType type, Object data)
+ public Event()
{
- this.type = type;
- this.nextFilter = nextFilter;
- this.data = data;
- if (type == EventType.EXCEPTION)
- {
- _log.error("Exception event constructed: " + data, (Throwable) data);
- }
}
- public Object getData()
- {
- return data;
- }
+ abstract public void process(IoSession session);
- public IoFilter.NextFilter getNextFilter()
- {
- return nextFilter;
- }
-
- public EventType getType()
+ public static final class ReceivedEvent extends Event
{
- return type;
- }
+ private final Object _data;
- void process(IoSession session)
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Processing " + this);
- }
- if (type == EventType.RECEIVED)
- {
- nextFilter.messageReceived(session, data);
- //ByteBufferUtil.releaseIfPossible( data );
- }
- else if (type == EventType.SENT)
+ private final IoFilter.NextFilter _nextFilter;
+
+ public ReceivedEvent(final IoFilter.NextFilter nextFilter, final Object data)
{
- nextFilter.messageSent(session, data);
- //ByteBufferUtil.releaseIfPossible( data );
+ super();
+ _nextFilter = nextFilter;
+ _data = data;
}
- else if (type == EventType.EXCEPTION)
+
+ public void process(IoSession session)
{
- nextFilter.exceptionCaught(session, (Throwable) data);
+ _nextFilter.messageReceived(session, _data);
}
- else if (type == EventType.IDLE)
+
+ public IoFilter.NextFilter getNextFilter()
{
- nextFilter.sessionIdle(session, (IdleStatus) data);
+ return _nextFilter;
}
- else if (type == EventType.OPENED)
+ }
+
+
+ public static final class WriteEvent extends Event
+ {
+ private final IoFilter.WriteRequest _data;
+ private final IoFilter.NextFilter _nextFilter;
+
+ public WriteEvent(final IoFilter.NextFilter nextFilter, final IoFilter.WriteRequest data)
{
- nextFilter.sessionOpened(session);
+ super();
+ _nextFilter = nextFilter;
+ _data = data;
}
- else if (type == EventType.WRITE)
+
+
+ public void process(IoSession session)
{
- nextFilter.filterWrite(session, (IoFilter.WriteRequest) data);
+ _nextFilter.filterWrite(session, _data);
}
- else if (type == EventType.CLOSED)
+
+ public IoFilter.NextFilter getNextFilter()
{
- nextFilter.sessionClosed(session);
+ return _nextFilter;
}
}
- public String toString()
- {
- return "Event: type " + type + ", data: " + data;
- }
+
}
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/pool/Job.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/pool/Job.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/pool/Job.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/pool/Job.java Wed Feb 14 12:02:03 2007
@@ -30,13 +30,13 @@
* Holds events for a session that will be processed asynchronously by
* the thread pool in PoolingFilter.
*/
-class Job implements Runnable
+public class Job implements Runnable
{
private final int _maxEvents;
private final IoSession _session;
private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>();
private final AtomicBoolean _active = new AtomicBoolean();
- private final AtomicInteger _refCount = new AtomicInteger();
+ //private final AtomicInteger _refCount = new AtomicInteger();
private final JobCompletionHandler _completionHandler;
Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents)
@@ -45,21 +45,21 @@
_completionHandler = completionHandler;
_maxEvents = maxEvents;
}
-
- void acquire()
- {
- _refCount.incrementAndGet();
- }
-
- void release()
- {
- _refCount.decrementAndGet();
- }
-
- boolean isReferenced()
- {
- return _refCount.get() > 0;
- }
+//
+// void acquire()
+// {
+// _refCount.incrementAndGet();
+// }
+//
+// void release()
+// {
+// _refCount.decrementAndGet();
+// }
+//
+// boolean isReferenced()
+// {
+// return _refCount.get() > 0;
+// }
void add(Event evt)
{
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java Wed Feb 14 12:02:03 2007
@@ -25,57 +25,60 @@
import org.apache.mina.common.IoFilterAdapter;
import org.apache.mina.common.IoSession;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.EnumSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler
{
private static final Logger _logger = Logger.getLogger(PoolingFilter.class);
- public static final Set<EventType> READ_EVENTS = new HashSet<EventType>(Arrays.asList(EventType.RECEIVED));
- public static final Set<EventType> WRITE_EVENTS = new HashSet<EventType>(Arrays.asList(EventType.WRITE));
private final ConcurrentMap<IoSession, Job> _jobs = new ConcurrentHashMap<IoSession, Job>();
private final ReferenceCountingExecutorService _poolReference;
- private final Set<EventType> _asyncTypes;
private final String _name;
private final int _maxEvents = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
- public PoolingFilter(ReferenceCountingExecutorService refCountingPool, Set<EventType> asyncTypes, String name)
+ public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
{
_poolReference = refCountingPool;
- _asyncTypes = asyncTypes;
_name = name;
}
- private void fireEvent(IoSession session, Event event)
+ void fireAsynchEvent(IoSession session, Event event)
{
- if (_asyncTypes.contains(event.getType()))
+ Job job = getJobForSession(session);
+ // job.acquire(); //prevents this job being removed from _jobs
+ job.add(event);
+
+ //Additional checks on pool to check that it hasn't shutdown.
+ // The alternative is to catch the RejectedExecutionException that will result from executing on a shutdown pool
+ if (job.activate() && _poolReference.getPool() != null && !_poolReference.getPool().isShutdown())
{
- Job job = getJobForSession(session);
- job.acquire(); //prevents this job being removed from _jobs
- job.add(event);
-
- //Additional checks on pool to check that it hasn't shutdown.
- // The alternative is to catch the RejectedExecutionException that will result from executing on a shutdown pool
- if (job.activate() && _poolReference.getPool() != null && !_poolReference.getPool().isShutdown())
- {
- _poolReference.getPool().execute(job);
- }
- }
- else
- {
- event.process(session);
+ _poolReference.getPool().execute(job);
}
+
+ }
+
+ public void createNewJobForSession(IoSession session)
+ {
+ Job job = new Job(session, this, _maxEvents);
+ session.setAttribute(_name, job);
}
private Job getJobForSession(IoSession session)
{
- Job job = _jobs.get(session);
- return job == null ? createJobForSession(session) : job;
+ return (Job) session.getAttribute(_name);
+
+/* if(job == null)
+ {
+ System.err.println("Error in " + _name);
+ Thread.dumpStack();
+ }
+
+
+ job = _jobs.get(session);
+ return job == null ? createJobForSession(session) : job;*/
}
private Job createJobForSession(IoSession session)
@@ -93,15 +96,16 @@
//Job.JobCompletionHandler
public void completed(IoSession session, Job job)
{
- if (job.isComplete())
- {
- job.release();
- if (!job.isReferenced())
- {
- _jobs.remove(session);
- }
- }
- else
+// if (job.isComplete())
+// {
+// job.release();
+// if (!job.isReferenced())
+// {
+// _jobs.remove(session);
+// }
+// }
+// else
+ if(!job.isComplete())
{
// ritchiem : 2006-12-13 Do we need to perform the additional checks here?
// Can the pool be shutdown at this point?
@@ -114,45 +118,44 @@
//IoFilter methods that are processed by threads on the pool
- public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception
+ public void sessionOpened(final NextFilter nextFilter, final IoSession session) throws Exception
{
- fireEvent(session, new Event(nextFilter, EventType.OPENED, null));
+ nextFilter.sessionOpened(session);
}
- public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception
+ public void sessionClosed(final NextFilter nextFilter, final IoSession session) throws Exception
{
- fireEvent(session, new Event(nextFilter, EventType.CLOSED, null));
+ nextFilter.sessionClosed(session);
}
- public void sessionIdle(NextFilter nextFilter, IoSession session,
- IdleStatus status) throws Exception
+ public void sessionIdle(final NextFilter nextFilter, final IoSession session,
+ final IdleStatus status) throws Exception
{
- fireEvent(session, new Event(nextFilter, EventType.IDLE, status));
+ nextFilter.sessionIdle(session, status);
}
- public void exceptionCaught(NextFilter nextFilter, IoSession session,
- Throwable cause) throws Exception
+ public void exceptionCaught(final NextFilter nextFilter, final IoSession session,
+ final Throwable cause) throws Exception
{
- fireEvent(session, new Event(nextFilter, EventType.EXCEPTION, cause));
+ nextFilter.exceptionCaught(session,cause);
}
- public void messageReceived(NextFilter nextFilter, IoSession session,
- Object message) throws Exception
+ public void messageReceived(final NextFilter nextFilter, final IoSession session,
+ final Object message) throws Exception
{
- //ByteBufferUtil.acquireIfPossible( message );
- fireEvent(session, new Event(nextFilter, EventType.RECEIVED, message));
+ nextFilter.messageReceived(session,message);
}
- public void messageSent(NextFilter nextFilter, IoSession session,
- Object message) throws Exception
+ public void messageSent(final NextFilter nextFilter, final IoSession session,
+ final Object message) throws Exception
{
- //ByteBufferUtil.acquireIfPossible( message );
- fireEvent(session, new Event(nextFilter, EventType.SENT, message));
+ nextFilter.messageSent(session, message);
}
- public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception
+ public void filterWrite(final NextFilter nextFilter, final IoSession session,
+ final WriteRequest writeRequest) throws Exception
{
- fireEvent(session, new Event(nextFilter, EventType.WRITE, writeRequest));
+ nextFilter.filterWrite(session, writeRequest);
}
//IoFilter methods that are processed on current thread (NOT on pooled thread)
@@ -188,5 +191,51 @@
// when the reference count gets to zero we release the executor service
_poolReference.releaseExecutorService();
}
+
+ public static class AsynchReadPoolingFilter extends PoolingFilter
+ {
+
+ public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
+ {
+ super(refCountingPool, name);
+ }
+
+ public void messageReceived(final NextFilter nextFilter, final IoSession session,
+ final Object message) throws Exception
+ {
+ fireAsynchEvent(session, new Event.ReceivedEvent(nextFilter, message));
+ }
+
+
+ }
+
+ public static class AsynchWritePoolingFilter extends PoolingFilter
+ {
+
+ public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
+ {
+ super(refCountingPool, name);
+ }
+
+
+ public void filterWrite(final NextFilter nextFilter, final IoSession session,
+ final WriteRequest writeRequest) throws Exception
+ {
+ fireAsynchEvent(session, new Event.WriteEvent(nextFilter, writeRequest));
+ }
+
+ }
+
+ public static PoolingFilter createAynschReadPoolingFilter(ReferenceCountingExecutorService refCountingPool,String name)
+ {
+ return new AsynchReadPoolingFilter(refCountingPool,name);
+ }
+
+
+ public static PoolingFilter createAynschWritePoolingFilter(ReferenceCountingExecutorService refCountingPool,String name)
+ {
+ return new AsynchWritePoolingFilter(refCountingPool,name);
+ }
+
}
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java Wed Feb 14 12:02:03 2007
@@ -26,15 +26,38 @@
public class ReadWriteThreadModel implements ThreadModel
{
+
+ private static final ReadWriteThreadModel _instance = new ReadWriteThreadModel();
+
+ private final PoolingFilter _asynchronousReadFilter;
+ private final PoolingFilter _asynchronousWriteFilter;
+
+ private ReadWriteThreadModel()
+ {
+ final ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance();
+ _asynchronousReadFilter = PoolingFilter.createAynschReadPoolingFilter(executor, "AsynchronousReadFilter");
+ _asynchronousWriteFilter = PoolingFilter.createAynschWritePoolingFilter(executor, "AsynchronousWriteFilter");
+ }
+
+ public PoolingFilter getAsynchronousReadFilter()
+ {
+ return _asynchronousReadFilter;
+ }
+
+ public PoolingFilter getAsynchronousWriteFilter()
+ {
+ return _asynchronousWriteFilter;
+ }
+
public void buildFilterChain(IoFilterChain chain) throws Exception
{
- ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance();
- PoolingFilter asyncRead = new PoolingFilter(executor, PoolingFilter.READ_EVENTS,
- "AsynchronousReadFilter");
- PoolingFilter asyncWrite = new PoolingFilter(executor, PoolingFilter.WRITE_EVENTS,
- "AsynchronousWriteFilter");
- chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(asyncRead));
- chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(asyncWrite));
+ chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(_asynchronousReadFilter));
+ chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(_asynchronousWriteFilter));
+ }
+
+ public static ReadWriteThreadModel getInstance()
+ {
+ return _instance;
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java Wed Feb 14 12:02:03 2007
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.protocol;
+import org.apache.qpid.framing.AMQShortString;
+
import java.util.Map;
import java.util.HashMap;
@@ -27,14 +29,14 @@
{
private int _code;
- private String _name;
+ private AMQShortString _name;
private static Map _codeMap = new HashMap();
private AMQConstant(int code, String name, boolean map)
{
_code = code;
- _name = name;
+ _name = new AMQShortString(name);
if (map)
{
_codeMap.put(new Integer(code), this);
@@ -51,7 +53,7 @@
return _code;
}
- public String getName()
+ public AMQShortString getName()
{
return _name;
}
Copied: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java (from r501854, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java?view=diff&rev=507672&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java&r1=501854&p2=incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java&r2=507672
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java Wed Feb 14 12:02:03 2007
@@ -25,4 +25,6 @@
public byte getProtocolMinorVersion();
public byte getProtocolMajorVersion();
+
+ public boolean isProtocolVersionEqual(byte majorVersion, byte minorVersion);
}
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java Wed Feb 14 12:02:03 2007
@@ -23,6 +23,7 @@
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.URLHelper;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
import java.util.HashMap;
import java.net.URI;
@@ -31,10 +32,10 @@
public class AMQBindingURL implements BindingURL
{
String _url;
- String _exchangeClass;
- String _exchangeName;
- String _destinationName;
- String _queueName;
+ AMQShortString _exchangeClass;
+ AMQShortString _exchangeName;
+ AMQShortString _destinationName;
+ AMQShortString _queueName;
private HashMap<String, String> _options;
@@ -84,7 +85,7 @@
if (connection.getPath() == null ||
connection.getPath().equals(""))
{
- URLHelper.parseError(_url.indexOf(_exchangeName) + _exchangeName.length(),
+ URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(),
"Destination or Queue requried", _url);
}
else
@@ -92,7 +93,7 @@
int slash = connection.getPath().indexOf("/", 1);
if (slash == -1)
{
- URLHelper.parseError(_url.indexOf(_exchangeName) + _exchangeName.length(),
+ URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(),
"Destination requried", _url);
}
else
@@ -121,6 +122,26 @@
}
}
+ private void setExchangeClass(String exchangeClass)
+ {
+ setExchangeClass(new AMQShortString(exchangeClass));
+ }
+
+ private void setQueueName(String name)
+ {
+ setQueueName(new AMQShortString(name));
+ }
+
+ private void setDestinationName(String name)
+ {
+ setDestinationName(new AMQShortString(name));
+ }
+
+ private void setExchangeName(String exchangeName)
+ {
+ setExchangeName(new AMQShortString(exchangeName));
+ }
+
private void processOptions()
{
//this is where we would parse any options that needed more than just storage.
@@ -131,22 +152,22 @@
return _url;
}
- public String getExchangeClass()
+ public AMQShortString getExchangeClass()
{
return _exchangeClass;
}
- public void setExchangeClass(String exchangeClass)
+ public void setExchangeClass(AMQShortString exchangeClass)
{
_exchangeClass = exchangeClass;
}
- public String getExchangeName()
+ public AMQShortString getExchangeName()
{
return _exchangeName;
}
- public void setExchangeName(String name)
+ public void setExchangeName(AMQShortString name)
{
_exchangeName = name;
@@ -156,17 +177,17 @@
}
}
- public String getDestinationName()
+ public AMQShortString getDestinationName()
{
return _destinationName;
}
- public void setDestinationName(String name)
+ public void setDestinationName(AMQShortString name)
{
_destinationName = name;
}
- public String getQueueName()
+ public AMQShortString getQueueName()
{
if (_exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
{
@@ -174,7 +195,7 @@
{
if (containsOption(BindingURL.OPTION_CLIENTID) && containsOption(BindingURL.OPTION_SUBSCRIPTION))
{
- return getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION);
+ return new AMQShortString(getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION));
}
else
{
@@ -192,7 +213,7 @@
}
}
- public void setQueueName(String name)
+ public void setQueueName(AMQShortString name)
{
_queueName = name;
}
@@ -212,7 +233,7 @@
return _options.containsKey(key);
}
- public String getRoutingKey()
+ public AMQShortString getRoutingKey()
{
if (_exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
{
@@ -221,15 +242,15 @@
if (containsOption(BindingURL.OPTION_ROUTING_KEY))
{
- return getOption(OPTION_ROUTING_KEY);
+ return new AMQShortString(getOption(OPTION_ROUTING_KEY));
}
return getDestinationName();
}
- public void setRoutingKey(String key)
+ public void setRoutingKey(AMQShortString key)
{
- setOption(OPTION_ROUTING_KEY, key);
+ setOption(OPTION_ROUTING_KEY, key.toString());
}
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/url/BindingURL.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/url/BindingURL.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/url/BindingURL.java Wed Feb 14 12:02:03 2007
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.url;
+import org.apache.qpid.framing.AMQShortString;
+
/*
Binding URL format:
<exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
@@ -36,21 +38,21 @@
String getURL();
- String getExchangeClass();
+ AMQShortString getExchangeClass();
- void setExchangeClass(String exchangeClass);
+ void setExchangeClass(AMQShortString name);
- String getExchangeName();
+ AMQShortString getExchangeName();
- void setExchangeName(String name);
+ void setExchangeName(AMQShortString name);
- String getDestinationName();
+ AMQShortString getDestinationName();
- void setDestinationName(String name);
+ void setDestinationName(AMQShortString name);
- String getQueueName();
+ AMQShortString getQueueName();
- void setQueueName(String name);
+ void setQueueName(AMQShortString name);
String getOption(String key);
@@ -58,9 +60,9 @@
boolean containsOption(String key);
- String getRoutingKey();
+ AMQShortString getRoutingKey();
- void setRoutingKey(String key);
+ void setRoutingKey(AMQShortString key);
String toString();
}
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java Wed Feb 14 12:02:03 2007
@@ -25,8 +25,6 @@
import java.util.Enumeration;
import java.util.Iterator;
-import java.util.Map;
-import java.util.HashMap;
import org.apache.mina.common.ByteBuffer;
import org.apache.log4j.Logger;
@@ -227,7 +225,7 @@
//... and a the string value of it.
Assert.assertEquals("" + Double.MAX_VALUE, table1.getString("value"));
table1.remove("value");
- //but after a remove it doesn't
+ //but after a removeKey it doesn't
Assert.assertFalse(table1.containsKey("value"));
// Table should now have zero length for encoding
@@ -265,7 +263,7 @@
table1.remove("value");
- //but after a remove it doesn't
+ //but after a removeKey it doesn't
Assert.assertFalse(table1.containsKey("value"));
// Table should now have zero length for encoding
@@ -303,7 +301,7 @@
table1.remove("value");
- //but after a remove it doesn't
+ //but after a removeKey it doesn't
Assert.assertFalse(table1.containsKey("value"));
// Table should now have zero length for encoding
@@ -341,7 +339,7 @@
table1.remove("value");
- //but after a remove it doesn't
+ //but after a removeKey it doesn't
Assert.assertFalse(table1.containsKey("value"));
// Table should now have zero length for encoding
@@ -380,7 +378,7 @@
Assert.assertEquals(null, table1.getString("value"));
table1.remove("value");
- //but after a remove it doesn't
+ //but after a removeKey it doesn't
Assert.assertFalse(table1.containsKey("value"));
// Table should now have zero length for encoding
@@ -440,7 +438,7 @@
Assert.assertTrue(table1.containsKey("value"));
table1.remove("value");
- //but after a remove it doesn't
+ //but after a removeKey it doesn't
Assert.assertFalse(table1.containsKey("value"));
checkEmpty(table1);
@@ -457,23 +455,7 @@
- public void testKeyEnumeration()
- {
- FieldTable table = new FieldTable();
- table.setLong("one", 1L);
- table.setLong("two", 2L);
- table.setLong("three", 3L);
- table.setLong("four", 4L);
- table.setLong("five", 5L);
-
- Enumeration e = table.getPropertyNames();
-
- Assert.assertTrue("one".equals(e.nextElement()));
- Assert.assertTrue("two".equals(e.nextElement()));
- Assert.assertTrue("three".equals(e.nextElement()));
- Assert.assertTrue("four".equals(e.nextElement()));
- Assert.assertTrue("five".equals(e.nextElement()));
- }
+
public void testValues()
{
@@ -546,8 +528,7 @@
table.setString("string", "hello");
table.setString("null-string", null);
-
- final ByteBuffer buffer = ByteBuffer.allocate((int) table.getEncodedSize()); // FIXME XXX: Is cast a problem?
+ final ByteBuffer buffer = ByteBuffer.allocate((int) table.getEncodedSize() + 4); // FIXME XXX: Is cast a problem?
table.writeToBuffer(buffer);
@@ -597,7 +578,7 @@
byte[] _bytes = {99, 98, 97, 96, 95};
result.setBytes("bytes", _bytes);
- size += 1 + EncodingUtils.encodedShortStringLength("bytes") + 1 + EncodingUtils.encodedByteLength() * _bytes.length;
+ size += 1 + EncodingUtils.encodedShortStringLength("bytes") + 4 + _bytes.length;
Assert.assertEquals(size, result.getEncodedSize());
result.setChar("char", (char) 'c');
@@ -639,7 +620,7 @@
Assert.assertEquals(size, result.getEncodedSize());
result.setObject("object-bytes", _bytes);
- size += 1 + EncodingUtils.encodedShortStringLength("object-bytes") + 1 + EncodingUtils.encodedByteLength() * _bytes.length;
+ size += 1 + EncodingUtils.encodedShortStringLength("object-bytes") + 4 + _bytes.length;
Assert.assertEquals(size, result.getEncodedSize());
result.setObject("object-char", 'c');
@@ -758,7 +739,7 @@
try
{
- table.setObject(null, "String");
+ table.setObject((String)null, "String");
fail("Null property name is not allowed");
}
catch (IllegalArgumentException iae)
@@ -868,9 +849,9 @@
{
FieldTable table = new FieldTable();
- table.put("StringProperty", "String");
+ table.setObject("StringProperty", "String");
- Assert.assertEquals("String", table.get("StringProperty"));
+ Assert.assertEquals("String", table.getString("StringProperty"));
//Test Clear
@@ -887,15 +868,15 @@
FieldTable table = new FieldTable();
- table.put("n1", "1");
- table.put("n2", "2");
- table.put("n3", "3");
-
- Iterator iterator = table.keySet().iterator();
- Assert.assertEquals("n1", iterator.next());
- Assert.assertEquals("n2", iterator.next());
- Assert.assertEquals("n3", iterator.next());
- Assert.assertFalse(iterator.hasNext());
+ table.setObject("n1", "1");
+ table.setObject("n2", "2");
+ table.setObject("n3", "3");
+
+
+ Assert.assertEquals("1", table.getObject("n1"));
+ Assert.assertEquals("2", table.getObject("n2"));
+ Assert.assertEquals("3", table.getObject("n3"));
+
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java Wed Feb 14 12:02:03 2007
@@ -36,25 +36,32 @@
public void setUp()
{
+
//Create Pool
_executorService = ReferenceCountingExecutorService.getInstance();
_executorService.acquireExecutorService();
- _pool = new PoolingFilter(_executorService, PoolingFilter.WRITE_EVENTS,
+ _pool = PoolingFilter.createAynschWritePoolingFilter(_executorService,
"AsynchronousWriteFilter");
}
public void testRejectedExecution() throws Exception
{
- _pool.filterWrite(new NoOpFilter(), new TestSession(), new IoFilter.WriteRequest("Message"));
+
+ TestSession testSession = new TestSession();
+ _pool.createNewJobForSession(testSession);
+ _pool.filterWrite(new NoOpFilter(), testSession, new IoFilter.WriteRequest("Message"));
//Shutdown the pool
_executorService.getPool().shutdownNow();
try
{
+
+ testSession = new TestSession();
+ _pool.createNewJobForSession(testSession);
//prior to fix for QPID-172 this would throw RejectedExecutionException
- _pool.filterWrite(null, new TestSession(), null);
+ _pool.filterWrite(null, testSession, null);
}
catch (RejectedExecutionException rje)
{
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/test/java/org/apache/qpid/session/TestSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/test/java/org/apache/qpid/session/TestSession.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/test/java/org/apache/qpid/session/TestSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/test/java/org/apache/qpid/session/TestSession.java Wed Feb 14 12:02:03 2007
@@ -24,9 +24,13 @@
import java.net.SocketAddress;
import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
public class TestSession implements IoSession
{
+ private final ConcurrentMap attributes = new ConcurrentHashMap();
+
public TestSession()
{
}
@@ -68,42 +72,42 @@
public Object getAttachment()
{
- return null; //TODO
+ return getAttribute("");
}
public Object setAttachment(Object attachment)
{
- return null; //TODO
+ return setAttribute("",attachment);
}
public Object getAttribute(String key)
{
- return null; //TODO
+ return attributes.get(key);
}
public Object setAttribute(String key, Object value)
{
- return null; //TODO
+ return attributes.put(key,value);
}
public Object setAttribute(String key)
{
- return null; //TODO
+ return attributes.put(key, Boolean.TRUE);
}
public Object removeAttribute(String key)
{
- return null; //TODO
+ return attributes.remove(key);
}
public boolean containsAttribute(String key)
{
- return false; //TODO
+ return attributes.containsKey(key);
}
public Set getAttributeKeys()
{
- return null; //TODO
+ return attributes.keySet();
}
public TransportType getTransportType()