You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/10/24 16:07:17 UTC
[3/3] qpid-broker-j git commit: QPID-7832: [Java Broker] Introduce
QpidByteBuffer interface and split implementation into separate classes
QPID-7832: [Java Broker] Introduce QpidByteBuffer interface and split implementation into separate classes
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/e64e2826
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/e64e2826
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/e64e2826
Branch: refs/heads/master
Commit: e64e282688da8963133dc09c630482c686d8be7a
Parents: cd12e2d
Author: Alex Rudyy <or...@apache.org>
Authored: Tue Oct 24 14:37:51 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Oct 24 17:06:38 2017 +0100
----------------------------------------------------------------------
.../server/bytebuffer/MultiQpidByteBuffer.java | 1020 +++++++++++
.../server/bytebuffer/PooledByteBufferRef.java | 2 -
.../qpid/server/bytebuffer/QpidByteBuffer.java | 1646 ++----------------
.../bytebuffer/QpidByteBufferFactory.java | 524 ++++++
.../bytebuffer/QpidByteBufferInputStream.java | 99 ++
.../bytebuffer/QpidByteBufferOutputStream.java | 2 +-
.../server/bytebuffer/SingleQpidByteBuffer.java | 598 +++++++
.../qpid/server/store/StoredMemoryMessage.java | 5 -
.../NonBlockingConnectionTLSDelegate.java | 2 +-
.../server/bytebuffer/QpidByteBufferTest.java | 84 +-
.../server/protocol/v0_10/ServerEncoder.java | 2 +-
11 files changed, 2454 insertions(+), 1530 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e64e2826/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/MultiQpidByteBuffer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/MultiQpidByteBuffer.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/MultiQpidByteBuffer.java
new file mode 100644
index 0000000..1209abf
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/MultiQpidByteBuffer.java
@@ -0,0 +1,1020 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.bytebuffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.BufferOverflowException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.InvalidMarkException;
+import java.nio.channels.ScatteringByteChannel;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.primitives.Chars;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.Shorts;
+
+class MultiQpidByteBuffer implements QpidByteBuffer
+{
+ private final SingleQpidByteBuffer[] _fragments;
+ private volatile int _resetFragmentIndex = -1;
+
+ private MultiQpidByteBuffer(final SingleQpidByteBuffer... fragments)
+ {
+ if (fragments == null)
+ {
+ throw new IllegalArgumentException();
+ }
+ _fragments = fragments;
+ }
+
+ MultiQpidByteBuffer(final List<SingleQpidByteBuffer> fragments)
+ {
+ if (fragments == null)
+ {
+ throw new IllegalArgumentException();
+ }
+ _fragments = fragments.toArray(new SingleQpidByteBuffer[fragments.size()]);
+ }
+
+ //////////////////
+ // Absolute puts
+ //////////////////
+
+ @Override
+ public QpidByteBuffer put(final int index, final byte b)
+ {
+ return put(index, new byte[]{b});
+ }
+
+ @Override
+ public QpidByteBuffer putShort(final int index, final short value)
+ {
+ byte[] valueArray = Shorts.toByteArray(value);
+ return put(index, valueArray);
+ }
+
+ @Override
+ public QpidByteBuffer putChar(final int index, final char value)
+ {
+ byte[] valueArray = Chars.toByteArray(value);
+ return put(index, valueArray);
+ }
+
+ @Override
+ public QpidByteBuffer putInt(final int index, final int value)
+ {
+ byte[] valueArray = Ints.toByteArray(value);
+ return put(index, valueArray);
+ }
+
+ @Override
+ public QpidByteBuffer putLong(final int index, final long value)
+ {
+ byte[] valueArray = Longs.toByteArray(value);
+ return put(index, valueArray);
+ }
+
+ @Override
+ public QpidByteBuffer putFloat(final int index, final float value)
+ {
+ int intValue = Float.floatToRawIntBits(value);
+ return putInt(index, intValue);
+ }
+
+ @Override
+ public QpidByteBuffer putDouble(final int index, final double value)
+ {
+ long longValue = Double.doubleToRawLongBits(value);
+ return putLong(index, longValue);
+ }
+
+ private QpidByteBuffer put(final int index, final byte[] src)
+ {
+ final int valueWidth = src.length;
+ if (index < 0 || index + valueWidth > limit())
+ {
+ throw new IndexOutOfBoundsException(String.format("index %d is out of bounds [%d, %d)", index, 0, limit()));
+ }
+
+ int written = 0;
+ int bytesToSkip = index;
+ for (int i = 0; i < _fragments.length && written != valueWidth; i++)
+ {
+ final SingleQpidByteBuffer buffer = _fragments[i];
+ final int limit = buffer.limit();
+ boolean isLastFragmentToConsider = valueWidth + bytesToSkip - written <= limit;
+ if (!isLastFragmentToConsider && limit != buffer.capacity())
+ {
+ throw new IllegalStateException(String.format("Unexpected limit %d on fragment %d", limit, i));
+ }
+
+ if (bytesToSkip >= limit)
+ {
+ bytesToSkip -= limit;
+ }
+ else
+ {
+ final int bytesToCopy = Math.min(limit - bytesToSkip, valueWidth - written);
+ final int originalPosition = buffer.position();
+ buffer.position(bytesToSkip);
+ buffer.put(src, written, bytesToCopy);
+ buffer.position(originalPosition);
+ written += bytesToCopy;
+ bytesToSkip = 0;
+ }
+ }
+ if (valueWidth != written)
+ {
+ throw new BufferOverflowException();
+ }
+ return this;
+ }
+
+ ////////////////
+ // Relative Puts
+ ////////////////
+
+ @Override
+ public final QpidByteBuffer put(final byte b)
+ {
+ return put(new byte[]{b});
+ }
+
+ @Override
+ public final QpidByteBuffer putUnsignedByte(final short s)
+ {
+ put((byte) s);
+ return this;
+ }
+
+ @Override
+ public final QpidByteBuffer putShort(final short value)
+ {
+ byte[] valueArray = Shorts.toByteArray(value);
+ return put(valueArray);
+ }
+
+ @Override
+ public final QpidByteBuffer putUnsignedShort(final int i)
+ {
+ putShort((short) i);
+ return this;
+ }
+
+ @Override
+ public final QpidByteBuffer putChar(final char value)
+ {
+ byte[] valueArray = Chars.toByteArray(value);
+ return put(valueArray);
+ }
+
+ @Override
+ public final QpidByteBuffer putInt(final int value)
+ {
+ byte[] valueArray = Ints.toByteArray(value);
+ return put(valueArray);
+ }
+
+ @Override
+ public final QpidByteBuffer putUnsignedInt(final long value)
+ {
+ putInt((int) value);
+ return this;
+ }
+
+ @Override
+ public final QpidByteBuffer putLong(final long value)
+ {
+ byte[] valueArray = Longs.toByteArray(value);
+ return put(valueArray);
+ }
+
+ @Override
+ public final QpidByteBuffer putFloat(final float value)
+ {
+ int intValue = Float.floatToRawIntBits(value);
+ return putInt(intValue);
+ }
+
+ @Override
+ public final QpidByteBuffer putDouble(final double value)
+ {
+ long longValue = Double.doubleToRawLongBits(value);
+ return putLong(longValue);
+ }
+
+ @Override
+ public final QpidByteBuffer put(byte[] src)
+ {
+ return put(src, 0, src.length);
+ }
+
+ @Override
+ public final QpidByteBuffer put(final byte[] src, final int offset, final int length)
+ {
+ if (!hasRemaining(length))
+ {
+ throw new BufferOverflowException();
+ }
+
+ int written = 0;
+ for (int i = 0; i < _fragments.length && written != length; i++)
+ {
+ final SingleQpidByteBuffer buffer = _fragments[i];
+ int bytesToWrite = Math.min(buffer.remaining(), length - written);
+ buffer.put(src, offset + written, bytesToWrite);
+ written += bytesToWrite;
+ }
+ if (written != length)
+ {
+ throw new IllegalStateException(String.format("Unexpectedly only wrote %d of %d bytes.", written, length));
+ }
+ return this;
+ }
+
+ @Override
+ public final QpidByteBuffer put(final ByteBuffer src)
+ {
+ final int valueWidth = src.remaining();
+ if (!hasRemaining(valueWidth))
+ {
+ throw new BufferOverflowException();
+ }
+
+ int written = 0;
+ for (int i = 0; i < _fragments.length && written != valueWidth; i++)
+ {
+ final SingleQpidByteBuffer dstFragment = _fragments[i];
+ if (dstFragment.hasRemaining())
+ {
+ final int srcFragmentRemaining = src.remaining();
+ final int dstFragmentRemaining = dstFragment.remaining();
+ if (dstFragmentRemaining >= srcFragmentRemaining)
+ {
+ dstFragment.put(src);
+ written += srcFragmentRemaining;
+ }
+ else
+ {
+ int srcOriginalLimit = src.limit();
+ src.limit(src.position() + dstFragmentRemaining);
+ dstFragment.put(src);
+ src.limit(srcOriginalLimit);
+ written += dstFragmentRemaining;
+ }
+ }
+ }
+ if (written != valueWidth)
+ {
+ throw new IllegalStateException(String.format("Unexpectedly only wrote %d of %d bytes.", written, valueWidth));
+ }
+ return this;
+ }
+
+ @Override
+ public final QpidByteBuffer put(final QpidByteBuffer qpidByteBuffer)
+ {
+ final int valueWidth = qpidByteBuffer.remaining();
+ if (!hasRemaining(valueWidth))
+ {
+ throw new BufferOverflowException();
+ }
+
+ int written = 0;
+ final SingleQpidByteBuffer[] fragments;
+ if (qpidByteBuffer instanceof SingleQpidByteBuffer)
+ {
+ final SingleQpidByteBuffer srcFragment = (SingleQpidByteBuffer) qpidByteBuffer;
+ for (int i = 0; i < _fragments.length && written != valueWidth; i++)
+ {
+ final SingleQpidByteBuffer dstFragment = _fragments[i];
+ if (dstFragment.hasRemaining())
+ {
+ final int dstFragmentRemaining = dstFragment.remaining();
+ if (dstFragmentRemaining >= valueWidth)
+ {
+ dstFragment.put(srcFragment);
+ written += valueWidth;
+ }
+ else
+ {
+ int srcOriginalLimit = srcFragment.limit();
+ srcFragment.limit(srcFragment.position() + dstFragmentRemaining);
+ dstFragment.put(srcFragment);
+ srcFragment.limit(srcOriginalLimit);
+ written += dstFragmentRemaining;
+ }
+ }
+ }
+ }
+ else if (qpidByteBuffer instanceof MultiQpidByteBuffer)
+ {
+ fragments = ((MultiQpidByteBuffer) qpidByteBuffer)._fragments;
+ int i = 0;
+ for (int i1 = 0; i1 < fragments.length; i1++)
+ {
+ final SingleQpidByteBuffer srcFragment = fragments[i1];
+ for (; i < _fragments.length; i++)
+ {
+ final SingleQpidByteBuffer dstFragment = _fragments[i];
+ if (dstFragment.hasRemaining())
+ {
+ final int srcFragmentRemaining = srcFragment.remaining();
+ final int dstFragmentRemaining = dstFragment.remaining();
+ if (dstFragmentRemaining >= srcFragmentRemaining)
+ {
+ dstFragment.put(srcFragment);
+ written += srcFragmentRemaining;
+ break;
+ }
+ else
+ {
+ int srcOriginalLimit = srcFragment.limit();
+ srcFragment.limit(srcFragment.position() + dstFragmentRemaining);
+ dstFragment.put(srcFragment);
+ srcFragment.limit(srcOriginalLimit);
+ written += dstFragmentRemaining;
+ }
+ }
+ }
+ }
+ }
+ else
+ {
+ throw new IllegalStateException("unknown QBB implementation");
+ }
+
+ if (written != valueWidth)
+ {
+ throw new IllegalStateException(String.format("Unexpectedly only wrote %d of %d bytes.",
+ written,
+ valueWidth));
+ }
+ return this;
+ }
+
+ ///////////////////
+ // Absolute Gets
+ ///////////////////
+
+ @Override
+ public byte get(final int index)
+ {
+ final byte[] byteArray = getByteArray(index, 1);
+ return byteArray[0];
+ }
+
+ @Override
+ public short getShort(final int index)
+ {
+ final byte[] byteArray = getByteArray(index, 2);
+ return Shorts.fromByteArray(byteArray);
+ }
+
+ @Override
+ public final int getUnsignedShort(int index)
+ {
+ return ((int) getShort(index)) & 0xFFFF;
+ }
+
+ @Override
+ public char getChar(final int index)
+ {
+ final byte[] byteArray = getByteArray(index, 2);
+ return Chars.fromByteArray(byteArray);
+ }
+
+ @Override
+ public int getInt(final int index)
+ {
+ final byte[] byteArray = getByteArray(index, 4);
+ return Ints.fromByteArray(byteArray);
+ }
+
+ @Override
+ public long getLong(final int index)
+ {
+ final byte[] byteArray = getByteArray(index, 8);
+ return Longs.fromByteArray(byteArray);
+ }
+
+ @Override
+ public float getFloat(final int index)
+ {
+ final int intValue = getInt(index);
+ return Float.intBitsToFloat(intValue);
+ }
+
+ @Override
+ public double getDouble(final int index)
+ {
+ final long longValue = getLong(index);
+ return Double.longBitsToDouble(longValue);
+ }
+
+ private byte[] getByteArray(final int index, final int length)
+ {
+ if (index < 0 || index + length > limit())
+ {
+ throw new IndexOutOfBoundsException(String.format("%d bytes at index %d do not fit into bounds [%d, %d)", length, index, 0, limit()));
+ }
+
+ byte[] value = new byte[length];
+
+ int consumed = 0;
+ int bytesToSkip = index;
+ for (int i = 0; i < _fragments.length && consumed != length; i++)
+ {
+ final SingleQpidByteBuffer buffer = _fragments[i];
+ final int limit = buffer.limit();
+ boolean isLastFragmentToConsider = length + bytesToSkip - consumed <= limit;
+ if (!isLastFragmentToConsider && limit != buffer.capacity())
+ {
+ throw new IllegalStateException(String.format("Unexpectedly limit %d on fragment %d.", limit, i));
+ }
+
+ if (bytesToSkip >= limit)
+ {
+ bytesToSkip -= limit;
+ }
+ else
+ {
+ final int bytesToCopy = Math.min(limit - bytesToSkip, length - consumed);
+ final int originalPosition = buffer.position();
+ buffer.position(bytesToSkip);
+ buffer.get(value, consumed, bytesToCopy);
+ buffer.position(originalPosition);
+ consumed += bytesToCopy;
+ bytesToSkip = 0;
+ }
+ }
+ if (consumed != length)
+ {
+ throw new IllegalStateException(String.format("Unexpectedly only consumed %d of %d bytes.", consumed, length));
+ }
+ return value;
+ }
+
+ //////////////////
+ // Relative Gets
+ //////////////////
+
+ @Override
+ public final byte get()
+ {
+ byte[] value = new byte[1];
+ get(value, 0, 1);
+ return value[0];
+ }
+
+ @Override
+ public final short getUnsignedByte()
+ {
+ return (short) (get() & 0xFF);
+ }
+
+ @Override
+ public final short getShort()
+ {
+ byte[] value = new byte[2];
+ get(value, 0, value.length);
+ return Shorts.fromByteArray(value);
+ }
+
+ @Override
+ public final int getUnsignedShort()
+ {
+ return ((int) getShort()) & 0xFFFF;
+ }
+
+ @Override
+ public final char getChar()
+ {
+ byte[] value = new byte[2];
+ get(value, 0, value.length);
+ return Chars.fromByteArray(value);
+ }
+
+ @Override
+ public final int getInt()
+ {
+ byte[] value = new byte[4];
+ get(value, 0, value.length);
+ return Ints.fromByteArray(value);
+ }
+
+ @Override
+ public final long getUnsignedInt()
+ {
+ return ((long) getInt()) & 0xFFFFFFFFL;
+ }
+
+ @Override
+ public final long getLong()
+ {
+ byte[] value = new byte[8];
+ get(value, 0, value.length);
+ return Longs.fromByteArray(value);
+ }
+
+ @Override
+ public final float getFloat()
+ {
+ final int intValue = getInt();
+ return Float.intBitsToFloat(intValue);
+ }
+
+ @Override
+ public final double getDouble()
+ {
+ final long longValue = getLong();
+ return Double.longBitsToDouble(longValue);
+ }
+
+ @Override
+ public final QpidByteBuffer get(final byte[] dst)
+ {
+ return get(dst, 0, dst.length);
+ }
+
+ @Override
+ public final QpidByteBuffer get(final byte[] dst, final int offset, final int length)
+ {
+ if (!hasRemaining(length))
+ {
+ throw new BufferUnderflowException();
+ }
+
+ int consumed = 0;
+ for (int i = 0; i < _fragments.length && consumed != length; i++)
+ {
+ final SingleQpidByteBuffer buffer = _fragments[i];
+ int bytesToCopy = Math.min(buffer.remaining(), length - consumed);
+ buffer.get(dst, offset + consumed, bytesToCopy);
+ consumed += bytesToCopy;
+ }
+ if (consumed != length)
+ {
+ throw new IllegalStateException(String.format("Unexpectedly only consumed %d of %d bytes.", consumed, length));
+ }
+ return this;
+ }
+
+ ///////////////
+ // Other stuff
+ ////////////////
+
+ @Override
+ public final void copyTo(final byte[] dst)
+ {
+ final int remaining = remaining();
+ if (remaining < dst.length)
+ {
+ throw new BufferUnderflowException();
+ }
+ if (remaining > dst.length)
+ {
+ throw new BufferOverflowException();
+ }
+ int offset = 0;
+ for (SingleQpidByteBuffer fragment : _fragments)
+ {
+ final int length = Math.min(fragment.remaining(), dst.length - offset);
+ fragment.getUnderlyingBuffer().duplicate().get(dst, offset, length);
+ offset += length;
+ }
+ }
+
+ @Override
+ public final void copyTo(final ByteBuffer dst)
+ {
+ if (dst.remaining() < remaining())
+ {
+ throw new BufferOverflowException();
+ }
+ for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+ {
+ final SingleQpidByteBuffer fragment = _fragments[i];
+ dst.put(fragment.getUnderlyingBuffer().duplicate());
+ }
+ }
+
+ @Override
+ public final void putCopyOf(final QpidByteBuffer qpidByteBuffer)
+ {
+ int sourceRemaining = qpidByteBuffer.remaining();
+ if (!hasRemaining(sourceRemaining))
+ {
+ throw new BufferOverflowException();
+ }
+ if (qpidByteBuffer instanceof MultiQpidByteBuffer)
+ {
+ MultiQpidByteBuffer source = (MultiQpidByteBuffer) qpidByteBuffer;
+ for (int i = 0, fragmentsSize = source._fragments.length; i < fragmentsSize; i++)
+ {
+ final SingleQpidByteBuffer srcFragment = source._fragments[i];
+ put(srcFragment.getUnderlyingBuffer().duplicate());
+ }
+ }
+ else if (qpidByteBuffer instanceof SingleQpidByteBuffer)
+ {
+ SingleQpidByteBuffer source = (SingleQpidByteBuffer) qpidByteBuffer;
+ put(source.getUnderlyingBuffer().duplicate());
+ }
+ else
+ {
+ throw new IllegalStateException("unknown QBB implementation");
+ }
+ }
+
+ @Override
+ public final boolean isDirect()
+ {
+ for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+ {
+ final SingleQpidByteBuffer fragment = _fragments[i];
+ if (!fragment.isDirect())
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public final void close()
+ {
+ dispose();
+ }
+
+ @Override
+ public final void dispose()
+ {
+ for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+ {
+ final SingleQpidByteBuffer fragment = _fragments[i];
+ fragment.dispose();
+ }
+ }
+
+ @Override
+ public final InputStream asInputStream()
+ {
+ return new QpidByteBufferInputStream(this);
+ }
+
+ @Override
+ public final long read(ScatteringByteChannel channel) throws IOException
+ {
+ ByteBuffer[] byteBuffers = new ByteBuffer[_fragments.length];
+ for (int i = 0; i < byteBuffers.length; i++)
+ {
+ final SingleQpidByteBuffer fragment = _fragments[i];
+ byteBuffers[i] = fragment.getUnderlyingBuffer();
+ }
+ return channel.read(byteBuffers);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "QpidByteBuffer{" + _fragments.length + " fragments}";
+ }
+
+ @Override
+ public QpidByteBuffer reset()
+ {
+ if (_resetFragmentIndex < 0)
+ {
+ throw new InvalidMarkException();
+ }
+ final SingleQpidByteBuffer fragment = _fragments[_resetFragmentIndex];
+ fragment.reset();
+ for (int i = _resetFragmentIndex + 1, size = _fragments.length; i < size; ++i)
+ {
+ _fragments[i].position(0);
+ }
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer rewind()
+ {
+ _resetFragmentIndex = -1;
+ for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+ {
+ final SingleQpidByteBuffer fragment = _fragments[i];
+ fragment.rewind();
+ }
+ return this;
+ }
+
+ @Override
+ public final boolean hasArray()
+ {
+ return false;
+ }
+
+ @Override
+ public byte[] array()
+ {
+ throw new UnsupportedOperationException("This QpidByteBuffer is not backed by an array.");
+ }
+
+ @Override
+ public QpidByteBuffer clear()
+ {
+ for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+ {
+ _fragments[i].clear();
+ }
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer compact()
+ {
+ int position = position();
+ int limit = limit();
+ if (position != 0)
+ {
+ int dstPos = 0;
+ for (int srcPos = position; srcPos < limit; ++srcPos, ++dstPos)
+ {
+ put(dstPos, get(srcPos));
+ }
+ position(dstPos);
+ limit(capacity());
+ }
+ _resetFragmentIndex = -1;
+ return this;
+ }
+
+ @Override
+ public int position()
+ {
+ int totalPosition = 0;
+ for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+ {
+ final SingleQpidByteBuffer fragment = _fragments[i];
+ totalPosition += fragment.position();
+ if (fragment.position() != fragment.limit())
+ {
+ break;
+ }
+ }
+ return totalPosition;
+ }
+
+ @Override
+ public QpidByteBuffer position(int newPosition)
+ {
+ if (newPosition < 0 || newPosition > limit())
+ {
+ throw new IllegalArgumentException(String.format("new position %d is out of bounds [%d, %d)", newPosition, 0, limit()));
+ }
+ for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+ {
+ final SingleQpidByteBuffer fragment = _fragments[i];
+ final int fragmentLimit = fragment.limit();
+ if (newPosition <= fragmentLimit)
+ {
+ fragment.position(newPosition);
+ newPosition = 0;
+ }
+ else
+ {
+ if (fragmentLimit != fragment.capacity())
+ {
+ throw new IllegalStateException(String.format("QBB Fragment %d has limit %d != capacity %d",
+ i,
+ fragmentLimit,
+ fragment.capacity()));
+ }
+ fragment.position(fragmentLimit);
+ newPosition -= fragmentLimit;
+ }
+ }
+ return this;
+ }
+
+ @Override
+ public int limit()
+ {
+ int totalLimit = 0;
+ for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+ {
+ final SingleQpidByteBuffer fragment = _fragments[i];
+ final int fragmentLimit = fragment.limit();
+ totalLimit += fragmentLimit;
+ if (fragmentLimit != fragment.capacity())
+ {
+ break;
+ }
+ }
+
+ return totalLimit;
+ }
+
+ @Override
+ public QpidByteBuffer limit(int newLimit)
+ {
+ for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+ {
+ final SingleQpidByteBuffer fragment = _fragments[i];
+ final int fragmentCapacity = fragment.capacity();
+ final int fragmentLimit = Math.min(newLimit, fragmentCapacity);
+ fragment.limit(fragmentLimit);
+ newLimit -= fragmentLimit;
+ }
+ return this;
+ }
+
+ @Override
+ public final QpidByteBuffer mark()
+ {
+ for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+ {
+ final SingleQpidByteBuffer fragment = _fragments[i];
+ if (fragment.position() != fragment.limit())
+ {
+ fragment.mark();
+ _resetFragmentIndex = i;
+ return this;
+ }
+ }
+ _resetFragmentIndex = _fragments.length - 1;
+ _fragments[_resetFragmentIndex].mark();
+ return this;
+ }
+
+ @Override
+ public final int remaining()
+ {
+ int remaining = 0;
+ for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+ {
+ final SingleQpidByteBuffer fragment = _fragments[i];
+ remaining += fragment.remaining();
+ }
+ return remaining;
+ }
+
+ @Override
+ public final boolean hasRemaining()
+ {
+ return hasRemaining(1);
+ }
+
+ @Override
+ public final boolean hasRemaining(int atLeast)
+ {
+ if (atLeast == 0)
+ {
+ return true;
+ }
+ int remaining = 0;
+ for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+ {
+ final SingleQpidByteBuffer fragment = _fragments[i];
+ remaining += fragment.remaining();
+ if (remaining >= atLeast)
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public QpidByteBuffer flip()
+ {
+ for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+ {
+ final SingleQpidByteBuffer fragment = _fragments[i];
+ fragment.flip();
+ }
+ return this;
+ }
+
+ @Override
+ public int capacity()
+ {
+ int totalCapacity = 0;
+ for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+ {
+ totalCapacity += _fragments[i].capacity();
+ }
+ return totalCapacity;
+ }
+
+ @Override
+ public QpidByteBuffer duplicate()
+ {
+ final SingleQpidByteBuffer[] fragments = new SingleQpidByteBuffer[_fragments.length];
+ for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+ {
+ fragments[i] =_fragments[i].duplicate();
+ }
+ MultiQpidByteBuffer duplicate = new MultiQpidByteBuffer(fragments);
+ duplicate._resetFragmentIndex = _resetFragmentIndex;
+ return duplicate;
+ }
+
+ @Override
+ public QpidByteBuffer slice()
+ {
+ return view(0, remaining());
+ }
+
+ @Override
+ public QpidByteBuffer view(int offset, int length)
+ {
+ if (offset + length > remaining())
+ {
+ throw new IllegalArgumentException(String.format("offset: %d, length: %d, remaining: %d", offset, length, remaining()));
+ }
+
+ final List<SingleQpidByteBuffer> fragments = new ArrayList<>(_fragments.length);
+
+ boolean firstFragmentToBeConsidered = true;
+ for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize && length > 0; i++)
+ {
+ final SingleQpidByteBuffer fragment = _fragments[i];
+ if (fragment.hasRemaining())
+ {
+ if (!firstFragmentToBeConsidered && fragment.position() != 0)
+ {
+ throw new IllegalStateException(String.format("Unexpectedly position %d on fragment %d.", fragment.position(), i));
+ }
+ firstFragmentToBeConsidered = false;
+ final int fragmentRemaining = fragment.remaining();
+ if (fragmentRemaining > offset)
+ {
+ final int fragmentViewLength = Math.min(fragmentRemaining - offset, length);
+ fragments.add(fragment.view(offset, fragmentViewLength));
+ length -= fragmentViewLength;
+ offset = 0;
+ }
+ else
+ {
+ offset -= fragmentRemaining;
+ }
+ }
+ }
+
+ return QpidByteBufferFactory.createQpidByteBuffer(fragments);
+ }
+
+ @Override
+ public boolean isSparse()
+ {
+ for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
+ {
+ final SingleQpidByteBuffer fragment = _fragments[i];
+ if (fragment.isSparse())
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ SingleQpidByteBuffer[] getFragments()
+ {
+ return _fragments;
+ }
+
+ ByteBuffer[] getUnderlyingBuffers()
+ {
+ ByteBuffer[] byteBuffers = new ByteBuffer[_fragments.length];
+ for (int i = 0; i < _fragments.length; i++)
+ {
+ byteBuffers[i] = _fragments[i].getUnderlyingBuffer();
+ }
+ return byteBuffers;
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e64e2826/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java
index 0126e20..abd5b78 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java
@@ -21,8 +21,6 @@
package org.apache.qpid.server.bytebuffer;
import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org