You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/12/28 14:02:48 UTC

svn commit: r1225178 [7/8] - in /qpid/trunk/qpid/java: ./ bdbstore/src/main/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/ bdbstore/src/test/ bdbstore/src/test/jav...

Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java?rev=1225178&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java Wed Dec 28 13:02:41 2011
@@ -0,0 +1,174 @@
+package org.apache.qpid.framing;
+
+import org.apache.qpid.codec.MarkableDataInput;
+
+import java.io.IOException;
+
+public class ByteArrayDataInput implements ExtendedDataInput, MarkableDataInput
+{
+    private byte[] _data;
+    private int _offset;
+    private int _length;
+    private int _origin;
+    private int _mark;
+
+    public ByteArrayDataInput(byte[] data)
+    {
+        this(data,0, data.length);
+    }
+
+    public ByteArrayDataInput(byte[] data, int offset, int length)
+    {
+        _data = data;
+        _offset = offset;
+        _length = length;
+        _origin = offset;
+        _mark = 0;
+    }
+
+    public void readFully(byte[] b)
+    {
+        System.arraycopy(_data,_offset,b,0,b.length);
+        _offset+=b.length;
+    }
+
+    public void readFully(byte[] b, int off, int len)
+    {
+        System.arraycopy(_data,_offset,b,off,len);
+        _offset+=len;
+    }
+
+    public int skipBytes(int n)
+    {
+        return _offset+=n;
+    }
+
+    public boolean readBoolean()
+    {
+        return _data[_offset++] != 0;
+    }
+
+    public byte readByte()
+    {
+        return _data[_offset++];
+    }
+
+    public int readUnsignedByte()
+    {
+        return ((int)_data[_offset++]) & 0xFF;
+    }
+
+    public short readShort()
+    {
+        return (short) (((((int)_data[_offset++]) << 8) & 0xFF00) | (((int)_data[_offset++]) & 0xFF));
+    }
+
+    public int readUnsignedShort()
+    {
+        return (((((int)_data[_offset++]) << 8) & 0xFF00) | (((int)_data[_offset++]) & 0xFF));
+    }
+
+    public char readChar()
+    {
+        return (char) (((((int)_data[_offset++]) << 8) & 0xFF00) | (((int)_data[_offset++]) & 0xFF));
+    }
+
+    public int readInt()
+    {
+        return  ((((int)_data[_offset++]) << 24) & 0xFF000000)
+                | ((((int)_data[_offset++]) << 16) & 0xFF0000)
+                | ((((int)_data[_offset++]) << 8) & 0xFF00)
+                | (((int)_data[_offset++]) & 0xFF);
+    }
+
+    public long readLong()
+    {
+        return    ((((long)_data[_offset++]) << 56) & 0xFF00000000000000L)
+                | ((((long)_data[_offset++]) << 48) & 0xFF000000000000L)
+                | ((((long)_data[_offset++]) << 40) & 0xFF0000000000L)
+                | ((((long)_data[_offset++]) << 32) & 0xFF00000000L)
+                | ((((long)_data[_offset++]) << 24) & 0xFF000000L)
+                | ((((long)_data[_offset++]) << 16) & 0xFF0000L)
+                | ((((long)_data[_offset++]) << 8)  & 0xFF00L)
+                | (((long)_data[_offset++]) & 0xFFL);
+    }
+
+    public float readFloat()
+    {
+        return Float.intBitsToFloat(readInt());
+    }
+
+    public double readDouble()
+    {
+        return Double.longBitsToDouble(readLong());
+    }
+
+    public AMQShortString readAMQShortString()
+    {
+        int length = _data[_offset++] & 0xff;
+        if(length == 0) 
+        {
+            return null;
+        }
+        else
+        {
+            final AMQShortString amqShortString = new AMQShortString(_data, _offset, length);
+            _offset+=length;
+            return amqShortString;
+        }
+    }
+
+    public String readLine()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public String readUTF()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public int available()
+    {
+        return (_origin+_length)-_offset;
+    }
+
+
+    public long skip(long i)
+    {
+        _offset+=i;
+        return i;
+    }
+
+    public int read(byte[] b)
+    {
+        readFully(b);
+        return b.length;
+    }
+
+    public int position()
+    {
+        return _offset - _origin;
+    }
+
+    public void position(int position)
+    {
+        _offset = position + _origin;
+    }
+
+    public int length()
+    {
+        return _length;
+    }
+
+
+    public void mark(int readAhead)
+    {
+        _mark = _offset-_origin;
+    }
+
+    public void reset()
+    {
+        _offset = _origin + _mark;
+    }
+}

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java Wed Dec 28 13:02:41 2011
@@ -20,7 +20,7 @@
  */
 package org.apache.qpid.framing;
 
-import java.io.DataOutputStream;
+import java.io.DataOutput;
 import java.io.IOException;
 
 public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
@@ -50,7 +50,7 @@ public class CompositeAMQDataBlock exten
         return frameSize;
     }
 
