You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/10/24 16:07:16 UTC

[2/3] qpid-broker-j git commit: QPID-7832: [Java Broker] Introduce QpidByteBuffer interface and split implementation into separate classes

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e64e2826/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
index e2a5ee7..bf7e40c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
@@ -1,4 +1,5 @@
 /*
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,1682 +18,291 @@
  * under the License.
  *
  */
-
 package org.apache.qpid.server.bytebuffer;
 
-import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.nio.BufferOverflowException;
-import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.InvalidMarkException;
 import java.nio.channels.GatheringByteChannel;
 import java.nio.channels.ScatteringByteChannel;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
 
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLEngineResult;
 import javax.net.ssl.SSLException;
 
-import com.google.common.primitives.Chars;
-import com.google.common.primitives.Ints;
-import com.google.common.primitives.Longs;
-import com.google.common.primitives.Shorts;
-
-public class QpidByteBuffer implements AutoCloseable
+public interface QpidByteBuffer extends AutoCloseable
 {
-    private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0];
-    private static final QpidByteBuffer EMPTY_QPID_BYTE_BUFFER = QpidByteBuffer.wrap(new byte[0]);
-    private static final ThreadLocal<QpidByteBufferFragment> _cachedBuffer = new ThreadLocal<>();
-    private volatile static boolean _isPoolInitialized;
-    private volatile static BufferPool _bufferPool;
-    private volatile static int _pooledBufferSize;
-    private volatile static double _sparsityFraction;
-    private volatile static ByteBuffer _zeroed;
-    private final QpidByteBufferFragment[] _fragments;
-    private volatile int _resetFragmentIndex = -1;
-
-    private QpidByteBuffer(final QpidByteBufferFragment... fragments)
+    static QpidByteBuffer allocate(boolean direct, int size)
     {
-        if (fragments == null)
-        {
-            throw new IllegalArgumentException();
-        }
-        _fragments = fragments;
+        return QpidByteBufferFactory.allocate(direct, size);
     }
 
-    private QpidByteBuffer(final List<QpidByteBufferFragment> fragments)
+    static QpidByteBuffer allocate(int size)
     {
-        if (fragments == null)
-        {
-            throw new IllegalArgumentException();
-        }
-        _fragments = fragments.toArray(new QpidByteBufferFragment[fragments.size()]);
+        return QpidByteBufferFactory.allocate(size);
     }
 
-    //////////////////
-    // Absolute puts
-    //////////////////
-
-    public QpidByteBuffer put(final int index, final byte b)
+    static QpidByteBuffer allocateDirect(int size)
     {
-        return put(index, new byte[]{b});
+        return QpidByteBufferFactory.allocateDirect(size);
     }
 
-    public QpidByteBuffer putShort(final int index, final short value)
+    static QpidByteBuffer asQpidByteBuffer(InputStream stream) throws IOException
     {
-        byte[] valueArray = Shorts.toByteArray(value);
-        return put(index, valueArray);
+        return QpidByteBufferFactory.asQpidByteBuffer(stream);
     }
 
-    public QpidByteBuffer putChar(final int index, final char value)
+    static SSLEngineResult encryptSSL(SSLEngine engine,
+                                      Collection<QpidByteBuffer> buffers,
+                                      QpidByteBuffer dest) throws SSLException
     {
-        byte[] valueArray = Chars.toByteArray(value);
-        return put(index, valueArray);
+        return QpidByteBufferFactory.encryptSSL(engine, buffers, dest);
     }
 
-    public QpidByteBuffer putInt(final int index, final int value)
-    {
-        byte[] valueArray = Ints.toByteArray(value);
-        return put(index, valueArray);
-    }
 
-    public QpidByteBuffer putLong(final int index, final long value)
+    static SSLEngineResult decryptSSL(SSLEngine engine, QpidByteBuffer src, QpidByteBuffer dst) throws SSLException
     {
-        byte[] valueArray = Longs.toByteArray(value);
-        return put(index, valueArray);
+        return QpidByteBufferFactory.decryptSSL(engine, src, dst);
     }
 
-    public QpidByteBuffer putFloat(final int index, final float value)
+    static QpidByteBuffer inflate(QpidByteBuffer compressedBuffer) throws IOException
     {
-        int intValue = Float.floatToRawIntBits(value);
-        return putInt(index, intValue);
+        return QpidByteBufferFactory.inflate(compressedBuffer);
     }
 
-    public QpidByteBuffer putDouble(final int index, final double value)
+    static QpidByteBuffer deflate(QpidByteBuffer uncompressedBuffer) throws IOException
     {
-        long longValue = Double.doubleToRawLongBits(value);
-        return putLong(index, longValue);
+        return QpidByteBufferFactory.deflate(uncompressedBuffer);
     }
 
-    public final QpidByteBuffer put(final int index, final byte[] src)
+    static long write(GatheringByteChannel channel, Collection<QpidByteBuffer> qpidByteBuffers)
+            throws IOException
     {
-        final int valueWidth = src.length;
-        if (index < 0 || index + valueWidth > limit())
-        {
-            throw new IndexOutOfBoundsException(String.format("index %d is out of bounds [%d, %d)", index, 0, limit()));
-        }
-
-        int written = 0;
-        int bytesToSkip = index;
-        for (int i = 0; i < _fragments.length && written != valueWidth; i++)
-        {
-            final QpidByteBufferFragment buffer = _fragments[i];
-            final int limit = buffer.limit();
-            boolean isLastFragmentToConsider = valueWidth + bytesToSkip - written <= limit;
-            if (!isLastFragmentToConsider && limit != buffer.capacity())
-            {
-                throw new IllegalStateException(String.format("Unexpected limit %d on fragment %d", limit, i));
-            }
-
-            if (bytesToSkip >= limit)
-            {
-                bytesToSkip -= limit;
-            }
-            else
-            {
-                final int bytesToCopy = Math.min(limit - bytesToSkip, valueWidth - written);
-                final int originalPosition = buffer.position();
-                buffer.position(bytesToSkip);
-                buffer.put(src, written, bytesToCopy);
-                buffer.position(originalPosition);
-                written += bytesToCopy;
-                bytesToSkip = 0;
-            }
-        }
-        if (valueWidth != written)
-        {
-            throw new BufferOverflowException();
-        }
-        return this;
+        return QpidByteBufferFactory.write(channel, qpidByteBuffers);
     }
 
-    ////////////////
-    // Relative Puts
-    ////////////////
-
-    public final QpidByteBuffer put(final byte b)
+    static QpidByteBuffer wrap(ByteBuffer wrap)
     {
-        return put(new byte[]{b});
+        return QpidByteBufferFactory.wrap(wrap);
     }
 
-    public final QpidByteBuffer putUnsignedByte(final short s)
+    static QpidByteBuffer wrap(byte[] data)
     {
-        put((byte) s);
-        return this;
+        return QpidByteBufferFactory.wrap(data);
     }
 
-    public final QpidByteBuffer putShort(final short value)
+    static QpidByteBuffer wrap(byte[] data, int offset, int length)
     {
-        byte[] valueArray = Shorts.toByteArray(value);
-        return put(valueArray);
+        return QpidByteBufferFactory.wrap(data, offset, length);
     }
 
-    public final QpidByteBuffer putUnsignedShort(final int i)
+    static void initialisePool(int bufferSize, int maxPoolSize, double sparsityFraction)
     {
-        putShort((short) i);
-        return this;
+        QpidByteBufferFactory.initialisePool(bufferSize, maxPoolSize, sparsityFraction);
     }
 
-    public final QpidByteBuffer putChar(final char value)
+    /**
+     * Test use only
+     */
+    static void deinitialisePool()
     {
-        byte[] valueArray = Chars.toByteArray(value);
-        return put(valueArray);
+        QpidByteBufferFactory.deinitialisePool();
     }
 
-    public final QpidByteBuffer putInt(final int value)
+    static void returnToPool(ByteBuffer buffer)
     {
-        byte[] valueArray = Ints.toByteArray(value);
-        return put(valueArray);
+        QpidByteBufferFactory.returnToPool(buffer);
     }
 
-    public final QpidByteBuffer putUnsignedInt(final long value)
+    static int getPooledBufferSize()
     {
-        putInt((int) value);
-        return this;
+        return QpidByteBufferFactory.getPooledBufferSize();
     }
 
-    public final QpidByteBuffer putLong(final long value)
+    static long getAllocatedDirectMemorySize()
     {
-        byte[] valueArray = Longs.toByteArray(value);
-        return put(valueArray);
+        return QpidByteBufferFactory.getAllocatedDirectMemorySize();
     }
 
-    public final QpidByteBuffer putFloat(final float value)
+    static int getNumberOfActivePooledBuffers()
     {
-        int intValue = Float.floatToRawIntBits(value);
-        return putInt(intValue);
+        return QpidByteBufferFactory.getNumberOfActivePooledBuffers();
     }
 
-    public final QpidByteBuffer putDouble(final double value)
+    static int getNumberOfPooledBuffers()
     {
-        long longValue = Double.doubleToRawLongBits(value);
-        return putLong(longValue);
+        return QpidByteBufferFactory.getNumberOfPooledBuffers();
     }
 
