You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/10/24 16:07:17 UTC

[3/3] qpid-broker-j git commit: QPID-7832: [Java Broker] Introduce QpidByteBuffer interface and split implementation into separate classes

QPID-7832: [Java Broker] Introduce QpidByteBuffer interface and split implementation into separate classes


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/e64e2826
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/e64e2826
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/e64e2826

Branch: refs/heads/master
Commit: e64e282688da8963133dc09c630482c686d8be7a
Parents: cd12e2d
Author: Alex Rudyy <or...@apache.org>
Authored: Tue Oct 24 14:37:51 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Oct 24 17:06:38 2017 +0100

----------------------------------------------------------------------
 .../server/bytebuffer/MultiQpidByteBuffer.java  | 1020 +++++++++++
 .../server/bytebuffer/PooledByteBufferRef.java  |    2 -
 .../qpid/server/bytebuffer/QpidByteBuffer.java  | 1646 ++----------------
 .../bytebuffer/QpidByteBufferFactory.java       |  524 ++++++
 .../bytebuffer/QpidByteBufferInputStream.java   |   99 ++
 .../bytebuffer/QpidByteBufferOutputStream.java  |    2 +-
 .../server/bytebuffer/SingleQpidByteBuffer.java |  598 +++++++
 .../qpid/server/store/StoredMemoryMessage.java  |    5 -
 .../NonBlockingConnectionTLSDelegate.java       |    2 +-
 .../server/bytebuffer/QpidByteBufferTest.java   |   84 +-
 .../server/protocol/v0_10/ServerEncoder.java    |    2 +-
 11 files changed, 2454 insertions(+), 1530 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e64e2826/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/MultiQpidByteBuffer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/MultiQpidByteBuffer.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/MultiQpidByteBuffer.java
