You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/10/18 15:21:43 UTC

[09/10] qpid-broker-j git commit: QPID-7832: [Java Broker] Refactor store/protocol API using Collection

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
index 346e203..6f293dc 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
@@ -27,14 +27,16 @@ import java.nio.BufferOverflowException;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.nio.CharBuffer;
+import java.nio.InvalidMarkException;
 import java.nio.channels.GatheringByteChannel;
-import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.ScatteringByteChannel;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
@@ -43,68 +45,144 @@ import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLEngineResult;
 import javax.net.ssl.SSLException;
 
-import org.apache.qpid.server.streams.CompositeInputStream;
+import com.google.common.primitives.Chars;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.Shorts;
 
 public class QpidByteBuffer implements AutoCloseable
 {
-    private static final AtomicIntegerFieldUpdater<QpidByteBuffer>
-            DISPOSED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(
-            QpidByteBuffer.class,
-            "_disposed");
-
-    private static final ThreadLocal<QpidByteBuffer> _cachedBuffer = new ThreadLocal<>();
     private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0];
-    private static final double REALLOCATION_CAPACITY_THRESHOLD_FRACTION = 0.9;
+    private static final QpidByteBuffer EMPTY_QPID_BYTE_BUFFER = QpidByteBuffer.wrap(new byte[0]);
+    private static final ThreadLocal<QpidByteBufferFragment> _cachedBuffer = new ThreadLocal<>();
     private volatile static boolean _isPoolInitialized;
     private volatile static BufferPool _bufferPool;
     private volatile static int _pooledBufferSize;
-    private volatile static ByteBuffer _zeroed;
     private volatile static double _sparsityFraction;
-    private final int _offset;
+    private volatile static ByteBuffer _zeroed;
+    private volatile List<QpidByteBufferFragment> _fragments = new ArrayList<>();
+    private volatile int _resetFragmentIndex = -1;
 
-    final ByteBufferRef _ref;
-    volatile ByteBuffer _buffer;
-    @SuppressWarnings("unused")
-    private volatile int _disposed;
+    //////////////////
+    // Absolute puts
+    //////////////////
 
+    public QpidByteBuffer put(final int index, final byte b)
+    {
+        return put(index, new byte[]{b});
+    }
 
-    QpidByteBuffer(ByteBufferRef ref)
+    public QpidByteBuffer putShort(final int index, final short value)
     {
-        this(ref, ref.getBuffer(), 0);
+        byte[] valueArray = Shorts.toByteArray(value);
+        return put(index, valueArray);
     }
 
-    private QpidByteBuffer(ByteBufferRef ref, ByteBuffer buffer, int offset)
+    public QpidByteBuffer putChar(final int index, final char value)
     {
-        _ref = ref;
-        _buffer = buffer;
-        _offset = offset;
-        _ref.incrementRef(capacity());
+        byte[] valueArray = Chars.toByteArray(value);
+        return put(index, valueArray);
     }
 
-    public final boolean isDirect()
+    public QpidByteBuffer putInt(final int index, final int value)
     {
-        return _buffer.isDirect();
+        byte[] valueArray = Ints.toByteArray(value);
+        return put(index, valueArray);
     }
 
-    public final short getUnsignedByte()
+    public QpidByteBuffer putLong(final int index, final long value)
     {
-        return (short) (((short) get()) & 0xFF);
+        byte[] valueArray = Longs.toByteArray(value);
+        return put(index, valueArray);
     }
 
-    public final int getUnsignedShort()
+    public QpidByteBuffer putFloat(final int index, final float value)
     {
-        return ((int) getShort()) & 0xffff;
+        int intValue = Float.floatToRawIntBits(value);
+        return putInt(index, intValue);
     }
 
-    public final int getUnsignedShort(int pos)
+    public QpidByteBuffer putDouble(final int index, final double value)
     {
-        return ((int) getShort(pos)) & 0xffff;
+        long longValue = Double.doubleToRawLongBits(value);
+        return putLong(index, longValue);
     }
 
+    public final QpidByteBuffer put(final int index, final byte[] src)
+    {
+        final int valueWidth = src.length;
+        if (index < 0 || index + valueWidth > limit())
+        {
+            throw new IndexOutOfBoundsException(String.format("index %d is out of bounds [%d, %d)", index, 0, limit()));
+        }
+        boolean bytewise = false;
+        int written = 0;
+        int bytesToSkip = index;
+        for (int i = 0, size = _fragments.size(); i < size; i++)
+        {
+            final QpidByteBufferFragment buffer = _fragments.get(i);
+            final int limit = buffer.limit();
+            boolean isLastFragmentToConsider = valueWidth + bytesToSkip - written <= limit;
+            if (!isLastFragmentToConsider && limit != buffer.capacity())
+            {
+                throw new IllegalStateException(String.format("Unexpected limit %d on fragment %d", limit, i));
+            }
 
-    public final long getUnsignedInt()
+            if (bytewise)
+            {
+                int offset = 0;
+                while (limit > offset && written < valueWidth)
+                {
+                    buffer.put(offset, src[written]);
+                    offset++;
+                    written++;
+                }
+                if (written == valueWidth)
+                {
+                    break;
+                }
+            }
+            else
+            {
+                if (limit >= bytesToSkip + valueWidth)
+                {
+                    for (int i1 = 0; i1 < valueWidth; i1++)
+                    {
+                        buffer.put(bytesToSkip + i1, src[i1]);
+                        written++;
+                    }
+                    break;
+                }
+                else if (limit > bytesToSkip)
+                {
+                    bytewise = true;
+                    while (limit > bytesToSkip + written)
+                    {
+                        buffer.put(bytesToSkip + written, src[written]);
+                        written++;
+                    }
+                    bytesToSkip = 0;
+                }
+                else
+                {
+                    bytesToSkip -= limit;
+                }
+            }
+        }
+        if (valueWidth != written)
+        {
+            throw new BufferOverflowException();
+        }
+        return this;
+    }
+
+    ////////////////
+    // Relative Puts
+    ////////////////
+
+    public final QpidByteBuffer put(final byte b)
     {
-        return ((long) getInt()) & 0xffffffffL;
+        return put(new byte[]{b});
     }
 
     public final QpidByteBuffer putUnsignedByte(final short s)
@@ -113,224 +191,493 @@ public class QpidByteBuffer implements AutoCloseable
         return this;
     }
 
+    public final QpidByteBuffer putShort(final short value)
+    {
+        byte[] valueArray = Shorts.toByteArray(value);
+        return put(valueArray);
+    }
+
     public final QpidByteBuffer putUnsignedShort(final int i)
     {
         putShort((short) i);
         return this;
     }
 
+    public final QpidByteBuffer putChar(final char value)
+    {
+        byte[] valueArray = Chars.toByteArray(value);
+        return put(valueArray);
+    }
+
+    public final QpidByteBuffer putInt(final int value)
+    {
+        byte[] valueArray = Ints.toByteArray(value);
+        return put(valueArray);
+    }
+
     public final QpidByteBuffer putUnsignedInt(final long value)
     {
         putInt((int) value);
         return this;
     }
 
-    @Override
-    public final void close()
+    public final QpidByteBuffer putLong(final long value)
     {
-        dispose();
+        byte[] valueArray = Longs.toByteArray(value);
+        return put(valueArray);
     }
 
-    public final void dispose()
+    public final QpidByteBuffer putFloat(final float value)
     {
-        if (DISPOSED_UPDATER.compareAndSet(this, 0, 1))
-        {
-            _ref.decrementRef(capacity());
-            _buffer = null;
-        }
+        int intValue = Float.floatToRawIntBits(value);
+        return putInt(intValue);
     }
 
-    public final InputStream asInputStream()
+    public final QpidByteBuffer putDouble(final double value)
     {
-        return new BufferInputStream(this);
+        long longValue = Double.doubleToRawLongBits(value);
+        return putLong(longValue);
     }
 
-    public final ByteBuffer asByteBuffer()
+    public final QpidByteBuffer put(byte[] src)
     {
-        try
+        return put(src, 0, src.length);
+    }
+
+    public final QpidByteBuffer put(final byte[] src, final int offset, final int length)
+    {
+        final int valueWidth = length;
+        if (valueWidth > remaining())
         {
-            return getUnderlyingBuffer();
+            throw new BufferOverflowException();
         }
-        finally
+        boolean bytewise = false;
+        int written = 0;
+        for (int i = 0, size = _fragments.size(); i < size; i++)
         {
-            dispose();
+            final QpidByteBufferFragment buffer = _fragments.get(i);
+            if (bytewise)
+            {
+                while (buffer.remaining() > 0 && written < valueWidth)
+                {
+                    buffer.put(src[offset + written]);
+                    written++;
+                }
+                if (written == valueWidth)
+                {
+                    break;
+                }
+            }
+            else
+            {
+                final int remaining = buffer.remaining();
+                if (remaining >= valueWidth)
+                {
+                    buffer.put(src, offset, length);
+                    written += length;
+                    break;
+                }
+                else
+                {
+                    bytewise = true;
+                    buffer.put(src, offset, remaining);
+                    written = remaining;
+                }
+            }
         }
+        if (written != length)
+        {
+            throw new IllegalStateException(String.format("Unexpectedly only wrote %d of %d bytes.", written, length));
+        }
+        return this;
     }
 
