You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/12/28 14:02:48 UTC

svn commit: r1225178 [6/8] - in /qpid/trunk/qpid/java: ./ bdbstore/src/main/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/ bdbstore/src/test/ bdbstore/src/test/jav...

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Wed Dec 28 13:02:41 2011
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.client.protocol;
 
+import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -409,10 +410,10 @@ public class AMQProtocolHandler implemen
             final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
 
             // Decode buffer
-
-            for (AMQDataBlock message : dataBlocks)
+            int size = dataBlocks.size();
+            for (int i = 0; i < size; i++)
             {
-
+                AMQDataBlock message = dataBlocks.get(i);
                     if (PROTOCOL_DEBUG)
                     {
                         _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
@@ -420,10 +421,10 @@ public class AMQProtocolHandler implemen
 
                     if(message instanceof AMQFrame)
                     {
-                        final boolean debug = _logger.isDebugEnabled();
+
                         final long msgNumber = ++_messageReceivedCount;
 
-                        if (debug && ((msgNumber % 1000) == 0))
+                        if (((msgNumber % 1000) == 0) && _logger.isDebugEnabled())
                         {
                             _logger.debug("Received " + _messageReceivedCount + " protocol messages");
                         }
@@ -514,12 +515,20 @@ public class AMQProtocolHandler implemen
         return getStateManager().createWaiter(states);
     }
 
-    public  synchronized void writeFrame(AMQDataBlock frame)
+    public void writeFrame(AMQDataBlock frame)
+    {
+        writeFrame(frame, true);
+    }
+
+    public  synchronized void writeFrame(AMQDataBlock frame, boolean flush)
     {
         final ByteBuffer buf = asByteBuffer(frame);
         _writtenBytes += buf.remaining();
         _sender.send(buf);
-        _sender.flush();
+        if(flush)
+        {
+            _sender.flush();
+        }
 
         if (PROTOCOL_DEBUG)
         {
@@ -539,35 +548,51 @@ public class AMQProtocolHandler implemen
 
     }
 
+    private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024;
+    private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY];
+    private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(_reusableBytes);
+    private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes);
+
     private ByteBuffer asByteBuffer(AMQDataBlock block)
     {
-        final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize());
+        final int size = (int) block.getSize();
 
-        try
-        {
-            block.writePayload(new DataOutputStream(new OutputStream()
-            {
+        final byte[] data;
 
 
-                @Override
-                public void write(int b) throws IOException
-                {
-                    buf.put((byte) b);
-                }
+        if(size > REUSABLE_BYTE_BUFFER_CAPACITY)
+        {
+            data= new byte[size];
+        }
+        else
+        {
 
-                @Override
-                public void write(byte[] b, int off, int len) throws IOException
-                {
-                    buf.put(b, off, len);
-                }
-            }));
+            data = _reusableBytes;
+        }
+        _reusableDataOutput.setBuffer(data);
+
+        try
+        {
+            block.writePayload(_reusableDataOutput);
         }
         catch (IOException e)
         {
             throw new RuntimeException(e);
         }
 
-        buf.flip();
+        final ByteBuffer buf;
+
+        if(size < REUSABLE_BYTE_BUFFER_CAPACITY)
+        {
+            buf = _reusableByteBuffer;
+            buf.position(0);
+        }
+        else
+        {
+            buf = ByteBuffer.wrap(data);
+        }
+        buf.limit(_reusableDataOutput.length());
+
         return buf;
     }
 
@@ -840,4 +865,160 @@ public class AMQProtocolHandler implemen
         return _suggestedProtocolVersion;
     }
 
+    private static class BytesDataOutput implements DataOutput
+        {
+            int _pos = 0;
+            byte[] _buf;
+
+            public BytesDataOutput(byte[] buf)
+            {
+                _buf = buf;
+            }
+
+            public void setBuffer(byte[] buf)
+            {
+                _buf = buf;
+                _pos = 0;
+            }
+
+            public void reset()
+            {
+                _pos = 0;
+            }
+
+            public int length()
+            {
+                return _pos;
+            }
+
+            public void write(int b)
+            {
+                _buf[_pos++] = (byte) b;
+            }
+
+            public void write(byte[] b)
+            {
+                System.arraycopy(b, 0, _buf, _pos, b.length);
+                _pos+=b.length;
+            }
+
+
+            public void write(byte[] b, int off, int len)
+            {
+                System.arraycopy(b, off, _buf, _pos, len);
+                _pos+=len;
+
+            }
+
+            public void writeBoolean(boolean v)
+            {
+                _buf[_pos++] = v ? (byte) 1 : (byte) 0;
+            }
+
+            public void writeByte(int v)
+            {
+                _buf[_pos++] = (byte) v;
+            }
+
+            public void writeShort(int v)
+            {
+                _buf[_pos++] = (byte) (v >>> 8);
+                _buf[_pos++] = (byte) v;
+            }
+
+            public void writeChar(int v)
+            {
+                _buf[_pos++] = (byte) (v >>> 8);
+                _buf[_pos++] = (byte) v;
+            }
+
+            public void writeInt(int v)
+            {
+                _buf[_pos++] = (byte) (v >>> 24);
+                _buf[_pos++] = (byte) (v >>> 16);
+                _buf[_pos++] = (byte) (v >>> 8);
+                _buf[_pos++] = (byte) v;
+            }
+
+            public void writeLong(long v)
+            {
+                _buf[_pos++] = (byte) (v >>> 56);
+                _buf[_pos++] = (byte) (v >>> 48);
+                _buf[_pos++] = (byte) (v >>> 40);
+                _buf[_pos++] = (byte) (v >>> 32);
+                _buf[_pos++] = (byte) (v >>> 24);
+                _buf[_pos++] = (byte) (v >>> 16);
+                _buf[_pos++] = (byte) (v >>> 8);
+                _buf[_pos++] = (byte)v;
+            }
+
+            public void writeFloat(float v)
+            {
+                writeInt(Float.floatToIntBits(v));
+            }
+
+            public void writeDouble(double v)
+            {
+                writeLong(Double.doubleToLongBits(v));
+            }
+
+            public void writeBytes(String s)
+            {
+                int len = s.length();
+                for (int i = 0 ; i < len ; i++)
+                {
+                    _buf[_pos++] = ((byte)s.charAt(i));
+                }
+            }
+
+            public void writeChars(String s)
+            {
+                int len = s.length();
+                for (int i = 0 ; i < len ; i++)
+                {
+                    int v = s.charAt(i);
+                    _buf[_pos++] = (byte) (v >>> 8);
+                    _buf[_pos++] = (byte) v;
+                }
+            }
+
+            public void writeUTF(String s)
+            {
+                int strlen = s.length();
+
+                int pos = _pos;
+                _pos+=2;
+
+
+                for (int i = 0; i < strlen; i++)
+                {
+                    int c = s.charAt(i);
+                    if ((c >= 0x0001) && (c <= 0x007F))
+                    {
+                        c = s.charAt(i);
+                        _buf[_pos++] = (byte) c;
+
+                    }
+                    else if (c > 0x07FF)
+                    {
+                        _buf[_pos++]  = (byte) (0xE0 | ((c >> 12) & 0x0F));
+                        _buf[_pos++]  = (byte) (0x80 | ((c >>  6) & 0x3F));
+                        _buf[_pos++]  = (byte) (0x80 | (c & 0x3F));
+                    }
+                    else
+                    {
+                        _buf[_pos++] = (byte) (0xC0 | ((c >>  6) & 0x1F));
+                        _buf[_pos++]  = (byte) (0x80 | (c & 0x3F));
+                    }
+                }
+
+                int len = _pos - (pos + 2);
+
+                _buf[pos++] = (byte) (len >>> 8);
+                _buf[pos] = (byte) len;
+            }
+
+        }
+
+
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java Wed Dec 28 13:02:41 2011
@@ -49,19 +49,9 @@ public class MessagePartListenerAdapter 
     {
         _currentMsg = new ByteBufferMessage(xfr.getId());
 
-        for (Struct st : xfr.getHeader().getStructs())
-        {
-            if(st instanceof DeliveryProperties)
-            {
-                _currentMsg.setDeliveryProperties((DeliveryProperties)st);
-
-            }
-            else if(st instanceof MessageProperties)
-            {
-                _currentMsg.setMessageProperties((MessageProperties)st);
-            }
-
-        }
+        Header header = xfr.getHeader();
+        _currentMsg.setDeliveryProperties(header.getDeliveryProperties());
+        _currentMsg.setMessageProperties(header.getMessageProperties());
 
 
         ByteBuffer body = xfr.getBody();

Propchange: qpid/trunk/qpid/java/client/src/test/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+

Modified: qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java (original)
+++ qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java Wed Dec 28 13:02:41 2011
@@ -46,8 +46,9 @@ public class BasicMessageConsumer_0_8_Te
         AMQBindingURL burl = new AMQBindingURL(url);
         AMQDestination queue = new AMQQueue(burl);
 
-        AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> testSession = new TestAMQSession(conn);
-        BasicMessageConsumer_0_8 consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
+        TestAMQSession testSession = new TestAMQSession(conn);
+        BasicMessageConsumer_0_8 consumer =
+                new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
 
         assertEquals("Reject behaviour was was not as expected", RejectBehaviour.SERVER, consumer.getRejectBehaviour());
     }