-    public final QpidByteBuffer put(byte[] src)
+    static long getPooledBufferDisposalCounter()
     {
-        return put(src, 0, src.length);
+        return QpidByteBufferFactory.getPooledBufferDisposalCounter();
     }
 
-    public final QpidByteBuffer put(final byte[] src, final int offset, final int length)
+    static QpidByteBuffer reallocateIfNecessary(QpidByteBuffer data)
     {
-        if (_fragments.length == 1)
-        {
-            _fragments[0].put(src, offset, length);
-            return this;
-        }
-
-        if (!hasRemaining(length))
-        {
-            throw new BufferOverflowException();
-        }
-
-        int written = 0;
-        for (int i = 0; i < _fragments.length && written != length; i++)
-        {
-            final QpidByteBufferFragment buffer = _fragments[i];
-            int bytesToWrite = Math.min(buffer.remaining(), length - written);
-            buffer.put(src, offset + written, bytesToWrite);
-            written += bytesToWrite;
-        }
-        if (written != length)
-        {
-            throw new IllegalStateException(String.format("Unexpectedly only wrote %d of %d bytes.", written, length));
-        }
-        return this;
+        return QpidByteBufferFactory.reallocateIfNecessary(data);
     }
 
-    public final QpidByteBuffer put(final ByteBuffer src)
+    static QpidByteBuffer concatenate(List<QpidByteBuffer> buffers)
     {
-        if (_fragments.length == 1)
-        {
-            _fragments[0].put(src);
-            return this;
-        }
-
-        final int valueWidth = src.remaining();
-        if (!hasRemaining(valueWidth))
-        {
-            throw new BufferOverflowException();
-        }
-
-        int written = 0;
-        for (int i = 0; i < _fragments.length && written != valueWidth; i++)
-        {
-            final QpidByteBufferFragment dstFragment = _fragments[i];
-            if (dstFragment.hasRemaining())
-            {
-                final int srcFragmentRemaining = src.remaining();
-                final int dstFragmentRemaining = dstFragment.remaining();
-                if (dstFragmentRemaining >= srcFragmentRemaining)
-                {
-                    dstFragment.put(src);
-                    written += srcFragmentRemaining;
-                }
-                else
-                {
-                    int srcOriginalLimit = src.limit();
-                    src.limit(src.position() + dstFragmentRemaining);
-                    dstFragment.put(src);
-                    src.limit(srcOriginalLimit);
-                    written += dstFragmentRemaining;
-                }
-            }
-        }
-        if (written != valueWidth)
-        {
-            throw new IllegalStateException(String.format("Unexpectedly only wrote %d of %d bytes.", written, valueWidth));
-        }
-        return this;
+        return QpidByteBufferFactory.concatenate(buffers);
     }
 
-    public final QpidByteBuffer put(final QpidByteBuffer src)
+    static QpidByteBuffer concatenate(QpidByteBuffer... buffers)
     {
-        if (_fragments.length == 1 && src._fragments.length == 1)
-        {
-            _fragments[0].put(src._fragments[0]);
-            return this;
-        }
-
-        final int valueWidth = src.remaining();
-        if (!hasRemaining(valueWidth))
-        {
-            throw new BufferOverflowException();
-        }
-        int i = 0;
-        int written = 0;
-        int size = _fragments.length;
-        final QpidByteBufferFragment[] fragments = src._fragments;
-        for (int i1 = 0; i1 < fragments.length; i1++)
-        {
-            final QpidByteBufferFragment srcFragment = fragments[i1];
-            for (; i < size; i++)
-            {
-                final QpidByteBufferFragment dstFragment = _fragments[i];
-                if (dstFragment.hasRemaining())
-                {
-                    final int srcFragmentRemaining = srcFragment.remaining();
-                    final int dstFragmentRemaining = dstFragment.remaining();
-                    if (dstFragmentRemaining >= srcFragmentRemaining)
-                    {
-                        dstFragment.put(srcFragment);
-                        written += srcFragmentRemaining;
-                        break;
-                    }
-                    else
-                    {
-                        int srcOriginalLimit = srcFragment.limit();
-                        srcFragment.limit(srcFragment.position() + dstFragmentRemaining);
-                        dstFragment.put(srcFragment);
-                        srcFragment.limit(srcOriginalLimit);
-                        written += dstFragmentRemaining;
-                    }
-                }
-            }
-        }
-        if (written != valueWidth)
-        {
-            throw new IllegalStateException(String.format("Unexpectedly only wrote %d of %d bytes.", written, valueWidth));
-        }
-        return this;
+        return QpidByteBufferFactory.concatenate(buffers);
     }
 
-    ///////////////////
-    // Absolute Gets
-    ///////////////////
-
-    public byte get(final int index)
+    static QpidByteBuffer emptyQpidByteBuffer()
     {
-        final byte[] byteArray = getByteArray(index, 1);
-        return byteArray[0];
+        return QpidByteBufferFactory.emptyQpidByteBuffer();
     }
 
-    public short getShort(final int index)
+    static ThreadFactory createQpidByteBufferTrackingThreadFactory(ThreadFactory factory)
     {
-        final byte[] byteArray = getByteArray(index, 2);
-        return Shorts.fromByteArray(byteArray);
+        return QpidByteBufferFactory.createQpidByteBufferTrackingThreadFactory(factory);
     }
 
-    public final int getUnsignedShort(int index)
-    {
-        return ((int) getShort(index)) & 0xFFFF;
-    }
+    @Override
+    void close();
 
-    public char getChar(final int index)
-    {
-        final byte[] byteArray = getByteArray(index, 2);
-        return Chars.fromByteArray(byteArray);
-    }
+    QpidByteBuffer put(int index, byte b);
 
-    public int getInt(final int index)
-    {
-        final byte[] byteArray = getByteArray(index, 4);
-        return Ints.fromByteArray(byteArray);
-    }
+    QpidByteBuffer putShort(int index, short value);
 
-    public long getLong(final int index)
-    {
-        final byte[] byteArray = getByteArray(index, 8);
-        return Longs.fromByteArray(byteArray);
-    }
+    QpidByteBuffer putChar(int index, char value);
 
-    public float getFloat(final int index)
-    {
-        final int intValue = getInt(index);
-        return Float.intBitsToFloat(intValue);
-    }
+    QpidByteBuffer putInt(int index, int value);
 
-    public double getDouble(final int index)
-    {
-        final long longValue = getLong(index);
-        return Double.longBitsToDouble(longValue);
-    }
+    QpidByteBuffer putLong(int index, long value);
 
-    private byte[] getByteArray(final int index, final int length)
-    {
-        if (index < 0 || index + length > limit())
-        {
-            throw new IndexOutOfBoundsException(String.format("%d bytes at index %d do not fit into bounds [%d, %d)", length, index, 0, limit()));
-        }
-
-        byte[] value = new byte[length];
-
-        int consumed = 0;
-        int bytesToSkip = index;
-        for (int i = 0; i < _fragments.length && consumed != length; i++)
-        {
-            final QpidByteBufferFragment buffer = _fragments[i];
-            final int limit = buffer.limit();
-            boolean isLastFragmentToConsider = length + bytesToSkip - consumed <= limit;
-            if (!isLastFragmentToConsider && limit != buffer.capacity())
-            {
-                throw new IllegalStateException(String.format("Unexpectedly limit %d on fragment %d.", limit, i));
-            }
-
-            if (bytesToSkip >= limit)
-            {
-                bytesToSkip -= limit;
-            }
-            else
-            {
-                final int bytesToCopy = Math.min(limit - bytesToSkip, length - consumed);
-                final int originalPosition = buffer.position();
-                buffer.position(bytesToSkip);
-                buffer.get(value, consumed, bytesToCopy);
-                buffer.position(originalPosition);
-                consumed += bytesToCopy;
-                bytesToSkip = 0;
-            }
-        }
-        if (consumed != length)
-        {
-            throw new IllegalStateException(String.format("Unexpectedly only consumed %d of %d bytes.", consumed, length));
-        }
-        return value;
-    }
+    QpidByteBuffer putFloat(int index, float value);
 
-    //////////////////
-    // Relative Gets
-    //////////////////
+    QpidByteBuffer putDouble(int index, double value);
 
-    public final byte get()
-    {
-        byte[] value = new byte[1];
-        get(value, 0, 1);
-        return value[0];
-    }
+    QpidByteBuffer put(byte b);
 
-    public final short getUnsignedByte()
-    {
-        return (short) (get() & 0xFF);
-    }
+    QpidByteBuffer putUnsignedByte(short s);
 
-    public final short getShort()
-    {
-        byte[] value = new byte[2];
-        get(value, 0, value.length);
-        return Shorts.fromByteArray(value);
-    }
+    QpidByteBuffer putShort(short value);
 
-    public final int getUnsignedShort()
-    {
-        return ((int) getShort()) & 0xFFFF;
-    }
+    QpidByteBuffer putUnsignedShort(int i);
 
-    public final char getChar()
-    {
-        byte[] value = new byte[2];
-        get(value, 0, value.length);
-        return Chars.fromByteArray(value);
-    }
+    QpidByteBuffer putChar(char value);
 