-    public final CharBuffer decode(Charset charset)
+    public final QpidByteBuffer put(final ByteBuffer src)
     {
-        return charset.decode(getUnderlyingBuffer());
+        final int valueWidth = src.remaining();
+        if (valueWidth > remaining())
+        {
+            throw new BufferOverflowException();
+        }
+        boolean bytewise = false;
+        int written = 0;
+        for (int i = 0, size = _fragments.size(); i < size; i++)
+        {
+            final QpidByteBufferFragment buffer = _fragments.get(i);
+            if (bytewise)
+            {
+                while (src.remaining() > 0 && buffer.remaining() > 0)
+                {
+                    buffer.put(src.get());
+                    written++;
+                }
+                if (written == valueWidth)
+                {
+                    break;
+                }
+            }
+            else
+            {
+                final int remaining = buffer.remaining();
+                if (remaining >= valueWidth)
+                {
+                    buffer.put(src);
+                    written += valueWidth;
+                    break;
+                }
+                else
+                {
+                    bytewise = true;
+                    while (remaining > written)
+                    {
+                        buffer.put(src.get());
+                        written++;
+                    }
+                }
+            }
+        }
+        if (written != valueWidth)
+        {
+            throw new IllegalStateException(String.format("Unexpectedly only wrote %d of %d bytes.", written, valueWidth));
+        }
+        return this;
     }
 