@@ -65,8 +66,9 @@ public class BasicMessageConsumer_0_8_Te
         final AMQBindingURL burl = new AMQBindingURL(url);
         final AMQDestination queue = new AMQQueue(burl);
 
-        final AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> testSession = new TestAMQSession(conn);
-        final BasicMessageConsumer_0_8 consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
+        final TestAMQSession testSession = new TestAMQSession(conn);
+        final BasicMessageConsumer_0_8 consumer =
+                new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
 
         assertEquals("Reject behaviour was was not as expected", RejectBehaviour.NORMAL, consumer.getRejectBehaviour());
     }
@@ -90,8 +92,9 @@ public class BasicMessageConsumer_0_8_Te
 
         assertNull("Reject behaviour should have been null", queue.getRejectBehaviour());
 
-        AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> testSession = new TestAMQSession(conn);
-        BasicMessageConsumer_0_8 consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
+        TestAMQSession testSession = new TestAMQSession(conn);
+        BasicMessageConsumer_0_8 consumer =
+                new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
 
         assertEquals("Reject behaviour was was not as expected", RejectBehaviour.NORMAL, consumer.getRejectBehaviour());
     }

Propchange: qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+

Modified: qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java Wed Dec 28 13:02:41 2011
@@ -29,12 +29,7 @@ import javax.jms.Topic;
 import javax.jms.TopicSubscriber;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.BasicMessageConsumer_0_8;
-import org.apache.qpid.client.BasicMessageProducer_0_8;
-import org.apache.qpid.client.MockAMQConnection;
+import org.apache.qpid.client.*;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.message.AMQMessageDelegateFactory;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
@@ -42,7 +37,7 @@ import org.apache.qpid.filter.MessageFil
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 
-public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
+public class TestAMQSession extends AMQSession_0_8
 {
 
     public TestAMQSession(AMQConnection connection)
@@ -92,7 +87,7 @@ public class TestAMQSession extends AMQS
         return null;
     }
 
-    protected void sendRecover() throws AMQException, FailoverException
+    public void sendRecover() throws AMQException, FailoverException
     {
 
     }

Propchange: qpid/trunk/qpid/java/client/test/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -1 +1,2 @@
+*.iml
 TEST-org.apache.qpid.client.AllClientUnitTests.txt

Modified: qpid/trunk/qpid/java/common/Composite.tpl
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/Composite.tpl?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/Composite.tpl (original)
+++ qpid/trunk/qpid/java/common/Composite.tpl Wed Dec 28 13:02:41 2011
@@ -245,6 +245,11 @@ if segments:
         return this;
     }
 