-    public void writePayload(DataOutputStream buffer) throws IOException
+    public void writePayload(DataOutput buffer) throws IOException
     {
         for (int i = 0; i < _blocks.length; i++)
         {

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java Wed Dec 28 13:02:41 2011
@@ -20,9 +20,11 @@
  */
 package org.apache.qpid.framing;
 
+import java.io.DataInput;
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataOutput;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.AMQException;
@@ -37,10 +39,10 @@ public class ContentBody implements AMQB
     {
     }
 
-    public ContentBody(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException
+    public ContentBody(DataInput buffer, long size) throws AMQFrameDecodingException, IOException
     {
         _payload = new byte[(int)size];
-        buffer.read(_payload);
+        buffer.readFully(_payload);
     }
 
 
@@ -59,7 +61,7 @@ public class ContentBody implements AMQB
         return _payload == null ? 0 : _payload.length;
     }
 
-    public void writePayload(DataOutputStream buffer) throws IOException
+    public void writePayload(DataOutput buffer) throws IOException
     {
         buffer.write(_payload);
     }
@@ -84,11 +86,62 @@ public class ContentBody implements AMQB
     {
     }
 
+    private static class BufferContentBody implements AMQBody
+    {
+        private final int _length;
+        private final int _offset;
+        private final ByteBuffer _buf;
+
+        private BufferContentBody( ByteBuffer buf, int offset, int length)
+        {
+            _length = length;
+            _offset = offset;
+            _buf = buf;
+        }
+
+        public byte getFrameType()
+        {
+            return TYPE;
+        }
+
+
+        public int getSize()
+        {
+            return _length;
+        }
 
+        public void writePayload(DataOutput buffer) throws IOException
+        {
+            if(_buf.hasArray())
+            {
+                buffer.write(_buf.array(), _buf.arrayOffset() +  _offset, _length);
+            }
+            else
+            {
+                byte[] data = new byte[_length];
+                ByteBuffer buf = _buf.duplicate();
+
+                buf.position(_offset);
+                buf.limit(_offset+_length);
+                buf.get(data);
+                buffer.write(data);
+            }
+        }
+
+
+        public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+        {
+            throw new RuntimeException("Buffered Body only to be used for outgoing data");
+        }
+    }
+
+    public static AMQFrame createAMQFrame(int channelId, ByteBuffer buf, int offset, int length)
+    {
+        return new AMQFrame(channelId, new BufferContentBody(buf, offset, length));
+    }
 
     public static AMQFrame createAMQFrame(int channelId, ContentBody body)
     {
-        final AMQFrame frame = new AMQFrame(channelId, body);
-        return frame;
+        return new AMQFrame(channelId, body);
     }
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java Wed Dec 28 13:02:41 2011
@@ -20,9 +20,9 @@
  */
 package org.apache.qpid.framing;
 
-import java.io.DataInputStream;
 import java.io.IOException;
 
+import org.apache.qpid.codec.MarkableDataInput;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,7 +42,7 @@ public class ContentBodyFactory implemen
         _log.debug("Creating content body factory");
     }
 
-    public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException
+    public AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException, IOException
     {
         return new ContentBody(in, bodySize);
     }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Wed Dec 28 13:02:41 2011
@@ -20,8 +20,9 @@
  */
 package org.apache.qpid.framing;
 
+import java.io.DataInput;
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
@@ -45,7 +46,7 @@ public class ContentHeaderBody implement
     {
     }
 
-    public ContentHeaderBody(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException
+    public ContentHeaderBody(DataInput buffer, long size) throws AMQFrameDecodingException, IOException
     {
         classId = buffer.readUnsignedShort();
         weight = buffer.readUnsignedShort();
@@ -106,7 +107,7 @@ public class ContentHeaderBody implement
         return 2 + 2 + 8 + 2 + properties.getPropertyListSize();
     }
 
-    public void writePayload(DataOutputStream buffer) throws IOException
+    public void writePayload(DataOutput buffer) throws IOException
     {
         EncodingUtils.writeUnsignedShort(buffer, classId);
         EncodingUtils.writeUnsignedShort(buffer, weight);

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java Wed Dec 28 13:02:41 2011
@@ -20,9 +20,9 @@
  */
 package org.apache.qpid.framing;
 
-import java.io.DataInputStream;
 import java.io.IOException;
 
+import org.apache.qpid.codec.MarkableDataInput;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,7 +42,7 @@ public class ContentHeaderBodyFactory im
         _log.debug("Creating content header body factory");
     }
 
-    public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException
+    public AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException, IOException
     {
         // all content headers are the same - it is only the properties that differ.
         // the content header body further delegates construction of properties

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java Wed Dec 28 13:02:41 2011
@@ -20,8 +20,9 @@
  */
 package org.apache.qpid.framing;
 
+import java.io.DataInput;
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataOutput;
 import java.io.IOException;
 
 
@@ -35,7 +36,7 @@ public interface ContentHeaderProperties
      * Writes the property list to the buffer, in a suitably encoded form.
      * @param buffer The buffer to write to
      */
-    void writePropertyListPayload(DataOutputStream buffer) throws IOException;
+    void writePropertyListPayload(DataOutput buffer) throws IOException;
 
     /**
      * Populates the properties from buffer.
@@ -43,7 +44,7 @@ public interface ContentHeaderProperties
      * @param propertyFlags he property flags.
      * @throws AMQFrameDecodingException when the buffer does not contain valid data
      */
-    void populatePropertiesFromBuffer(DataInputStream buffer, int propertyFlags, int size)
+    void populatePropertiesFromBuffer(DataInput buffer, int propertyFlags, int size)
         throws AMQFrameDecodingException, IOException;
 
     /**

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java Wed Dec 28 13:02:41 2011
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.framing;
 
+import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.IOException;
 
@@ -39,7 +40,7 @@ public class ContentHeaderPropertiesFact
     }
 
     public ContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags,
-                                                                 DataInputStream buffer, int size)
+                                                                 DataInput buffer, int size)
              throws AMQFrameDecodingException, IOException
     {
         ContentHeaderProperties properties;

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java Wed Dec 28 13:02:41 2011
@@ -219,7 +219,7 @@ public class EncodingUtils
         return 0;
     }
 
-    public static void writeShortStringBytes(DataOutputStream buffer, String s) throws IOException
+    public static void writeShortStringBytes(DataOutput buffer, String s) throws IOException
     {
         if (s != null)
         {
@@ -243,7 +243,7 @@ public class EncodingUtils
         }
     }
 
-    public static void writeShortStringBytes(DataOutputStream buffer, AMQShortString s) throws IOException
+    public static void writeShortStringBytes(DataOutput buffer, AMQShortString s) throws IOException
     {
         if (s != null)
         {
@@ -257,7 +257,7 @@ public class EncodingUtils
         }
     }
 
-    public static void writeLongStringBytes(DataOutputStream buffer, String s) throws IOException
+    public static void writeLongStringBytes(DataOutput buffer, String s) throws IOException
     {
         assert (s == null) || (s.length() <= 0xFFFE);
         if (s != null)
@@ -279,7 +279,7 @@ public class EncodingUtils
         }
     }
 
-    public static void writeLongStringBytes(DataOutputStream buffer, char[] s) throws IOException
+    public static void writeLongStringBytes(DataOutput buffer, char[] s) throws IOException
     {
         assert (s == null) || (s.length <= 0xFFFE);
         if (s != null)
@@ -300,7 +300,7 @@ public class EncodingUtils
         }
     }
 
-    public static void writeLongStringBytes(DataOutputStream buffer, byte[] bytes) throws IOException
+    public static void writeLongStringBytes(DataOutput buffer, byte[] bytes) throws IOException
     {
         assert (bytes == null) || (bytes.length <= 0xFFFE);
         if (bytes != null)
@@ -314,13 +314,13 @@ public class EncodingUtils
         }
     }
 
-    public static void writeUnsignedByte(DataOutputStream buffer, short b) throws IOException
+    public static void writeUnsignedByte(DataOutput buffer, short b) throws IOException
     {
         byte bv = (byte) b;
         buffer.write(bv);
     }
 
-    public static void writeUnsignedShort(DataOutputStream buffer, int s) throws IOException
+    public static void writeUnsignedShort(DataOutput buffer, int s) throws IOException
     {
         // TODO: Is this comparison safe? Do I need to cast RHS to long?
         if (s < Short.MAX_VALUE)
@@ -340,7 +340,7 @@ public class EncodingUtils
         return 4;
     }
 
-    public static void writeUnsignedInteger(DataOutputStream buffer, long l) throws IOException
+    public static void writeUnsignedInteger(DataOutput buffer, long l) throws IOException
     {
         // TODO: Is this comparison safe? Do I need to cast RHS to long?
         if (l < Integer.MAX_VALUE)
@@ -360,7 +360,7 @@ public class EncodingUtils
         }
     }
 
-    public static void writeFieldTableBytes(DataOutputStream buffer, FieldTable table) throws IOException
+    public static void writeFieldTableBytes(DataOutput buffer, FieldTable table) throws IOException
     {
         if (table != null)
         {
@@ -372,12 +372,12 @@ public class EncodingUtils
         }
     }
 
-    public static void writeContentBytes(DataOutputStream buffer, Content content)
+    public static void writeContentBytes(DataOutput buffer, Content content)
     {
         // TODO: New Content class required for AMQP 0-9.
     }
 
-    public static void writeBooleans(DataOutputStream buffer, boolean[] values) throws IOException
+    public static void writeBooleans(DataOutput buffer, boolean[] values) throws IOException
     {
         byte packedValue = 0;
         for (int i = 0; i < values.length; i++)
@@ -391,13 +391,13 @@ public class EncodingUtils
         buffer.write(packedValue);
     }
 
-    public static void writeBooleans(DataOutputStream buffer, boolean value) throws IOException
+    public static void writeBooleans(DataOutput buffer, boolean value) throws IOException
     {
 
         buffer.write(value ? (byte) 1 : (byte) 0);
     }
 
-    public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1) throws IOException
+    public static void writeBooleans(DataOutput buffer, boolean value0, boolean value1) throws IOException
     {
         byte packedValue = value0 ? (byte) 1 : (byte) 0;
 
@@ -409,7 +409,7 @@ public class EncodingUtils
         buffer.write(packedValue);
     }
 
-    public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2) throws IOException
+    public static void writeBooleans(DataOutput buffer, boolean value0, boolean value1, boolean value2) throws IOException
     {
         byte packedValue = value0 ? (byte) 1 : (byte) 0;
 
@@ -426,7 +426,7 @@ public class EncodingUtils
         buffer.write(packedValue);
     }
 
-    public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3) throws IOException
+    public static void writeBooleans(DataOutput buffer, boolean value0, boolean value1, boolean value2, boolean value3) throws IOException
     {
         byte packedValue = value0 ? (byte) 1 : (byte) 0;
 
@@ -448,7 +448,7 @@ public class EncodingUtils
         buffer.write(packedValue);
     }
 
-    public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3,
+    public static void writeBooleans(DataOutput buffer, boolean value0, boolean value1, boolean value2, boolean value3,
         boolean value4) throws IOException
     {
         byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -476,7 +476,7 @@ public class EncodingUtils
         buffer.write(packedValue);
     }
 
-    public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3,
+    public static void writeBooleans(DataOutput buffer, boolean value0, boolean value1, boolean value2, boolean value3,
         boolean value4, boolean value5) throws IOException
     {
         byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -509,7 +509,7 @@ public class EncodingUtils
         buffer.write(packedValue);
     }
 
-    public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3,
+    public static void writeBooleans(DataOutput buffer, boolean value0, boolean value1, boolean value2, boolean value3,
         boolean value4, boolean value5, boolean value6) throws IOException
     {
         byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -547,7 +547,7 @@ public class EncodingUtils
         buffer.write(packedValue);
     }
 
-    public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3,
+    public static void writeBooleans(DataOutput buffer, boolean value0, boolean value1, boolean value2, boolean value3,
         boolean value4, boolean value5, boolean value6, boolean value7) throws IOException
     {
         byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -596,7 +596,7 @@ public class EncodingUtils
      * @param buffer
      * @param data
      */
-    public static void writeLongstr(DataOutputStream buffer, byte[] data) throws IOException
+    public static void writeLongstr(DataOutput buffer, byte[] data) throws IOException
     {
         if (data != null)
         {
@@ -609,12 +609,12 @@ public class EncodingUtils
         }
     }
 
-    public static void writeTimestamp(DataOutputStream buffer, long timestamp) throws IOException
+    public static void writeTimestamp(DataOutput buffer, long timestamp) throws IOException
     {
         writeLong(buffer, timestamp);
     }
 
-    public static boolean[] readBooleans(DataInputStream buffer) throws IOException
+    public static boolean[] readBooleans(DataInput buffer) throws IOException
     {
         final byte packedValue = buffer.readByte();
         if (packedValue == 0)
@@ -641,7 +641,7 @@ public class EncodingUtils
         return result;
     }
 
-    public static FieldTable readFieldTable(DataInputStream buffer) throws AMQFrameDecodingException, IOException
+    public static FieldTable readFieldTable(DataInput buffer) throws AMQFrameDecodingException, IOException
     {
         long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL;
         if (length == 0)
@@ -654,19 +654,19 @@ public class EncodingUtils
         }
     }
 
-    public static Content readContent(DataInputStream buffer) throws AMQFrameDecodingException
+    public static Content readContent(DataInput buffer) throws AMQFrameDecodingException
     {
         // TODO: New Content class required for AMQP 0-9.
         return null;
     }
 
-    public static AMQShortString readAMQShortString(DataInputStream buffer) throws IOException
+    public static AMQShortString readAMQShortString(DataInput buffer) throws IOException
     {
         return AMQShortString.readFromBuffer(buffer);
 
     }
 
-    public static String readShortString(DataInputStream buffer) throws IOException
+    public static String readShortString(DataInput buffer) throws IOException
     {
         short length = (short) (((short)buffer.readByte()) & 0xFF);
         if (length == 0)
@@ -681,7 +681,7 @@ public class EncodingUtils
             // this approach here is valid since we know that all the chars are
             // ASCII (0-127)
             byte[] stringBytes = new byte[length];
-            buffer.read(stringBytes, 0, length);
+            buffer.readFully(stringBytes, 0, length);
             char[] stringChars = new char[length];
             for (int i = 0; i < stringChars.length; i++)
             {
@@ -692,7 +692,7 @@ public class EncodingUtils
         }
     }
 
-    public static String readLongString(DataInputStream buffer) throws IOException
+    public static String readLongString(DataInput buffer) throws IOException
     {
         long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL;
         if (length == 0)
@@ -707,7 +707,7 @@ public class EncodingUtils
             // this approach here is valid since we know that all the chars are
             // ASCII (0-127)
             byte[] stringBytes = new byte[(int) length];
-            buffer.read(stringBytes, 0, (int) length);
+            buffer.readFully(stringBytes, 0, (int) length);
             char[] stringChars = new char[(int) length];
             for (int i = 0; i < stringChars.length; i++)
             {
@@ -718,7 +718,7 @@ public class EncodingUtils
         }
     }
 
-    public static byte[] readLongstr(DataInputStream buffer) throws IOException
+    public static byte[] readLongstr(DataInput buffer) throws IOException
     {
         long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL;
         if (length == 0)
@@ -728,13 +728,13 @@ public class EncodingUtils
         else
         {
             byte[] result = new byte[(int) length];
-            buffer.read(result);
+            buffer.readFully(result);
 
             return result;
         }
     }
 
-    public static long readTimestamp(DataInputStream buffer) throws IOException
+    public static long readTimestamp(DataInput buffer) throws IOException
     {
         // Discard msb from AMQ timestamp
         // buffer.getUnsignedInt();
@@ -818,12 +818,12 @@ public class EncodingUtils
 
     // AMQP_BOOLEAN_PROPERTY_PREFIX
 
-    public static void writeBoolean(DataOutputStream buffer, Boolean aBoolean) throws IOException
+    public static void writeBoolean(DataOutput buffer, boolean aBoolean) throws IOException
     {
         buffer.write(aBoolean ? 1 : 0);
     }
 
-    public static boolean readBoolean(DataInputStream buffer) throws IOException
+    public static boolean readBoolean(DataInput buffer) throws IOException
     {
         byte packedValue = buffer.readByte();
 
@@ -836,12 +836,12 @@ public class EncodingUtils
     }
 
     // AMQP_BYTE_PROPERTY_PREFIX
-    public static void writeByte(DataOutputStream buffer, Byte aByte) throws IOException
+    public static void writeByte(DataOutput buffer, byte aByte) throws IOException
     {
         buffer.writeByte(aByte);
     }
 
-    public static byte readByte(DataInputStream buffer) throws IOException
+    public static byte readByte(DataInput buffer) throws IOException
     {
         return buffer.readByte();
     }
@@ -852,12 +852,12 @@ public class EncodingUtils
     }
 
     // AMQP_SHORT_PROPERTY_PREFIX
-    public static void writeShort(DataOutputStream buffer, Short aShort) throws IOException
+    public static void writeShort(DataOutput buffer, short aShort) throws IOException
     {
         buffer.writeShort(aShort);
     }
 
-    public static short readShort(DataInputStream buffer) throws IOException
+    public static short readShort(DataInput buffer) throws IOException
     {
         return buffer.readShort();
     }
@@ -868,12 +868,12 @@ public class EncodingUtils
     }
 
     // INTEGER_PROPERTY_PREFIX
-    public static void writeInteger(DataOutputStream buffer, Integer aInteger) throws IOException
+    public static void writeInteger(DataOutput buffer, int aInteger) throws IOException
     {
         buffer.writeInt(aInteger);
     }
 
-    public static int readInteger(DataInputStream buffer) throws IOException
+    public static int readInteger(DataInput buffer) throws IOException
     {
         return buffer.readInt();
     }
@@ -884,12 +884,12 @@ public class EncodingUtils
     }
 
     // AMQP_LONG_PROPERTY_PREFIX