-    public final int getInt()
-    {
-        byte[] value = new byte[4];
-        get(value, 0, value.length);
-        return Ints.fromByteArray(value);
-    }
+    QpidByteBuffer putInt(int value);
 
-    public final long getUnsignedInt()
-    {
-        return ((long) getInt()) & 0xFFFFFFFFL;
-    }
+    QpidByteBuffer putUnsignedInt(long value);
 
-    public final long getLong()
-    {
-        byte[] value = new byte[8];
-        get(value, 0, value.length);
-        return Longs.fromByteArray(value);
-    }
+    QpidByteBuffer putLong(long value);
 
-    public final float getFloat()
-    {
-        final int intValue = getInt();
-        return Float.intBitsToFloat(intValue);
-    }
+    QpidByteBuffer putFloat(float value);
 
-    public final double getDouble()
-    {
-        final long longValue = getLong();
-        return Double.longBitsToDouble(longValue);
-    }
+    QpidByteBuffer putDouble(double value);
 
-    public final QpidByteBuffer get(final byte[] dst)
-    {
-        return get(dst, 0, dst.length);
-    }
+    QpidByteBuffer put(byte[] src);
 
-    public final QpidByteBuffer get(final byte[] dst, final int offset, final int length)
-    {
-        if (_fragments.length == 1)
-        {
-            _fragments[0].get(dst, offset, length);
-            return this;
-        }
-
-        if (!hasRemaining(length))
-        {
-            throw new BufferUnderflowException();
-        }
-
-        int consumed = 0;
-        for (int i = 0; i < _fragments.length && consumed != length; i++)
-        {
-            final QpidByteBufferFragment buffer = _fragments[i];
-            int bytesToCopy = Math.min(buffer.remaining(), length - consumed);
-            buffer.get(dst, offset + consumed, bytesToCopy);
-            consumed += bytesToCopy;
-        }
-        if (consumed != length)
-        {
-            throw new IllegalStateException(String.format("Unexpectedly only consumed %d of %d bytes.", consumed, length));
-        }
-        return this;
-    }
+    QpidByteBuffer put(byte[] src, int offset, int length);
 
-    ///////////////
-    // Other stuff
-    ////////////////
+    QpidByteBuffer put(ByteBuffer src);
 
-    public final void copyTo(final byte[] dst)
-    {
-        final int remaining = remaining();
-        if (remaining < dst.length)
-        {
-            throw new BufferUnderflowException();
-        }
-        if (remaining > dst.length)
-        {
-            throw new BufferOverflowException();
-        }
-        int offset = 0;
-        for (QpidByteBufferFragment fragment : _fragments)
-        {
-            final int length = Math.min(fragment.remaining(), dst.length - offset);
-            fragment._buffer.duplicate().get(dst, offset, length);
-            offset += length;
-        }
-    }
+    QpidByteBuffer put(QpidByteBuffer src);
 
-    public final void copyTo(final ByteBuffer dst)
-    {
-        if (dst.remaining() < remaining())
-        {
-            throw new BufferOverflowException();
-        }
-        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
-        {
-            final QpidByteBufferFragment fragment = _fragments[i];
-            dst.put(fragment._buffer.duplicate());
-        }
-    }
+    byte get(int index);
 
-    public final void putCopyOf(final QpidByteBuffer source)
-    {
-        int sourceRemaining = source.remaining();
-        if (!hasRemaining(sourceRemaining))
-        {
-            throw new BufferOverflowException();
-        }
-        for (int i = 0, fragmentsSize = source._fragments.length; i < fragmentsSize; i++)
-        {
-            final QpidByteBufferFragment srcFragment = source._fragments[i];
-            put(srcFragment._buffer.duplicate());
-        }
-    }
+    short getShort(int index);
 
-    public final boolean isDirect()
-    {
-        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
-        {
-            final QpidByteBufferFragment fragment = _fragments[i];
-            if (!fragment.isDirect())
-            {
-                return false;
-            }
-        }
-        return true;
-    }
+    int getUnsignedShort(int index);
 
-    @Override
-    public final void close()
-    {
-        dispose();
-    }
+    char getChar(int index);
 
-    public final void dispose()
-    {
-        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
-        {
-            final QpidByteBufferFragment fragment = _fragments[i];
-            fragment.dispose();
-        }
-    }
+    int getInt(int index);
 
-    public final InputStream asInputStream()
-    {
-        return new QpidByteBuffer.BufferInputStream(this);
-    }
+    long getLong(int index);
 
-    public final long read(ScatteringByteChannel channel) throws IOException
-    {
-        ByteBuffer[] byteBuffers = new ByteBuffer[_fragments.length];
-        for (int i = 0; i < byteBuffers.length; i++)
-        {
-            final QpidByteBufferFragment fragment = _fragments[i];
-            byteBuffers[i] = fragment.getUnderlyingBuffer();
-        }
-        return channel.read(byteBuffers);
-    }
+    float getFloat(int index);
 
-    @Override
-    public String toString()
-    {
-        return "QpidByteBuffer{" + _fragments.length + " fragments}";
-    }
+    double getDouble(int index);
 
-    public QpidByteBuffer reset()
-    {
-        if (_resetFragmentIndex < 0)
-        {
-            throw new InvalidMarkException();
-        }
-        final QpidByteBufferFragment fragment = _fragments[_resetFragmentIndex];
-        fragment.reset();
-        for (int i = _resetFragmentIndex + 1, size = _fragments.length; i < size; ++i)
-        {
-            _fragments[i].position(0);
-        }
-        return this;
-    }
+    byte get();
 
-    public QpidByteBuffer rewind()
-    {
-        _resetFragmentIndex = -1;
-        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
-        {
-            final QpidByteBufferFragment fragment = _fragments[i];
-            fragment.rewind();
-        }
-        return this;
-    }
+    short getUnsignedByte();
 
-    public final boolean hasArray()
-    {
-        return _fragments.length == 1 && _fragments[0].hasArray();
-    }
+    short getShort();
 
-    public byte[] array()
-    {
-        if (!hasArray())
-        {
-            throw new UnsupportedOperationException("This QpidByteBuffer is not backed by an array.");
-        }
-        return _fragments[0].array();
-    }
+    int getUnsignedShort();
 
-    public QpidByteBuffer clear()
-    {
-        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
-        {
-            _fragments[i].clear();
-        }
-        return this;
-    }
+    char getChar();
 
-    public QpidByteBuffer compact()
-    {
-        if (_fragments.length == 1)
-        {
-            _fragments[0].compact();
-        }
-        else
-        {
-            int position = position();
-            int limit = limit();
-            if (position != 0)
-            {
-                int dstPos = 0;
-                for (int srcPos = position; srcPos < limit; ++srcPos, ++dstPos)
-                {
-                    put(dstPos, get(srcPos));
-                }
-                position(dstPos);
-                limit(capacity());
-            }
-        }
-        _resetFragmentIndex = -1;
-        return this;
-    }
+    int getInt();
 
-    public int position()
-    {
-        int totalPosition = 0;
-        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
-        {
-            final QpidByteBufferFragment fragment = _fragments[i];
-            totalPosition += fragment.position();
-            if (fragment.position() != fragment.limit())
-            {
-                break;
-            }
-        }
-        return totalPosition;
-    }
+    long getUnsignedInt();
 
-    public QpidByteBuffer position(int newPosition)
-    {
-        if (newPosition < 0 || newPosition > limit())
-        {
-            throw new IllegalArgumentException(String.format("new position %d is out of bounds [%d, %d)", newPosition, 0, limit()));
-        }
-        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
-        {
-            final QpidByteBufferFragment fragment = _fragments[i];
-            final int fragmentLimit = fragment.limit();
-            if (newPosition <= fragmentLimit)
-            {
-                fragment.position(newPosition);
-                newPosition = 0;
-            }
-            else
-            {
-                if (fragmentLimit != fragment.capacity())
-                {
-                    throw new IllegalStateException(String.format("QBB Fragment %d has limit %d != capacity %d",
-                                                                  i,
-                                                                  fragmentLimit,
-                                                                  fragment.capacity()));
-                }
-                fragment.position(fragmentLimit);
-                newPosition -= fragmentLimit;
-            }
-        }
-        return this;
-    }
+    long getLong();
 
-    public int limit()
-    {
-        int totalLimit = 0;
-        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
-        {
-            final QpidByteBufferFragment fragment = _fragments[i];
-            final int fragmentLimit = fragment.limit();
-            totalLimit += fragmentLimit;
-            if (fragmentLimit != fragment.capacity())
-            {
-                break;
-            }
-        }
-
-        return totalLimit;
-    }
+    float getFloat();
 
-    public QpidByteBuffer limit(int newLimit)
-    {
-        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
-        {
-            final QpidByteBufferFragment fragment = _fragments[i];
-            final int fragmentCapacity = fragment.capacity();
-            final int fragmentLimit = Math.min(newLimit, fragmentCapacity);
-            fragment.limit(fragmentLimit);
-            newLimit -= fragmentLimit;
-        }
-        return this;
-    }
+    double getDouble();
 