+    public int getBodySize()
+    {
+        return this.body == null ? 0 : this.body.remaining();
+    }
+
     public final ByteBuffer getBody() {
         if (this.body == null)
         {

Propchange: qpid/trunk/qpid/java/common/src/main/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Wed Dec 28 13:02:41 2011
@@ -24,12 +24,7 @@ import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQDataBlockDecoder;
-import org.apache.qpid.framing.AMQFrameDecodingException;
-import org.apache.qpid.framing.AMQMethodBodyFactory;
-import org.apache.qpid.framing.AMQProtocolVersionException;
-import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
 /**
@@ -193,24 +188,41 @@ public class AMQDecoder
         }
     }
 
+    private static class SimpleDataInputStream extends DataInputStream implements MarkableDataInput
+    {
+        public SimpleDataInputStream(InputStream in)
+        {
+            super(in);
+        }
+
+        public AMQShortString readAMQShortString() throws IOException
+        {
+            return EncodingUtils.readAMQShortString(this);
+        }
+
+    }
+
 
     public ArrayList<AMQDataBlock> decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
     {
 
         // get prior remaining data from accumulator
         ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>();
-        DataInputStream msg;
+        MarkableDataInput msg;
 
 
-        ByteArrayInputStream bais = new ByteArrayInputStream(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining());
+        ByteArrayInputStream bais;
+        DataInput di;
         if(!_remainingBufs.isEmpty())
         {
+             bais = new ByteArrayInputStream(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining());
             _remainingBufs.add(bais);
-            msg = new DataInputStream(new RemainingByteArrayInputStream());
+            msg = new SimpleDataInputStream(new RemainingByteArrayInputStream());
         }
         else
         {
-            msg = new DataInputStream(bais);
+            bais = null;
+            msg = new ByteArrayDataInput(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining());
         }
 
         boolean enoughData = true;
@@ -245,11 +257,24 @@ public class AMQDecoder
                         iterator.remove();
                     }
                 }
-                if(bais.available()!=0)
+
+                if(bais == null)
+                {
+                    if(msg.available()!=0)
+                    {
+                        byte[] remaining = new byte[msg.available()];
+                        msg.read(remaining);
+                        _remainingBufs.add(new ByteArrayInputStream(remaining));
+                    }
+                }
+                else
                 {
-                    byte[] remaining = new byte[bais.available()];
-                    bais.read(remaining);
-                    _remainingBufs.add(new ByteArrayInputStream(remaining));
+                    if(bais.available()!=0)
+                    {
+                        byte[] remaining = new byte[bais.available()];
+                        bais.read(remaining);
+                        _remainingBufs.add(new ByteArrayInputStream(remaining));
+                    }
                 }
             }
         }

Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java?rev=1225178&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java Wed Dec 28 13:02:41 2011
@@ -0,0 +1,21 @@
+package org.apache.qpid.codec;
+
+import org.apache.qpid.framing.AMQShortString;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+public interface MarkableDataInput extends DataInput
+{
+    public void mark(int pos);
+    public void reset() throws IOException;
+
+    int available() throws IOException;
+
+    long skip(long i) throws IOException;
+
+    int read(byte[] b) throws IOException;
+
+    public AMQShortString readAMQShortString() throws IOException;
+
+}

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java Wed Dec 28 13:02:41 2011
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.framing;
 
+import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
@@ -36,7 +37,7 @@ public interface AMQBody
      */
     public abstract int getSize();
     
-    public void writePayload(DataOutputStream buffer) throws IOException;
+    public void writePayload(DataOutput buffer) throws IOException;
     
     void handle(final int channelId, final AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException;
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java Wed Dec 28 13:02:41 2011
@@ -21,6 +21,7 @@
 package org.apache.qpid.framing;
 
 import java.io.DataInputStream;
+import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
@@ -42,6 +43,6 @@ public abstract class AMQDataBlock imple
      * Writes the datablock to the specified buffer.
      * @param buffer
      */
-    public abstract void writePayload(DataOutputStream buffer) throws IOException;
+    public abstract void writePayload(DataOutput buffer) throws IOException;
 
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java Wed Dec 28 13:02:41 2011
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.framing;
 
+import org.apache.qpid.codec.MarkableDataInput;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +44,7 @@ public class AMQDataBlockDecoder
     public AMQDataBlockDecoder()
     { }
 
-    public boolean decodable(DataInputStream in) throws AMQFrameDecodingException, IOException
+    public boolean decodable(MarkableDataInput in) throws AMQFrameDecodingException, IOException
     {
         final int remainingAfterAttributes = in.available() - (1 + 2 + 4 + 1);
         // type, channel, body length and end byte
@@ -65,7 +66,7 @@ public class AMQDataBlockDecoder
 
     }
 
-    public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, DataInputStream in)
+    public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, MarkableDataInput in)
             throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
     {
         final byte type = in.readByte();

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java Wed Dec 28 13:02:41 2011
@@ -20,9 +20,10 @@
  */
 package org.apache.qpid.framing;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import org.apache.qpid.codec.MarkableDataInput;
+
+import java.io.*;
+import java.io.DataOutput;
 
 public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
 {
@@ -38,7 +39,7 @@ public class AMQFrame extends AMQDataBlo
         _bodyFrame = bodyFrame;
     }
 
-    public AMQFrame(final DataInputStream in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException, IOException
+    public AMQFrame(final MarkableDataInput in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException, IOException
     {
         this._channel = channel;
         this._bodyFrame = bodyFactory.createBody(in,bodySize);
@@ -55,7 +56,7 @@ public class AMQFrame extends AMQDataBlo
     }
 
 
-    public void writePayload(DataOutputStream buffer) throws IOException
+    public void writePayload(DataOutput buffer) throws IOException
     {
         buffer.writeByte(_bodyFrame.getFrameType());
         EncodingUtils.writeUnsignedShort(buffer, _channel);
@@ -79,7 +80,7 @@ public class AMQFrame extends AMQDataBlo
         return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame);
     }
 
-    public static void writeFrame(DataOutputStream buffer, final int channel, AMQBody body) throws IOException
+    public static void writeFrame(DataOutput buffer, final int channel, AMQBody body) throws IOException
     {
         buffer.writeByte(body.getFrameType());
         EncodingUtils.writeUnsignedShort(buffer, channel);
@@ -89,7 +90,7 @@ public class AMQFrame extends AMQDataBlo
 
     }
 
-    public static void writeFrames(DataOutputStream buffer, final int channel, AMQBody body1, AMQBody body2) throws IOException
+    public static void writeFrames(DataOutput buffer, final int channel, AMQBody body1, AMQBody body2) throws IOException
     {
         buffer.writeByte(body1.getFrameType());
         EncodingUtils.writeUnsignedShort(buffer, channel);
@@ -104,7 +105,7 @@ public class AMQFrame extends AMQDataBlo
 
     }
 
-    public static void writeFrames(DataOutputStream buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3) throws IOException
+    public static void writeFrames(DataOutput buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3) throws IOException
     {
         buffer.writeByte(body1.getFrameType());
         EncodingUtils.writeUnsignedShort(buffer, channel);

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java Wed Dec 28 13:02:41 2011
@@ -25,6 +25,7 @@ import org.apache.qpid.AMQConnectionExce
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
 
+import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
@@ -45,12 +46,12 @@ public interface AMQMethodBody extends A
     /** @return unsigned short */
     public int getMethod();
 
-    public void writeMethodPayload(DataOutputStream buffer) throws IOException;
+    public void writeMethodPayload(DataOutput buffer) throws IOException;
 
 
     public int getSize();
 
-    public void writePayload(DataOutputStream buffer) throws IOException;
+    public void writePayload(DataOutput buffer) throws IOException;
 
     //public abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException;
 

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java Wed Dec 28 13:02:41 2011
@@ -20,11 +20,13 @@
  */
 package org.apache.qpid.framing;
 
+import org.apache.qpid.codec.MarkableDataInput;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.IOException;
 
@@ -39,7 +41,7 @@ public class AMQMethodBodyFactory implem
         _protocolSession = protocolSession;
     }
 
-    public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException
+    public AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException, IOException
     {
         return _protocolSession.getMethodRegistry().convertToBody(in, bodySize);
     }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java Wed Dec 28 13:02:41 2011
@@ -24,11 +24,12 @@ package org.apache.qpid.framing;
 import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.MarkableDataInput;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 
 public abstract class AMQMethodBodyImpl implements AMQMethodBody
@@ -101,7 +102,7 @@ public abstract class AMQMethodBodyImpl 
         return 2 + 2 + getBodySize();
     }
 
-    public void writePayload(DataOutputStream buffer) throws IOException
+    public void writePayload(DataOutput buffer) throws IOException
     {
         EncodingUtils.writeUnsignedShort(buffer, getClazz());
         EncodingUtils.writeUnsignedShort(buffer, getMethod());
@@ -109,14 +110,15 @@ public abstract class AMQMethodBodyImpl 
     }
 
 
-    protected byte readByte(DataInputStream buffer) throws IOException
+    protected byte readByte(DataInput buffer) throws IOException
     {
         return buffer.readByte();
     }
 
-    protected AMQShortString readAMQShortString(DataInputStream buffer) throws IOException
+    protected AMQShortString readAMQShortString(MarkableDataInput buffer) throws IOException
     {
-        return EncodingUtils.readAMQShortString(buffer);
+        AMQShortString str = buffer.readAMQShortString();
+        return str == null ? null : str.intern(false);
     }
 
     protected int getSizeOf(AMQShortString string)
@@ -124,27 +126,27 @@ public abstract class AMQMethodBodyImpl 
         return EncodingUtils.encodedShortStringLength(string);
     }
 
-    protected void writeByte(DataOutputStream buffer, byte b) throws IOException
+    protected void writeByte(DataOutput buffer, byte b) throws IOException
     {
         buffer.writeByte(b);
     }
 
-    protected void writeAMQShortString(DataOutputStream buffer, AMQShortString string) throws IOException
+    protected void writeAMQShortString(DataOutput buffer, AMQShortString string) throws IOException
     {
         EncodingUtils.writeShortStringBytes(buffer, string);
     }
 
-    protected int readInt(DataInputStream buffer) throws IOException
+    protected int readInt(DataInput buffer) throws IOException
     {
         return buffer.readInt();
     }
 
-    protected void writeInt(DataOutputStream buffer, int i) throws IOException
+    protected void writeInt(DataOutput buffer, int i) throws IOException
     {
         buffer.writeInt(i);
     }
 
-    protected FieldTable readFieldTable(DataInputStream buffer) throws AMQFrameDecodingException, IOException
+    protected FieldTable readFieldTable(DataInput buffer) throws AMQFrameDecodingException, IOException
     {
         return EncodingUtils.readFieldTable(buffer);
     }
@@ -154,17 +156,17 @@ public abstract class AMQMethodBodyImpl 
         return EncodingUtils.encodedFieldTableLength(table);  //To change body of created methods use File | Settings | File Templates.
     }
 
-    protected void writeFieldTable(DataOutputStream buffer, FieldTable table) throws IOException
+    protected void writeFieldTable(DataOutput buffer, FieldTable table) throws IOException
     {
         EncodingUtils.writeFieldTableBytes(buffer, table);
     }
 
-    protected long readLong(DataInputStream buffer) throws IOException
+    protected long readLong(DataInput buffer) throws IOException
     {
         return buffer.readLong();
     }
 
-    protected void writeLong(DataOutputStream buffer, long l) throws IOException
+    protected void writeLong(DataOutput buffer, long l) throws IOException
     {
         buffer.writeLong(l);
     }
@@ -174,27 +176,27 @@ public abstract class AMQMethodBodyImpl 
         return (response == null) ? 4 : response.length + 4;
     }
 
-    protected void writeBytes(DataOutputStream buffer, byte[] data) throws IOException
+    protected void writeBytes(DataOutput buffer, byte[] data) throws IOException
     {
         EncodingUtils.writeBytes(buffer,data);
     }
 
-    protected byte[] readBytes(DataInputStream buffer) throws IOException
+    protected byte[] readBytes(DataInput buffer) throws IOException
     {
         return EncodingUtils.readBytes(buffer);
     }
 
-    protected short readShort(DataInputStream buffer) throws IOException
+    protected short readShort(DataInput buffer) throws IOException
     {
         return EncodingUtils.readShort(buffer);
     }
 
-    protected void writeShort(DataOutputStream buffer, short s) throws IOException
+    protected void writeShort(DataOutput buffer, short s) throws IOException
     {
         EncodingUtils.writeShort(buffer, s);
     }
 