-    public static void writeLong(DataOutputStream buffer, Long aLong) throws IOException
+    public static void writeLong(DataOutput buffer, long aLong) throws IOException
     {
         buffer.writeLong(aLong);
     }
 
-    public static long readLong(DataInputStream buffer) throws IOException
+    public static long readLong(DataInput buffer) throws IOException
     {
         return buffer.readLong();
     }
@@ -900,12 +900,12 @@ public class EncodingUtils
     }
 
     // Float_PROPERTY_PREFIX
-    public static void writeFloat(DataOutputStream buffer, Float aFloat) throws IOException
+    public static void writeFloat(DataOutput buffer, float aFloat) throws IOException
     {
         buffer.writeFloat(aFloat);
     }
 
-    public static float readFloat(DataInputStream buffer) throws IOException
+    public static float readFloat(DataInput buffer) throws IOException
     {
         return buffer.readFloat();
     }
@@ -916,12 +916,12 @@ public class EncodingUtils
     }
 
     // Double_PROPERTY_PREFIX
-    public static void writeDouble(DataOutputStream buffer, Double aDouble) throws IOException
+    public static void writeDouble(DataOutput buffer, Double aDouble) throws IOException
     {
         buffer.writeDouble(aDouble);
     }
 
-    public static double readDouble(DataInputStream buffer) throws IOException
+    public static double readDouble(DataInput buffer) throws IOException
     {
         return buffer.readDouble();
     }
@@ -931,7 +931,7 @@ public class EncodingUtils
         return 8;
     }
 
-    public static byte[] readBytes(DataInputStream buffer) throws IOException
+    public static byte[] readBytes(DataInput buffer) throws IOException
     {
         long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL;
         if (length == 0)
@@ -941,13 +941,13 @@ public class EncodingUtils
         else
         {
             byte[] dataBytes = new byte[(int)length];
-            buffer.read(dataBytes, 0, (int) length);
+            buffer.readFully(dataBytes, 0, (int) length);
 
             return dataBytes;
         }
     }
 