new file mode 100644
index 0000000..1209abf
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/MultiQpidByteBuffer.java
@@ -0,0 +1,1020 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.bytebuffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.BufferOverflowException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.InvalidMarkException;
+import java.nio.channels.ScatteringByteChannel;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.primitives.Chars;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.Shorts;
+
+class MultiQpidByteBuffer implements QpidByteBuffer
+{
+    private final SingleQpidByteBuffer[] _fragments;
+    private volatile int _resetFragmentIndex = -1;
+
+    private MultiQpidByteBuffer(final SingleQpidByteBuffer... fragments)
+    {
+        if (fragments == null)
+        {
+            throw new IllegalArgumentException();
+        }
+        _fragments = fragments;
+    }
+
+    MultiQpidByteBuffer(final List<SingleQpidByteBuffer> fragments)
+    {
+        if (fragments == null)
+        {
+            throw new IllegalArgumentException();
+        }
+        _fragments = fragments.toArray(new SingleQpidByteBuffer[fragments.size()]);
+    }
+
+    //////////////////
+    // Absolute puts
+    //////////////////
+
+    @Override
+    public QpidByteBuffer put(final int index, final byte b)
+    {
+        return put(index, new byte[]{b});
+    }
+
+    @Override
+    public QpidByteBuffer putShort(final int index, final short value)
+    {
+        byte[] valueArray = Shorts.toByteArray(value);
+        return put(index, valueArray);
+    }
+
+    @Override
+    public QpidByteBuffer putChar(final int index, final char value)
+    {
+        byte[] valueArray = Chars.toByteArray(value);
+        return put(index, valueArray);
+    }
+
+    @Override
+    public QpidByteBuffer putInt(final int index, final int value)
+    {
+        byte[] valueArray = Ints.toByteArray(value);
+        return put(index, valueArray);
+    }
+
+    @Override
+    public QpidByteBuffer putLong(final int index, final long value)
+    {
+        byte[] valueArray = Longs.toByteArray(value);
+        return put(index, valueArray);
+    }
+
+    @Override
+    public QpidByteBuffer putFloat(final int index, final float value)
+    {
+        int intValue = Float.floatToRawIntBits(value);
+        return putInt(index, intValue);
+    }
+
+    @Override
+    public QpidByteBuffer putDouble(final int index, final double value)
+    {
+        long longValue = Double.doubleToRawLongBits(value);
+        return putLong(index, longValue);
+    }
+
+    private QpidByteBuffer put(final int index, final byte[] src)
+    {
+        final int valueWidth = src.length;
+        if (index < 0 || index + valueWidth > limit())
+        {
+            throw new IndexOutOfBoundsException(String.format("index %d is out of bounds [%d, %d)", index, 0, limit()));
+        }
+
+        int written = 0;
+        int bytesToSkip = index;
+        for (int i = 0; i < _fragments.length && written != valueWidth; i++)
+        {
+            final SingleQpidByteBuffer buffer = _fragments[i];
+            final int limit = buffer.limit();
+            boolean isLastFragmentToConsider = valueWidth + bytesToSkip - written <= limit;
+            if (!isLastFragmentToConsider && limit != buffer.capacity())
+            {
+                throw new IllegalStateException(String.format("Unexpected limit %d on fragment %d", limit, i));
+            }
+
+            if (bytesToSkip >= limit)
+            {
+                bytesToSkip -= limit;
+            }
+            else
+            {
+                final int bytesToCopy = Math.min(limit - bytesToSkip, valueWidth - written);
+                final int originalPosition = buffer.position();
+                buffer.position(bytesToSkip);
+                buffer.put(src, written, bytesToCopy);
+                buffer.position(originalPosition);
+                written += bytesToCopy;
+                bytesToSkip = 0;
+            }
+        }
+        if (valueWidth != written)
+        {
+            throw new BufferOverflowException();
+        }
+        return this;
+    }
+
+    ////////////////
+    // Relative Puts
+    ////////////////
+
+    @Override
+    public final QpidByteBuffer put(final byte b)
+    {
+        return put(new byte[]{b});
+    }
+
+    @Override
+    public final QpidByteBuffer putUnsignedByte(final short s)
+    {
+        put((byte) s);
+        return this;
+    }
+
+    @Override
+    public final QpidByteBuffer putShort(final short value)
+    {
+        byte[] valueArray = Shorts.toByteArray(value);
+        return put(valueArray);
+    }
+
+    @Override
+    public final QpidByteBuffer putUnsignedShort(final int i)
+    {
+        putShort((short) i);
+        return this;
+    }
+
+    @Override
+    public final QpidByteBuffer putChar(final char value)
+    {
+        byte[] valueArray = Chars.toByteArray(value);
+        return put(valueArray);
+    }
+
+    @Override
+    public final QpidByteBuffer putInt(final int value)
+    {
+        byte[] valueArray = Ints.toByteArray(value);
+        return put(valueArray);
+    }
+
+    @Override
+    public final QpidByteBuffer putUnsignedInt(final long value)
+    {
+        putInt((int) value);
+        return this;
+    }
+
+    @Override
+    public final QpidByteBuffer putLong(final long value)
+    {
+        byte[] valueArray = Longs.toByteArray(value);
+        return put(valueArray);
+    }
+
+    @Override
+    public final QpidByteBuffer putFloat(final float value)
+    {
+        int intValue = Float.floatToRawIntBits(value);
+        return putInt(intValue);
+    }
+
+    @Override
+    public final QpidByteBuffer putDouble(final double value)
+    {
+        long longValue = Double.doubleToRawLongBits(value);
+        return putLong(longValue);
+    }
+
+    @Override
+    public final QpidByteBuffer put(byte[] src)
+    {
+        return put(src, 0, src.length);
+    }
+
+    @Override
+    public final QpidByteBuffer put(final byte[] src, final int offset, final int length)
+    {
+        if (!hasRemaining(length))
+        {
+            throw new BufferOverflowException();
+        }
+
+        int written = 0;
+        for (int i = 0; i < _fragments.length && written != length; i++)
+        {
+            final SingleQpidByteBuffer buffer = _fragments[i];
+            int bytesToWrite = Math.min(buffer.remaining(), length - written);
+            buffer.put(src, offset + written, bytesToWrite);
+            written += bytesToWrite;
+        }
+        if (written != length)
+        {
+            throw new IllegalStateException(String.format("Unexpectedly only wrote %d of %d bytes.", written, length));
+        }
+        return this;
+    }
+
+    @Override
+    public final QpidByteBuffer put(final ByteBuffer src)
+    {
+        final int valueWidth = src.remaining();
+        if (!hasRemaining(valueWidth))
+        {
+            throw new BufferOverflowException();
+        }
+
+        int written = 0;
+        for (int i = 0; i < _fragments.length && written != valueWidth; i++)
+        {
+            final SingleQpidByteBuffer dstFragment = _fragments[i];
+            if (dstFragment.hasRemaining())
+            {
+                final int srcFragmentRemaining = src.remaining();
+                final int dstFragmentRemaining = dstFragment.remaining();
+                if (dstFragmentRemaining >= srcFragmentRemaining)
+                {
+                    dstFragment.put(src);
+                    written += srcFragmentRemaining;
+                }
+                else
+                {
+                    int srcOriginalLimit = src.limit();
+                    src.limit(src.position() + dstFragmentRemaining);
+                    dstFragment.put(src);
+                    src.limit(srcOriginalLimit);
+                    written += dstFragmentRemaining;
+                }
+            }
+        }
+        if (written != valueWidth)
+        {
+            throw new IllegalStateException(String.format("Unexpectedly only wrote %d of %d bytes.", written, valueWidth));
+        }
+        return this;
+    }
+
+    @Override
+    public final QpidByteBuffer put(final QpidByteBuffer qpidByteBuffer)
+    {
+        final int valueWidth = qpidByteBuffer.remaining();
+        if (!hasRemaining(valueWidth))
+        {
+            throw new BufferOverflowException();
+        }
+
+        int written = 0;
+        final SingleQpidByteBuffer[] fragments;
+        if (qpidByteBuffer instanceof SingleQpidByteBuffer)
+        {
+            final SingleQpidByteBuffer srcFragment = (SingleQpidByteBuffer) qpidByteBuffer;
+            for (int i = 0; i < _fragments.length && written != valueWidth; i++)
+            {
+                final SingleQpidByteBuffer dstFragment = _fragments[i];
+                if (dstFragment.hasRemaining())
+                {
+                    final int dstFragmentRemaining = dstFragment.remaining();
+                    if (dstFragmentRemaining >= valueWidth)
+                    {
+                        dstFragment.put(srcFragment);
+                        written += valueWidth;
+                    }
+                    else
+                    {
+                        int srcOriginalLimit = srcFragment.limit();
+                        srcFragment.limit(srcFragment.position() + dstFragmentRemaining);
+                        dstFragment.put(srcFragment);
+                        srcFragment.limit(srcOriginalLimit);
+                        written += dstFragmentRemaining;
+                    }
+                }
+            }
+        }
+        else if (qpidByteBuffer instanceof MultiQpidByteBuffer)
+        {
+            fragments = ((MultiQpidByteBuffer) qpidByteBuffer)._fragments;
+            int i = 0;
+            for (int i1 = 0; i1 < fragments.length; i1++)
+            {
+                final SingleQpidByteBuffer srcFragment = fragments[i1];
+                for (; i < _fragments.length; i++)
+                {
+                    final SingleQpidByteBuffer dstFragment = _fragments[i];
+                    if (dstFragment.hasRemaining())
+                    {
+                        final int srcFragmentRemaining = srcFragment.remaining();
+                        final int dstFragmentRemaining = dstFragment.remaining();
+                        if (dstFragmentRemaining >= srcFragmentRemaining)
+                        {
+                            dstFragment.put(srcFragment);
+                            written += srcFragmentRemaining;
+                            break;
+                        }
+                        else
+                        {
+                            int srcOriginalLimit = srcFragment.limit();
+                            srcFragment.limit(srcFragment.position() + dstFragmentRemaining);
+                            dstFragment.put(srcFragment);
+                            srcFragment.limit(srcOriginalLimit);
+                            written += dstFragmentRemaining;
+                        }
+                    }
+                }
+            }
+        }
+        else
+        {
+            throw new IllegalStateException("unknown QBB implementation");
+        }
+
+        if (written != valueWidth)
+        {
+            throw new IllegalStateException(String.format("Unexpectedly only wrote %d of %d bytes.",
+                                                          written,
+                                                          valueWidth));
+        }
+        return this;
+    }
+
+    ///////////////////
+    // Absolute Gets
+    ///////////////////
+
+    @Override
+    public byte get(final int index)
+    {
+        final byte[] byteArray = getByteArray(index, 1);
+        return byteArray[0];
+    }
+
+    @Override
+    public short getShort(final int index)
+    {
+        final byte[] byteArray = getByteArray(index, 2);
+        return Shorts.fromByteArray(byteArray);
+    }
+
+    @Override
+    public final int getUnsignedShort(int index)
+    {
+        return ((int) getShort(index)) & 0xFFFF;
+    }
+
+    @Override
+    public char getChar(final int index)
+    {
+        final byte[] byteArray = getByteArray(index, 2);
+        return Chars.fromByteArray(byteArray);
+    }
+
+    @Override
+    public int getInt(final int index)
+    {
+        final byte[] byteArray = getByteArray(index, 4);
+        return Ints.fromByteArray(byteArray);
+    }
+
+    @Override
+    public long getLong(final int index)
+    {
+        final byte[] byteArray = getByteArray(index, 8);
+        return Longs.fromByteArray(byteArray);
+    }
+
+    @Override
+    public float getFloat(final int index)
+    {
+        final int intValue = getInt(index);
+        return Float.intBitsToFloat(intValue);
+    }
+
+    @Override
+    public double getDouble(final int index)
+    {
+        final long longValue = getLong(index);
+        return Double.longBitsToDouble(longValue);
+    }
+
+    private byte[] getByteArray(final int index, final int length)
+    {
+        if (index < 0 || index + length > limit())
+        {
+            throw new IndexOutOfBoundsException(String.format("%d bytes at index %d do not fit into bounds [%d, %d)", length, index, 0, limit()));
+        }
+
+        byte[] value = new byte[length];
+
+        int consumed = 0;
+        int bytesToSkip = index;
+        for (int i = 0; i < _fragments.length && consumed != length; i++)
+        {
+            final SingleQpidByteBuffer buffer = _fragments[i];
+            final int limit = buffer.limit();
+            boolean isLastFragmentToConsider = length + bytesToSkip - consumed <= limit;
+            if (!isLastFragmentToConsider && limit != buffer.capacity())
+            {
+                throw new IllegalStateException(String.format("Unexpectedly limit %d on fragment %d.", limit, i));
+            }
+
+            if (bytesToSkip >= limit)
+            {
+                bytesToSkip -= limit;
+            }
+            else
+            {
+                final int bytesToCopy = Math.min(limit - bytesToSkip, length - consumed);
+                final int originalPosition = buffer.position();
+                buffer.position(bytesToSkip);
+                buffer.get(value, consumed, bytesToCopy);
+                buffer.position(originalPosition);
+                consumed += bytesToCopy;
+                bytesToSkip = 0;
+            }
+        }
+        if (consumed != length)
+        {
+            throw new IllegalStateException(String.format("Unexpectedly only consumed %d of %d bytes.", consumed, length));
+        }
+        return value;
+    }
+
+    //////////////////
+    // Relative Gets
+    //////////////////
+
+    @Override
+    public final byte get()
+    {
+        byte[] value = new byte[1];
+        get(value, 0, 1);
+        return value[0];
+    }
+
+    @Override
+    public final short getUnsignedByte()
+    {
+        return (short) (get() & 0xFF);
+    }
+
+    @Override
+    public final short getShort()
+    {
+        byte[] value = new byte[2];
+        get(value, 0, value.length);
+        return Shorts.fromByteArray(value);
+    }
+
+    @Override
+    public final int getUnsignedShort()
+    {
+        return ((int) getShort()) & 0xFFFF;
+    }
+
+    @Override
+    public final char getChar()
+    {
+        byte[] value = new byte[2];
+        get(value, 0, value.length);
+        return Chars.fromByteArray(value);
+    }
+
+    @Override
+    public final int getInt()
+    {
+        byte[] value = new byte[4];
+        get(value, 0, value.length);
+        return Ints.fromByteArray(value);
+    }
+
+    @Override
+    public final long getUnsignedInt()
+    {
+        return ((long) getInt()) & 0xFFFFFFFFL;
+    }
+
+    @Override
+    public final long getLong()
+    {
+        byte[] value = new byte[8];
+        get(value, 0, value.length);
+        return Longs.fromByteArray(value);
+    }
+
+    @Override
+    public final float getFloat()
+    {
+        final int intValue = getInt();
+        return Float.intBitsToFloat(intValue);
+    }
+
+    @Override
+    public final double getDouble()
+    {
+        final long longValue = getLong();
+        return Double.longBitsToDouble(longValue);
+    }
+
+    @Override
+    public final QpidByteBuffer get(final byte[] dst)
+    {
+        return get(dst, 0, dst.length);
+    }
+
+    @Override
+    public final QpidByteBuffer get(final byte[] dst, final int offset, final int length)
+    {
+        if (!hasRemaining(length))
+        {
+            throw new BufferUnderflowException();
+        }
+
+        int consumed = 0;
+        for (int i = 0; i < _fragments.length && consumed != length; i++)
+        {
+            final SingleQpidByteBuffer buffer = _fragments[i];
+            int bytesToCopy = Math.min(buffer.remaining(), length - consumed);
+            buffer.get(dst, offset + consumed, bytesToCopy);
+            consumed += bytesToCopy;
+        }
+        if (consumed != length)
+        {
+            throw new IllegalStateException(String.format("Unexpectedly only consumed %d of %d bytes.", consumed, length));
+        }
+        return this;
+    }
+
+    ///////////////
+    // Other stuff
+    ////////////////
+
+    @Override
+    public final void copyTo(final byte[] dst)
+    {
+        final int remaining = remaining();
+        if (remaining < dst.length)
+        {
+            throw new BufferUnderflowException();
+        }
+        if (remaining > dst.length)
+        {
+            throw new BufferOverflowException();
+        }
+        int offset = 0;
+        for (SingleQpidByteBuffer fragment : _fragments)
+        {
+            final int length = Math.min(fragment.remaining(), dst.length - offset);
+            fragment.getUnderlyingBuffer().duplicate().get(dst, offset, length);
+            offset += length;
+        }
+    }
+
+    @Override
+    public final void copyTo(final ByteBuffer dst)
+    {
+        if (dst.remaining() < remaining())
+        {
+            throw new BufferOverflowException();
+        }
+        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+        {
+            final SingleQpidByteBuffer fragment = _fragments[i];
+            dst.put(fragment.getUnderlyingBuffer().duplicate());
+        }
+    }
+
+    @Override
+    public final void putCopyOf(final QpidByteBuffer qpidByteBuffer)
+    {
+        int sourceRemaining = qpidByteBuffer.remaining();
+        if (!hasRemaining(sourceRemaining))
+        {
+            throw new BufferOverflowException();
+        }
+        if (qpidByteBuffer instanceof MultiQpidByteBuffer)
+        {
+            MultiQpidByteBuffer source = (MultiQpidByteBuffer) qpidByteBuffer;
+            for (int i = 0, fragmentsSize = source._fragments.length; i < fragmentsSize; i++)
+            {
+                final SingleQpidByteBuffer srcFragment = source._fragments[i];
+                put(srcFragment.getUnderlyingBuffer().duplicate());
+            }
+        }
+        else if (qpidByteBuffer instanceof SingleQpidByteBuffer)
+        {
+            SingleQpidByteBuffer source = (SingleQpidByteBuffer) qpidByteBuffer;
+            put(source.getUnderlyingBuffer().duplicate());
+        }
+        else
+        {
+            throw new IllegalStateException("unknown QBB implementation");
+        }
+    }
+
+    @Override
+    public final boolean isDirect()
+    {
+        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+        {
+            final SingleQpidByteBuffer fragment = _fragments[i];
+            if (!fragment.isDirect())
+            {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public final void close()
+    {
+        dispose();
+    }
+
+    @Override
+    public final void dispose()
+    {
+        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+        {
+            final SingleQpidByteBuffer fragment = _fragments[i];
+            fragment.dispose();
+        }
+    }
+
+    @Override
+    public final InputStream asInputStream()
+    {
+        return new QpidByteBufferInputStream(this);
+    }
+
+    @Override
+    public final long read(ScatteringByteChannel channel) throws IOException
+    {
+        ByteBuffer[] byteBuffers = new ByteBuffer[_fragments.length];
+        for (int i = 0; i < byteBuffers.length; i++)
+        {
+            final SingleQpidByteBuffer fragment = _fragments[i];
+            byteBuffers[i] = fragment.getUnderlyingBuffer();
+        }
+        return channel.read(byteBuffers);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "QpidByteBuffer{" + _fragments.length + " fragments}";
+    }
+
+    @Override
+    public QpidByteBuffer reset()
+    {
+        if (_resetFragmentIndex < 0)
+        {
+            throw new InvalidMarkException();
+        }
+        final SingleQpidByteBuffer fragment = _fragments[_resetFragmentIndex];
+        fragment.reset();
+        for (int i = _resetFragmentIndex + 1, size = _fragments.length; i < size; ++i)
+        {
+            _fragments[i].position(0);
+        }
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer rewind()
+    {
+        _resetFragmentIndex = -1;
+        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+        {
+            final SingleQpidByteBuffer fragment = _fragments[i];
+            fragment.rewind();
+        }
+        return this;
+    }
+
+    @Override
+    public final boolean hasArray()
+    {
+        return false;
+    }
+
+    @Override
+    public byte[] array()
+    {
+        throw new UnsupportedOperationException("This QpidByteBuffer is not backed by an array.");
+    }
+
+    @Override
+    public QpidByteBuffer clear()
+    {
+        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+        {
+            _fragments[i].clear();
+        }
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer compact()
+    {
+        int position = position();
+        int limit = limit();
+        if (position != 0)
+        {
+            int dstPos = 0;
+            for (int srcPos = position; srcPos < limit; ++srcPos, ++dstPos)
+            {
+                put(dstPos, get(srcPos));
+            }
+            position(dstPos);
+            limit(capacity());
+        }
+        _resetFragmentIndex = -1;
+        return this;
+    }
+
+    @Override
+    public int position()
+    {
+        int totalPosition = 0;
+        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+        {
+            final SingleQpidByteBuffer fragment = _fragments[i];
+            totalPosition += fragment.position();
+            if (fragment.position() != fragment.limit())
+            {
+                break;
+            }
+        }
+        return totalPosition;
+    }
+
+    @Override
+    public QpidByteBuffer position(int newPosition)
+    {
+        if (newPosition < 0 || newPosition > limit())
+        {
+            throw new IllegalArgumentException(String.format("new position %d is out of bounds [%d, %d)", newPosition, 0, limit()));
+        }
+        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+        {
+            final SingleQpidByteBuffer fragment = _fragments[i];
+            final int fragmentLimit = fragment.limit();
+            if (newPosition <= fragmentLimit)
+            {
+                fragment.position(newPosition);
+                newPosition = 0;
+            }
+            else
+            {
+                if (fragmentLimit != fragment.capacity())
+                {
+                    throw new IllegalStateException(String.format("QBB Fragment %d has limit %d != capacity %d",
+                                                                  i,
+                                                                  fragmentLimit,
+                                                                  fragment.capacity()));
+                }
+                fragment.position(fragmentLimit);
+                newPosition -= fragmentLimit;
+            }
+        }
+        return this;
+    }
+
+    @Override
+    public int limit()
+    {
+        int totalLimit = 0;
+        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+        {
+            final SingleQpidByteBuffer fragment = _fragments[i];
+            final int fragmentLimit = fragment.limit();
+            totalLimit += fragmentLimit;
+            if (fragmentLimit != fragment.capacity())
+            {
+                break;
+            }
+        }
+
+        return totalLimit;
+    }
+
+    @Override
+    public QpidByteBuffer limit(int newLimit)
+    {
+        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+        {
+            final SingleQpidByteBuffer fragment = _fragments[i];
+            final int fragmentCapacity = fragment.capacity();
+            final int fragmentLimit = Math.min(newLimit, fragmentCapacity);
+            fragment.limit(fragmentLimit);
+            newLimit -= fragmentLimit;
+        }
+        return this;
+    }
+
+    @Override
+    public final QpidByteBuffer mark()
+    {
+        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+        {
+            final SingleQpidByteBuffer fragment = _fragments[i];
+            if (fragment.position() != fragment.limit())
+            {
+                fragment.mark();
+                _resetFragmentIndex = i;
+                return this;
+            }
+        }
+        _resetFragmentIndex = _fragments.length - 1;
+        _fragments[_resetFragmentIndex].mark();
+        return this;
+    }
+
+    @Override
+    public final int remaining()
+    {
+        int remaining = 0;
+        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+        {
+            final SingleQpidByteBuffer fragment = _fragments[i];
+            remaining += fragment.remaining();
+        }
+        return remaining;
+    }
+
+    @Override
+    public final boolean hasRemaining()
+    {
+        return hasRemaining(1);
+    }
+
+    @Override
+    public final boolean hasRemaining(int atLeast)
+    {
+        if (atLeast == 0)
+        {
+            return true;
+        }
+        int remaining = 0;
+        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+        {
+            final SingleQpidByteBuffer fragment = _fragments[i];
+            remaining += fragment.remaining();
+            if (remaining >= atLeast)
+            {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public QpidByteBuffer flip()
+    {
+        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+        {
+            final SingleQpidByteBuffer fragment = _fragments[i];
+            fragment.flip();
+        }
+        return this;
+    }
+
+    @Override
+    public int capacity()
+    {
+        int totalCapacity = 0;
+        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+        {
+            totalCapacity += _fragments[i].capacity();
+        }
+        return totalCapacity;
+    }
+
+    @Override
+    public QpidByteBuffer duplicate()
+    {
+        final SingleQpidByteBuffer[] fragments = new SingleQpidByteBuffer[_fragments.length];
+        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+        {
+            fragments[i] =_fragments[i].duplicate();
+        }
+        MultiQpidByteBuffer duplicate = new MultiQpidByteBuffer(fragments);
+        duplicate._resetFragmentIndex = _resetFragmentIndex;
+        return duplicate;
+    }
+
+    @Override
+    public QpidByteBuffer slice()
+    {
+        return view(0, remaining());
+    }
+
+    @Override
+    public QpidByteBuffer view(int offset, int length)
+    {
+        if (offset + length > remaining())
+        {
+            throw new IllegalArgumentException(String.format("offset: %d, length: %d, remaining: %d", offset, length, remaining()));
+        }
+
+        final List<SingleQpidByteBuffer> fragments = new ArrayList<>(_fragments.length);
+
+        boolean firstFragmentToBeConsidered = true;
+        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize && length > 0; i++)
+        {
+            final SingleQpidByteBuffer fragment = _fragments[i];
+            if (fragment.hasRemaining())
+            {
+                if (!firstFragmentToBeConsidered && fragment.position() != 0)
+                {
+                    throw new IllegalStateException(String.format("Unexpectedly position %d on fragment %d.", fragment.position(), i));
+                }
+                firstFragmentToBeConsidered = false;
+                final int fragmentRemaining = fragment.remaining();
+                if (fragmentRemaining > offset)
+                {
+                    final int fragmentViewLength = Math.min(fragmentRemaining - offset, length);
+                    fragments.add(fragment.view(offset, fragmentViewLength));
+                    length -= fragmentViewLength;
+                    offset = 0;
+                }
+                else
+                {
+                    offset -= fragmentRemaining;
+                }
+            }
+        }
+
+        return QpidByteBufferFactory.createQpidByteBuffer(fragments);
+    }
+
+    @Override
+    public boolean isSparse()
+    {
+        for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+        {
+            final SingleQpidByteBuffer fragment = _fragments[i];
+            if (fragment.isSparse())
+            {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    SingleQpidByteBuffer[] getFragments()
+    {
+        return _fragments;
+    }
+
+    ByteBuffer[] getUnderlyingBuffers()
+    {
+        ByteBuffer[] byteBuffers = new ByteBuffer[_fragments.length];
+        for (int i = 0; i < _fragments.length; i++)
+        {
+            byteBuffers[i] = _fragments[i].getUnderlyingBuffer();
+        }
+        return byteBuffers;
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e64e2826/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java
index 0126e20..abd5b78 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java
@@ -21,8 +21,6 @@
 package org.apache.qpid.server.bytebuffer;
 
 import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLong;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org