-    protected Content readContent(DataInputStream buffer)
+    protected Content readContent(DataInput buffer)
     {
         return null;
     }
@@ -204,56 +206,56 @@ public abstract class AMQMethodBodyImpl 
         return 0;
     }
 
-    protected void writeContent(DataOutputStream buffer, Content body)
+    protected void writeContent(DataOutput buffer, Content body)
     {
     }
 
-    protected byte readBitfield(DataInputStream buffer) throws IOException
+    protected byte readBitfield(DataInput buffer) throws IOException
     {
         return readByte(buffer);
     }
 
-    protected int readUnsignedShort(DataInputStream buffer) throws IOException
+    protected int readUnsignedShort(DataInput buffer) throws IOException
     {
         return buffer.readUnsignedShort();
     }
 
-    protected void writeBitfield(DataOutputStream buffer, byte bitfield0) throws IOException
+    protected void writeBitfield(DataOutput buffer, byte bitfield0) throws IOException
     {
         buffer.writeByte(bitfield0);
     }
 
-    protected void writeUnsignedShort(DataOutputStream buffer, int s) throws IOException
+    protected void writeUnsignedShort(DataOutput buffer, int s) throws IOException
     {
         EncodingUtils.writeUnsignedShort(buffer, s);
     }
 
-    protected long readUnsignedInteger(DataInputStream buffer) throws IOException
+    protected long readUnsignedInteger(DataInput buffer) throws IOException
     {
         return EncodingUtils.readUnsignedInteger(buffer);
     }
-    protected void writeUnsignedInteger(DataOutputStream buffer, long i) throws IOException
+    protected void writeUnsignedInteger(DataOutput buffer, long i) throws IOException
     {
         EncodingUtils.writeUnsignedInteger(buffer, i);
     }
 
 
-    protected short readUnsignedByte(DataInputStream buffer) throws IOException
+    protected short readUnsignedByte(DataInput buffer) throws IOException
     {
         return (short) buffer.readUnsignedByte();
     }
 
-    protected void writeUnsignedByte(DataOutputStream buffer, short unsignedByte) throws IOException
+    protected void writeUnsignedByte(DataOutput buffer, short unsignedByte) throws IOException
     {
         EncodingUtils.writeUnsignedByte(buffer, unsignedByte);
     }
 
-    protected long readTimestamp(DataInputStream buffer) throws IOException
+    protected long readTimestamp(DataInput buffer) throws IOException
     {
         return EncodingUtils.readTimestamp(buffer);
     }
 
-    protected void writeTimestamp(DataOutputStream buffer, long t) throws IOException
+    protected void writeTimestamp(DataOutput buffer, long t) throws IOException
     {
         EncodingUtils.writeTimestamp(buffer, t);
     }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java Wed Dec 28 13:02:41 2011
@@ -21,11 +21,12 @@
 
 package org.apache.qpid.framing;
 