-    public final QpidByteBuffer mark()
-    {
-        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
-        {
-            final QpidByteBufferFragment fragment = _fragments[i];
-            if (fragment.position() != fragment.limit())
-            {
-                fragment.mark();
-                _resetFragmentIndex = i;
-                return this;
-            }
-        }
-        _resetFragmentIndex = _fragments.length - 1;
-        _fragments[_resetFragmentIndex].mark();
-        return this;
-    }
+    QpidByteBuffer get(byte[] dst);
 
-    public final int remaining()
-    {
-        int remaining = 0;
-        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
-        {
-            final QpidByteBufferFragment fragment = _fragments[i];
-            remaining += fragment.remaining();
-        }
-        return remaining;
-    }
+    QpidByteBuffer get(byte[] dst, int offset, int length);
 
-    public final boolean hasRemaining()
-    {
-        return hasRemaining(1);
-    }
-
-    public final boolean hasRemaining(int atLeast)
-    {
-        if (atLeast == 0)
-        {
-            return true;
-        }
-        int remaining = 0;
-        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
-        {
-            final QpidByteBufferFragment fragment = _fragments[i];
-            remaining += fragment.remaining();
-            if (remaining >= atLeast)
-            {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    public QpidByteBuffer flip()
-    {
-        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
-        {
-            final QpidByteBufferFragment fragment = _fragments[i];
-            fragment.flip();
-        }
-        return this;
-    }
-
-    public int capacity()
-    {
-        int totalCapacity = 0;
-        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
-        {
-            totalCapacity += _fragments[i].capacity();
-        }
-        return totalCapacity;
-    }
-
-    /**
-     * Method does not respect mark.
-     *
-     * @return QpidByteBuffer
-     */
-    public QpidByteBuffer duplicate()
-    {
-        final QpidByteBufferFragment[] fragments = new QpidByteBufferFragment[_fragments.length];
-        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
-        {
-            fragments[i] =_fragments[i].duplicate();
-        }
-        return new QpidByteBuffer(fragments);
-    }
-
-    public QpidByteBuffer slice()
-    {
-        return view(0, remaining());
-    }
-
-    public QpidByteBuffer view(int offset, int length)
-    {
-        if (offset + length > remaining())
-        {
-            throw new IllegalArgumentException(String.format("offset: %d, length: %d, remaining: %d", offset, length, remaining()));
-        }
-
-        if (_fragments.length == 1)
-        {
-            return new QpidByteBuffer(_fragments[0].view(offset, length));
-        }
-
-        final List<QpidByteBufferFragment> fragments = new ArrayList<>(_fragments.length);
-
-        boolean firstFragmentToBeConsidered = true;
-        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize && length > 0; i++)
-        {
-            final QpidByteBufferFragment fragment = _fragments[i];
-            if (fragment.hasRemaining())
-            {
-                if (!firstFragmentToBeConsidered && fragment.position() != 0)
-                {
-                    throw new IllegalStateException(String.format("Unexpectedly position %d on fragment %d.", fragment.position(), i));
-                }
-                firstFragmentToBeConsidered = false;
-                final int fragmentRemaining = fragment.remaining();
-                if (fragmentRemaining > offset)
-                {
-                    final int fragmentViewLength = Math.min(fragmentRemaining - offset, length);
-                    fragments.add(fragment.view(offset, fragmentViewLength));
-                    length -= fragmentViewLength;
-                    offset = 0;
-                }
-                else
-                {
-                    offset -= fragmentRemaining;
-                }
-            }
-        }
-
-        return new QpidByteBuffer(fragments);
-    }
+    void copyTo(byte[] dst);
 
-    private ByteBuffer[] getUnderlyingBuffers()
-    {
-        ByteBuffer[] byteBuffers = new ByteBuffer[_fragments.length];
-        for (int i = 0; i < _fragments.length; i++)
-        {
-            byteBuffers[i] = _fragments[i].getUnderlyingBuffer();
-        }
-        return byteBuffers;
+    void copyTo(ByteBuffer dst);
 
-    }
-
-    public static QpidByteBuffer allocate(boolean direct, int size)
-    {
-        return direct ? allocateDirect(size) : allocate(size);
-    }
-
-    public static QpidByteBuffer allocate(int size)
-    {
-        return new QpidByteBuffer(new QpidByteBufferFragment[]{QpidByteBufferFragment.allocate(size)});
-    }
-
-    public static QpidByteBuffer allocateDirect(int size)
-    {
-        if (size < 0)
-        {
-            throw new IllegalArgumentException("Cannot allocate QpidByteBufferFragment with size "
-                                               + size
-                                               + " which is negative.");
-        }
-
-        if (_isPoolInitialized)
-        {
-            List<QpidByteBufferFragment> fragments = new ArrayList<>();
-            int allocatedSize = 0;
-            while (size - allocatedSize >= _pooledBufferSize)
-            {
-                fragments.add(QpidByteBufferFragment.allocateDirect(_pooledBufferSize));
-                allocatedSize += _pooledBufferSize;
-            }
-            if (allocatedSize != size)
-            {
-                fragments.add(QpidByteBufferFragment.allocateDirect(size - allocatedSize));
-            }
-            return new QpidByteBuffer(fragments);
-        }
-        else
-        {
-            return allocate(size);
-        }
-    }
-
-    private static QpidByteBuffer asQpidByteBuffer(final byte[] data, final int offset, final int length)
-    {
-        try (QpidByteBufferOutputStream outputStream = new QpidByteBufferOutputStream(true, getPooledBufferSize()))
-        {
-            outputStream.write(data, offset, length);
-            return outputStream.fetchAccumulatedBuffer();
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException("unexpected Error converting array to QpidByteBuffers", e);
-        }
-    }
-
-    public static QpidByteBuffer asQpidByteBuffer(final InputStream stream) throws IOException
-    {
-        final List<QpidByteBufferFragment> fragments = new ArrayList<>();
-        final int pooledBufferSize = QpidByteBuffer.getPooledBufferSize();
-        byte[] transferBuf = new byte[pooledBufferSize];
-        int readFragment = 0;
-        int read = stream.read(transferBuf, readFragment, pooledBufferSize - readFragment);
-        while (read > 0)
-        {
-            readFragment += read;
-            if (readFragment == pooledBufferSize)
-            {
-                QpidByteBufferFragment fragment = QpidByteBufferFragment.allocateDirect(pooledBufferSize);
-                fragment.put(transferBuf, 0, pooledBufferSize);
-                fragment.flip();
-                fragments.add(fragment);
-                readFragment = 0;
-            }
-            read = stream.read(transferBuf, readFragment, pooledBufferSize - readFragment);
-        }
-        if (readFragment != 0)
-        {
-            QpidByteBufferFragment fragment = QpidByteBufferFragment.allocateDirect(readFragment);
-            fragment.put(transferBuf, 0, readFragment);
-            fragment.flip();
-            fragments.add(fragment);
-        }
-        return new QpidByteBuffer(fragments);
-    }
-
-    public final SSLEngineResult decryptSSL(SSLEngine engine, QpidByteBuffer dst) throws SSLException
-    {
-        final ByteBuffer[] dstUnderlyingBuffers = dst.getUnderlyingBuffers();
-        final ByteBuffer[] underlyingBuffers = getUnderlyingBuffers();
-        if (underlyingBuffers.length != 1)
-        {
-            throw new IllegalStateException("Expected single fragment buffer");
-        }
-        return engine.unwrap(underlyingBuffers[0], dstUnderlyingBuffers);
-    }
-
-    public static SSLEngineResult encryptSSL(SSLEngine engine,
-                                             final Collection<QpidByteBuffer> buffers,
-                                             QpidByteBuffer dest) throws SSLException
-    {
-        final ByteBuffer[] src;
-        // QPID-7447: prevent unnecessary allocations
-        if (buffers.isEmpty())
-        {
-            src = EMPTY_BYTE_BUFFER_ARRAY;
-        }
-        else
-        {
-            List<ByteBuffer> buffers_ = new LinkedList<>();
-            for (QpidByteBuffer buffer : buffers)
-            {
-                Collections.addAll(buffers_, buffer.getUnderlyingBuffers());
-            }
-            src = buffers_.toArray(new ByteBuffer[buffers_.size()]);
-        }
-        final ByteBuffer[] dstUnderlyingBuffers = dest.getUnderlyingBuffers();
-        if (dstUnderlyingBuffers.length != 1)
-        {
-            throw new IllegalStateException("Expected a single fragment output buffer");
-        }
-        return engine.wrap(src, dstUnderlyingBuffers[0]);
-    }
-
-    public static QpidByteBuffer inflate(QpidByteBuffer compressedBuffer) throws IOException
-    {
-        if (compressedBuffer == null)
-        {
-            throw new IllegalArgumentException("compressedBuffer cannot be null");
-        }
-
-        boolean isDirect = compressedBuffer.isDirect();
-        final int bufferSize = (isDirect && _pooledBufferSize > 0) ? _pooledBufferSize : 65536;
-
-        List<QpidByteBuffer> uncompressedBuffers = new ArrayList<>();
-        try (GZIPInputStream gzipInputStream = new GZIPInputStream(compressedBuffer.asInputStream()))
-        {
-            byte[] buf = new byte[bufferSize];
-            int read;
-            while ((read = gzipInputStream.read(buf)) != -1)
-            {
-                uncompressedBuffers.add(QpidByteBuffer.asQpidByteBuffer(buf, 0, read));
-            }
-            return QpidByteBuffer.concatenate(uncompressedBuffers);
-        }
-        finally
-        {
-            uncompressedBuffers.forEach(QpidByteBuffer::dispose);
-        }
-    }
+    void putCopyOf(QpidByteBuffer source);
 
-    public static QpidByteBuffer deflate(QpidByteBuffer uncompressedBuffer) throws IOException
-    {
-        if (uncompressedBuffer == null)
-        {
-            throw new IllegalArgumentException("uncompressedBuffer cannot be null");
-        }
-
-        boolean isDirect = uncompressedBuffer.isDirect();
-        final int bufferSize = (isDirect && _pooledBufferSize > 0) ? _pooledBufferSize : 65536;
-
-        try (QpidByteBufferOutputStream compressedOutput = new QpidByteBufferOutputStream(isDirect, bufferSize);
-             InputStream compressedInput = uncompressedBuffer.asInputStream();
-             GZIPOutputStream gzipStream = new GZIPOutputStream(new BufferedOutputStream(compressedOutput,
-                                                                                         bufferSize)))
-        {
-            byte[] buf = new byte[16384];
-            int read;
-            while ((read = compressedInput.read(buf)) > -1)
-            {
-                gzipStream.write(buf, 0, read);
-            }
-            gzipStream.finish();
-            gzipStream.flush();
-            return compressedOutput.fetchAccumulatedBuffer();
-        }
-    }
+    boolean isDirect();
 
-    public static long write(GatheringByteChannel channel, Collection<QpidByteBuffer> qpidByteBuffers)
-            throws IOException
-    {
-        List<ByteBuffer> byteBuffers = new ArrayList<>();
-        for (QpidByteBuffer qpidByteBuffer : qpidByteBuffers)
-        {
-            Collections.addAll(byteBuffers, qpidByteBuffer.getUnderlyingBuffers());
-        }
-        return channel.write(byteBuffers.toArray(new ByteBuffer[byteBuffers.size()]));
-    }
+    void dispose();
 
-    public static QpidByteBuffer wrap(final ByteBuffer wrap)
-    {
-        return new QpidByteBuffer(new QpidByteBufferFragment[]{new QpidByteBufferFragment(new NonPooledByteBufferRef( wrap))});
-    }
+    InputStream asInputStream();
 
-    public static QpidByteBuffer wrap(final byte[] data)
-    {
-        return wrap(ByteBuffer.wrap(data));
-    }
+    long read(ScatteringByteChannel channel) throws IOException;
 
-    public static QpidByteBuffer wrap(final byte[] data, int offset, int length)
-    {
-        return wrap(ByteBuffer.wrap(data, offset, length));
-    }
+    QpidByteBuffer reset();
 
-    static void returnToPool(final ByteBuffer buffer)
-    {
-        buffer.clear();
-        if (_isPoolInitialized)
-        {
-            final ByteBuffer duplicate = _zeroed.duplicate();
-            duplicate.limit(buffer.capacity());
-            buffer.put(duplicate);
-
-            _bufferPool.returnBuffer(buffer);
-        }
-    }
+    QpidByteBuffer rewind();
 
-    public synchronized static void initialisePool(int bufferSize, int maxPoolSize, final double sparsityFraction)
-    {
-        if (_isPoolInitialized && (bufferSize != _pooledBufferSize
-                                   || maxPoolSize != _bufferPool.getMaxSize()
-                                   || sparsityFraction != _sparsityFraction))
-        {
-            final String errorMessage = String.format(
-                    "QpidByteBuffer pool has already been initialised with bufferSize=%d, maxPoolSize=%d, and sparsityFraction=%f."
-                    +
-                    "Re-initialisation with different bufferSize=%d and maxPoolSize=%d is not allowed.",
-                    _pooledBufferSize,
-                    _bufferPool.getMaxSize(),
-                    _sparsityFraction,
-                    bufferSize,
-                    maxPoolSize);
-            throw new IllegalStateException(errorMessage);
-        }
-        if (bufferSize <= 0)
-        {
-            throw new IllegalArgumentException("Negative or zero bufferSize illegal : " + bufferSize);
-        }
-
-        _bufferPool = new BufferPool(maxPoolSize);
-        _pooledBufferSize = bufferSize;
-        _zeroed = ByteBuffer.allocateDirect(_pooledBufferSize);
-        _sparsityFraction = sparsityFraction;
-        _isPoolInitialized = true;
-    }
+    boolean hasArray();
 
-    /**
-     * Test use only
-     */
-    public synchronized static void deinitialisePool()
-    {
-        if (_isPoolInitialized)
-        {
-            _bufferPool = null;
-            _pooledBufferSize = -1;
-            _zeroed = null;
-            _isPoolInitialized = false;
-            _sparsityFraction = 1.0;
-            final QpidByteBufferFragment cachedBuffer = _cachedBuffer.get();
-            if (cachedBuffer != null)
-            {
-                cachedBuffer.dispose();
-                _cachedBuffer.remove();
-            }
-        }
-    }
+    byte[] array();
 
-    public static int getPooledBufferSize()
-    {
-        return _pooledBufferSize;
-    }
+    QpidByteBuffer clear();
 
-    public static long getAllocatedDirectMemorySize()
-    {
-        return _pooledBufferSize * getNumberOfActivePooledBuffers();
-    }
+    QpidByteBuffer compact();
 
-    public static int getNumberOfActivePooledBuffers()
-    {
-        return PooledByteBufferRef.getActiveBufferCount();
-    }
+    int position();
 
-    public static int getNumberOfPooledBuffers()
-    {
-        return _bufferPool.size();
-    }
+    QpidByteBuffer position(int newPosition);
 
-    public static long getPooledBufferDisposalCounter()
-    {
-        return PooledByteBufferRef.getDisposalCounter();
-    }
+    int limit();
 
-    public static QpidByteBuffer reallocateIfNecessary(final QpidByteBuffer data)
-    {
-        if (data != null && data.isDirect() && data.isSparse())
-        {
-            QpidByteBuffer newBuf = allocateDirect(data.remaining());
-            newBuf.put(data);
-            newBuf.flip();
-            data.dispose();
-            return newBuf;
-        }
-        else
-        {
-            return data;
-        }
-    }
+    QpidByteBuffer limit(int newLimit);
 
-    boolean isSparse()
-    {
-        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
-        {
-            final QpidByteBufferFragment fragment = _fragments[i];
-            if (fragment.isSparse())
-            {
-                return true;
-            }
-        }
-        return false;
-    }
+    QpidByteBuffer mark();
 
-    public static QpidByteBuffer concatenate(final List<QpidByteBuffer> buffers)
-    {
-        final List<QpidByteBufferFragment> fragments = new ArrayList<>(buffers.size());
-        for (QpidByteBuffer buffer : buffers)
-        {
-            for (QpidByteBufferFragment fragment : buffer._fragments)
-            {
-                fragments.add(fragment.slice());
-            }
-        }
-        return new QpidByteBuffer(fragments);
-    }
+    int remaining();
 
-    public static QpidByteBuffer concatenate(QpidByteBuffer... buffers)
-    {
-        return concatenate(Arrays.asList(buffers));
-    }
+    boolean hasRemaining();
 
-    public static QpidByteBuffer emptyQpidByteBuffer()
-    {
-        return EMPTY_QPID_BYTE_BUFFER.duplicate();
-    }
+    boolean hasRemaining(int atLeast);
 
-    public static ThreadFactory createQpidByteBufferTrackingThreadFactory(final ThreadFactory factory)
-    {
-        return r -> factory.newThread(() -> {
-            try
-            {
-                r.run();
-            }
-            finally
-            {
-                final QpidByteBufferFragment cachedThreadLocalBuffer = _cachedBuffer.get();
-                if (cachedThreadLocalBuffer != null)
-                {
-                    cachedThreadLocalBuffer.dispose();
-                    _cachedBuffer.remove();
-                }
-            }
-        });
-    }
+    QpidByteBuffer flip();
 
-    private static final class BufferInputStream extends InputStream
-    {
-        private final QpidByteBuffer _qpidByteBuffer;
-
-        private BufferInputStream(final QpidByteBuffer buffer)
-        {
-            _qpidByteBuffer = buffer.duplicate();
-        }
-
-        @Override
-        public int read() throws IOException
-        {
-            if (_qpidByteBuffer.hasRemaining())
-            {
-                return _qpidByteBuffer.getUnsignedByte();
-            }
-            return -1;
-        }
-
-
-        @Override
-        public int read(byte[] b, int off, int len) throws IOException
-        {
-            if (!_qpidByteBuffer.hasRemaining())
-            {
-                return -1;
-            }
-            int remaining = _qpidByteBuffer.remaining();
-            if (remaining < len)
-            {
-                len = remaining;
-            }
-            _qpidByteBuffer.get(b, off, len);
-
-            return len;
-        }
-
-        @Override
-        public void mark(int readlimit)
-        {
-            _qpidByteBuffer.mark();
-        }
-
-        @Override
-        public void reset() throws IOException
-        {
-            _qpidByteBuffer.reset();
-        }
-
-        @Override
-        public boolean markSupported()
-        {
-            return true;
-        }
-
-        @Override
-        public long skip(long n) throws IOException
-        {
-            _qpidByteBuffer.position(_qpidByteBuffer.position() + (int) n);
-            return n;
-        }
-
-        @Override
-        public int available() throws IOException
-        {
-            return _qpidByteBuffer.remaining();
-        }
-
-        @Override
-        public void close()
-        {
-            _qpidByteBuffer.dispose();
-        }
-    }
+    int capacity();
 
+    QpidByteBuffer duplicate();
 
-    static class QpidByteBufferFragment implements AutoCloseable
-    {
+    QpidByteBuffer slice();
 
-        private static final AtomicIntegerFieldUpdater<QpidByteBufferFragment>
-                DISPOSED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(
-                QpidByteBufferFragment.class,
-                "_disposed");
-
-        private final int _offset;
-
-        final ByteBufferRef _ref;
-        volatile ByteBuffer _buffer;
-        @SuppressWarnings("unused")
-        private volatile int _disposed;
-
-
-        QpidByteBufferFragment(ByteBufferRef ref)
-        {
-            this(ref, ref.getBuffer(), 0);
-        }
-
-        private QpidByteBufferFragment(ByteBufferRef ref, ByteBuffer buffer, int offset)
-        {
-            _ref = ref;
-            _buffer = buffer;
-            _offset = offset;
-            _ref.incrementRef(capacity());
-        }
-
-        public final boolean isDirect()
-        {
-            return _buffer.isDirect();
-        }
-
-        @Override
-        public final void close()
-        {
-            dispose();
-        }
-
-        public final void dispose()
-        {
-            if (DISPOSED_UPDATER.compareAndSet(this, 0, 1))
-            {
-                _ref.decrementRef(capacity());
-                _buffer = null;
-            }
-        }
-
-        public final CharBuffer decode(Charset charset)
-        {
-            return charset.decode(getUnderlyingBuffer());
-        }
-
-        @Override
-        public String toString()
-        {
-            return "QpidByteBufferFragment{" +
-                   "_buffer=" + _buffer +
-                   ", _disposed=" + _disposed +
-                   '}';
-        }
-
-        public final boolean hasRemaining()
-        {
-            return _buffer.hasRemaining();
-        }
-
-        public final QpidByteBufferFragment put(final byte b)
-        {
-            _buffer.put(b);
-            return this;
-        }
-
-        public QpidByteBufferFragment put(final int index, final byte b)
-        {
-            _buffer.put(index, b);
-            return this;
-        }
-
-        public final QpidByteBufferFragment mark()
-        {
-            _buffer.mark();
-            return this;
-        }
-
-        public final boolean hasArray()
-        {
-            return _buffer.hasArray();
-        }
-
-        public byte[] array()
-        {
-            return _buffer.array();
-        }
-
-        public final int remaining()
-        {
-            return _buffer.remaining();
-        }
-
-
-        public final QpidByteBufferFragment put(final ByteBuffer src)
-        {
-            _buffer.put(src);
-            return this;
-        }
-
-        public final QpidByteBufferFragment put(final QpidByteBufferFragment src)
-        {
-            _buffer.put(src.getUnderlyingBuffer());
-            return this;
-        }
-
-        public final QpidByteBufferFragment get(final byte[] dst, final int offset, final int length)
-        {
-            _buffer.get(dst, offset, length);
-            return this;
-        }
-
-        public QpidByteBufferFragment rewind()
-        {
-            _buffer.rewind();
-            return this;
-        }
-
-        public QpidByteBufferFragment clear()
-        {
-            _buffer.clear();
-            return this;
-        }
-
-        public QpidByteBufferFragment compact()
-        {
-            _buffer.compact();
-            return this;
-        }
-
-        public int limit()
-        {
-            return _buffer.limit();
-        }
-
-        public QpidByteBufferFragment reset()
-        {
-            _buffer.reset();
-            return this;
-        }
-
-        public QpidByteBufferFragment flip()
-        {
-            _buffer.flip();
-            return this;
-        }
-
-        public QpidByteBufferFragment limit(final int newLimit)
-        {
-            _buffer.limit(newLimit);
-            return this;
-        }
-
-        /**
-         * Method does not respect mark.
-         *
-         * @return QpidByteBufferFragment
-         */
-        public QpidByteBufferFragment duplicate()
-        {
-            ByteBuffer buffer = _ref.getBuffer();
-            if (!(_ref instanceof PooledByteBufferRef))
-            {
-                buffer = buffer.duplicate();
-            }
-
-            buffer.position(_offset);
-            buffer.limit(_offset + _buffer.capacity());
-
-            buffer = buffer.slice();
-
-            buffer.limit(_buffer.limit());
-            buffer.position(_buffer.position());
-            return new QpidByteBufferFragment(_ref, buffer, _offset);
-        }
-
-        public final QpidByteBufferFragment put(final byte[] src, final int offset, final int length)
-        {
-            _buffer.put(src, offset, length);
-            return this;
-        }
-
-
-        public int capacity()
-        {
-            return _buffer.capacity();
-        }
-
-
-        public final byte get()
-        {
-            return _buffer.get();
-        }
-
-        public byte get(final int index)
-        {
-            return _buffer.get(index);
-        }
-
-        public QpidByteBufferFragment position(final int newPosition)
-        {
-            _buffer.position(newPosition);
-            return this;
-        }
-
-        public QpidByteBufferFragment slice()
-        {
-            return view(0, _buffer.remaining());
-        }
-
-        public QpidByteBufferFragment view(int offset, int length)
-        {
-            ByteBuffer buffer = _ref.getBuffer();
-            if (!(_ref instanceof PooledByteBufferRef))
-            {
-                buffer = buffer.duplicate();
-            }
-
-            int newRemaining = Math.min(_buffer.remaining() - offset, length);
-
-            int newPosition = _offset + _buffer.position() + offset;
-            buffer.limit(newPosition + newRemaining);
-            buffer.position(newPosition);
-
-            buffer = buffer.slice();
-
-            return new QpidByteBufferFragment(_ref, buffer, newPosition);
-        }
-
-        public int position()
-        {
-            return _buffer.position();
-        }
-
-        ByteBuffer getUnderlyingBuffer()
-        {
-            return _buffer;
-        }
-
-        public static QpidByteBufferFragment allocate(boolean direct, int size)
-        {
-            return direct ? allocateDirect(size) : allocate(size);
-        }
-
-        public static QpidByteBufferFragment allocate(int size)
-        {
-            return new QpidByteBufferFragment(new NonPooledByteBufferRef(ByteBuffer.allocate(size)));
-        }
-
-        public static QpidByteBufferFragment allocateDirect(int size)
-        {
-            if (size < 0)
-            {
-                throw new IllegalArgumentException("Cannot allocate QpidByteBufferFragment with size "
-                                                   + size
-                                                   + " which is negative.");
-            }
-
-            final ByteBufferRef ref;
-            if (_isPoolInitialized && _pooledBufferSize >= size)
-            {
-                if (_pooledBufferSize == size)
-                {
-                    ByteBuffer buf = _bufferPool.getBuffer();
-                    if (buf == null)
-                    {
-                        buf = ByteBuffer.allocateDirect(size);
-                    }
-                    ref = new PooledByteBufferRef(buf);
-                }
-                else
-                {
-                    QpidByteBufferFragment buf = _cachedBuffer.get();
-                    if (buf == null || buf.remaining() < size)
-                    {
-                        if (buf != null)
-                        {
-                            buf.dispose();
-                        }
-                        buf = allocateDirect(_pooledBufferSize);
-                        _cachedBuffer.set(buf);
-                    }
-                    QpidByteBufferFragment rVal = buf.view(0, size);
-                    buf.position(buf.position() + size);
-
-                    return rVal;
-                }
-            }
-            else
-            {
-                ref = new NonPooledByteBufferRef(ByteBuffer.allocateDirect(size));
-            }
-            return new QpidByteBufferFragment(ref);
-        }
-
-        boolean isSparse()
-        {
-            return _ref.isSparse(_sparsityFraction);
-        }
+    QpidByteBuffer view(int offset, int length);
 
-    }
+    boolean isSparse();
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e64e2826/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferFactory.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferFactory.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferFactory.java
new file mode 100644
index 0000000..5b55ff9
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferFactory.java
@@ -0,0 +1,524 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.bytebuffer;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ThreadFactory;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLException;
+
+final class QpidByteBufferFactory
+{
+    private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0];
+    private static final QpidByteBuffer EMPTY_QPID_BYTE_BUFFER = QpidByteBuffer.wrap(new byte[0]);
+    private static final ThreadLocal<SingleQpidByteBuffer> _cachedBuffer = new ThreadLocal<>();
+    private volatile static boolean _isPoolInitialized;
+    private volatile static BufferPool _bufferPool;
+    private volatile static int _pooledBufferSize;
+    private volatile static double _sparsityFraction;
+    private volatile static ByteBuffer _zeroed;
+
+    static QpidByteBuffer allocate(boolean direct, int size)
+    {
+        return direct ? allocateDirect(size) : allocate(size);
+    }
+
+    static QpidByteBuffer allocate(int size)
+    {
+        return new SingleQpidByteBuffer(new NonPooledByteBufferRef(ByteBuffer.allocate(size)));
+    }
+
+    static QpidByteBuffer allocateDirect(int size)
+    {
+        if (size < 0)
+        {
+            throw new IllegalArgumentException("Cannot allocate QpidByteBufferFragment with size "
+                                               + size
+                                               + " which is negative.");
+        }
+
+        if (_isPoolInitialized)
+        {
+            if (size <= _pooledBufferSize)
+            {
+                return allocateDirectSingle(size);
+            }
+            else
+            {
+                List<SingleQpidByteBuffer> fragments = new ArrayList<>();
+                int allocatedSize = 0;
+                while (size - allocatedSize >= _pooledBufferSize)
+                {
+                    fragments.add(allocateDirectSingle(_pooledBufferSize));
+                    allocatedSize += _pooledBufferSize;
+                }
+                if (allocatedSize != size)
+                {
+                    fragments.add(allocateDirectSingle(size - allocatedSize));
+                }
+                return new MultiQpidByteBuffer(fragments);
+            }
+        }
+        else
+        {
+            return allocate(size);
+        }
+    }
+
+    static QpidByteBuffer asQpidByteBuffer(InputStream stream) throws IOException
+    {
+        final List<SingleQpidByteBuffer> fragments = new ArrayList<>();
+        final int pooledBufferSize = getPooledBufferSize();
+        byte[] transferBuf = new byte[pooledBufferSize];
+        int readFragment = 0;
+        int read = stream.read(transferBuf, readFragment, pooledBufferSize - readFragment);
+        while (read > 0)
+        {
+            readFragment += read;
+            if (readFragment == pooledBufferSize)
+            {
+                SingleQpidByteBuffer fragment = allocateDirectSingle(pooledBufferSize);
+                fragment.put(transferBuf, 0, pooledBufferSize);
+                fragment.flip();
+                fragments.add(fragment);
+                readFragment = 0;
+            }
+            read = stream.read(transferBuf, readFragment, pooledBufferSize - readFragment);
+        }
+        if (readFragment != 0)
+        {
+            SingleQpidByteBuffer fragment = allocateDirectSingle(readFragment);
+            fragment.put(transferBuf, 0, readFragment);
+            fragment.flip();
+            fragments.add(fragment);
+        }
+        return createQpidByteBuffer(fragments);
+    }
+
+    private static QpidByteBuffer asQpidByteBuffer(final byte[] data, final int offset, final int length)
+    {
+        try (QpidByteBufferOutputStream outputStream = new QpidByteBufferOutputStream(true, QpidByteBuffer.getPooledBufferSize()))
+        {
+            outputStream.write(data, offset, length);
+            return outputStream.fetchAccumulatedBuffer();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException("unexpected Error converting array to QpidByteBuffers", e);
+        }
+    }
+
+    private static ByteBuffer[] getUnderlyingBuffers(QpidByteBuffer buffer)
+    {
+        if (buffer instanceof SingleQpidByteBuffer)
+        {
+            return new ByteBuffer[] {((SingleQpidByteBuffer) buffer).getUnderlyingBuffer()};
+        }
+        else if (buffer instanceof MultiQpidByteBuffer)
+        {
+            return ((MultiQpidByteBuffer) buffer).getUnderlyingBuffers();
+        }
+        else
+        {
+            throw new IllegalStateException("Unknown Buffer Implementation");
+        }
+    }
+
+    static SSLEngineResult encryptSSL(SSLEngine engine,
+                                      Collection<QpidByteBuffer> buffers,
+                                      QpidByteBuffer dest) throws SSLException
+    {
+        if (dest instanceof SingleQpidByteBuffer)
+        {
+            SingleQpidByteBuffer dst = (SingleQpidByteBuffer) dest;
+            final ByteBuffer[] src;
+            // QPID-7447: prevent unnecessary allocations
+            if (buffers.isEmpty())
+            {
+                src = EMPTY_BYTE_BUFFER_ARRAY;
+            }
+            else
+            {
+                List<ByteBuffer> buffers_ = new LinkedList<>();
+                for (QpidByteBuffer buffer : buffers)
+                {
+                    Collections.addAll(buffers_, getUnderlyingBuffers(buffer));
+                }
+                src = buffers_.toArray(new ByteBuffer[buffers_.size()]);
+            }
+            return engine.wrap(src, dst.getUnderlyingBuffer());
+        }
+        else
+        {
+            throw new IllegalStateException("Expected a single fragment output buffer");
+        }
+    }
+
+    static SSLEngineResult decryptSSL(final SSLEngine engine, final QpidByteBuffer src, final QpidByteBuffer dst)
+            throws SSLException
+    {
+        if (src instanceof SingleQpidByteBuffer)
+        {
+            ByteBuffer underlying = ((SingleQpidByteBuffer)src).getUnderlyingBuffer();
+            if (dst instanceof SingleQpidByteBuffer)
+            {
+                return engine.unwrap(underlying, ((SingleQpidByteBuffer) dst).getUnderlyingBuffer());
+            }
+            else if (dst instanceof MultiQpidByteBuffer)
+            {
+                return engine.unwrap(underlying, ((MultiQpidByteBuffer) dst).getUnderlyingBuffers());
+            }
+            else
+            {
+                throw new IllegalStateException("unknown QBB implementation");
+            }
+        }
+        else
+        {
+            throw new IllegalStateException("Source QBB can only be single byte buffer");
+        }
+    }
+
+    static QpidByteBuffer inflate(QpidByteBuffer compressedBuffer) throws IOException
+    {
+        if (compressedBuffer == null)
+        {
+            throw new IllegalArgumentException("compressedBuffer cannot be null");
+        }
+
+        boolean isDirect = compressedBuffer.isDirect();
+        final int bufferSize = (isDirect && _pooledBufferSize > 0) ? _pooledBufferSize : 65536;
+
+        List<QpidByteBuffer> uncompressedBuffers = new ArrayList<>();
+        try (GZIPInputStream gzipInputStream = new GZIPInputStream(compressedBuffer.asInputStream()))
+        {
+            byte[] buf = new byte[bufferSize];
+            int read;
+            while ((read = gzipInputStream.read(buf)) != -1)
+            {
+                uncompressedBuffers.add(asQpidByteBuffer(buf, 0, read));
+            }
+            return concatenate(uncompressedBuffers);
+        }
+        finally
+        {
+            uncompressedBuffers.forEach(QpidByteBuffer::dispose);
+        }
+    }
+
+    static QpidByteBuffer deflate(QpidByteBuffer uncompressedBuffer) throws IOException
+    {
+        if (uncompressedBuffer == null)
+        {
+            throw new IllegalArgumentException("uncompressedBuffer cannot be null");
+        }
+
+        boolean isDirect = uncompressedBuffer.isDirect();
+        final int bufferSize = (isDirect && _pooledBufferSize > 0) ? _pooledBufferSize : 65536;
+
+        try (QpidByteBufferOutputStream compressedOutput = new QpidByteBufferOutputStream(isDirect, bufferSize);
+             InputStream compressedInput = uncompressedBuffer.asInputStream();
+             GZIPOutputStream gzipStream = new GZIPOutputStream(new BufferedOutputStream(compressedOutput,
+                                                                                         bufferSize)))
+        {
+            byte[] buf = new byte[16384];
+            int read;
+            while ((read = compressedInput.read(buf)) > -1)
+            {
+                gzipStream.write(buf, 0, read);
+            }
+            gzipStream.finish();
+            gzipStream.flush();
+            return compressedOutput.fetchAccumulatedBuffer();
+        }
+    }
+
+    static long write(GatheringByteChannel channel, Collection<QpidByteBuffer> qpidByteBuffers)
+            throws IOException
+    {
+        List<ByteBuffer> byteBuffers = new ArrayList<>();
+        for (QpidByteBuffer qpidByteBuffer : qpidByteBuffers)
+        {
+            Collections.addAll(byteBuffers, getUnderlyingBuffers(qpidByteBuffer));
+        }
+        return channel.write(byteBuffers.toArray(new ByteBuffer[byteBuffers.size()]));
+    }
+
+    static QpidByteBuffer wrap(ByteBuffer wrap)
+    {
+        return new SingleQpidByteBuffer(new NonPooledByteBufferRef(wrap));
+    }
+
+    static QpidByteBuffer wrap(byte[] data)
+    {
+        return wrap(ByteBuffer.wrap(data));
+    }
+
+    static QpidByteBuffer wrap(byte[] data, int offset, int length)
+    {
+        return wrap(ByteBuffer.wrap(data, offset, length));
+    }
+
+    static void initialisePool(int bufferSize, int maxPoolSize, double sparsityFraction)
+    {
+        if (_isPoolInitialized && (bufferSize != _pooledBufferSize
+                                                       || maxPoolSize != _bufferPool.getMaxSize()
+                                                       || sparsityFraction != _sparsityFraction))
+        {
+            final String errorMessage = String.format(
+                    "QpidByteBuffer pool has already been initialised with bufferSize=%d, maxPoolSize=%d, and sparsityFraction=%f."
+                    +
+                    "Re-initialisation with different bufferSize=%d and maxPoolSize=%d is not allowed.",
+                    _pooledBufferSize,
+                    _bufferPool.getMaxSize(),
+                    _sparsityFraction,
+                    bufferSize,
+                    maxPoolSize);
+            throw new IllegalStateException(errorMessage);
+        }
+        if (bufferSize <= 0)
+        {
+            throw new IllegalArgumentException("Negative or zero bufferSize illegal : " + bufferSize);
+        }
+
+        _bufferPool = new BufferPool(maxPoolSize);
+        _pooledBufferSize = bufferSize;
+        _zeroed = ByteBuffer.allocateDirect(_pooledBufferSize);
+        _sparsityFraction = sparsityFraction;
+        _isPoolInitialized = true;
+    }
+
+    /**
+     * Test use only
+     */
+    static void deinitialisePool()
+    {
+        if (_isPoolInitialized)
+        {
+            SingleQpidByteBuffer singleQpidByteBuffer = _cachedBuffer.get();
+            if (singleQpidByteBuffer != null)
+            {
+                singleQpidByteBuffer.dispose();
+                _cachedBuffer.remove();
+            }
+            _bufferPool = null;
+            _pooledBufferSize = -1;
+            _isPoolInitialized = false;
+            _sparsityFraction = 1.0;
+            _zeroed = null;
+        }
+    }
+
+
+    static void returnToPool(final ByteBuffer buffer)
+    {
+        buffer.clear();
+        if (_isPoolInitialized)
+        {
+            final ByteBuffer duplicate = _zeroed.duplicate();
+            duplicate.limit(buffer.capacity());
+            buffer.put(duplicate);
+            _bufferPool.returnBuffer(buffer);
+        }
+    }
+
+    static double getSparsityFraction()
+    {
+        return _sparsityFraction;
+    }
+
+    static int getPooledBufferSize()
+    {
+        return _pooledBufferSize;
+    }
+
+    static long getAllocatedDirectMemorySize()
+    {
+        return _pooledBufferSize * getNumberOfActivePooledBuffers();
+    }
+
+    static int getNumberOfActivePooledBuffers()
+    {
+        return PooledByteBufferRef.getActiveBufferCount();
+    }
+
+    static int getNumberOfPooledBuffers()
+    {
+        return _bufferPool.size();
+    }
+
+    static long getPooledBufferDisposalCounter()
+    {
+        return PooledByteBufferRef.getDisposalCounter();
+    }
+
+    static QpidByteBuffer reallocateIfNecessary(QpidByteBuffer data)
+    {
+        if (data != null && data.isDirect() && data.isSparse())
+        {
+            QpidByteBuffer newBuf = allocateDirect(data.remaining());
+            newBuf.put(data);
+            newBuf.flip();
+            data.dispose();
+            return newBuf;
+        }
+        else
+        {
+            return data;
+        }
+    }
+
+    static QpidByteBuffer concatenate(List<QpidByteBuffer> buffers)
+    {
+        final List<SingleQpidByteBuffer> fragments = new ArrayList<>(buffers.size());
+        for (QpidByteBuffer buffer : buffers)
+        {
+            if (buffer instanceof SingleQpidByteBuffer)
+            {
+                if (buffer.hasRemaining())
+                {
+                    fragments.add((SingleQpidByteBuffer) buffer.slice());
+                }
+            }
+            else if (buffer instanceof MultiQpidByteBuffer)
+            {
+                for (final SingleQpidByteBuffer fragment : ((MultiQpidByteBuffer) buffer).getFragments())
+                {
+                    if (fragment.hasRemaining())
+                    {
+                        fragments.add(fragment.slice());
+                    }
+                }
+            }
+            else
+            {
+                throw new IllegalStateException("unknown QBB implementation");
+            }
+        }
+        return createQpidByteBuffer(fragments);
+    }
+
+    static QpidByteBuffer createQpidByteBuffer(final List<SingleQpidByteBuffer> fragments)
+    {
+        if (fragments.size() == 0)
+        {
+            return emptyQpidByteBuffer();
+        }
+        else if (fragments.size() == 1)
+        {
+            return fragments.get(0);
+        }
+        else
+        {
+            return new MultiQpidByteBuffer(fragments);
+        }
+    }
+
+    static QpidByteBuffer concatenate(QpidByteBuffer... buffers)
+    {
+        return concatenate(Arrays.asList(buffers));
+    }
+
+    static QpidByteBuffer emptyQpidByteBuffer()
+    {
+        return EMPTY_QPID_BYTE_BUFFER.duplicate();
+    }
+
+    static ThreadFactory createQpidByteBufferTrackingThreadFactory(ThreadFactory factory)
+    {
+        return r -> factory.newThread(() -> {
+            try
+            {
+                r.run();
+            }
+            finally
+            {
+                final SingleQpidByteBuffer cachedThreadLocalBuffer = _cachedBuffer.get();
+                if (cachedThreadLocalBuffer != null)
+                {
+                    cachedThreadLocalBuffer.dispose();
+                    _cachedBuffer.remove();
+                }
+            }
+        });
+    }
+
+    private static SingleQpidByteBuffer allocateDirectSingle(int size)
+    {
+        if (size < 0)
+        {
+            throw new IllegalArgumentException("Cannot allocate SingleQpidByteBuffer with size "
+                                               + size
+                                               + " which is negative.");
+        }
+
+        final ByteBufferRef ref;
+        if (_isPoolInitialized && _pooledBufferSize >= size)
+        {
+            if (_pooledBufferSize == size)
+            {
+                ByteBuffer buf = _bufferPool.getBuffer();
+                if (buf == null)
+                {
+                    buf = ByteBuffer.allocateDirect(size);
+                }
+                ref = new PooledByteBufferRef(buf);
+            }
+            else
+            {
+                SingleQpidByteBuffer buf = _cachedBuffer.get();
+                if (buf == null || buf.remaining() < size)
+                {
+                    if (buf != null)
+                    {
+                        buf.dispose();
+                    }
+                    buf = allocateDirectSingle(_pooledBufferSize);
+                    _cachedBuffer.set(buf);
+                }
+                SingleQpidByteBuffer rVal = buf.view(0, size);
+                buf.position(buf.position() + size);
+
+                return rVal;
+            }
+        }
+        else
+        {
+            ref = new NonPooledByteBufferRef(ByteBuffer.allocateDirect(size));
+        }
+        return new SingleQpidByteBuffer(ref);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e64e2826/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferInputStream.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferInputStream.java
new file mode 100644
index 0000000..db453b6
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferInputStream.java
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.bytebuffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+final class QpidByteBufferInputStream extends InputStream
+{
+    private final QpidByteBuffer _qpidByteBuffer;
+
+    QpidByteBufferInputStream(final QpidByteBuffer buffer)
+    {
+        _qpidByteBuffer = buffer.duplicate();
+    }
+
+    @Override
+    public int read() throws IOException
+    {
+        if (_qpidByteBuffer.hasRemaining())
+        {
+            return _qpidByteBuffer.getUnsignedByte();
+        }
+        return -1;
+    }
+
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException
+    {
+        if (!_qpidByteBuffer.hasRemaining())
+        {
+            return -1;
+        }
+        int remaining = _qpidByteBuffer.remaining();
+        if (remaining < len)
+        {
+            len = remaining;
+        }
+        _qpidByteBuffer.get(b, off, len);
+
+        return len;
+    }
+
+    @Override
+    public void mark(int readlimit)
+    {
+        _qpidByteBuffer.mark();
+    }
+
+    @Override
+    public void reset() throws IOException
+    {
+        _qpidByteBuffer.reset();
+    }
+
+    @Override
+    public boolean markSupported()
+    {
+        return true;
+    }
+
+    @Override
+    public long skip(long n) throws IOException
+    {
+        _qpidByteBuffer.position(_qpidByteBuffer.position() + (int) n);
+        return n;
+    }
+
+    @Override
+    public int available() throws IOException
+    {
+        return _qpidByteBuffer.remaining();
+    }
+
+    @Override
+    public void close()
+    {
+        _qpidByteBuffer.dispose();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e64e2826/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferOutputStream.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferOutputStream.java
index 08788df..9eb222c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferOutputStream.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferOutputStream.java
@@ -25,7 +25,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.LinkedList;
 
-public class QpidByteBufferOutputStream extends OutputStream
+final class QpidByteBufferOutputStream extends OutputStream
 {
     private final LinkedList<QpidByteBuffer> _buffers = new LinkedList<>();
     private int _bufferPosition = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org