-    public static void writeBytes(DataOutputStream buffer, byte[] data) throws IOException
+    public static void writeBytes(DataOutput buffer, byte[] data) throws IOException
     {
         if (data != null)
         {
@@ -969,19 +969,19 @@ public class EncodingUtils
         return encodedByteLength();
     }
 
-    public static char readChar(DataInputStream buffer) throws IOException
+    public static char readChar(DataInput buffer) throws IOException
     {
         // This is valid as we know that the Character is ASCII 0..127
-        return (char) buffer.read();
+        return (char) buffer.readByte();
     }
 
-    public static void writeChar(DataOutputStream buffer, char character) throws IOException
+    public static void writeChar(DataOutput buffer, char character) throws IOException
     {
         // This is valid as we know that the Character is ASCII 0..127
         writeByte(buffer, (byte) character);
     }
 
-    public static long readLongAsShortString(DataInputStream buffer) throws IOException
+    public static long readLongAsShortString(DataInput buffer) throws IOException
     {
         short length = (short) buffer.readUnsignedByte();
         short pos = 0;
@@ -1018,7 +1018,7 @@ public class EncodingUtils
         return result;
     }
 
-    public static long readUnsignedInteger(DataInputStream buffer) throws IOException
+    public static long readUnsignedInteger(DataInput buffer) throws IOException
     {
         long l = 0xFF & buffer.readByte();
         l <<= 8;

Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ExtendedDataInput.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ExtendedDataInput.java?rev=1225178&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ExtendedDataInput.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ExtendedDataInput.java Wed Dec 28 13:02:41 2011
@@ -0,0 +1,14 @@
+package org.apache.qpid.framing;
+
+import java.io.DataInput;
+
+public interface ExtendedDataInput extends DataInput
+{
+    AMQShortString readAMQShortString();
+
+    int available();
+
+    int position();
+
+    void position(int position);
+}

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java Wed Dec 28 13:02:41 2011
@@ -25,11 +25,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.AMQPInvalidClassException;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.io.*;
 import java.math.BigDecimal;
 import java.util.Collections;
 import java.util.Enumeration;
@@ -44,18 +40,27 @@ import java.util.Set;
 public class FieldTable
 {
     private static final Logger _logger = LoggerFactory.getLogger(FieldTable.class);
-    private static final String STRICT_AMQP = "STRICT_AMQP";
-    private final boolean _strictAMQP = Boolean.valueOf(System.getProperty(STRICT_AMQP, "false"));
+    private static final String STRICT_AMQP_NAME = "STRICT_AMQP";
+    private static final boolean STRICT_AMQP = Boolean.valueOf(System.getProperty(STRICT_AMQP_NAME, "false"));
 
     private byte[] _encodedForm;
+    private int _encodedFormOffset;
     private LinkedHashMap<AMQShortString, AMQTypedValue> _properties = null;
     private long _encodedSize;
     private static final int INITIAL_HASHMAP_CAPACITY = 16;
     private static final int INITIAL_ENCODED_FORM_SIZE = 256;
+    private final boolean _strictAMQP;
 
     public FieldTable()
     {
+        this(STRICT_AMQP);
+    }
+
+
+    public FieldTable(boolean strictAMQP)
+    {
         super();
+        _strictAMQP = strictAMQP;
     }
 
     /**
@@ -64,14 +69,28 @@ public class FieldTable
      * @param buffer the buffer from which to read data. The length byte must be read already
      * @param length the length of the field table. Must be > 0.
      */
-    public FieldTable(DataInputStream buffer, long length) throws IOException
+    public FieldTable(DataInput buffer, long length) throws IOException
     {
         this();
         _encodedForm = new byte[(int) length];
-        buffer.read(_encodedForm);
+        buffer.readFully(_encodedForm);
         _encodedSize = length;
     }
 
+    public FieldTable(byte[] encodedForm, int offset, int length) throws IOException
+    {
+        this();
+        _encodedForm = encodedForm;
+        _encodedFormOffset = offset;
+        _encodedSize = length;
+    }
+
+
+    public boolean isClean()
+    {
+        return _encodedForm != null;
+    }
+
     public AMQTypedValue getProperty(AMQShortString string)
     {
         checkPropertyName(string);
@@ -181,7 +200,7 @@ public class FieldTable
 
     public Boolean getBoolean(String string)
     {
-        return getBoolean(new AMQShortString(string));
+        return getBoolean(AMQShortString.valueOf(string));
     }
 
     public Boolean getBoolean(AMQShortString string)
@@ -199,7 +218,7 @@ public class FieldTable
 
     public Byte getByte(String string)
     {
-        return getByte(new AMQShortString(string));
+        return getByte(AMQShortString.valueOf(string));
     }
 
     public Byte getByte(AMQShortString string)
@@ -217,7 +236,7 @@ public class FieldTable
 
     public Short getShort(String string)
     {
-        return getShort(new AMQShortString(string));
+        return getShort(AMQShortString.valueOf(string));
     }
 
     public Short getShort(AMQShortString string)
@@ -235,7 +254,7 @@ public class FieldTable
 
     public Integer getInteger(String string)
     {
-        return getInteger(new AMQShortString(string));
+        return getInteger(AMQShortString.valueOf(string));
     }
 
     public Integer getInteger(AMQShortString string)
@@ -253,7 +272,7 @@ public class FieldTable
 
     public Long getLong(String string)
     {
-        return getLong(new AMQShortString(string));
+        return getLong(AMQShortString.valueOf(string));
     }
 
     public Long getLong(AMQShortString string)
@@ -271,7 +290,7 @@ public class FieldTable
 
     public Float getFloat(String string)
     {
-        return getFloat(new AMQShortString(string));
+        return getFloat(AMQShortString.valueOf(string));
     }
 
     public Float getFloat(AMQShortString string)
@@ -289,7 +308,7 @@ public class FieldTable
 
     public Double getDouble(String string)
     {
-        return getDouble(new AMQShortString(string));
+        return getDouble(AMQShortString.valueOf(string));
     }
 
     public Double getDouble(AMQShortString string)
@@ -307,7 +326,7 @@ public class FieldTable
 
     public String getString(String string)
     {
-        return getString(new AMQShortString(string));
+        return getString(AMQShortString.valueOf(string));
     }
 
     public String getString(AMQShortString string)
@@ -330,7 +349,7 @@ public class FieldTable
 
     public Character getCharacter(String string)
     {
-        return getCharacter(new AMQShortString(string));
+        return getCharacter(AMQShortString.valueOf(string));
     }
 
     public Character getCharacter(AMQShortString string)
@@ -348,7 +367,7 @@ public class FieldTable
 
     public byte[] getBytes(String string)
     {
-        return getBytes(new AMQShortString(string));
+        return getBytes(AMQShortString.valueOf(string));
     }
 
     public byte[] getBytes(AMQShortString string)
@@ -374,7 +393,7 @@ public class FieldTable
      */
     public FieldTable getFieldTable(String string)
     {
-        return getFieldTable(new AMQShortString(string));
+        return getFieldTable(AMQShortString.valueOf(string));
     }
 
     /**
@@ -401,7 +420,7 @@ public class FieldTable
 
     public Object getObject(String string)
     {
-        return getObject(new AMQShortString(string));
+        return getObject(AMQShortString.valueOf(string));
     }
 
     public Object getObject(AMQShortString string)
@@ -447,7 +466,7 @@ public class FieldTable
     // ************  Setters
     public Object setBoolean(String string, Boolean b)
     {
-        return setBoolean(new AMQShortString(string), b);
+        return setBoolean(AMQShortString.valueOf(string), b);
     }
 
     public Object setBoolean(AMQShortString string, Boolean b)
@@ -457,7 +476,7 @@ public class FieldTable
 
     public Object setByte(String string, Byte b)
     {
-        return setByte(new AMQShortString(string), b);
+        return setByte(AMQShortString.valueOf(string), b);
     }
 
     public Object setByte(AMQShortString string, Byte b)
@@ -467,7 +486,7 @@ public class FieldTable
 
     public Object setShort(String string, Short i)
     {
-        return setShort(new AMQShortString(string), i);
+        return setShort(AMQShortString.valueOf(string), i);
     }
 
     public Object setShort(AMQShortString string, Short i)
@@ -475,29 +494,29 @@ public class FieldTable
         return setProperty(string, AMQType.SHORT.asTypedValue(i));
     }
 
-    public Object setInteger(String string, Integer i)
+    public Object setInteger(String string, int i)
     {
-        return setInteger(new AMQShortString(string), i);
+        return setInteger(AMQShortString.valueOf(string), i);
     }
 
-    public Object setInteger(AMQShortString string, Integer i)
+    public Object setInteger(AMQShortString string, int i)
     {
-        return setProperty(string, AMQType.INT.asTypedValue(i));
+        return setProperty(string, AMQTypedValue.createAMQTypedValue(i));
     }
 
-    public Object setLong(String string, Long l)
+    public Object setLong(String string, long l)
     {
-        return setLong(new AMQShortString(string), l);
+        return setLong(AMQShortString.valueOf(string), l);
     }
 
-    public Object setLong(AMQShortString string, Long l)
+    public Object setLong(AMQShortString string, long l)
     {
-        return setProperty(string, AMQType.LONG.asTypedValue(l));
+        return setProperty(string, AMQTypedValue.createAMQTypedValue(l));
     }
 
     public Object setFloat(String string, Float f)
     {
-        return setFloat(new AMQShortString(string), f);
+        return setFloat(AMQShortString.valueOf(string), f);
     }
 
     public Object setFloat(AMQShortString string, Float v)
@@ -507,7 +526,7 @@ public class FieldTable
 
     public Object setDouble(String string, Double d)
     {
-        return setDouble(new AMQShortString(string), d);
+        return setDouble(AMQShortString.valueOf(string), d);
     }
 
     public Object setDouble(AMQShortString string, Double v)
@@ -517,7 +536,7 @@ public class FieldTable
 
     public Object setString(String string, String s)
     {
-        return setString(new AMQShortString(string), s);
+        return setString(AMQShortString.valueOf(string), s);
     }
 
     public Object setAsciiString(AMQShortString string, String value)
@@ -546,7 +565,7 @@ public class FieldTable
 
     public Object setChar(String string, char c)
     {
-        return setChar(new AMQShortString(string), c);
+        return setChar(AMQShortString.valueOf(string), c);
     }
 
     public Object setChar(AMQShortString string, char c)
@@ -556,7 +575,7 @@ public class FieldTable
 
     public Object setBytes(String string, byte[] b)
     {
-        return setBytes(new AMQShortString(string), b);
+        return setBytes(AMQShortString.valueOf(string), b);
     }
 
     public Object setBytes(AMQShortString string, byte[] bytes)
@@ -566,7 +585,7 @@ public class FieldTable
 
     public Object setBytes(String string, byte[] bytes, int start, int length)
     {
-        return setBytes(new AMQShortString(string), bytes, start, length);
+        return setBytes(AMQShortString.valueOf(string), bytes, start, length);
     }
 
     public Object setBytes(AMQShortString string, byte[] bytes, int start, int length)
@@ -579,7 +598,7 @@ public class FieldTable
 
     public Object setObject(String string, Object o)
     {
-        return setObject(new AMQShortString(string), o);
+        return setObject(AMQShortString.valueOf(string), o);
     }
 
     public Object setTimestamp(AMQShortString string, long datetime)
@@ -617,7 +636,7 @@ public class FieldTable
      */
     public Object setFieldTable(String string, FieldTable ftValue)
     {
-        return setFieldTable(new AMQShortString(string), ftValue);
+        return setFieldTable(AMQShortString.valueOf(string), ftValue);
     }
 
     /**
@@ -681,7 +700,7 @@ public class FieldTable
 
     public boolean isNullStringValue(String name)
     {
-        AMQTypedValue value = getProperty(new AMQShortString(name));
+        AMQTypedValue value = getProperty(AMQShortString.valueOf(name));
 
         return (value != null) && (value.getType() == AMQType.VOID);
     }
@@ -713,7 +732,7 @@ public class FieldTable
 
     public boolean itemExists(String string)
     {
-        return itemExists(new AMQShortString(string));
+        return itemExists(AMQShortString.valueOf(string));
     }
 
     public String toString()
@@ -769,7 +788,7 @@ public class FieldTable
 
     // *************************  Byte Buffer Processing
 
-    public void writeToBuffer(DataOutputStream buffer) throws IOException
+    public void writeToBuffer(DataOutput buffer) throws IOException
     {
         final boolean trace = _logger.isDebugEnabled();
 
@@ -919,7 +938,7 @@ public class FieldTable
 
     public boolean containsKey(String key)
     {
-        return containsKey(new AMQShortString(key));
+        return containsKey(AMQShortString.valueOf(key));
     }
 
     public Set<String> keys()
@@ -942,7 +961,7 @@ public class FieldTable
 
     public Object get(String key)
     {
-        return get(new AMQShortString(key));
+        return get(AMQShortString.valueOf(key));
     }
 
     public Object get(AMQShortString key)
@@ -958,7 +977,7 @@ public class FieldTable
     public Object remove(String key)
     {
 
-        return remove(new AMQShortString(key));
+        return remove(AMQShortString.valueOf(key));
 
     }
 
@@ -1005,12 +1024,12 @@ public class FieldTable
         return _properties.keySet();
     }
 
-    private void putDataInBuffer(DataOutputStream buffer) throws IOException
+    private void putDataInBuffer(DataOutput buffer) throws IOException
     {
 
         if (_encodedForm != null)
         {
-            buffer.write(_encodedForm);
+            buffer.write(_encodedForm,_encodedFormOffset,(int)_encodedSize);
         }
         else if (_properties != null)
         {
@@ -1039,9 +1058,8 @@ public class FieldTable
     private void setFromBuffer() throws AMQFrameDecodingException, IOException
     {
 
-        final ByteArrayInputStream in = new ByteArrayInputStream(_encodedForm);
-        DataInputStream buffer = new DataInputStream(in);
-        final boolean trace = _logger.isDebugEnabled();
+        ByteArrayDataInput baid = new ByteArrayDataInput(_encodedForm, _encodedFormOffset, (int)_encodedSize);
+
         if (_encodedSize > 0)
         {
 
@@ -1051,12 +1069,12 @@ public class FieldTable
             do
             {
 
-                final AMQShortString key = EncodingUtils.readAMQShortString(buffer);
-                AMQTypedValue value = AMQTypedValue.readFromBuffer(buffer);
+                final AMQShortString key = baid.readAMQShortString();
+                AMQTypedValue value = AMQTypedValue.readFromBuffer(baid);
                 _properties.put(key, value);
 
             }
-            while (in.available() > 0);
+            while (baid.available() > 0);
 
         }
 
@@ -1101,7 +1119,7 @@ public class FieldTable
             FieldTable table = new FieldTable();
             for(Map.Entry<String,Object> entry : map.entrySet())
             {
-                table.put(new AMQShortString(entry.getKey()), entry.getValue());
+                table.put(AMQShortString.valueOf(entry.getKey()), entry.getValue());
             }
 
             return table;

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java Wed Dec 28 13:02:41 2011
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.framing;
 
+import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.IOException;
 
@@ -30,7 +31,7 @@ public class FieldTableFactory
         return new FieldTable();
     }
 
-    public static FieldTable newFieldTable(DataInputStream byteBuffer, long length) throws AMQFrameDecodingException, IOException
+    public static FieldTable newFieldTable(DataInput byteBuffer, long length) throws AMQFrameDecodingException, IOException
     {
         return new FieldTable(byteBuffer, length);
     }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java Wed Dec 28 13:02:41 2011
@@ -21,7 +21,7 @@
 package org.apache.qpid.framing;
 
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
@@ -56,7 +56,7 @@ public class HeartbeatBody implements AM
         return 0;//heartbeats we generate have no payload
     }
 
-    public void writePayload(DataOutputStream buffer)
+    public void writePayload(DataOutput buffer)
     {
     }
 

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java Wed Dec 28 13:02:41 2011
@@ -20,12 +20,13 @@
  */
 package org.apache.qpid.framing;
 
-import java.io.DataInputStream;
+import org.apache.qpid.codec.MarkableDataInput;
 
 public class HeartbeatBodyFactory implements BodyFactory
 {
-    public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException
+    public AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException
     {
         return new HeartbeatBody();
     }
+
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java Wed Dec 28 13:02:41 2011
@@ -21,13 +21,10 @@
 package org.apache.qpid.framing;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.MarkableDataInput;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.io.*;
 
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 
 public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock
@@ -66,7 +63,7 @@ public class ProtocolInitiation extends 
              pv.equals(ProtocolVersion.v0_91) ? 1 : pv.getMinorVersion());
     }
 
-    public ProtocolInitiation(DataInputStream in) throws IOException
+    public ProtocolInitiation(MarkableDataInput in) throws IOException
     {
         _protocolHeader = new byte[4];
         in.read(_protocolHeader);
@@ -82,7 +79,7 @@ public class ProtocolInitiation extends 
         return 4 + 1 + 1 + 1 + 1;
     }
 
-    public void writePayload(DataOutputStream buffer) throws IOException
+    public void writePayload(DataOutput buffer) throws IOException
     {
 
         buffer.write(_protocolHeader);
@@ -143,7 +140,7 @@ public class ProtocolInitiation extends 
          * @return true if we have enough data to decode the PI frame fully, false if more
          * data is required
          */
-        public boolean decodable(DataInputStream in) throws IOException
+        public boolean decodable(MarkableDataInput in) throws IOException
         {
             return (in.available() >= 8);
         }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java Wed Dec 28 13:02:41 2011
@@ -21,7 +21,7 @@
 
 package org.apache.qpid.framing;
 
-import java.io.DataOutputStream;
+import java.io.DataOutput;
 import java.io.IOException;
 
 public class SmallCompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
@@ -69,7 +69,7 @@ public class SmallCompositeAMQDataBlock 
         return frameSize;
     }
 
-    public void writePayload(DataOutputStream buffer) throws IOException
+    public void writePayload(DataOutput buffer) throws IOException
     {
         if (_firstFrame != null)
         {

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java Wed Dec 28 13:02:41 2011
@@ -23,6 +23,7 @@ package org.apache.qpid.framing;
 import java.io.DataInputStream;
 import java.io.IOException;
 
+import org.apache.qpid.codec.MarkableDataInput;
 import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
 
 import org.slf4j.Logger;
@@ -145,7 +146,7 @@ public class VersionSpecificRegistry
 
     }
 
-    public AMQMethodBody get(short classID, short methodID, DataInputStream in, long size) throws AMQFrameDecodingException, IOException
+    public AMQMethodBody get(short classID, short methodID, MarkableDataInput in, long size) throws AMQFrameDecodingException, IOException
     {
         AMQMethodBodyInstanceFactory bodyFactory;
         try

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Wed Dec 28 13:02:41 2011
@@ -383,13 +383,19 @@ public class Connection extends Connecti
 
     public void received(ProtocolEvent event)
     {
-        log.debug("RECV: [%s] %s", this, event);
+        if(log.isDebugEnabled())
+        {
+            log.debug("RECV: [%s] %s", this, event);
+        }
         event.delegate(this, delegate);
     }
 
     public void send(ProtocolEvent event)
     {
-        log.debug("SEND: [%s] %s", this, event);
+        if(log.isDebugEnabled())
+        {
+            log.debug("SEND: [%s] %s", this, event);
+        }
         Sender s = sender;
         if (s == null)
         {
@@ -400,8 +406,15 @@ public class Connection extends Connecti
 
     public void flush()
     {
-        log.debug("FLUSH: [%s]", this);
-        sender.flush();
+        if(log.isDebugEnabled())
+        {
+            log.debug("FLUSH: [%s]", this);
+        }
+        final Sender<ProtocolEvent> theSender = sender;
+        if(theSender != null)
+        {
+            theSender.flush();
+        }
     }
 
     protected void invoke(Method method)

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java Wed Dec 28 13:02:41 2011
@@ -20,13 +20,7 @@
  */
 package org.apache.qpid.transport;
 
-import org.apache.qpid.transport.network.Frame;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.LinkedHashMap;
-import java.nio.ByteBuffer;
+import java.util.*;
 
 
 /**
@@ -35,45 +29,87 @@ import java.nio.ByteBuffer;
  * @author Rafael H. Schloming
  */
 
-public class Header {
+public class Header
+{
 
-    private final Struct[] structs;
+    private final DeliveryProperties _deliveryProps;
+    private final MessageProperties _messageProps;
+    private final List<Struct> _nonStandardProps;
 
-    public Header(List<Struct> structs)
+    public Header(DeliveryProperties deliveryProps, MessageProperties messageProps)
     {
-        this(structs.toArray(new Struct[structs.size()]));
+        this(deliveryProps, messageProps, null);
     }
 
-    public Header(Struct ... structs)
+    public Header(DeliveryProperties deliveryProps, MessageProperties messageProps, List<Struct> nonStandardProps)
     {
-        this.structs = structs;
+        _deliveryProps = deliveryProps;
+        _messageProps = messageProps;
+        _nonStandardProps = nonStandardProps;
     }
 
     public Struct[] getStructs()
     {
+        int size = 0;
+        if(_deliveryProps != null)
+        {
+            size++;
+        }
+        if(_messageProps != null)
+        {
+            size++;
+        }
+        if(_nonStandardProps != null)
+        {
+            size+=_nonStandardProps.size();
+        }
+        Struct[] structs = new Struct[size];
+        int index = 0;
+        if(_deliveryProps != null)
+        {
+            structs[index++] = _deliveryProps;
+        }
+        if(_messageProps != null)
+        {
+            structs[index++] = _messageProps;
+        }
+        if(_nonStandardProps != null)
+        {
+            for(Struct struct : _nonStandardProps)
+            {
+                structs[index++] = struct;
+            }
+        }
+
         return structs;
     }
 
+    public DeliveryProperties getDeliveryProperties()
+    {
+        return _deliveryProps;
+    }
 
-    public <T> T get(Class<T> klass)
+    public MessageProperties getMessageProperties()
     {
-        for (Struct st : structs)
-        {
-            if (klass.isInstance(st))
-            {
-                return (T) st;
-            }
-        }
+        return _messageProps;
+    }
 
-        return null;
+    public List<Struct> getNonStandardProperties()
+    {
+        return _nonStandardProps;
     }
 
     public String toString()
     {
-        StringBuffer str = new StringBuffer();
+        StringBuilder str = new StringBuilder();
         str.append(" Header(");
         boolean first = true;
-        for (Struct s : structs)
+        if(_deliveryProps !=null)
+        {
+            first=false;
+            str.append(_deliveryProps);
+        }
+        if(_messageProps != null)
         {
             if (first)
             {
@@ -83,9 +119,24 @@ public class Header {
             {
                 str.append(", ");
             }
-            str.append(s);
+            str.append(_messageProps);
+        }
+        if(_nonStandardProps != null)
+        {
+            for (Struct s : _nonStandardProps)
+            {
+                if (first)
+                {
+                    first = false;
+                }
+                else
+                {
+                    str.append(", ");
+                }
+                str.append(s);
+            }
         }
-        str.append(")");
+        str.append(')');
         return str.toString();
     }
 

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java Wed Dec 28 13:02:41 2011
@@ -21,6 +21,8 @@
 package org.apache.qpid.transport;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 
 import static org.apache.qpid.util.Serial.*;
@@ -32,94 +34,259 @@ import static org.apache.qpid.util.Seria
  * @author Rafael H. Schloming
  */
 
-public final class Range
+public abstract class Range implements RangeSet
 {
-    private final int lower;
-    private final int upper;
+    public static Range newInstance(int point)
+    {
+        return new PointImpl(point);
+    }
+
+    public static Range newInstance(int lower, int upper)
+    {
+        return lower == upper ? new PointImpl(lower) : new RangeImpl(lower, upper);
+    }
+
+    public abstract int getLower();
+
+    public abstract int getUpper();
+
+    public abstract boolean includes(int value);
+
+    public abstract boolean includes(Range range);
+
+    public abstract boolean intersects(Range range);
+
+    public abstract boolean touches(Range range);
+
+    public abstract Range span(Range range);
 
-    public Range(int lower, int upper)
+    public abstract List<Range> subtract(Range range);
+
+
+    public Range intersect(Range range)
     {
-        this.lower = lower;
-        this.upper = upper;
+        int l = max(getLower(), range.getLower());
+        int r = min(getUpper(), range.getUpper());
+        if (gt(l, r))
+        {
+            return null;
+        }
+        else
+        {
+            return newInstance(l, r);
+        }
     }
 
-    public int getLower()
+
+
+    public int size()
     {
-        return lower;
+        return 1;
     }
 
-    public int getUpper()
+    public Iterator<Range> iterator()
     {
-        return upper;
+        return new RangeIterator();
     }
 
-    public boolean includes(int value)
+    public Range getFirst()
     {
-        return le(lower, value) && le(value, upper);
+        return this;
     }
 
-    public boolean includes(Range range)
+    public Range getLast()
     {
-        return includes(range.lower) && includes(range.upper);
+        return this;
     }
 
-    public boolean intersects(Range range)
+    public void add(Range range)
     {
-        return (includes(range.lower) || includes(range.upper) ||
-                range.includes(lower) || range.includes(upper));
+        throw new UnsupportedOperationException();
     }
 
-    public boolean touches(Range range)
+    public void add(int lower, int upper)
     {
-        return (intersects(range) ||
-                includes(range.upper + 1) || includes(range.lower - 1) ||
-                range.includes(upper + 1) || range.includes(lower - 1));
+        throw new UnsupportedOperationException();
     }
 
-    public Range span(Range range)
+    public void add(int value)
     {
-        return new Range(min(lower, range.lower), max(upper, range.upper));
+        throw new UnsupportedOperationException();
     }
 
-    public List<Range> subtract(Range range)
+    public void clear()
     {
-        List<Range> result = new ArrayList<Range>();
+        throw new UnsupportedOperationException();
+    }
+
+    public RangeSet copy()
+    {
+        RangeSet rangeSet = RangeSetFactory.createRangeSet();
+        rangeSet.add(this);
+        return rangeSet;
+    }
+
+    private static class PointImpl extends Range
+    {
+        private final int point;
+
+        private PointImpl(int point)
+        {
+            this.point = point;
+        }
+
+        public int getLower()
+        {
+            return point;
+        }
+
+        public int getUpper()
+        {
+            return point;
+        }
+
+        public boolean includes(int value)
+        {
+            return value == point;
+        }
+
+
+        public boolean includes(Range range)
+        {
+            return range.getLower() == point && range.getUpper() == point;
+        }
+
+        public boolean intersects(Range range)
+        {
+            return range.includes(point);
+        }
 
-        if (includes(range.lower) && le(lower, range.lower - 1))
+        public boolean touches(Range range)
         {
-            result.add(new Range(lower, range.lower - 1));
+            return intersects(range) ||
+                    includes(range.getUpper() + 1) || includes(range.getLower() - 1) ||
+                    range.includes(point + 1) || range.includes(point - 1);
         }
 
-        if (includes(range.upper) && le(range.upper + 1, upper))
+        public Range span(Range range)
         {
-            result.add(new Range(range.upper + 1, upper));
+            return newInstance(min(point, range.getLower()), max(point, range.getUpper()));
         }
 
-        if (result.isEmpty() && !range.includes(this))
+        public List<Range> subtract(Range range)
         {
-            result.add(this);
+            if(range.includes(point))
+            {
+                return Collections.emptyList();
+            }
+            else
+            {
+                return Collections.singletonList((Range) this);
+            }
         }
 
-        return result;
     }
 
-    public Range intersect(Range range)
+    private static class RangeImpl extends Range
     {
-        int l = max(lower, range.lower);
-        int r = min(upper, range.upper);
-        if (gt(l, r))
+        private final int lower;
+        private final int upper;
+
+        private RangeImpl(int lower, int upper)
         {
-            return null;
+            this.lower = lower;
+            this.upper = upper;
         }
-        else
+
+        public int getLower()
+        {
+            return lower;
+        }
+
+        public int getUpper()
+        {
+            return upper;
+        }
+
+        public boolean includes(int value)
+        {
+            return le(lower, value) && le(value, upper);
+        }
+
+        public boolean includes(Range range)
+        {
+            return includes(range.getLower()) && includes(range.getUpper());
+        }
+
+        public boolean intersects(Range range)
+        {
+            return (includes(range.getLower()) || includes(range.getUpper()) ||
+                    range.includes(lower) || range.includes(upper));
+        }
+
+        public boolean touches(Range range)
         {
-            return new Range(l, r);
+            return (intersects(range) ||
+                    includes(range.getUpper() + 1) || includes(range.getLower() - 1) ||
+                    range.includes(upper + 1) || range.includes(lower - 1));
+        }
+
+        public Range span(Range range)
+        {
+            return newInstance(min(lower, range.getLower()), max(upper, range.getUpper()));
+        }
+
+        public List<Range> subtract(Range range)
+        {
+            List<Range> result = new ArrayList<Range>();
+
+            if (includes(range.getLower()) && le(lower, range.getLower() - 1))
+            {
+                result.add(newInstance(lower, range.getLower() - 1));
+            }
+
+            if (includes(range.getUpper()) && le(range.getUpper() + 1, upper))
+            {
+                result.add(newInstance(range.getUpper() + 1, upper));
+            }
+
+            if (result.isEmpty() && !range.includes(this))
+            {
+                result.add(this);
+            }
+
+            return result;
+        }
+
+
+        public String toString()
+        {
+            return "[" + lower + ", " + upper + "]";
         }
     }
 
-    public String toString()
+
+    private class RangeIterator implements Iterator<Range>
     {
-        return "[" + lower + ", " + upper + "]";
-    }
+        private boolean atFirst = true;
+
+        public boolean hasNext()
+        {
+            return atFirst;
+        }
+
+        public Range next()
+        {
 
+            Range range = atFirst ? Range.this : null;
+            atFirst = false;
+            return range;
+        }
+
+        @Override
+        public void remove()
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java Wed Dec 28 13:02:41 2011
@@ -20,9 +20,7 @@
  */
 package org.apache.qpid.transport;
 
-import java.util.Iterator;
-import java.util.ListIterator;
-import java.util.LinkedList;
+import java.util.*;
 
 import static org.apache.qpid.util.Serial.*;
 
@@ -32,121 +30,29 @@ import static org.apache.qpid.util.Seria
  * @author Rafael H. Schloming
  */
 
-public final class RangeSet implements Iterable<Range>
+public interface RangeSet extends Iterable<Range>
 {
 
-    private LinkedList<Range> ranges = new LinkedList<Range>();
+    int size();
 
-    public int size()
-    {
-        return ranges.size();
-    }
-
-    public Iterator<Range> iterator()
-    {
-        return ranges.iterator();
-    }
-
-    public Range getFirst()
-    {
-        return ranges.getFirst();
-    }
-
-    public Range getLast()
-    {
-        return ranges.getLast();
-    }
-
-    public boolean includes(Range range)
-    {
-        for (Range r : this)
-        {
-            if (r.includes(range))
-            {
-                return true;
-            }
-        }
-
-        return false;
-    }
-
-    public boolean includes(int n)
-    {
-        for (Range r : this)
-        {
-            if (r.includes(n))
-            {
-                return true;
-            }
-        }
-
-        return false;
-    }
-
-    public void add(Range range)
-    {
-        ListIterator<Range> it = ranges.listIterator();
-
-        while (it.hasNext())
-        {
-            Range next = it.next();
-            if (range.touches(next))
-            {
-                it.remove();
-                range = range.span(next);
-            }
-            else if (lt(range.getUpper(), next.getLower()))
-            {
-                it.previous();
-                it.add(range);
-                return;
-            }
-        }
-
-        it.add(range);
-    }
-
-    public void add(int lower, int upper)
-    {
-        add(new Range(lower, upper));
-    }
-
-    public void add(int value)
-    {
-        add(value, value);
-    }
-
-    public void clear()
-    {
-        ranges.clear();
-    }
-
-    public RangeSet copy()
-    {
-        RangeSet copy = new RangeSet();
-        copy.ranges.addAll(ranges);
-        return copy;
-    }
-
-    public String toString()
-    {
-        StringBuffer str = new StringBuffer();
-        str.append("{");
-        boolean first = true;
-        for (Range range : ranges)
-        {
-            if (first)
-            {
-                first = false;
-            }
-            else
-            {
-                str.append(", ");
-            }
-            str.append(range);
-        }
-        str.append("}");
-        return str.toString();
-    }
+    Iterator<Range> iterator();
+
+    Range getFirst();
+
+    Range getLast();
+
+    boolean includes(Range range);
+
+    boolean includes(int n);
+
+    void add(Range range);
+
+    void add(int lower, int upper);
+
+    void add(int value);
+
+    void clear();
+
+    RangeSet copy();
 
 }

Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java?rev=1225178&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java Wed Dec 28 13:02:41 2011
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+public class RangeSetFactory
+{
+    public static RangeSet createRangeSet()
+    {
+        return new RangeSetImpl();
+    }
+
+    public static RangeSet createRangeSet(int size)
+    {
+        return new RangeSetImpl(size);
+    }
+}

Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java?rev=1225178&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java Wed Dec 28 13:02:41 2011
@@ -0,0 +1,178 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+
+import static org.apache.qpid.util.Serial.lt;
+
+public class RangeSetImpl implements RangeSet
+{
+
+    private List<Range> ranges;
+
+    public RangeSetImpl()
+    {
+        ranges =  new ArrayList<Range>();
+    }
+
+    public RangeSetImpl(int size)
+    {
+        ranges =  new ArrayList<Range>(size);
+    }
+
+
+    public RangeSetImpl(org.apache.qpid.transport.RangeSetImpl copy)
+    {
+        ranges = new ArrayList<Range>(copy.ranges);
+    }
+
+    public int size()
+    {
+        return ranges.size();
+    }
+
+    public Iterator<Range> iterator()
+    {
+        return ranges.iterator();
+    }
+
+    public Range getFirst()
+    {
+        return ranges.get(0);
+    }
+
+    public Range getLast()
+    {
+        return ranges.get(ranges.size() - 1);
+    }
+
+    public boolean includes(Range range)
+    {
+        for (Range r : this)
+        {
+            if (r.includes(range))
+            {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    public boolean includes(int n)
+    {
+        for (Range r : this)
+        {
+            if (r.includes(n))
+            {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    public void add(Range range)
+    {
+        ListIterator<Range> it = ranges.listIterator();
+
+        while (it.hasNext())
+        {
+            Range next = it.next();
+            if (range.touches(next))
+            {
+                it.remove();
+                range = range.span(next);
+            }
+            else if (lt(range.getUpper(), next.getLower()))
+            {
+                it.previous();
+                it.add(range);
+                return;
+            }
+        }
+
+        it.add(range);
+    }
+
+    public void add(int lower, int upper)
+    {
+        switch(ranges.size())
+        {
+            case 0:
+                ranges.add(Range.newInstance(lower, upper));
+                break;
+
+            case 1:
+                Range first = ranges.get(0);
+                if(first.getUpper() + 1 >= lower && upper >= first.getUpper())
+                {
+                    ranges.set(0, Range.newInstance(first.getLower(), upper));
+                    break;
+                }
+
+            default:
+                add(Range.newInstance(lower, upper));
+        }
+
+
+    }
+
+    public void add(int value)
+    {
+        add(value, value);
+    }
+
+    public void clear()
+    {
+        ranges.clear();
+    }
+
+    public RangeSet copy()
+    {
+        return new org.apache.qpid.transport.RangeSetImpl(this);
+    }
+
+    public String toString()
+    {
+        StringBuffer str = new StringBuffer();
+        str.append("{");
+        boolean first = true;
+        for (Range range : ranges)
+        {
+            if (first)
+            {
+                first = false;
+            }
+            else
+            {
+                str.append(", ");
+            }
+            str.append(range);
+        }
+        str.append("}");
+        return str.toString();
+    }
+}

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Wed Dec 28 13:02:41 2011
@@ -247,7 +247,7 @@ public class Session extends SessionInvo
         synchronized (processedLock)
         {
             incomingInit = false;
-            processed = new RangeSet();
+            processed = RangeSetFactory.createRangeSet();
         }
     }
 
@@ -276,22 +276,22 @@ public class Session extends SessionInvo
                 else if (m instanceof MessageTransfer)
                 {
                 	MessageTransfer xfr = (MessageTransfer)m;
-                	
-                	if (xfr.getHeader() != null)
+
+                    Header header = xfr.getHeader();
+
+                    if (header != null)
                 	{
-                		if (xfr.getHeader().get(DeliveryProperties.class) != null)
+                		if (header.getDeliveryProperties() != null)
                 		{
-                		   xfr.getHeader().get(DeliveryProperties.class).setRedelivered(true);
+                		   header.getDeliveryProperties().setRedelivered(true);
                 		}
                 		else
                 		{
-                			Struct[] structs = xfr.getHeader().getStructs();
                 			DeliveryProperties deliveryProps = new DeliveryProperties();
                     		deliveryProps.setRedelivered(true);
-                    		
-                    		List<Struct> list = Arrays.asList(structs);
-                    		list.add(deliveryProps);
-                    		xfr.setHeader(new Header(list));
+
+                    		xfr.setHeader(new Header(deliveryProps, header.getMessageProperties(),
+                                                     header.getNonStandardProperties()));
                 		}
                 		
                 	}
@@ -299,7 +299,7 @@ public class Session extends SessionInvo
                 	{
                 		DeliveryProperties deliveryProps = new DeliveryProperties();
                 		deliveryProps.setRedelivered(true);
-                		xfr.setHeader(new Header(deliveryProps));
+                		xfr.setHeader(new Header(deliveryProps, null, null));
                 	}
                 }
                 sessionCommandPoint(m.getId(), 0);
@@ -394,38 +394,46 @@ public class Session extends SessionInvo
 
     public void processed(int command)
     {
-        processed(new Range(command, command));
+        processed(command, command);
     }
 
-    public void processed(int lower, int upper)
+    public void processed(Range range)
     {
 
-        processed(new Range(lower, upper));
+        processed(range.getLower(), range.getUpper());
     }
 
-    public void processed(Range range)
+    public void processed(int lower, int upper)
     {
-        log.debug("%s processed(%s) %s %s", this, range, syncPoint, maxProcessed);
+        if(log.isDebugEnabled())
+        {
+            log.debug("%s processed([%d,%d]) %s %s", this, lower, upper, syncPoint, maxProcessed);
+        }
 
         boolean flush;
         synchronized (processedLock)
         {
-            log.debug("%s", processed);
+            if(log.isDebugEnabled())
+            {
+                log.debug("%s", processed);
+            }
 
-            if (ge(range.getUpper(), commandsIn))
+            if (ge(upper, commandsIn))
             {
                 throw new IllegalArgumentException
-                    ("range exceeds max received command-id: " + range);
+                    ("range exceeds max received command-id: " + Range.newInstance(lower, upper));
             }
 
-            processed.add(range);
+            processed.add(lower, upper);
+
             Range first = processed.getFirst();
-            int lower = first.getLower();
-            int upper = first.getUpper();
+
+            int flower = first.getLower();
+            int fupper = first.getUpper();
             int old = maxProcessed;
-            if (le(lower, maxProcessed + 1))
+            if (le(flower, maxProcessed + 1))
             {
-                maxProcessed = max(maxProcessed, upper);
+                maxProcessed = max(maxProcessed, fupper);
             }
             boolean synced = ge(maxProcessed, syncPoint);
             flush = lt(old, syncPoint) && synced;
@@ -442,7 +450,7 @@ public class Session extends SessionInvo
 
     void flushExpected()
     {
-        RangeSet rs = new RangeSet();
+        RangeSet rs = RangeSetFactory.createRangeSet();
         synchronized (processedLock)
         {
             if (incomingInit)
@@ -478,7 +486,7 @@ public class Session extends SessionInvo
     {
         synchronized (processedLock)
         {
-            RangeSet newProcessed = new RangeSet();
+            RangeSet newProcessed = RangeSetFactory.createRangeSet();
             for (Range pr : processed)
             {
                 for (Range kr : kc)
@@ -534,7 +542,12 @@ public class Session extends SessionInvo
             {
                 maxComplete = max(maxComplete, upper);
             }
-            log.debug("%s   commands remaining: %s", this, commandsOut - maxComplete);
+
+            if(log.isDebugEnabled())
+            {
+                log.debug("%s   commands remaining: %s", this, commandsOut - maxComplete);
+            }
+
             commands.notifyAll();
             return gt(maxComplete, old);
         }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java Wed Dec 28 13:02:41 2011
@@ -91,21 +91,38 @@ public class SessionDelegate
     {
         RangeSet ranges = cmp.getCommands();
         RangeSet known = null;
-        if (cmp.getTimelyReply())
-        {
-            known = new RangeSet();
-        }
 
         if (ranges != null)
         {
-            for (Range range : ranges)
+            if(ranges.size() == 1)
             {
+                Range range = ranges.getFirst();
                 boolean advanced = ssn.complete(range.getLower(), range.getUpper());
-                if (advanced && known != null)
+
+                if(advanced && cmp.getTimelyReply())
                 {
-                    known.add(range);
+                    known = range;
                 }
             }
+            else
+            {
+                if (cmp.getTimelyReply())
+                {
+                    known = RangeSetFactory.createRangeSet();
+                }
+                for (Range range : ranges)
+                {
+                    boolean advanced = ssn.complete(range.getLower(), range.getUpper());
+                    if (advanced && known != null)
+                    {
+                        known.add(range);
+                    }
+                }
+            }
+        }
+        else if (cmp.getTimelyReply())
+        {
+            known = RangeSetFactory.createRangeSet();
         }
 
         if (known != null)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org