-import java.io.DataInputStream;
+import org.apache.qpid.codec.MarkableDataInput;
+
 import java.io.IOException;
 
 
 public abstract interface AMQMethodBodyInstanceFactory
 {
-    public AMQMethodBody newInstance(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException;
+    public AMQMethodBody newInstance(MarkableDataInput buffer, long size) throws AMQFrameDecodingException, IOException;
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java Wed Dec 28 13:02:41 2011
@@ -24,8 +24,9 @@ package org.apache.qpid.framing;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.DataInput;
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.*;
 import java.lang.ref.WeakReference;
@@ -93,22 +94,44 @@ public final class AMQShortString implem
 
     private AMQShortString substring(final int from, final int to)
     {
-        return new AMQShortString(_data, from+_offset, to+_offset);
+        return new AMQShortString(_data, from+_offset, to-from);
     }
 
 
-    private static final ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>> _localInternMap =
-            new ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>>()
+    private static final int LOCAL_INTERN_CACHE_SIZE = 2048;
+
+    private static final ThreadLocal<Map<AMQShortString, AMQShortString>> _localInternMap =
+            new ThreadLocal<Map<AMQShortString, AMQShortString>>()
             {
-                protected Map<AMQShortString, WeakReference<AMQShortString>> initialValue()
+                protected Map<AMQShortString, AMQShortString> initialValue()
                 {
-                    return new WeakHashMap<AMQShortString, WeakReference<AMQShortString>>();
+                    return new LinkedHashMap<AMQShortString, AMQShortString>()
+                    {
+
+                        protected boolean removeEldestEntry(Map.Entry<AMQShortString, AMQShortString> eldest)
+                        {
+                            return size() > LOCAL_INTERN_CACHE_SIZE;
+                        }
+                    };
                 };
             };
 
     private static final Map<AMQShortString, WeakReference<AMQShortString>> _globalInternMap =
             new WeakHashMap<AMQShortString, WeakReference<AMQShortString>>();
 
+
+    private static final ThreadLocal<Map<String, WeakReference<AMQShortString>>> _localStringMap =
+            new ThreadLocal<Map<String, WeakReference<AMQShortString>>>()
+            {
+                protected Map<String, WeakReference<AMQShortString>> initialValue()
+                {
+                    return new WeakHashMap<String, WeakReference<AMQShortString>>();
+                };
+            };
+
+    private static final Map<String, WeakReference<AMQShortString>> _globalStringMap =
+            new WeakHashMap<String, WeakReference<AMQShortString>>();
+
     private static final Logger _logger = LoggerFactory.getLogger(AMQShortString.class);
 
     private final byte[] _data;
@@ -200,32 +223,32 @@ public final class AMQShortString implem
 
     }
 
-    private AMQShortString(DataInputStream data, final int length) throws IOException
+    private AMQShortString(DataInput data, final int length) throws IOException
     {
         if (length > MAX_LENGTH)
         {
             throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
         }
         byte[] dataBytes = new byte[length];
-        data.read(dataBytes);
+        data.readFully(dataBytes);
         _data = dataBytes;
         _offset = 0;
         _length = length;
 
     }
 
-    private AMQShortString(final byte[] data, final int from, final int to)
+    public AMQShortString(byte[] data, final int offset, final int length)
     {
-        if (data == null)
-        {
-            throw new NullPointerException("Cannot create AMQShortString with null data[]");
-        }
-        int length = to - from;
         if (length > MAX_LENGTH)
         {
             throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
         }
-        _offset = from;
+        if (data == null)
+        {
+            throw new NullPointerException("Cannot create AMQShortString with null data[]");
+        }
+
+        _offset = offset;
         _length = length;
         _data = data;
     }
@@ -234,9 +257,7 @@ public final class AMQShortString implem
     {
         if(_data.length != _length)
         {
-            byte[] dataBytes = new byte[_length];
-            System.arraycopy(_data,_offset,dataBytes,0,_length);
-            return new AMQShortString(dataBytes,0,_length);
+            return copy();
         }
         else
         {
@@ -265,7 +286,7 @@ public final class AMQShortString implem
         return new CharSubSequence(start, end);
     }
 
-    public static AMQShortString readFromBuffer(DataInputStream buffer) throws IOException
+    public static AMQShortString readFromBuffer(DataInput buffer) throws IOException
     {
         final int length = buffer.readUnsignedByte();
         if (length == 0)
@@ -293,12 +314,12 @@ public final class AMQShortString implem
         }
     }
 
-    public void writeToBuffer(DataOutputStream buffer) throws IOException
+    public void writeToBuffer(DataOutput buffer) throws IOException
     {
 
         final int size = length();
         //buffer.setAutoExpand(true);
-        buffer.write((byte) size);
+        buffer.writeByte(size);
         buffer.write(_data, _offset, size);
 
     }
@@ -420,7 +441,17 @@ public final class AMQShortString implem
     {
         if (_asString == null)
         {
-            _asString = new String(asChars());
+            AMQShortString intern = intern();
+
+            if(intern == this)
+            {
+                _asString = new String(asChars());
+            }
+            else
+            {
+                _asString = intern.asString();
+            }
+
         }
         return _asString;
     }
@@ -609,42 +640,51 @@ public final class AMQShortString implem
 
     public AMQShortString intern()
     {
+        return intern(true);
+    }
+
+    public AMQShortString intern(boolean keep)
+    {
 
         hashCode();
 
-        Map<AMQShortString, WeakReference<AMQShortString>> localMap =
+        Map<AMQShortString, AMQShortString> localMap =
                 _localInternMap.get();
 
-        WeakReference<AMQShortString> ref = localMap.get(this);
-        AMQShortString internString;
+        AMQShortString internString = localMap.get(this);
 
-        if(ref != null)
+
+        if(internString != null)
         {
-            internString = ref.get();
-            if(internString != null)
-            {
-                return internString;
-            }
+            return internString;
         }
 
 
+        WeakReference<AMQShortString> ref;
         synchronized(_globalInternMap)
         {
 
             ref = _globalInternMap.get(this);
             if((ref == null) || ((internString = ref.get()) == null))
             {
-                internString = shrink();
+                internString = keep ? shrink() : copy();
                 ref = new WeakReference(internString);
                 _globalInternMap.put(internString, ref);
             }
 
         }
-        localMap.put(internString, ref);
+        localMap.put(internString, internString);
         return internString;
 
     }
 
+    private AMQShortString copy()
+    {
+        byte[] dataBytes = new byte[_length];
+        System.arraycopy(_data,_offset,dataBytes,0,_length);
+        return new AMQShortString(dataBytes,0,_length);
+    }
+
     private int occurences(final byte delim)
     {
         int count = 0;
@@ -761,7 +801,46 @@ public final class AMQShortString implem
 
     public static AMQShortString valueOf(Object obj)
     {
-        return obj == null ? null : new AMQShortString(String.valueOf(obj));
+        return obj == null ? null : AMQShortString.valueOf(String.valueOf(obj));
+    }
+
+    public static AMQShortString valueOf(String obj)
+    {
+        if(obj == null)
+        {
+            return null;
+        }
+
+        Map<String, WeakReference<AMQShortString>> localMap =
+                _localStringMap.get();
+
+        WeakReference<AMQShortString> ref = localMap.get(obj);
+        AMQShortString internString;
+
+        if(ref != null)
+        {
+            internString = ref.get();
+            if(internString != null)
+            {
+                return internString;
+            }
+        }
+
+
+        synchronized(_globalStringMap)
+        {
+
+            ref = _globalStringMap.get(obj);
+            if((ref == null) || ((internString = ref.get()) == null))
+            {
+                internString = (new AMQShortString(obj)).intern();
+                ref = new WeakReference<AMQShortString>(internString);
+                _globalStringMap.put(obj, ref);
+            }
+
+        }
+        localMap.put(obj, ref);
+        return internString;
     }
 
 

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java Wed Dec 28 13:02:41 2011
@@ -20,8 +20,8 @@
  */
 package org.apache.qpid.framing;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.math.BigDecimal;
 
@@ -61,12 +61,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, DataOutput buffer) throws IOException
         {
             EncodingUtils.writeLongStringBytes(buffer, (String) value);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(DataInput buffer) throws IOException
         {
             return EncodingUtils.readLongString(buffer);
         }
@@ -107,12 +107,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, DataOutput buffer) throws IOException
         {
             EncodingUtils.writeUnsignedInteger(buffer, (Long) value);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(DataInput buffer) throws IOException
         {
             return EncodingUtils.readUnsignedInteger(buffer);
         }
@@ -138,7 +138,7 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, DataOutput buffer) throws IOException
         {
             BigDecimal bd = (BigDecimal) value;
 
@@ -151,7 +151,7 @@ public enum AMQType
             EncodingUtils.writeInteger(buffer, unscaled);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(DataInput buffer) throws IOException
         {
             byte places = EncodingUtils.readByte(buffer);
 
@@ -183,12 +183,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, DataOutput buffer) throws IOException
         {
             EncodingUtils.writeLong(buffer, (Long) value);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(DataInput buffer) throws IOException
         {
             return EncodingUtils.readLong(buffer);
         }
@@ -247,7 +247,7 @@ public enum AMQType
          * @param value  An instance of the type.
          * @param buffer The byte buffer to write it to.
          */
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, DataOutput buffer) throws IOException
         {
             // Ensure that the value is a FieldTable.
             if (!(value instanceof FieldTable))
@@ -268,7 +268,7 @@ public enum AMQType
          *
          * @return An instance of the type.
          */
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(DataInput buffer) throws IOException
         {
             try
             {
@@ -302,10 +302,10 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer)
+        public void writeValueImpl(Object value, DataOutput buffer)
         { }
 
-        public Object readValueFromBuffer(DataInputStream buffer)
+        public Object readValueFromBuffer(DataInput buffer)
         {
             return null;
         }
@@ -331,12 +331,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, DataOutput buffer) throws IOException
         {
             EncodingUtils.writeLongstr(buffer, (byte[]) value);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(DataInput buffer) throws IOException
         {
             return EncodingUtils.readLongstr(buffer);
         }
@@ -361,12 +361,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, DataOutput buffer) throws IOException
         {
             EncodingUtils.writeLongStringBytes(buffer, (String) value);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(DataInput buffer) throws IOException
         {
             return EncodingUtils.readLongString(buffer);
         }
@@ -392,12 +392,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, DataOutput buffer) throws IOException
         {
             EncodingUtils.writeLongStringBytes(buffer, (String) value);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(DataInput buffer) throws IOException
         {
             return EncodingUtils.readLongString(buffer);
         }
@@ -427,12 +427,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, DataOutput buffer) throws IOException
         {
             EncodingUtils.writeBoolean(buffer, (Boolean) value);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(DataInput buffer) throws IOException
         {
             return EncodingUtils.readBoolean(buffer);
         }
@@ -462,12 +462,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, DataOutput buffer) throws IOException
         {
             EncodingUtils.writeChar(buffer, (Character) value);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(DataInput buffer) throws IOException
         {
             return EncodingUtils.readChar(buffer);
         }
@@ -497,12 +497,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, DataOutput buffer) throws IOException
         {
             EncodingUtils.writeByte(buffer, (Byte) value);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(DataInput buffer) throws IOException
         {
             return EncodingUtils.readByte(buffer);
         }
@@ -536,12 +536,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, DataOutput buffer) throws IOException
         {
             EncodingUtils.writeShort(buffer, (Short) value);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(DataInput buffer) throws IOException
         {
             return EncodingUtils.readShort(buffer);
         }
@@ -578,12 +578,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, DataOutput buffer) throws IOException
         {
             EncodingUtils.writeInteger(buffer, (Integer) value);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(DataInput buffer) throws IOException
         {
             return EncodingUtils.readInteger(buffer);
         }
@@ -596,6 +596,22 @@ public enum AMQType
             return EncodingUtils.encodedLongLength();
         }
 
+        public int getEncodingSize(long value)
+        {
+            return EncodingUtils.encodedLongLength();
+        }
+
+        public AMQTypedValue asTypedValue(long value)
+        {
+            return AMQTypedValue.createAMQTypedValue(value);
+        }
+
+        public void writeToBuffer(long value, DataOutput buffer) throws IOException
+        {
+            buffer.writeByte(identifier());
+            EncodingUtils.writeLong(buffer, value);
+        }
+
         public Object toNativeValue(Object value)
         {
             if (value instanceof Long)
@@ -625,12 +641,18 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, DataOutput buffer) throws IOException
         {
             EncodingUtils.writeLong(buffer, (Long) value);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public long readLongFromBuffer(DataInput buffer) throws IOException
+        {
+            return EncodingUtils.readLong(buffer);
+        }
+
+
+        public Object readValueFromBuffer(DataInput buffer) throws IOException
         {
             return EncodingUtils.readLong(buffer);
         }
@@ -660,12 +682,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, DataOutput buffer) throws IOException
         {
             EncodingUtils.writeFloat(buffer, (Float) value);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(DataInput buffer) throws IOException
         {
             return EncodingUtils.readFloat(buffer);
         }
@@ -699,12 +721,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, DataOutput buffer) throws IOException
         {
             EncodingUtils.writeDouble(buffer, (Double) value);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(DataInput buffer) throws IOException
         {
             return EncodingUtils.readDouble(buffer);
         }
@@ -761,7 +783,7 @@ public enum AMQType
      */
     public AMQTypedValue asTypedValue(Object value)
     {
-        return new AMQTypedValue(this, toNativeValue(value));
+        return AMQTypedValue.createAMQTypedValue(this, toNativeValue(value));
     }
 
     /**
@@ -771,7 +793,7 @@ public enum AMQType
      * @param value  An instance of the type.
      * @param buffer The byte buffer to write it to.
      */
-    public void writeToBuffer(Object value, DataOutputStream buffer) throws IOException
+    public void writeToBuffer(Object value, DataOutput buffer) throws IOException
     {
         buffer.writeByte(identifier());
         writeValueImpl(value, buffer);
@@ -783,7 +805,7 @@ public enum AMQType
      * @param value  An instance of the type.
      * @param buffer The byte buffer to write it to.
      */
-    abstract void writeValueImpl(Object value, DataOutputStream buffer) throws IOException;
+    abstract void writeValueImpl(Object value, DataOutput buffer) throws IOException;
 
     /**
      * Reads an instance of the type from a specified byte buffer.
@@ -792,5 +814,5 @@ public enum AMQType
      *
      * @return An instance of the type.
      */
-    abstract Object readValueFromBuffer(DataInputStream buffer) throws IOException;
+    abstract Object readValueFromBuffer(DataInput buffer) throws IOException;
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java Wed Dec 28 13:02:41 2011
@@ -20,9 +20,7 @@
  */
 package org.apache.qpid.framing;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.io.*;
 import java.util.Date;
 import java.util.Map;
 import java.math.BigDecimal;
@@ -42,81 +40,218 @@ import java.math.BigDecimal;
  * <tr><td> Extract the value from a fully typed AMQP value.
  * </table>
  */
-public class AMQTypedValue
+public abstract class AMQTypedValue
 {
-    /** The type of the value. */
-    private final AMQType _type;
 
-    /** The Java native representation of the AMQP typed value. */
-    private final Object _value;
+    public abstract AMQType getType();
 
-    public AMQTypedValue(AMQType type, Object value)
+    public abstract Object getValue();
+
+    public abstract void writeToBuffer(DataOutput buffer) throws IOException;
+
+    public abstract int getEncodingSize();
+
+
+    private static final class GenericTypedValue extends AMQTypedValue
     {
-        if (type == null)
+        /** The type of the value. */
+        private final AMQType _type;
+
+        /** The Java native representation of the AMQP typed value. */
+        private final Object _value;
+
+        private GenericTypedValue(AMQType type, Object value)
         {
-            throw new NullPointerException("Cannot create a typed value with null type");
+            if (type == null)
+            {
+                throw new NullPointerException("Cannot create a typed value with null type");
+            }
+
+            _type = type;
+            _value = type.toNativeValue(value);
         }
 
-        _type = type;
-        _value = type.toNativeValue(value);
-    }
+        private GenericTypedValue(AMQType type, DataInput buffer) throws IOException
+        {
+            _type = type;
+            _value = type.readValueFromBuffer(buffer);
+        }
+
+
+        public AMQType getType()
+        {
+            return _type;
+        }
+
+        public Object getValue()
+        {
+            return _value;
+        }
+
+        public void writeToBuffer(DataOutput buffer) throws IOException
+        {
+            _type.writeToBuffer(_value, buffer);
+        }
+
+        public int getEncodingSize()
+        {
+            return _type.getEncodingSize(_value);
+        }
+
+
+        public String toString()
+        {
+            return "[" + getType() + ": " + getValue() + "]";
+        }
+
+
+        public boolean equals(Object o)
+        {
+            if(o instanceof GenericTypedValue)
+            {
+                GenericTypedValue other = (GenericTypedValue) o;
+                return _type == other._type && (_value == null ? other._value == null : _value.equals(other._value));
+            }
+            else
+            {
+                return false;
+            }
+        }
+
+        public int hashCode()
+        {
+            return _type.hashCode() ^ (_value == null ? 0 : _value.hashCode());
+        }
 
-    private AMQTypedValue(AMQType type, DataInputStream buffer) throws IOException
-    {
-        _type = type;
-        _value = type.readValueFromBuffer(buffer);
     }
 
-    public AMQType getType()
+    private static final class LongTypedValue extends AMQTypedValue
     {
-        return _type;
+
+        private final long _value;
+
+        private LongTypedValue(long value)
+        {
+            _value = value;
+        }
+
+        public LongTypedValue(DataInput buffer) throws IOException
+        {
+            _value = EncodingUtils.readLong(buffer);
+        }
+
+        public AMQType getType()
+        {
+            return AMQType.LONG;
+        }
+
+
+        public Object getValue()
+        {
+            return _value;
+        }
+
+        public void writeToBuffer(DataOutput buffer) throws IOException
+        {
+            EncodingUtils.writeByte(buffer,AMQType.LONG.identifier());
+            EncodingUtils.writeLong(buffer,_value);
+        }
+
+
+        public int getEncodingSize()
+        {
+            return EncodingUtils.encodedLongLength();
+        }
     }
 
-    public Object getValue()
+    private static final class IntTypedValue extends AMQTypedValue
     {
-        return _value;
+
+        private final int _value;
+
+        public IntTypedValue(int value)
+        {
+            _value = value;
+        }
+
+        public AMQType getType()
+        {
+            return AMQType.INT;
+        }
+
+
+        public Object getValue()
+        {
+            return _value;
+        }
+
+        public void writeToBuffer(DataOutput buffer) throws IOException
+        {
+            EncodingUtils.writeByte(buffer,AMQType.INT.identifier());
+            EncodingUtils.writeInteger(buffer, _value);
+        }
+
+
+        public int getEncodingSize()
+        {
+            return EncodingUtils.encodedIntegerLength();
+        }
     }
 
-    public void writeToBuffer(DataOutputStream buffer) throws IOException
+
+    public static AMQTypedValue readFromBuffer(DataInput buffer) throws IOException
     {
-        _type.writeToBuffer(_value, buffer);
+        AMQType type = AMQTypeMap.getType(buffer.readByte());
+
+        switch(type)
+        {
+            case LONG:
+                return new LongTypedValue(buffer);
+
+            case INT:
+                int value = EncodingUtils.readInteger(buffer);
+                return createAMQTypedValue(value);
+
+            default:
+                return new GenericTypedValue(type, buffer);
+        }
+
     }
 
-    public int getEncodingSize()
+    private static final AMQTypedValue[] INT_VALUES = new AMQTypedValue[16];
+    static
     {
-        return _type.getEncodingSize(_value);
+        for(int i = 0 ; i < 16; i ++)
+        {
+            INT_VALUES[i] = new IntTypedValue(i);
+        }
     }
 
-    public static AMQTypedValue readFromBuffer(DataInputStream buffer) throws IOException
+    public static AMQTypedValue createAMQTypedValue(int i)
     {
-        AMQType type = AMQTypeMap.getType(buffer.readByte());
-
-        return new AMQTypedValue(type, buffer);
+        return (i & 0x0f) == i ? INT_VALUES[i] : new IntTypedValue(i);
     }
 
-    public String toString()
+
+    public static AMQTypedValue createAMQTypedValue(long value)
     {
-        return "[" + getType() + ": " + getValue() + "]";
+        return new LongTypedValue(value);
     }
 
-
-    public boolean equals(Object o)
+    public static AMQTypedValue createAMQTypedValue(AMQType type, Object value)
     {
-        if(o instanceof AMQTypedValue)
+        switch(type)
         {
-            AMQTypedValue other = (AMQTypedValue) o;
-            return _type == other._type && (_value == null ? other._value == null : _value.equals(other._value));
-        }
-        else
-        {
-            return false;
+            case LONG:
+                return new LongTypedValue((Long) AMQType.LONG.toNativeValue(value));
+            case INT:
+                return new IntTypedValue((Integer) AMQType.INT.toNativeValue(value));
+
+            default:
+                return new GenericTypedValue(type, value);
         }
     }
 
-    public int hashCode()
-    {
-        return _type.hashCode() ^ (_value == null ? 0 : _value.hashCode());
-    }
 
 
     public static AMQTypedValue toTypedValue(Object val)

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java Wed Dec 28 13:02:41 2011
@@ -20,8 +20,9 @@
  */
 package org.apache.qpid.framing;
 
+import java.io.DataInput;
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataOutput;
 import java.io.IOException;
 
 import org.slf4j.Logger;
@@ -80,6 +81,7 @@ public class BasicContentHeaderPropertie
     private static final int USER_ID_MASK = 1 << 4;
     private static final int APPLICATION_ID_MASK = 1 << 3;
     private static final int CLUSTER_ID_MASK = 1 << 2;
+    private byte[] _encodedForm;
 
 
     public BasicContentHeaderProperties()
@@ -87,6 +89,12 @@ public class BasicContentHeaderPropertie
 
     public int getPropertyListSize()
     {
+        if(_encodedForm != null && (_headers == null || _headers.isClean()))
+        {
+            return _encodedForm.length;
+        }
+        else
+        {
             int size = 0;
 
             if ((_propertyFlags & (CONTENT_TYPE_MASK)) > 0)
@@ -167,6 +175,7 @@ public class BasicContentHeaderPropertie
             }
 
             return size;
+        }
     }
 
     public void setPropertyFlags(int propertyFlags)
@@ -179,87 +188,94 @@ public class BasicContentHeaderPropertie
         return _propertyFlags;
     }
 
-    public void writePropertyListPayload(DataOutputStream buffer) throws IOException
+    public void writePropertyListPayload(DataOutput buffer) throws IOException
     {
-        if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
+        if(_encodedForm != null && (_headers == null || !_headers.isClean()))
         {
-            EncodingUtils.writeShortStringBytes(buffer, _contentType);
+            buffer.write(_encodedForm);
         }
-
-        if ((_propertyFlags & ENCODING_MASK) != 0)
+        else
         {
-            EncodingUtils.writeShortStringBytes(buffer, _encoding);
-        }
+            if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
+            {
+                EncodingUtils.writeShortStringBytes(buffer, _contentType);
+            }
 
-        if ((_propertyFlags & HEADERS_MASK) != 0)
-        {
-            EncodingUtils.writeFieldTableBytes(buffer, _headers);
-        }
+            if ((_propertyFlags & ENCODING_MASK) != 0)
+            {
+                EncodingUtils.writeShortStringBytes(buffer, _encoding);
+            }
 
-        if ((_propertyFlags & DELIVERY_MODE_MASK) != 0)
-        {
-            buffer.writeByte(_deliveryMode);
-        }
+            if ((_propertyFlags & HEADERS_MASK) != 0)
+            {
+                EncodingUtils.writeFieldTableBytes(buffer, _headers);
+            }
 
-        if ((_propertyFlags & PRIORITY_MASK) != 0)
-        {
-            buffer.writeByte(_priority);
-        }
+            if ((_propertyFlags & DELIVERY_MODE_MASK) != 0)
+            {
+                buffer.writeByte(_deliveryMode);
+            }
 
-        if ((_propertyFlags & CORRELATION_ID_MASK) != 0)
-        {
-            EncodingUtils.writeShortStringBytes(buffer, _correlationId);
-        }
+            if ((_propertyFlags & PRIORITY_MASK) != 0)
+            {
+                buffer.writeByte(_priority);
+            }
 
-        if ((_propertyFlags & REPLY_TO_MASK) != 0)
-        {
-            EncodingUtils.writeShortStringBytes(buffer, _replyTo);
-        }
+            if ((_propertyFlags & CORRELATION_ID_MASK) != 0)
+            {
+                EncodingUtils.writeShortStringBytes(buffer, _correlationId);
+            }
 
-        if ((_propertyFlags & EXPIRATION_MASK) != 0)
-        {
-            if (_expiration == 0L)
+            if ((_propertyFlags & REPLY_TO_MASK) != 0)
             {
-                EncodingUtils.writeShortStringBytes(buffer, ZERO_STRING);
+                EncodingUtils.writeShortStringBytes(buffer, _replyTo);
             }
-            else
+
+            if ((_propertyFlags & EXPIRATION_MASK) != 0)
             {
-                EncodingUtils.writeShortStringBytes(buffer, String.valueOf(_expiration));
+                if (_expiration == 0L)
+                {
+                    EncodingUtils.writeShortStringBytes(buffer, ZERO_STRING);
+                }
+                else
+                {
+                    EncodingUtils.writeShortStringBytes(buffer, String.valueOf(_expiration));
+                }
             }
-        }
 
-        if ((_propertyFlags & MESSAGE_ID_MASK) != 0)
-        {
-            EncodingUtils.writeShortStringBytes(buffer, _messageId);
-        }
+            if ((_propertyFlags & MESSAGE_ID_MASK) != 0)
+            {
+                EncodingUtils.writeShortStringBytes(buffer, _messageId);
+            }
 
-        if ((_propertyFlags & TIMESTAMP_MASK) != 0)
-        {
-            EncodingUtils.writeTimestamp(buffer, _timestamp);
-        }
+            if ((_propertyFlags & TIMESTAMP_MASK) != 0)
+            {
+                EncodingUtils.writeTimestamp(buffer, _timestamp);
+            }
 
-        if ((_propertyFlags & TYPE_MASK) != 0)
-        {
-            EncodingUtils.writeShortStringBytes(buffer, _type);
-        }
+            if ((_propertyFlags & TYPE_MASK) != 0)
+            {
+                EncodingUtils.writeShortStringBytes(buffer, _type);
+            }
 
-        if ((_propertyFlags & USER_ID_MASK) != 0)
-        {
-            EncodingUtils.writeShortStringBytes(buffer, _userId);
-        }
+            if ((_propertyFlags & USER_ID_MASK) != 0)
+            {
+                EncodingUtils.writeShortStringBytes(buffer, _userId);
+            }
 
-        if ((_propertyFlags & APPLICATION_ID_MASK) != 0)
-        {
-            EncodingUtils.writeShortStringBytes(buffer, _appId);
-        }
+            if ((_propertyFlags & APPLICATION_ID_MASK) != 0)
+            {
+                EncodingUtils.writeShortStringBytes(buffer, _appId);
+            }
 
-        if ((_propertyFlags & CLUSTER_ID_MASK) != 0)
-        {
-            EncodingUtils.writeShortStringBytes(buffer, _clusterId);
+            if ((_propertyFlags & CLUSTER_ID_MASK) != 0)
+            {
+                EncodingUtils.writeShortStringBytes(buffer, _clusterId);
+            }
         }
     }
 
-    public void populatePropertiesFromBuffer(DataInputStream buffer, int propertyFlags, int size) throws AMQFrameDecodingException, IOException
+    public void populatePropertiesFromBuffer(DataInput buffer, int propertyFlags, int size) throws AMQFrameDecodingException, IOException
     {
         _propertyFlags = propertyFlags;
 
@@ -268,26 +284,40 @@ public class BasicContentHeaderPropertie
             _logger.debug("Property flags: " + _propertyFlags);
         }
 
-        decode(buffer);
+        _encodedForm = new byte[size];
+        buffer.readFully(_encodedForm);
+
+        ByteArrayDataInput input = new ByteArrayDataInput(_encodedForm);
+
+        decode(input);
+
     }
 
-    private void decode(DataInputStream buffer) throws IOException, AMQFrameDecodingException
+    private void decode(ByteArrayDataInput buffer) throws IOException, AMQFrameDecodingException
     {
         // ByteBuffer buffer = ByteBuffer.wrap(_encodedForm);
 
+            int headersOffset = 0;
+
             if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
             {
-                _contentType = EncodingUtils.readAMQShortString(buffer);
+                _contentType = buffer.readAMQShortString();
+                headersOffset += EncodingUtils.encodedShortStringLength(_contentType);
             }
 
             if ((_propertyFlags & ENCODING_MASK) != 0)
             {
-                _encoding = EncodingUtils.readAMQShortString(buffer);
+                _encoding = buffer.readAMQShortString();
+                headersOffset += EncodingUtils.encodedShortStringLength(_encoding);
             }
 
             if ((_propertyFlags & HEADERS_MASK) != 0)
             {
-                _headers = EncodingUtils.readFieldTable(buffer);
+                long length = EncodingUtils.readUnsignedInteger(buffer);
+
+                _headers = new FieldTable(_encodedForm, headersOffset+4, (int)length);
+
+                buffer.skipBytes((int)length);
             }
 
             if ((_propertyFlags & DELIVERY_MODE_MASK) != 0)
@@ -302,12 +332,12 @@ public class BasicContentHeaderPropertie
 
             if ((_propertyFlags & CORRELATION_ID_MASK) != 0)
             {
-                _correlationId = EncodingUtils.readAMQShortString(buffer);
+                _correlationId = buffer.readAMQShortString();
             }
 
             if ((_propertyFlags & REPLY_TO_MASK) != 0)
             {
-                _replyTo = EncodingUtils.readAMQShortString(buffer);
+                _replyTo = buffer.readAMQShortString();
             }
 
             if ((_propertyFlags & EXPIRATION_MASK) != 0)
@@ -317,7 +347,7 @@ public class BasicContentHeaderPropertie
 
             if ((_propertyFlags & MESSAGE_ID_MASK) != 0)
             {
-                _messageId = EncodingUtils.readAMQShortString(buffer);
+                _messageId = buffer.readAMQShortString();
             }
 
             if ((_propertyFlags & TIMESTAMP_MASK) != 0)
@@ -327,22 +357,22 @@ public class BasicContentHeaderPropertie
 
             if ((_propertyFlags & TYPE_MASK) != 0)
             {
-                _type = EncodingUtils.readAMQShortString(buffer);
+                _type = buffer.readAMQShortString();
             }
 
             if ((_propertyFlags & USER_ID_MASK) != 0)
             {
-                _userId = EncodingUtils.readAMQShortString(buffer);
+                _userId = buffer.readAMQShortString();
             }
 
             if ((_propertyFlags & APPLICATION_ID_MASK) != 0)
             {
-                _appId = EncodingUtils.readAMQShortString(buffer);
+                _appId = buffer.readAMQShortString();
             }
 
             if ((_propertyFlags & CLUSTER_ID_MASK) != 0)
             {
-                _clusterId = EncodingUtils.readAMQShortString(buffer);
+                _clusterId = buffer.readAMQShortString();
             }
 
 
@@ -363,11 +393,12 @@ public class BasicContentHeaderPropertie
     {
         _propertyFlags |= (CONTENT_TYPE_MASK);
         _contentType = contentType;
+        _encodedForm = null;
     }
 
     public void setContentType(String contentType)
     {
-        setContentType((contentType == null) ? null : new AMQShortString(contentType));
+        setContentType((contentType == null) ? null : AMQShortString.valueOf(contentType));
     }
 
     public String getEncodingAsString()
@@ -384,13 +415,15 @@ public class BasicContentHeaderPropertie
     public void setEncoding(String encoding)
     {
         _propertyFlags |= ENCODING_MASK;
-        _encoding = (encoding == null) ? null : new AMQShortString(encoding);
+        _encoding = (encoding == null) ? null : AMQShortString.valueOf(encoding);
+        _encodedForm = null;
     }
 
     public void setEncoding(AMQShortString encoding)
     {
         _propertyFlags |= ENCODING_MASK;
         _encoding = encoding;
+        _encodedForm = null;
     }
 
     public FieldTable getHeaders()
@@ -407,6 +440,7 @@ public class BasicContentHeaderPropertie
     {
         _propertyFlags |= HEADERS_MASK;
         _headers = headers;
+        _encodedForm = null;
     }
 
     public byte getDeliveryMode()
@@ -418,6 +452,7 @@ public class BasicContentHeaderPropertie
     {
         _propertyFlags |= DELIVERY_MODE_MASK;
         _deliveryMode = deliveryMode;
+        _encodedForm = null;
     }
 
     public byte getPriority()
@@ -429,6 +464,7 @@ public class BasicContentHeaderPropertie
     {
         _propertyFlags |= PRIORITY_MASK;
         _priority = priority;
+        _encodedForm = null;
     }
 
     public AMQShortString getCorrelationId()
@@ -443,13 +479,14 @@ public class BasicContentHeaderPropertie
 
     public void setCorrelationId(String correlationId)
     {
-        setCorrelationId((correlationId == null) ? null : new AMQShortString(correlationId));
+        setCorrelationId((correlationId == null) ? null : AMQShortString.valueOf(correlationId));
     }
 
     public void setCorrelationId(AMQShortString correlationId)
     {
         _propertyFlags |= CORRELATION_ID_MASK;
         _correlationId = correlationId;
+        _encodedForm = null;
     }
 
     public String getReplyToAsString()
@@ -464,13 +501,14 @@ public class BasicContentHeaderPropertie
 
     public void setReplyTo(String replyTo)
     {
-        setReplyTo((replyTo == null) ? null : new AMQShortString(replyTo));
+        setReplyTo((replyTo == null) ? null : AMQShortString.valueOf(replyTo));
     }
 
     public void setReplyTo(AMQShortString replyTo)
     {
         _propertyFlags |= REPLY_TO_MASK;
         _replyTo = replyTo;
+        _encodedForm = null;
     }
 
     public long getExpiration()
@@ -482,6 +520,7 @@ public class BasicContentHeaderPropertie
     {
         _propertyFlags |= EXPIRATION_MASK;
         _expiration = expiration;
+        _encodedForm = null;
     }
 
     public AMQShortString getMessageId()
@@ -498,12 +537,14 @@ public class BasicContentHeaderPropertie
     {
         _propertyFlags |= MESSAGE_ID_MASK;
         _messageId = (messageId == null) ? null : new AMQShortString(messageId);
+        _encodedForm = null;
     }
 
     public void setMessageId(AMQShortString messageId)
     {
         _propertyFlags |= MESSAGE_ID_MASK;
         _messageId = messageId;
+        _encodedForm = null;
     }
 
     public long getTimestamp()
@@ -515,6 +556,7 @@ public class BasicContentHeaderPropertie
     {
         _propertyFlags |= TIMESTAMP_MASK;
         _timestamp = timestamp;
+        _encodedForm = null;
     }
 
     public String getTypeAsString()
@@ -529,13 +571,14 @@ public class BasicContentHeaderPropertie
 
     public void setType(String type)
     {
-        setType((type == null) ? null : new AMQShortString(type));
+        setType((type == null) ? null : AMQShortString.valueOf(type));
     }
 
     public void setType(AMQShortString type)
     {
         _propertyFlags |= TYPE_MASK;
         _type = type;
+        _encodedForm = null;
     }
 
     public String getUserIdAsString()
@@ -550,13 +593,14 @@ public class BasicContentHeaderPropertie
 
     public void setUserId(String userId)
     {
-        setUserId((userId == null) ? null : new AMQShortString(userId));
+        setUserId((userId == null) ? null : AMQShortString.valueOf(userId));
     }
 
     public void setUserId(AMQShortString userId)
     {
         _propertyFlags |= USER_ID_MASK;
         _userId = userId;
+        _encodedForm = null;
     }
 
     public String getAppIdAsString()
@@ -571,13 +615,14 @@ public class BasicContentHeaderPropertie
 
     public void setAppId(String appId)
     {
-        setAppId((appId == null) ? null : new AMQShortString(appId));
+        setAppId((appId == null) ? null : AMQShortString.valueOf(appId));
     }
 
     public void setAppId(AMQShortString appId)
     {
         _propertyFlags |= APPLICATION_ID_MASK;
         _appId = appId;
+        _encodedForm = null;
     }
 
     public String getClusterIdAsString()
@@ -592,13 +637,14 @@ public class BasicContentHeaderPropertie
 
     public void setClusterId(String clusterId)
     {
-        setClusterId((clusterId == null) ? null : new AMQShortString(clusterId));
+        setClusterId((clusterId == null) ? null : AMQShortString.valueOf(clusterId));
     }
 
     public void setClusterId(AMQShortString clusterId)
     {
         _propertyFlags |= CLUSTER_ID_MASK;
         _clusterId = clusterId;
+        _encodedForm = null;
     }
 
     public String toString()

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java Wed Dec 28 13:02:41 2011
@@ -20,7 +20,8 @@
  */
 package org.apache.qpid.framing;
 
-import java.io.DataInputStream;
+import org.apache.qpid.codec.MarkableDataInput;
+
 import java.io.IOException;
 
 /**
@@ -28,5 +29,5 @@ import java.io.IOException;
  */
 public interface BodyFactory
 {
-    AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException;
+    AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException, IOException;
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org