-    public final int read(ReadableByteChannel channel) throws IOException
+    public final QpidByteBuffer put(final QpidByteBuffer src)
     {
-        return channel.read(getUnderlyingBuffer());
+        final int valueWidth = src.remaining();
+        if (valueWidth > remaining())
+        {
+            throw new BufferOverflowException();
+        }
+        int i = 0;
+        boolean bytewise = false;
+        int written = 0;
+        int size = _fragments.size();
+        final List<QpidByteBufferFragment> fragments = src._fragments;
+        for (int i1 = 0, fragmentsSize = fragments.size(); i1 < fragmentsSize; i1++)
+        {
+            final QpidByteBufferFragment srcFragment = fragments.get(i1);
+            for (; i < size; i++)
+            {
+                final QpidByteBufferFragment dstFragment = _fragments.get(i);
+                if (dstFragment.hasRemaining())
+                {
+                    final int srcFragmentRemaining = srcFragment.remaining();
+                    if (bytewise)
+                    {
+                        while (srcFragmentRemaining > 0 && dstFragment.remaining() > 0)
+                        {
+                            dstFragment.put(srcFragment.get());
+                            written++;
+                        }
+                        if (!srcFragment.hasRemaining())
+                        {
+                            break;
+                        }
+                    }
+                    else
+                    {
+                        final int remaining = dstFragment.remaining();
+                        if (remaining >= srcFragmentRemaining)
+                        {
+                            dstFragment.put(srcFragment);
+                            written += srcFragmentRemaining;
+                            break;
+                        }
+                        else
+                        {
+                            bytewise = true;
+                            while (remaining > written)
+                            {
+                                dstFragment.put(srcFragment.get());
+                                written++;
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        if (written != valueWidth)
+        {
+            throw new IllegalStateException(String.format("Unexpectedly only wrote %d of %d bytes.", written, valueWidth));
+        }
+        return this;
     }
 
-    public final SSLEngineResult decryptSSL(SSLEngine engine, QpidByteBuffer dest) throws SSLException
-    {
-        return engine.unwrap(getUnderlyingBuffer(), dest.getUnderlyingBuffer());
-    }
+    ///////////////////
+    // Absolute Gets
+    ///////////////////
 
-    @Override
-    public String toString()
+    public byte get(final int index)
     {
-        return "QpidByteBuffer{" +
-               "_buffer=" + _buffer +
-               ", _disposed=" + _disposed +
-               '}';
+        final byte[] byteArray = getByteArray(index, 1);
+        return byteArray[0];
     }
 
-    public final boolean hasRemaining()
+    public short getShort(final int index)
     {
-        return _buffer.hasRemaining();
+        final byte[] byteArray = getByteArray(index, 2);
+        return Shorts.fromByteArray(byteArray);
     }
 
-    public QpidByteBuffer putInt(final int index, final int value)
+    public final int getUnsignedShort(int index)
     {
-        _buffer.putInt(index, value);
-        return this;
+        return ((int) getShort(index)) & 0xFFFF;
     }
 
-    public QpidByteBuffer putShort(final int index, final short value)
+    public char getChar(final int index)
     {
-        _buffer.putShort(index, value);
-        return this;
+        final byte[] byteArray = getByteArray(index, 2);
+        return Chars.fromByteArray(byteArray);
     }
 
-    public QpidByteBuffer putChar(final int index, final char value)
+    public int getInt(final int index)
     {
-        _buffer.putChar(index, value);
-        return this;
+        final byte[] byteArray = getByteArray(index, 4);
+        return Ints.fromByteArray(byteArray);
     }
 
-    public final QpidByteBuffer put(final byte b)
+    public long getLong(final int index)
     {
-        _buffer.put(b);
-        return this;
+        final byte[] byteArray = getByteArray(index, 8);
+        return Longs.fromByteArray(byteArray);
     }
 
-    public QpidByteBuffer put(final int index, final byte b)
+    public float getFloat(final int index)
     {
-        _buffer.put(index, b);
-        return this;
+        final int intValue = getInt(index);
+        return Float.intBitsToFloat(intValue);
     }
 
-    public short getShort(final int index)
+    public double getDouble(final int index)
     {
-        return _buffer.getShort(index);
+        final long longValue = getLong(index);
+        return Double.longBitsToDouble(longValue);
     }
 
-    public final QpidByteBuffer mark()
+    private byte[] getByteArray(final int index, final int length)
     {
-        _buffer.mark();
-        return this;
-    }
+        if (index + length > limit())
+        {
+            throw new IndexOutOfBoundsException(String.format("%d bytes at index %d do not fit into bounds [%d, %d)", length, index, 0, limit()));
+        }
 
-    public final long getLong()
-    {
-        return _buffer.getLong();
+        byte[] value = new byte[length];
+        boolean bytewise = false;
+        int consumed = 0;
+        int bytesToSkip = index;
+        for (int i = 0, size = _fragments.size(); i < size; i++)
+        {
+            final QpidByteBufferFragment buffer = _fragments.get(i);
+            final int limit = buffer.limit();
+            boolean isLastFragmentToConsider = length + bytesToSkip - consumed <= limit;
+            if (!isLastFragmentToConsider && limit != buffer.capacity())
+            {
+                throw new IllegalStateException(String.format("Unexpectedly limit %d on fragment %d.", limit, i));
+            }
+            if (bytewise)
+            {
+                int offset = 0;
+                while (limit > offset && consumed < length)
+                {
+                    value[consumed] = buffer.get(offset);
+                    offset++;
+                    consumed++;
+                }
+                if (consumed == length)
+                {
+                    break;
+                }
+            }
+            else
+            {
+                if (limit >= bytesToSkip + length)
+                {
+                    while (consumed < length)
+                    {
+                        value[consumed] = buffer.get(bytesToSkip + consumed);
+                        consumed++;
+                    }
+                    break;
+                }
+                else if (limit > bytesToSkip)
+                {
+                    bytewise = true;
+                    while (limit > bytesToSkip + consumed)
+                    {
+                        value[consumed] = buffer.get(bytesToSkip + consumed);
+                        consumed++;
+                    }
+                    bytesToSkip = 0;
+                }
+                else
+                {
+                    bytesToSkip -= limit;
+                }
+            }
+        }
+        if (consumed != length)
+        {
+            throw new IllegalStateException(String.format("Unexpectedly only consumed %d of %d bytes.", consumed, length));
+        }
+        return value;
     }
 
-    public QpidByteBuffer putFloat(final int index, final float value)
-    {
-        _buffer.putFloat(index, value);
-        return this;
-    }
+    //////////////////
+    // Relative Gets
+    //////////////////
 
-    public double getDouble(final int index)
+    public final byte get()
     {
-        return _buffer.getDouble(index);
+        byte[] value = new byte[1];
+        get(value, 0, 1);
+        return value[0];
     }
 
-    public final boolean hasArray()
+    public final short getUnsignedByte()
     {
-        return _buffer.hasArray();
+        return (short) (get() & 0xFF);
     }
 
-    public final double getDouble()
+    public final short getShort()
     {
-        return _buffer.getDouble();
+        byte[] value = new byte[2];
+        get(value, 0, value.length);
+        return Shorts.fromByteArray(value);
     }
 
-    public final QpidByteBuffer putFloat(final float value)
+    public final int getUnsignedShort()
     {
-        _buffer.putFloat(value);
-        return this;
+        return ((int) getShort()) & 0xFFFF;
     }
 
-    public final QpidByteBuffer putInt(final int value)
+    public final char getChar()
     {
-        _buffer.putInt(value);
-        return this;
+        byte[] value = new byte[2];
+        get(value, 0, value.length);
+        return Chars.fromByteArray(value);
     }
 
-    public byte[] array()
+    public final int getInt()
     {
-        return _buffer.array();
+        byte[] value = new byte[4];
+        get(value, 0, value.length);
+        return Ints.fromByteArray(value);
     }
 
-    public final QpidByteBuffer putShort(final short value)
+    public final long getUnsignedInt()
     {
-        _buffer.putShort(value);
-        return this;
+        return ((long) getInt()) & 0xFFFFFFFFL;
     }
 
-    public int getInt(final int index)
+    public final long getLong()
     {
-        return _buffer.getInt(index);
+        byte[] value = new byte[8];
+        get(value, 0, value.length);
+        return Longs.fromByteArray(value);
     }
 
-    public final int remaining()
+    public final float getFloat()
     {
-        return _buffer.remaining();
+        final int intValue = getInt();
+        return Float.intBitsToFloat(intValue);
     }
 
-    public final QpidByteBuffer put(final byte[] src)
+    public final double getDouble()
     {
-        _buffer.put(src);
-        return this;
+        final long longValue = getLong();
+        return Double.longBitsToDouble(longValue);
     }
 
-    public final QpidByteBuffer put(final ByteBuffer src)
+    public final QpidByteBuffer get(final byte[] dst)
     {
-        _buffer.put(src);
-        return this;
+        return get(dst, 0, dst.length);
     }
 
-    public final QpidByteBuffer put(final QpidByteBuffer src)
+    public final QpidByteBuffer get(final byte[] dst, final int offset, final int length)
     {
-        int sourceRemaining = src.remaining();
-        if (sourceRemaining > remaining())
+        if (remaining() < length)
         {
-            throw new BufferOverflowException();
+            throw new BufferUnderflowException();
+        }
+        boolean bytewise = false;
+        int consumed = 0;
+        for (int i = 0, size = _fragments.size(); i < size; i++)
+        {
+            final QpidByteBufferFragment buffer = _fragments.get(i);
+            if (bytewise)
+            {
+                while (buffer.hasRemaining() && consumed < length)
+                {
+                    dst[offset + consumed] = buffer.get();
+                    consumed++;
+                }
+                if (consumed == length)
+                {
+                    return this;
+                }
+            }
+            else
+            {
+                final int remaining = buffer.remaining();
+                if (remaining >= length)
+                {
+                    buffer.get(dst, offset, length);
+                    return this;
+                }
+                else if (remaining > 0)
+                {
+                    bytewise = true;
+                    while (remaining > consumed)
+                    {
+                        dst[offset + consumed] = buffer.get();
+                        consumed++;
+                    }
+                }
+            }
+        }
+        if (consumed != length)
+        {
+            throw new IllegalStateException(String.format("Unexpectedly only consumed %d of %d bytes.", consumed, length));
         }
-
-        _buffer.put(src.getUnderlyingBuffer());
         return this;
     }
 
-    public final QpidByteBuffer get(final byte[] dst, final int offset, final int length)
-    {
-        _buffer.get(dst, offset, length);
-        return this;
-    }
+    ///////////////
+    // Other stuff
+    ////////////////
 
-    public final QpidByteBuffer get(final ByteBuffer dst)
+    public final void copyTo(final byte[] dst)
     {
-        int destinationRemaining = dst.remaining();
-        int remaining = remaining();
-        if (destinationRemaining < remaining)
+        if (remaining() < dst.length)
         {
             throw new BufferUnderflowException();
         }
-        dst.put(_buffer);
-        return this;
+        if (remaining() > dst.length)
+        {
+            throw new BufferOverflowException();
+        }
+        int offset = 0;
+        for (QpidByteBufferFragment fragment : _fragments)
+        {
+            final int length = Math.min(fragment.remaining(), dst.length - offset);
+            fragment._buffer.duplicate().get(dst, offset, length);
+            offset += length;
+        }
     }
 
     public final void copyTo(final ByteBuffer dst)
     {
-        dst.put(_buffer.duplicate());
+        if (dst.remaining() < remaining())
+        {
+            throw new BufferOverflowException();
+        }
+        for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+        {
+            final QpidByteBufferFragment fragment = _fragments.get(i);
+            dst.put(fragment._buffer.duplicate());
+        }
     }
 
     public final void putCopyOf(final QpidByteBuffer source)
@@ -341,218 +688,345 @@ public class QpidByteBuffer implements AutoCloseable
         {
             throw new BufferOverflowException();
         }
-
-        put(source.getUnderlyingBuffer().duplicate());
+        for (int i = 0, fragmentsSize = source._fragments.size(); i < fragmentsSize; i++)
+        {
+            final QpidByteBufferFragment srcFragment = source._fragments.get(i);
+            put(srcFragment._buffer.duplicate());
+        }
     }
 
-    public QpidByteBuffer rewind()
+    public final boolean isDirect()
     {
-        _buffer.rewind();
-        return this;
+        for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+        {
+            final QpidByteBufferFragment fragment = _fragments.get(i);
+            if (!fragment.isDirect())
+            {
+                return false;
+            }
+        }
+        return true;
     }
 
-    public QpidByteBuffer clear()
+    @Override
+    public final void close()
     {
-        _buffer.clear();
-        return this;
+        dispose();
     }
 
-    public QpidByteBuffer putLong(final int index, final long value)
+    public final void dispose()
     {
-        _buffer.putLong(index, value);
-        return this;
+        for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+        {
+            final QpidByteBufferFragment fragment = _fragments.get(i);
+            fragment.dispose();
+        }
     }
 
-    public QpidByteBuffer compact()
+    public final InputStream asInputStream()
     {
-        _buffer.compact();
-        return this;
+        return new QpidByteBuffer.BufferInputStream(this);
     }
 
-    public final QpidByteBuffer putDouble(final double value)
+    public final long read(ScatteringByteChannel channel) throws IOException
     {
-        _buffer.putDouble(value);
-        return this;
+        ByteBuffer[] byteBuffers = new ByteBuffer[_fragments.size()];
+        for (int i = 0; i < byteBuffers.length; i++)
+        {
+            final QpidByteBufferFragment fragment = _fragments.get(i);
+            byteBuffers[i] = fragment.getUnderlyingBuffer();
+        }
+        return channel.read(byteBuffers);
     }
 
-    public int limit()
+    @Override
+    public String toString()
     {
-        return _buffer.limit();
+        return "QpidByteBuffer{" + _fragments.size() + " fragments}";
     }
 
     public QpidByteBuffer reset()
     {
-        _buffer.reset();
+        if (_resetFragmentIndex < 0)
+        {
+            throw new InvalidMarkException();
+        }
+        final QpidByteBufferFragment fragment = _fragments.get(_resetFragmentIndex);
+        fragment.reset();
+        for (int i = _resetFragmentIndex + 1, size = _fragments.size(); i < size; ++i)
+        {
+            _fragments.get(i).position(0);
+        }
         return this;
     }
 
-    public QpidByteBuffer flip()
+    public QpidByteBuffer rewind()
     {
-        _buffer.flip();
+        _resetFragmentIndex = -1;
+        for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+        {
+            final QpidByteBufferFragment fragment = _fragments.get(i);
+            fragment.rewind();
+        }
         return this;
     }
 
-    public final short getShort()
+    public final boolean hasArray()
     {
-        return _buffer.getShort();
+        return _fragments.size() == 1 && _fragments.get(0).hasArray();
     }
 
-    public final float getFloat()
+    public byte[] array()
     {
-        return _buffer.getFloat();
+        if (!hasArray())
+        {
+            throw new UnsupportedOperationException("This QpidByteBuffer is not backed by an array.");
+        }
+        return _fragments.get(0).array();
     }
 
-    public QpidByteBuffer limit(final int newLimit)
+    public QpidByteBuffer clear()
     {
-        _buffer.limit(newLimit);
+        for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+        {
+            _fragments.get(i).clear();
+        }
         return this;
     }
 
-    /**
-     * Method does not respect mark.
-     *
-     * @return QpidByteBuffer
-     */
-    public QpidByteBuffer duplicate()
+    public QpidByteBuffer compact()
     {
-        ByteBuffer buffer = _ref.getBuffer();
-        if (!(_ref instanceof PooledByteBufferRef))
+        if (_fragments.size() == 1)
         {
-            buffer = buffer.duplicate();
+            _fragments.get(0).compact();
         }
-
-        buffer.position(_offset );
-        buffer.limit(_offset + _buffer.capacity());
-
-        buffer = buffer.slice();
-
-        buffer.limit(_buffer.limit());
-        buffer.position(_buffer.position());
-        return new QpidByteBuffer(_ref, buffer, _offset);
-    }
-
-    public final QpidByteBuffer put(final byte[] src, final int offset, final int length)
-    {
-        _buffer.put(src, offset, length);
+        else
+        {
+            int position = position();
+            int limit = limit();
+            if (position != 0)
+            {
+                int dstPos = 0;
+                for (int srcPos = position; srcPos < limit; ++srcPos, ++dstPos)
+                {
+                    put(dstPos, get(srcPos));
+                }
+                position(dstPos);
+                limit(capacity());
+            }
+        }
+        _resetFragmentIndex = -1;
         return this;
     }
 
-    public long getLong(final int index)
-    {
-        return _buffer.getLong(index);
-    }
-
-    public int capacity()
+    public int position()
     {
-        return _buffer.capacity();
+        int totalPosition = 0;
+        for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+        {
+            final QpidByteBufferFragment fragment = _fragments.get(i);
+            totalPosition += fragment.position();
+            if (fragment.position() != fragment.limit())
+            {
+                break;
+            }
+        }
+        return totalPosition;
     }
 
-    public char getChar(final int index)
+    public QpidByteBuffer position(int newPosition)
     {
-        return _buffer.getChar(index);
+        if (newPosition < 0 || newPosition > limit())
+        {
+            throw new IllegalArgumentException(String.format("new position %d is out of bounds [%d, %d)", newPosition, 0, limit()));
+        }
+        for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+        {
+            final QpidByteBufferFragment fragment = _fragments.get(i);
+            final int fragmentLimit = fragment.limit();
+            if (newPosition <= fragmentLimit)
+            {
+                fragment.position(newPosition);
+                newPosition = 0;
+            }
+            else
+            {
+                if (fragmentLimit != fragment.capacity())
+                {
+                    throw new IllegalStateException(String.format("QBB Fragment %d has limit %d != capacity %d",
+                                                                  i,
+                                                                  fragmentLimit,
+                                                                  fragment.capacity()));
+                }
+                fragment.position(fragmentLimit);
+                newPosition -= fragmentLimit;
+            }
+        }
+        return this;
     }
 
-    public final byte get()
+    public int limit()
     {
-        return _buffer.get();
-    }
+        int totalLimit = 0;
+        for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+        {
+            final QpidByteBufferFragment fragment = _fragments.get(i);
+            final int fragmentLimit = fragment.limit();
+            totalLimit += fragmentLimit;
+            if (fragmentLimit != fragment.capacity())
+            {
+                break;
+            }
+        }
 
-    public byte get(final int index)
-    {
-        return _buffer.get(index);
+        return totalLimit;
     }
 
-    public final QpidByteBuffer get(final byte[] dst)
+    public QpidByteBuffer limit(int newLimit)
     {
-        _buffer.get(dst);
+        for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+        {
+            final QpidByteBufferFragment fragment = _fragments.get(i);
+            final int fragmentCapacity = fragment.capacity();
+            final int fragmentLimit = Math.min(newLimit, fragmentCapacity);
+            fragment.limit(fragmentLimit);
+            newLimit -= fragmentLimit;
+        }
         return this;
     }
 
-    public final void copyTo(final byte[] dst)
+    public final QpidByteBuffer mark()
     {
-        if (remaining() < dst.length)
+        for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
         {
-            throw new BufferUnderflowException();
+            final QpidByteBufferFragment fragment = _fragments.get(i);
+            if (fragment.position() != fragment.limit())
+            {
+                fragment.mark();
+                _resetFragmentIndex = i;
+                return this;
+            }
         }
-        _buffer.duplicate().get(dst);
-    }
-
-    public final QpidByteBuffer putChar(final char value)
-    {
-        _buffer.putChar(value);
+        _resetFragmentIndex = _fragments.size() - 1;
+        _fragments.get(_resetFragmentIndex).mark();
         return this;
     }
 
-    public QpidByteBuffer position(final int newPosition)
+    public final int remaining()
     {
-        _buffer.position(newPosition);
-        return this;
+        int remaining = 0;
+        for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+        {
+            final QpidByteBufferFragment fragment = _fragments.get(i);
+            remaining += fragment.remaining();
+        }
+        return remaining;
     }
 
-    public int arrayOffset()
+    public final boolean hasRemaining()
     {
-        return _buffer.arrayOffset();
+        return hasRemaining(1);
     }
 
-    public final char getChar()
+    public final boolean hasRemaining(int atLeast)
     {
-        return _buffer.getChar();
+        int remaining = 0;
+        for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+        {
+            final QpidByteBufferFragment fragment = _fragments.get(i);
+            remaining += fragment.remaining();
+            if (remaining >= atLeast)
+            {
+                return true;
+            }
+        }
+        return false;
     }
 
-    public final int getInt()
+    public QpidByteBuffer flip()
     {
-        return _buffer.getInt();
+        for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+        {
+            final QpidByteBufferFragment fragment = _fragments.get(i);
+            fragment.flip();
+        }
+        return this;
     }
 
-    public final QpidByteBuffer putLong(final long value)
+    public int capacity()
     {
-        _buffer.putLong(value);
-        return this;
+        int totalCapacity = 0;
+        for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+        {
+            totalCapacity += _fragments.get(i).capacity();
+        }
+        return totalCapacity;
     }
 
-    public float getFloat(final int index)
+    /**
+     * Method does not respect mark.
+     *
+     * @return QpidByteBuffer
+     */
+    public QpidByteBuffer duplicate()
     {
-        return _buffer.getFloat(index);
+        final QpidByteBuffer newQpidByteBuffer = new QpidByteBuffer();
+        for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+        {
+            newQpidByteBuffer._fragments.add(_fragments.get(i).duplicate());
+        }
+        return newQpidByteBuffer;
     }
 
     public QpidByteBuffer slice()
     {
-        return view(0, _buffer.remaining());
+        return view(0, remaining());
     }
 
     public QpidByteBuffer view(int offset, int length)
     {
-        ByteBuffer buffer = _ref.getBuffer();
-        if (!(_ref instanceof PooledByteBufferRef))
+        if (offset + length > remaining())
         {
-            buffer = buffer.duplicate();
+            throw new IllegalArgumentException(String.format("offset: %d, length: %d, remaining: %d", offset, length, remaining()));
+        }
+        final QpidByteBuffer newQpidByteBuffer = new QpidByteBuffer();
+        boolean firstFragmentToBeConsidered = true;
+        for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize && length > 0; i++)
+        {
+            final QpidByteBufferFragment fragment = _fragments.get(i);
+            if (fragment.hasRemaining())
+            {
+                if (!firstFragmentToBeConsidered && fragment.position() != 0)
+                {
+                    throw new IllegalStateException(String.format("Unexpectedly position %d on fragment %d.", fragment.position(), i));
+                }
+                firstFragmentToBeConsidered = false;
+                if (fragment.remaining() > offset)
+                {
+                    final int fragmentViewLength = Math.min(fragment.remaining() - offset, length);
+                    newQpidByteBuffer._fragments.add(fragment.view(offset, fragmentViewLength));
+                    length -= fragmentViewLength;
+                    offset = 0;
+                }
+                else
+                {
+                    offset -= fragment.remaining();
+                }
+            }
         }
 
-        int newRemaining = Math.min(_buffer.remaining() - offset, length);
-
-        int newPosition = _offset + _buffer.position() + offset;
-        buffer.limit(newPosition + newRemaining);
-        buffer.position(newPosition);
-
-        buffer = buffer.slice();
-
-        return new QpidByteBuffer(_ref, buffer, newPosition);
-    }
-
-    public int position()
-    {
-        return _buffer.position();
+        return newQpidByteBuffer;
     }
 
-    public QpidByteBuffer putDouble(final int index, final double value)
+    List<ByteBuffer> getUnderlyingBuffers()
     {
-        _buffer.putDouble(index, value);
-        return this;
-    }
+        List<ByteBuffer> byteBuffers = new ArrayList<>();
+        for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+        {
+            byteBuffers.add(_fragments.get(i).getUnderlyingBuffer());
+        }
+        return byteBuffers;
 
-    ByteBuffer getUnderlyingBuffer()
-    {
-        return _buffer;
     }
 
     public static QpidByteBuffer allocate(boolean direct, int size)
@@ -562,113 +1036,47 @@ public class QpidByteBuffer implements AutoCloseable
 
     public static QpidByteBuffer allocate(int size)
     {
-        return new QpidByteBuffer(new NonPooledByteBufferRef(ByteBuffer.allocate(size)));
+        final QpidByteBuffer qpidByteBuffer = new QpidByteBuffer();
+        qpidByteBuffer._fragments.add(QpidByteBufferFragment.allocate(size));
+        return qpidByteBuffer;
     }
 
     public static QpidByteBuffer allocateDirect(int size)
     {
         if (size < 0)
         {
-            throw new IllegalArgumentException("Cannot allocate QpidByteBuffer with size "
-                                               + size
-                                               + " which is negative.");
-        }
-
-        final ByteBufferRef ref;
-        if (_isPoolInitialized && _pooledBufferSize >= size)
-        {
-            if (_pooledBufferSize == size)
-            {
-                ByteBuffer buf = _bufferPool.getBuffer();
-                if (buf == null)
-                {
-                    buf = ByteBuffer.allocateDirect(size);
-                }
-                ref = new PooledByteBufferRef(buf);
-            }
-            else
-            {
-                QpidByteBuffer buf = _cachedBuffer.get();
-                if (buf == null || buf.remaining() < size)
-                {
-                    if (buf != null)
-                    {
-                        buf.dispose();
-                    }
-                    buf = allocateDirect(_pooledBufferSize);
-                    _cachedBuffer.set(buf);
-                }
-                QpidByteBuffer rVal = buf.view(0, size);
-                buf.position(buf.position() + size);
-
-                return rVal;
-            }
-        }
-        else
-        {
-            ref = new NonPooledByteBufferRef(ByteBuffer.allocateDirect(size));
-        }
-        return new QpidByteBuffer(ref);
-    }
-
-    public static Collection<QpidByteBuffer> allocateDirectCollection(int size)
-    {
-        if (_pooledBufferSize == 0)
-        {
-            return Collections.singleton(allocateDirect(size));
+            throw new IllegalArgumentException("Cannot allocate QpidByteBufferFragment with size "
+                                               + size
+                                               + " which is negative.");
         }
-        else
-        {
-            List<QpidByteBuffer> buffers = new ArrayList<>((size / _pooledBufferSize) + 2);
-            int remaining = size;
-
-            QpidByteBuffer buf = _cachedBuffer.get();
-            if (buf == null)
-            {
-                buf = allocateDirect(_pooledBufferSize);
-            }
-            while (remaining > buf.remaining())
-            {
-                int bufRemaining = buf.remaining();
-                if (buf == _cachedBuffer.get())
-                {
-                    buffers.add(buf.view(0, bufRemaining));
-                    buf.dispose();
-                }
-                else
-                {
-                    buffers.add(buf);
-                }
-                remaining -= bufRemaining;
-                buf = allocateDirect(_pooledBufferSize);
-            }
-            buffers.add(buf.view(0, remaining));
-            buf.position(buf.position() + remaining);
 
-            if (buf.hasRemaining())
+        if (_isPoolInitialized)
+        {
+            QpidByteBuffer qpidByteBuffer = new QpidByteBuffer();
+            int allocatedSize = 0;
+            while (size - allocatedSize >= _pooledBufferSize)
             {
-                _cachedBuffer.set(buf);
+                qpidByteBuffer._fragments.add(QpidByteBufferFragment.allocateDirect(_pooledBufferSize));
+                allocatedSize += _pooledBufferSize;
             }
-            else
+            if (allocatedSize != size)
             {
-                _cachedBuffer.set(allocateDirect(_pooledBufferSize));
-                buf.dispose();
+                qpidByteBuffer._fragments.add(QpidByteBufferFragment.allocateDirect(size - allocatedSize));
             }
-            return buffers;
+            return qpidByteBuffer;
+        }
+        else
+        {
+            return allocate(size);
         }
     }
 
-    public static Collection<QpidByteBuffer> asQpidByteBuffers(final byte[] data)
-    {
-        return asQpidByteBuffers(data, 0, data.length);
-    }
-
-    public static Collection<QpidByteBuffer> asQpidByteBuffers(final byte[] data, final int offset, final int length)
+    private static QpidByteBuffer asQpidByteBuffer(final byte[] data, final int offset, final int length)
     {
         try (QpidByteBufferOutputStream outputStream = new QpidByteBufferOutputStream(true, getPooledBufferSize()))
         {
             outputStream.write(data, offset, length);
-            return outputStream.fetchAccumulatedBuffers();
+            return outputStream.fetchAccumulatedBuffer();
         }
         catch (IOException e)
         {
@@ -676,20 +1084,46 @@ public class QpidByteBuffer implements AutoCloseable
         }
     }
 
-    public static List<QpidByteBuffer> asQpidByteBuffers(final InputStream stream) throws IOException
+    public static QpidByteBuffer asQpidByteBuffer(final InputStream stream) throws IOException
+    {
+        final QpidByteBuffer qpidByteBuffer = new QpidByteBuffer();
+        final int pooledBufferSize = QpidByteBuffer.getPooledBufferSize();
+        byte[] transferBuf = new byte[pooledBufferSize];
+        int readFragment = 0;
+        int read = stream.read(transferBuf, readFragment, pooledBufferSize - readFragment);
+        while (read > 0)
+        {
+            readFragment += read;
+            if (readFragment == pooledBufferSize)
+            {
+                QpidByteBufferFragment fragment = QpidByteBufferFragment.allocateDirect(pooledBufferSize);
+                fragment.put(transferBuf, 0, pooledBufferSize);
+                fragment.flip();
+                qpidByteBuffer._fragments.add(fragment);
+                readFragment = 0;
+            }
+            read = stream.read(transferBuf, readFragment, pooledBufferSize - readFragment);
+        }
+        if (readFragment != 0)
+        {
+            QpidByteBufferFragment fragment = QpidByteBufferFragment.allocateDirect(readFragment);
+            fragment.put(transferBuf, 0, readFragment);
+            fragment.flip();
+            qpidByteBuffer._fragments.add(fragment);
+        }
+        return qpidByteBuffer;
+    }
+
+    public final SSLEngineResult decryptSSL(SSLEngine engine, QpidByteBuffer dst) throws SSLException
     {
-        List<QpidByteBuffer> bufs = new ArrayList<>();
-        byte[] transferBuf = new byte[QpidByteBuffer.getPooledBufferSize()];
-        int read = stream.read(transferBuf);
-        while(read > 0)
+        final List<ByteBuffer> dstUnderlyingBuffers = dst.getUnderlyingBuffers();
+        final List<ByteBuffer> underlyingBuffers = getUnderlyingBuffers();
+        if (underlyingBuffers.size() != 1)
         {
-            QpidByteBuffer chunk = QpidByteBuffer.allocateDirect(read);
-            chunk.put(transferBuf, 0, read);
-            chunk.flip();
-            bufs.add(chunk);
-            read = stream.read(transferBuf);
+            throw new IllegalStateException("Expected single fragment buffer");
         }
-        return bufs;
+        return engine.unwrap(underlyingBuffers.get(0),
+                             dstUnderlyingBuffers.toArray(new ByteBuffer[dstUnderlyingBuffers.size()]));
     }
 
     public static SSLEngineResult encryptSSL(SSLEngine engine,
@@ -704,72 +1138,62 @@ public class QpidByteBuffer implements AutoCloseable
         }
         else
         {
-            src = new ByteBuffer[buffers.size()];
-            Iterator<QpidByteBuffer> iterator = buffers.iterator();
-            for (int i = 0; i < src.length; i++)
+            List<ByteBuffer> buffers_ = new LinkedList<>();
+            for (QpidByteBuffer buffer : buffers)
             {
-                src[i] = iterator.next().getUnderlyingBuffer();
+                buffers_.addAll(buffer.getUnderlyingBuffers());
             }
+            src = buffers_.toArray(new ByteBuffer[buffers_.size()]);
         }
-        return engine.wrap(src, dest.getUnderlyingBuffer());
+        final List<ByteBuffer> dstUnderlyingBuffers = dest.getUnderlyingBuffers();
+        if (dstUnderlyingBuffers.size() != 1)
+        {
+            throw new IllegalStateException("Expected a single fragment output buffer");
+        }
+        return engine.wrap(src, dstUnderlyingBuffers.get(0));
     }
 
-    public static Collection<QpidByteBuffer> inflate(Collection<QpidByteBuffer> compressedBuffers) throws IOException
+    public static QpidByteBuffer inflate(QpidByteBuffer compressedBuffer) throws IOException
     {
-        if (compressedBuffers == null)
+        if (compressedBuffer == null)
         {
-            throw new IllegalArgumentException("compressedBuffers cannot be null");
+            throw new IllegalArgumentException("compressedBuffer cannot be null");
         }
 
-        boolean isDirect = false;
-        Collection<InputStream> streams = new ArrayList<>(compressedBuffers.size());
-        for (QpidByteBuffer buffer : compressedBuffers)
-        {
-            isDirect = isDirect || buffer.isDirect();
-            streams.add(buffer.asInputStream());
-        }
+        boolean isDirect = compressedBuffer.isDirect();
         final int bufferSize = (isDirect && _pooledBufferSize > 0) ? _pooledBufferSize : 65536;
 
-        Collection<QpidByteBuffer> uncompressedBuffers = new ArrayList<>();
-        try (GZIPInputStream gzipInputStream = new GZIPInputStream(new CompositeInputStream(streams)))
+        List<QpidByteBuffer> uncompressedBuffers = new ArrayList<>();
+        try (GZIPInputStream gzipInputStream = new GZIPInputStream(compressedBuffer.asInputStream()))
         {
             byte[] buf = new byte[bufferSize];
             int read;
             while ((read = gzipInputStream.read(buf)) != -1)
             {
-                uncompressedBuffers.addAll(asQpidByteBuffers(buf, 0, read));
+                uncompressedBuffers.add(QpidByteBuffer.asQpidByteBuffer(buf, 0, read));
             }
-            return uncompressedBuffers;
+            return QpidByteBuffer.concatenate(uncompressedBuffers);
         }
-        catch (IOException e)
+        finally
         {
-            for (QpidByteBuffer uncompressedBuffer : uncompressedBuffers)
-            {
-                uncompressedBuffer.dispose();
-            }
-            throw e;
+            uncompressedBuffers.forEach(QpidByteBuffer::dispose);
         }
     }
 
-    public static Collection<QpidByteBuffer> deflate(Collection<QpidByteBuffer> uncompressedBuffers) throws IOException
+    public static QpidByteBuffer deflate(QpidByteBuffer uncompressedBuffer) throws IOException
     {
-        if (uncompressedBuffers == null)
+        if (uncompressedBuffer == null)
         {
-            throw new IllegalArgumentException("uncompressedBuffers cannot be null");
+            throw new IllegalArgumentException("uncompressedBuffer cannot be null");
         }
 
-        boolean isDirect = false;
-        Collection<InputStream> streams = new ArrayList<>(uncompressedBuffers.size());
-        for (QpidByteBuffer buffer : uncompressedBuffers)
-        {
-            isDirect = isDirect || buffer.isDirect();
-            streams.add(buffer.asInputStream());
-        }
+        boolean isDirect = uncompressedBuffer.isDirect();
         final int bufferSize = (isDirect && _pooledBufferSize > 0) ? _pooledBufferSize : 65536;
 
-        try(QpidByteBufferOutputStream compressedOutput = new QpidByteBufferOutputStream(isDirect, bufferSize);
-            InputStream compressedInput = new CompositeInputStream(streams);
-            GZIPOutputStream gzipStream = new GZIPOutputStream(new BufferedOutputStream(compressedOutput, bufferSize)))
+        try (QpidByteBufferOutputStream compressedOutput = new QpidByteBufferOutputStream(isDirect, bufferSize);
+             InputStream compressedInput = uncompressedBuffer.asInputStream();
+             GZIPOutputStream gzipStream = new GZIPOutputStream(new BufferedOutputStream(compressedOutput,
+                                                                                         bufferSize)))
         {
             byte[] buf = new byte[16384];
             int read;
@@ -779,25 +1203,29 @@ public class QpidByteBuffer implements AutoCloseable
             }
             gzipStream.finish();
             gzipStream.flush();
-            return compressedOutput.fetchAccumulatedBuffers();
+            return compressedOutput.fetchAccumulatedBuffer();
         }
     }
 
     public static long write(GatheringByteChannel channel, Collection<QpidByteBuffer> qpidByteBuffers)
             throws IOException
     {
-        ByteBuffer[] byteBuffers = new ByteBuffer[qpidByteBuffers.size()];
-        Iterator<QpidByteBuffer> iterator = qpidByteBuffers.iterator();
-        for (int i = 0; i < byteBuffers.length; i++)
+        List<ByteBuffer> byteBuffers = new ArrayList<>();
+        for (QpidByteBuffer qpidByteBuffer : qpidByteBuffers)
         {
-            byteBuffers[i] = iterator.next().getUnderlyingBuffer();
+            for (QpidByteBufferFragment fragment : qpidByteBuffer._fragments)
+            {
+                byteBuffers.add(fragment.getUnderlyingBuffer());
+            }
         }
-        return channel.write(byteBuffers);
+        return channel.write(byteBuffers.toArray(new ByteBuffer[byteBuffers.size()]));
     }
 
     public static QpidByteBuffer wrap(final ByteBuffer wrap)
     {
-        return new QpidByteBuffer(new NonPooledByteBufferRef(wrap));
+        final QpidByteBuffer qpidByteBuffer = new QpidByteBuffer();
+        qpidByteBuffer._fragments.add(new QpidByteBufferFragment(new NonPooledByteBufferRef(wrap)));
+        return qpidByteBuffer;
     }
 
     public static QpidByteBuffer wrap(final byte[] data)
@@ -813,19 +1241,25 @@ public class QpidByteBuffer implements AutoCloseable
     static void returnToPool(final ByteBuffer buffer)
     {
         buffer.clear();
-        final ByteBuffer duplicate = _zeroed.duplicate();
-        duplicate.limit(buffer.capacity());
-        buffer.put(duplicate);
+        if (_isPoolInitialized)
+        {
+            final ByteBuffer duplicate = _zeroed.duplicate();
+            duplicate.limit(buffer.capacity());
+            buffer.put(duplicate);
 
-        _bufferPool.returnBuffer(buffer);
+            _bufferPool.returnBuffer(buffer);
+        }
     }
 
     public synchronized static void initialisePool(int bufferSize, int maxPoolSize, final double sparsityFraction)
     {
-        if (_isPoolInitialized && (bufferSize != _pooledBufferSize || maxPoolSize != _bufferPool.getMaxSize() || sparsityFraction != _sparsityFraction))
+        if (_isPoolInitialized && (bufferSize != _pooledBufferSize
+                                   || maxPoolSize != _bufferPool.getMaxSize()
+                                   || sparsityFraction != _sparsityFraction))
         {
             final String errorMessage = String.format(
-                    "QpidByteBuffer pool has already been initialised with bufferSize=%d, maxPoolSize=%d, and sparsityFraction=%f." +
+                    "QpidByteBuffer pool has already been initialised with bufferSize=%d, maxPoolSize=%d, and sparsityFraction=%f."
+                    +
                     "Re-initialisation with different bufferSize=%d and maxPoolSize=%d is not allowed.",
                     _pooledBufferSize,
                     _bufferPool.getMaxSize(),
@@ -846,7 +1280,9 @@ public class QpidByteBuffer implements AutoCloseable
         _isPoolInitialized = true;
     }
 
-    /** Test use only */
+    /**
+     * Test use only
+     */
     public synchronized static void deinitialisePool()
     {
         if (_isPoolInitialized)
@@ -856,18 +1292,15 @@ public class QpidByteBuffer implements AutoCloseable
             _zeroed = null;
             _isPoolInitialized = false;
             _sparsityFraction = 1.0;
+            final QpidByteBufferFragment cachedBuffer = _cachedBuffer.get();
+            if (cachedBuffer != null)
+            {
+                cachedBuffer.dispose();
+                _cachedBuffer.remove();
+            }
         }
     }
 
-    /**
-     * Not for general use!
-     * Used to clear threadlocal buffer when shutting down thread pools.
-     */
-    public static QpidByteBuffer getCachedThreadLocalBuffer()
-    {
-        return _cachedBuffer.get();
-    }
-
     public static int getPooledBufferSize()
     {
         return _pooledBufferSize;
@@ -893,23 +1326,6 @@ public class QpidByteBuffer implements AutoCloseable
         return PooledByteBufferRef.getDisposalCounter();
     }
 
-    public static List<QpidByteBuffer> reallocateIfNecessary(Collection<QpidByteBuffer> data)
-    {
-        if (data != null)
-        {
-            List<QpidByteBuffer> newCopy = new ArrayList<>(data.size());
-            for (QpidByteBuffer buf : data)
-            {
-                newCopy.add(reallocateIfNecessary(buf));
-            }
-            return newCopy;
-        }
-        else
-        {
-            return null;
-        }
-    }
-
     public static QpidByteBuffer reallocateIfNecessary(final QpidByteBuffer data)
     {
         if (data != null && data.isDirect() && data.isSparse())
@@ -928,7 +1344,57 @@ public class QpidByteBuffer implements AutoCloseable
 
     boolean isSparse()
     {
-        return _ref.isSparse(_sparsityFraction);
+        for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+        {
+            final QpidByteBufferFragment fragment = _fragments.get(i);
+            if (fragment.isSparse())
+            {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public static QpidByteBuffer concatenate(final List<QpidByteBuffer> buffers)
+    {
+        final QpidByteBuffer qpidByteBuffer = new QpidByteBuffer();
+        for (QpidByteBuffer buffer : buffers)
+        {
+            for (QpidByteBufferFragment fragment : buffer._fragments)
+            {
+                qpidByteBuffer._fragments.add(fragment.slice());
+            }
+        }
+        return qpidByteBuffer;
+    }
+
+    public static QpidByteBuffer concatenate(QpidByteBuffer... buffers)
+    {
+        return concatenate(Arrays.asList(buffers));
+    }
+
+    public static QpidByteBuffer emptyQpidByteBuffer()
+    {
+        return EMPTY_QPID_BYTE_BUFFER.duplicate();
+    }
+
+    public static ThreadFactory createQpidByteBufferTrackingThreadFactory(final ThreadFactory factory)
+    {
+        return r -> factory.newThread(() -> {
+            try
+            {
+                r.run();
+            }
+            finally
+            {
+                final QpidByteBufferFragment cachedThreadLocalBuffer = _cachedBuffer.get();
+                if (cachedThreadLocalBuffer != null)
+                {
+                    cachedThreadLocalBuffer.dispose();
+                    _cachedBuffer.remove();
+                }
+            }
+        });
     }
 
     private static final class BufferInputStream extends InputStream
@@ -937,7 +1403,7 @@ public class QpidByteBuffer implements AutoCloseable
 
         private BufferInputStream(final QpidByteBuffer buffer)
         {
-            _qpidByteBuffer = buffer;
+            _qpidByteBuffer = buffer.duplicate();
         }
 
         @Override
@@ -945,7 +1411,7 @@ public class QpidByteBuffer implements AutoCloseable
         {
             if (_qpidByteBuffer.hasRemaining())
             {
-                return _qpidByteBuffer.get() & 0xFF;
+                return _qpidByteBuffer.getUnsignedByte();
             }
             return -1;
         }
@@ -1001,6 +1467,324 @@ public class QpidByteBuffer implements AutoCloseable
         @Override
         public void close()
         {
+            _qpidByteBuffer.dispose();
+        }
+    }
+
+
+    static class QpidByteBufferFragment implements AutoCloseable
+    {
+
+        private static final AtomicIntegerFieldUpdater<QpidByteBufferFragment>
+                DISPOSED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(
+                QpidByteBufferFragment.class,
+                "_disposed");
+
+        private final int _offset;
+
+        final ByteBufferRef _ref;
+        volatile ByteBuffer _buffer;
+        @SuppressWarnings("unused")
+        private volatile int _disposed;
+
+
+        QpidByteBufferFragment(ByteBufferRef ref)
+        {
+            this(ref, ref.getBuffer(), 0);
+        }
+
+        private QpidByteBufferFragment(ByteBufferRef ref, ByteBuffer buffer, int offset)
+        {
+            _ref = ref;
+            _buffer = buffer;
+            _offset = offset;
+            _ref.incrementRef(capacity());
+        }
+
+        public final boolean isDirect()
+        {
+            return _buffer.isDirect();
+        }
+
+        @Override
+        public final void close()
+        {
+            dispose();
+        }
+
+        public final void dispose()
+        {
+            if (DISPOSED_UPDATER.compareAndSet(this, 0, 1))
+            {
+                _ref.decrementRef(capacity());
+                _buffer = null;
+            }
+        }
+
+        public final CharBuffer decode(Charset charset)
+        {
+            return charset.decode(getUnderlyingBuffer());
+        }
+
+        @Override
+        public String toString()
+        {
+            return "QpidByteBufferFragment{" +
+                   "_buffer=" + _buffer +
+                   ", _disposed=" + _disposed +
+                   '}';
+        }
+
+        public final boolean hasRemaining()
+        {
+            return _buffer.hasRemaining();
+        }
+
+        public final QpidByteBufferFragment put(final byte b)
+        {
+            _buffer.put(b);
+            return this;
+        }
+
+        public QpidByteBufferFragment put(final int index, final byte b)
+        {
+            _buffer.put(index, b);
+            return this;
+        }
+
+        public final QpidByteBufferFragment mark()
+        {
+            _buffer.mark();
+            return this;
+        }
+
+        public final boolean hasArray()
+        {
+            return _buffer.hasArray();
+        }
+
+        public byte[] array()
+        {
+            return _buffer.array();
+        }
+
+        public final int remaining()
+        {
+            return _buffer.remaining();
+        }
+
+
+        public final QpidByteBufferFragment put(final ByteBuffer src)
+        {
+            _buffer.put(src);
+            return this;
+        }
+
+        public final QpidByteBufferFragment put(final QpidByteBufferFragment src)
+        {
+            int sourceRemaining = src.remaining();
+            if (sourceRemaining > remaining())
+            {
+                throw new BufferOverflowException();
+            }
+
+            _buffer.put(src.getUnderlyingBuffer());
+            return this;
+        }
+
+        public final QpidByteBufferFragment get(final byte[] dst, final int offset, final int length)
+        {
+            _buffer.get(dst, offset, length);
+            return this;
+        }
+
+        public QpidByteBufferFragment rewind()
+        {
+            _buffer.rewind();
+            return this;
+        }
+
+        public QpidByteBufferFragment clear()
+        {
+            _buffer.clear();
+            return this;
+        }
+
+        public QpidByteBufferFragment compact()
+        {
+            _buffer.compact();
+            return this;
+        }
+
+        public int limit()
+        {
+            return _buffer.limit();
+        }
+
+        public QpidByteBufferFragment reset()
+        {
+            _buffer.reset();
+            return this;
+        }
+
+        public QpidByteBufferFragment flip()
+        {
+            _buffer.flip();
+            return this;
+        }
+
+        public QpidByteBufferFragment limit(final int newLimit)
+        {
+            _buffer.limit(newLimit);
+            return this;
+        }
+
+        /**
+         * Method does not respect mark.
+         *
+         * @return QpidByteBufferFragment
+         */
+        public QpidByteBufferFragment duplicate()
+        {
+            ByteBuffer buffer = _ref.getBuffer();
+            if (!(_ref instanceof PooledByteBufferRef))
+            {
+                buffer = buffer.duplicate();
+            }
+
+            buffer.position(_offset);
+            buffer.limit(_offset + _buffer.capacity());
+
+            buffer = buffer.slice();
+
+            buffer.limit(_buffer.limit());
+            buffer.position(_buffer.position());
+            return new QpidByteBufferFragment(_ref, buffer, _offset);
+        }
+
+        public final QpidByteBufferFragment put(final byte[] src, final int offset, final int length)
+        {
+            _buffer.put(src, offset, length);
+            return this;
+        }
+
+
+        public int capacity()
+        {
+            return _buffer.capacity();
+        }
+
+
+        public final byte get()
+        {
+            return _buffer.get();
+        }
+
+        public byte get(final int index)
+        {
+            return _buffer.get(index);
+        }
+
+        public QpidByteBufferFragment position(final int newPosition)
+        {
+            _buffer.position(newPosition);
+            return this;
+        }
+
+        public QpidByteBufferFragment slice()
+        {
+            return view(0, _buffer.remaining());
+        }
+
+        public QpidByteBufferFragment view(int offset, int length)
+        {
+            ByteBuffer buffer = _ref.getBuffer();
+            if (!(_ref instanceof PooledByteBufferRef))
+            {
+                buffer = buffer.duplicate();
+            }
+
+            int newRemaining = Math.min(_buffer.remaining() - offset, length);
+
+            int newPosition = _offset + _buffer.position() + offset;
+            buffer.limit(newPosition + newRemaining);
+            buffer.position(newPosition);
+
+            buffer = buffer.slice();
+
+            return new QpidByteBufferFragment(_ref, buffer, newPosition);
+        }
+
+        public int position()
+        {
+            return _buffer.position();
+        }
+
+        ByteBuffer getUnderlyingBuffer()
+        {
+            return _buffer;
+        }
+
+        public static QpidByteBufferFragment allocate(boolean direct, int size)
+        {
+            return direct ? allocateDirect(size) : allocate(size);
+        }
+
+        public static QpidByteBufferFragment allocate(int size)
+        {
+            return new QpidByteBufferFragment(new NonPooledByteBufferRef(ByteBuffer.allocate(size)));
+        }
+
+        public static QpidByteBufferFragment allocateDirect(int size)
+        {
+            if (size < 0)
+            {
+                throw new IllegalArgumentException("Cannot allocate QpidByteBufferFragment with size "
+                                                   + size
+                                                   + " which is negative.");
+            }
+
+            final ByteBufferRef ref;
+            if (_isPoolInitialized && _pooledBufferSize >= size)
+            {
+                if (_pooledBufferSize == size)
+                {
+                    ByteBuffer buf = _bufferPool.getBuffer();
+                    if (buf == null)
+                    {
+                        buf = ByteBuffer.allocateDirect(size);
+                    }
+                    ref = new PooledByteBufferRef(buf);
+                }
+                else
+                {
+                    QpidByteBufferFragment buf = _cachedBuffer.get();
+                    if (buf == null || buf.remaining() < size)
+                    {
+                        if (buf != null)
+                        {
+                            buf.dispose();
+                        }
+                        buf = allocateDirect(_pooledBufferSize);
+                        _cachedBuffer.set(buf);
+                    }
+                    QpidByteBufferFragment rVal = buf.view(0, size);
+                    buf.position(buf.position() + size);
+
+                    return rVal;
+                }
+            }
+            else
+            {
+                ref = new NonPooledByteBufferRef(ByteBuffer.allocateDirect(size));
+            }
+            return new QpidByteBufferFragment(ref);
+        }
+
+        boolean isSparse()
+        {
+            return _ref.isSparse(_sparsityFraction);
         }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferInputStream.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferInputStream.java
deleted file mode 100644
index d7ce698..0000000
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferInputStream.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.server.bytebuffer;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-
-import org.apache.qpid.server.streams.CompositeInputStream;
-
-/**
- * InputStream implementation that takes a list QpidByteBuffers.
- * The QpidByteBufferInputStream takes ownership of the buffers and disposes them on close().
- *
- * Not thread safe.
- */
-public class QpidByteBufferInputStream extends InputStream
-{
-    private final CompositeInputStream _compositeInputStream;
-    private final Collection<QpidByteBuffer> _buffers;
-
-    public QpidByteBufferInputStream(Collection<QpidByteBuffer> buffers)
-    {
-        _buffers = buffers;
-
-        final Collection<InputStream> streams = new ArrayList<>(buffers.size());
-        for (QpidByteBuffer buffer : buffers)
-        {
-            streams.add(buffer.asInputStream());
-        }
-        _compositeInputStream = new CompositeInputStream(streams);
-    }
-
-    @Override
-    public int read() throws IOException
-    {
-        return _compositeInputStream.read();
-    }
-
-    @Override
-    public int read(byte[] b, int off, int len) throws IOException
-    {
-        return _compositeInputStream.read(b, off, len);
-    }
-
-    @Override
-    public void mark(int readlimit)
-    {
-        _compositeInputStream.mark(readlimit);
-    }
-
-    @Override
-    public void reset() throws IOException
-    {
-        _compositeInputStream.reset();
-    }
-
-    @Override
-    public boolean markSupported()
-    {
-        return _compositeInputStream.markSupported();
-    }
-
-    @Override
-    public long skip(long n) throws IOException
-    {
-        return _compositeInputStream.skip(n);
-    }
-
-    @Override
-    public int available() throws IOException
-    {
-        return _compositeInputStream.available();
-    }
-
-    @Override
-    public void close() throws IOException
-    {
-        try
-        {
-            _compositeInputStream.close();
-        }
-        finally
-        {
-            for (QpidByteBuffer buffer : _buffers)
-            {
-                buffer.dispose();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferOutputStream.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferOutputStream.java
index faaa332..08788df 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferOutputStream.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferOutputStream.java
@@ -23,23 +23,13 @@ package org.apache.qpid.server.bytebuffer;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.LinkedList;
 
-/**
- * OutputStream implementation that yields a list QpidByteBuffers that contain a copy
- * of the incoming bytes.  Use fetchAccumulatedBuffers to get the buffers.  Caller
- * has responsibility to dispose the buffers after use.
- *
- * It will be normally be desirable to front this stream with java.io.BufferedOutputStream
- * to minimise the number of write and thus the number of buffers created.
- *
- * Not thread safe.
- */
 public class QpidByteBufferOutputStream extends OutputStream
 {
     private final LinkedList<QpidByteBuffer> _buffers = new LinkedList<>();
+    private int _bufferPosition = 0;
+    private final byte[] _buffer;
     private final boolean _isDirect;
     private final int _maximumBufferSize;
     private boolean _closed;
@@ -52,14 +42,13 @@ public class QpidByteBufferOutputStream extends OutputStream
         }
         _isDirect = isDirect;
         _maximumBufferSize = maximumBufferSize;
+        _buffer = new byte[_maximumBufferSize];
     }
 
     @Override
     public void write(int b) throws IOException
     {
-        int size = 1;
-        byte[] data = new byte[] {(byte)b};
-        allocateDataBuffers(data, 0, size);
+        write(new byte[] {(byte)b});
     }
 
     @Override
@@ -78,18 +67,18 @@ public class QpidByteBufferOutputStream extends OutputStream
     public void close() throws IOException
     {
         _closed = true;
-        for (QpidByteBuffer buffer : _buffers)
-        {
-            buffer.dispose();
-        }
+        _buffers.forEach(QpidByteBuffer::dispose);
         _buffers.clear();
     }
 
-    public Collection<QpidByteBuffer> fetchAccumulatedBuffers()
+    QpidByteBuffer fetchAccumulatedBuffer()
     {
-        Collection<QpidByteBuffer> bufs = new ArrayList<>(_buffers);
-        _buffers.clear();
-        return bufs;
+        if (_bufferPosition != 0)
+        {
+            addSingleQpidByteBuffer(_buffer, 0, _bufferPosition);
+        }
+        final QpidByteBuffer combined = QpidByteBuffer.concatenate(_buffers);
+        return combined;
     }
 
     private void allocateDataBuffers(byte[] data, int offset, int len) throws IOException
@@ -99,15 +88,28 @@ public class QpidByteBufferOutputStream extends OutputStream
             throw new IOException("Stream is closed");
         }
 
-        int size = Math.min(_maximumBufferSize, len);
+        do
+        {
+            int bytesWeCanWrite = Math.min(_buffer.length - _bufferPosition, len);
+            System.arraycopy(data, offset, _buffer, _bufferPosition, bytesWeCanWrite);
+            offset += bytesWeCanWrite;
+            len -= bytesWeCanWrite;
+            _bufferPosition += bytesWeCanWrite;
+            if (_buffer.length == _bufferPosition)
+            {
+                addSingleQpidByteBuffer(_buffer, 0, _buffer.length);
+            }
+        } while (len != 0);
+    }
 
-        QpidByteBuffer current = _isDirect ? QpidByteBuffer.allocateDirect(len) : QpidByteBuffer.allocate(len);
-        current.put(data, offset, size);
+    private void addSingleQpidByteBuffer(final byte[] buffer, final int offset, final int length)
+    {
+        QpidByteBuffer current = _isDirect
+                ? QpidByteBuffer.allocateDirect(length)
+                : QpidByteBuffer.allocate(length);
+        current.put(buffer, offset, length);
         current.flip();
         _buffers.add(current);
-        if (len > size)
-        {
-            allocateDataBuffers(data, offset + size, len - size);
-        }
+        _bufferPosition = 0;
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferUtils.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferUtils.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferUtils.java
deleted file mode 100644
index a475ef7..0000000
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferUtils.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.bytebuffer;
-
-import java.nio.BufferUnderflowException;
-import java.util.List;
-
-public class QpidByteBufferUtils
-{
-    public static void dispose(List<QpidByteBuffer> in)
-    {
-        for (int i = 0, inSize = in.size(); i < inSize; i++)
-        {
-            final QpidByteBuffer qpidByteBuffer = in.get(i);
-            qpidByteBuffer.dispose();
-        }
-    }
-
-    public static boolean hasRemaining(List<QpidByteBuffer> in)
-    {
-        if (in.isEmpty())
-        {
-            return false;
-        }
-        for (int i = 0; i < in.size(); i++)
-        {
-            if (in.get(i).hasRemaining())
-            {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    public static long remaining(List<QpidByteBuffer> in)
-    {
-        long remaining = 0L;
-        for (int i = 0; i < in.size(); i++)
-        {
-            remaining += in.get(i).remaining();
-        }
-        return remaining;
-    }
-
-    public static byte get(List<QpidByteBuffer> in)
-    {
-        for (int i = 0; i < in.size(); i++)
-        {
-            final QpidByteBuffer buffer = in.get(i);
-            if (buffer.hasRemaining())
-            {
-                return buffer.get();
-            }
-        }
-        throw new BufferUnderflowException();
-    }
-
-    public static boolean hasRemaining(final List<QpidByteBuffer> in, int len)
-    {
-        for (int i = 0; i < in.size(); i++)
-        {
-            final QpidByteBuffer buffer = in.get(i);
-            int remaining = buffer.remaining();
-            if (remaining >= len)
-            {
-                return true;
-            }
-            len -= remaining;
-        }
-
-        return false;
-    }
-
-    public static long getLong(final List<QpidByteBuffer> in)
-    {
-        boolean bytewise = false;
-        int consumed = 0;
-        long result = 0L;
-        for (int i = 0; i < in.size(); i++)
-        {
-            final QpidByteBuffer buffer = in.get(i);
-            int remaining = buffer.remaining();
-            if (bytewise)
-            {
-                while (buffer.hasRemaining() && consumed < 8)
-                {
-                    result <<= 1;
-                    result |= (0xFF & buffer.get());
-                    consumed++;
-                }
-                if (consumed == 8)
-                {
-                    return result;
-                }
-            }
-            else
-            {
-                if (remaining >= 8)
-                {
-                    return buffer.getLong();
-                }
-                else if (remaining != 0)
-                {
-                    bytewise = true;
-                    while (buffer.hasRemaining())
-                    {
-                        result <<= 1;
-                        result |= (0xFF & buffer.get());
-                        consumed++;
-                    }
-                }
-            }
-        }
-        throw new BufferUnderflowException();
-    }
-
-    public static int getInt(final List<QpidByteBuffer> in)
-    {
-        boolean bytewise = false;
-        int consumed = 0;
-        int result = 0;
-        for (int i = 0; i < in.size(); i++)
-        {
-            final QpidByteBuffer buffer = in.get(i);
-            int remaining = buffer.remaining();
-            if (bytewise)
-            {
-                while (buffer.hasRemaining() && consumed < 4)
-                {
-                    result <<= 1;
-                    result |= (0xFF & buffer.get());
-                    consumed++;
-                }
-                if (consumed == 4)
-                {
-                    return result;
-                }
-            }
-            else
-            {
-                if (remaining >= 4)
-                {
-                    return buffer.getInt();
-                }
-                else if (remaining != 0)
-                {
-                    bytewise = true;
-                    while (buffer.hasRemaining())
-                    {
-                        result <<= 1;
-                        result |= (0xFF & buffer.get());
-                        consumed++;
-                    }
-                }
-            }
-        }
-        throw new BufferUnderflowException();
-    }
-
-    public static float getFloat(final List<QpidByteBuffer> in)
-    {
-        return Float.intBitsToFloat(getInt(in));
-    }
-
-    public static double getDouble(final List<QpidByteBuffer> in)
-    {
-        return Double.longBitsToDouble(getLong(in));
-    }
-
-    public static Short getShort(final List<QpidByteBuffer> in)
-    {
-        boolean bytewise = false;
-        int consumed = 0;
-        short result = 0;
-        for (int i = 0; i < in.size(); i++)
-        {
-            final QpidByteBuffer buffer = in.get(i);
-            int remaining = buffer.remaining();
-            if (bytewise)
-            {
-                while (buffer.hasRemaining() && consumed < 2)
-                {
-                    result <<= 1;
-                    result |= (0xFF & buffer.get());
-                    consumed++;
-                }
-                if (consumed == 2)
-                {
-                    return result;
-                }
-            }
-            else
-            {
-                if (remaining >= 2)
-                {
-                    return buffer.getShort();
-                }
-                else if (remaining != 0)
-                {
-                    bytewise = true;
-                    while (buffer.hasRemaining())
-                    {
-                        result <<= 1;
-                        result |= (0xFF & buffer.get());
-                        consumed++;
-                    }
-                }
-            }
-        }
-        throw new BufferUnderflowException();
-    }
-
-    public static int get(final List<QpidByteBuffer> in, final byte[] data)
-    {
-        int copied = 0;
-        int i = 0;
-        while (copied < data.length && i < in.size())
-        {
-            QpidByteBuffer buf = in.get(i);
-            if (buf.hasRemaining())
-            {
-                int remaining = buf.remaining();
-                if (remaining >= data.length - copied)
-                {
-                    buf.get(data, copied, data.length - copied);
-                    return data.length;
-                }
-                else
-                {
-                    buf.get(data, copied, remaining);
-                    copied += remaining;
-                }
-            }
-            i++;
-        }
-        return copied;
-    }
-
-    public static void skip(final List<QpidByteBuffer> in, int length)
-    {
-        int skipped = 0;
-        int i = 0;
-        while (skipped < length && i < in.size())
-        {
-            QpidByteBuffer buf = in.get(i);
-            if (buf.hasRemaining())
-            {
-                int remaining = buf.remaining();
-                if (remaining >= length - skipped)
-                {
-                    buf.position(buf.position() + length - skipped);
-                    return;
-                }
-                else
-                {
-                    buf.position(buf.position() + remaining);
-                    skipped += remaining;
-                }
-            }
-            i++;
-        }
-
-        if (skipped != length)
-        {
-            throw new BufferUnderflowException();
-        }
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org