You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/10/18 15:21:43 UTC
[09/10] qpid-broker-j git commit: QPID-7832: [Java Broker] Refactor
store/protocol API using Collection
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
index 346e203..6f293dc 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
@@ -27,14 +27,16 @@ import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
+import java.nio.InvalidMarkException;
import java.nio.channels.GatheringByteChannel;
-import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.ScatteringByteChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
@@ -43,68 +45,144 @@ import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLException;
-import org.apache.qpid.server.streams.CompositeInputStream;
+import com.google.common.primitives.Chars;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.Shorts;
public class QpidByteBuffer implements AutoCloseable
{
- private static final AtomicIntegerFieldUpdater<QpidByteBuffer>
- DISPOSED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(
- QpidByteBuffer.class,
- "_disposed");
-
- private static final ThreadLocal<QpidByteBuffer> _cachedBuffer = new ThreadLocal<>();
private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0];
- private static final double REALLOCATION_CAPACITY_THRESHOLD_FRACTION = 0.9;
+ private static final QpidByteBuffer EMPTY_QPID_BYTE_BUFFER = QpidByteBuffer.wrap(new byte[0]);
+ private static final ThreadLocal<QpidByteBufferFragment> _cachedBuffer = new ThreadLocal<>();
private volatile static boolean _isPoolInitialized;
private volatile static BufferPool _bufferPool;
private volatile static int _pooledBufferSize;
- private volatile static ByteBuffer _zeroed;
private volatile static double _sparsityFraction;
- private final int _offset;
+ private volatile static ByteBuffer _zeroed;
+ private volatile List<QpidByteBufferFragment> _fragments = new ArrayList<>();
+ private volatile int _resetFragmentIndex = -1;
- final ByteBufferRef _ref;
- volatile ByteBuffer _buffer;
- @SuppressWarnings("unused")
- private volatile int _disposed;
+ //////////////////
+ // Absolute puts
+ //////////////////
+ public QpidByteBuffer put(final int index, final byte b)
+ {
+ return put(index, new byte[]{b});
+ }
- QpidByteBuffer(ByteBufferRef ref)
+ public QpidByteBuffer putShort(final int index, final short value)
{
- this(ref, ref.getBuffer(), 0);
+ byte[] valueArray = Shorts.toByteArray(value);
+ return put(index, valueArray);
}
- private QpidByteBuffer(ByteBufferRef ref, ByteBuffer buffer, int offset)
+ public QpidByteBuffer putChar(final int index, final char value)
{
- _ref = ref;
- _buffer = buffer;
- _offset = offset;
- _ref.incrementRef(capacity());
+ byte[] valueArray = Chars.toByteArray(value);
+ return put(index, valueArray);
}
- public final boolean isDirect()
+ public QpidByteBuffer putInt(final int index, final int value)
{
- return _buffer.isDirect();
+ byte[] valueArray = Ints.toByteArray(value);
+ return put(index, valueArray);
}
- public final short getUnsignedByte()
+ public QpidByteBuffer putLong(final int index, final long value)
{
- return (short) (((short) get()) & 0xFF);
+ byte[] valueArray = Longs.toByteArray(value);
+ return put(index, valueArray);
}
- public final int getUnsignedShort()
+ public QpidByteBuffer putFloat(final int index, final float value)
{
- return ((int) getShort()) & 0xffff;
+ int intValue = Float.floatToRawIntBits(value);
+ return putInt(index, intValue);
}
- public final int getUnsignedShort(int pos)
+ public QpidByteBuffer putDouble(final int index, final double value)
{
- return ((int) getShort(pos)) & 0xffff;
+ long longValue = Double.doubleToRawLongBits(value);
+ return putLong(index, longValue);
}
+ public final QpidByteBuffer put(final int index, final byte[] src)
+ {
+ final int valueWidth = src.length;
+ if (index < 0 || index + valueWidth > limit())
+ {
+ throw new IndexOutOfBoundsException(String.format("index %d is out of bounds [%d, %d)", index, 0, limit()));
+ }
+ boolean bytewise = false;
+ int written = 0;
+ int bytesToSkip = index;
+ for (int i = 0, size = _fragments.size(); i < size; i++)
+ {
+ final QpidByteBufferFragment buffer = _fragments.get(i);
+ final int limit = buffer.limit();
+ boolean isLastFragmentToConsider = valueWidth + bytesToSkip - written <= limit;
+ if (!isLastFragmentToConsider && limit != buffer.capacity())
+ {
+ throw new IllegalStateException(String.format("Unexpected limit %d on fragment %d", limit, i));
+ }
- public final long getUnsignedInt()
+ if (bytewise)
+ {
+ int offset = 0;
+ while (limit > offset && written < valueWidth)
+ {
+ buffer.put(offset, src[written]);
+ offset++;
+ written++;
+ }
+ if (written == valueWidth)
+ {
+ break;
+ }
+ }
+ else
+ {
+ if (limit >= bytesToSkip + valueWidth)
+ {
+ for (int i1 = 0; i1 < valueWidth; i1++)
+ {
+ buffer.put(bytesToSkip + i1, src[i1]);
+ written++;
+ }
+ break;
+ }
+ else if (limit > bytesToSkip)
+ {
+ bytewise = true;
+ while (limit > bytesToSkip + written)
+ {
+ buffer.put(bytesToSkip + written, src[written]);
+ written++;
+ }
+ bytesToSkip = 0;
+ }
+ else
+ {
+ bytesToSkip -= limit;
+ }
+ }
+ }
+ if (valueWidth != written)
+ {
+ throw new BufferOverflowException();
+ }
+ return this;
+ }
+
+ ////////////////
+ // Relative Puts
+ ////////////////
+
+ public final QpidByteBuffer put(final byte b)
{
- return ((long) getInt()) & 0xffffffffL;
+ return put(new byte[]{b});
}
public final QpidByteBuffer putUnsignedByte(final short s)
@@ -113,224 +191,493 @@ public class QpidByteBuffer implements AutoCloseable
return this;
}
+ public final QpidByteBuffer putShort(final short value)
+ {
+ byte[] valueArray = Shorts.toByteArray(value);
+ return put(valueArray);
+ }
+
public final QpidByteBuffer putUnsignedShort(final int i)
{
putShort((short) i);
return this;
}
+ public final QpidByteBuffer putChar(final char value)
+ {
+ byte[] valueArray = Chars.toByteArray(value);
+ return put(valueArray);
+ }
+
+ public final QpidByteBuffer putInt(final int value)
+ {
+ byte[] valueArray = Ints.toByteArray(value);
+ return put(valueArray);
+ }
+
public final QpidByteBuffer putUnsignedInt(final long value)
{
putInt((int) value);
return this;
}
- @Override
- public final void close()
+ public final QpidByteBuffer putLong(final long value)
{
- dispose();
+ byte[] valueArray = Longs.toByteArray(value);
+ return put(valueArray);
}
- public final void dispose()
+ public final QpidByteBuffer putFloat(final float value)
{
- if (DISPOSED_UPDATER.compareAndSet(this, 0, 1))
- {
- _ref.decrementRef(capacity());
- _buffer = null;
- }
+ int intValue = Float.floatToRawIntBits(value);
+ return putInt(intValue);
}
- public final InputStream asInputStream()
+ public final QpidByteBuffer putDouble(final double value)
{
- return new BufferInputStream(this);
+ long longValue = Double.doubleToRawLongBits(value);
+ return putLong(longValue);
}
- public final ByteBuffer asByteBuffer()
+ public final QpidByteBuffer put(byte[] src)
{
- try
+ return put(src, 0, src.length);
+ }
+
+ public final QpidByteBuffer put(final byte[] src, final int offset, final int length)
+ {
+ final int valueWidth = length;
+ if (valueWidth > remaining())
{
- return getUnderlyingBuffer();
+ throw new BufferOverflowException();
}
- finally
+ boolean bytewise = false;
+ int written = 0;
+ for (int i = 0, size = _fragments.size(); i < size; i++)
{
- dispose();
+ final QpidByteBufferFragment buffer = _fragments.get(i);
+ if (bytewise)
+ {
+ while (buffer.remaining() > 0 && written < valueWidth)
+ {
+ buffer.put(src[offset + written]);
+ written++;
+ }
+ if (written == valueWidth)
+ {
+ break;
+ }
+ }
+ else
+ {
+ final int remaining = buffer.remaining();
+ if (remaining >= valueWidth)
+ {
+ buffer.put(src, offset, length);
+ written += length;
+ break;
+ }
+ else
+ {
+ bytewise = true;
+ buffer.put(src, offset, remaining);
+ written = remaining;
+ }
+ }
}
+ if (written != length)
+ {
+ throw new IllegalStateException(String.format("Unexpectedly only wrote %d of %d bytes.", written, length));
+ }
+ return this;
}
- public final CharBuffer decode(Charset charset)
+ public final QpidByteBuffer put(final ByteBuffer src)
{
- return charset.decode(getUnderlyingBuffer());
+ final int valueWidth = src.remaining();
+ if (valueWidth > remaining())
+ {
+ throw new BufferOverflowException();
+ }
+ boolean bytewise = false;
+ int written = 0;
+ for (int i = 0, size = _fragments.size(); i < size; i++)
+ {
+ final QpidByteBufferFragment buffer = _fragments.get(i);
+ if (bytewise)
+ {
+ while (src.remaining() > 0 && buffer.remaining() > 0)
+ {
+ buffer.put(src.get());
+ written++;
+ }
+ if (written == valueWidth)
+ {
+ break;
+ }
+ }
+ else
+ {
+ final int remaining = buffer.remaining();
+ if (remaining >= valueWidth)
+ {
+ buffer.put(src);
+ written += valueWidth;
+ break;
+ }
+ else
+ {
+ bytewise = true;
+ while (remaining > written)
+ {
+ buffer.put(src.get());
+ written++;
+ }
+ }
+ }
+ }
+ if (written != valueWidth)
+ {
+ throw new IllegalStateException(String.format("Unexpectedly only wrote %d of %d bytes.", written, valueWidth));
+ }
+ return this;
}
- public final int read(ReadableByteChannel channel) throws IOException
+ public final QpidByteBuffer put(final QpidByteBuffer src)
{
- return channel.read(getUnderlyingBuffer());
+ final int valueWidth = src.remaining();
+ if (valueWidth > remaining())
+ {
+ throw new BufferOverflowException();
+ }
+ int i = 0;
+ boolean bytewise = false;
+ int written = 0;
+ int size = _fragments.size();
+ final List<QpidByteBufferFragment> fragments = src._fragments;
+ for (int i1 = 0, fragmentsSize = fragments.size(); i1 < fragmentsSize; i1++)
+ {
+ final QpidByteBufferFragment srcFragment = fragments.get(i1);
+ for (; i < size; i++)
+ {
+ final QpidByteBufferFragment dstFragment = _fragments.get(i);
+ if (dstFragment.hasRemaining())
+ {
+ final int srcFragmentRemaining = srcFragment.remaining();
+ if (bytewise)
+ {
+ while (srcFragmentRemaining > 0 && dstFragment.remaining() > 0)
+ {
+ dstFragment.put(srcFragment.get());
+ written++;
+ }
+ if (!srcFragment.hasRemaining())
+ {
+ break;
+ }
+ }
+ else
+ {
+ final int remaining = dstFragment.remaining();
+ if (remaining >= srcFragmentRemaining)
+ {
+ dstFragment.put(srcFragment);
+ written += srcFragmentRemaining;
+ break;
+ }
+ else
+ {
+ bytewise = true;
+ while (remaining > written)
+ {
+ dstFragment.put(srcFragment.get());
+ written++;
+ }
+ }
+ }
+ }
+ }
+ }
+ if (written != valueWidth)
+ {
+ throw new IllegalStateException(String.format("Unexpectedly only wrote %d of %d bytes.", written, valueWidth));
+ }
+ return this;
}
- public final SSLEngineResult decryptSSL(SSLEngine engine, QpidByteBuffer dest) throws SSLException
- {
- return engine.unwrap(getUnderlyingBuffer(), dest.getUnderlyingBuffer());
- }
+ ///////////////////
+ // Absolute Gets
+ ///////////////////
- @Override
- public String toString()
+ public byte get(final int index)
{
- return "QpidByteBuffer{" +
- "_buffer=" + _buffer +
- ", _disposed=" + _disposed +
- '}';
+ final byte[] byteArray = getByteArray(index, 1);
+ return byteArray[0];
}
- public final boolean hasRemaining()
+ public short getShort(final int index)
{
- return _buffer.hasRemaining();
+ final byte[] byteArray = getByteArray(index, 2);
+ return Shorts.fromByteArray(byteArray);
}
- public QpidByteBuffer putInt(final int index, final int value)
+ public final int getUnsignedShort(int index)
{
- _buffer.putInt(index, value);
- return this;
+ return ((int) getShort(index)) & 0xFFFF;
}
- public QpidByteBuffer putShort(final int index, final short value)
+ public char getChar(final int index)
{
- _buffer.putShort(index, value);
- return this;
+ final byte[] byteArray = getByteArray(index, 2);
+ return Chars.fromByteArray(byteArray);
}
- public QpidByteBuffer putChar(final int index, final char value)
+ public int getInt(final int index)
{
- _buffer.putChar(index, value);
- return this;
+ final byte[] byteArray = getByteArray(index, 4);
+ return Ints.fromByteArray(byteArray);
}
- public final QpidByteBuffer put(final byte b)
+ public long getLong(final int index)
{
- _buffer.put(b);
- return this;
+ final byte[] byteArray = getByteArray(index, 8);
+ return Longs.fromByteArray(byteArray);
}
- public QpidByteBuffer put(final int index, final byte b)
+ public float getFloat(final int index)
{
- _buffer.put(index, b);
- return this;
+ final int intValue = getInt(index);
+ return Float.intBitsToFloat(intValue);
}
- public short getShort(final int index)
+ public double getDouble(final int index)
{
- return _buffer.getShort(index);
+ final long longValue = getLong(index);
+ return Double.longBitsToDouble(longValue);
}
- public final QpidByteBuffer mark()
+ private byte[] getByteArray(final int index, final int length)
{
- _buffer.mark();
- return this;
- }
+ if (index + length > limit())
+ {
+ throw new IndexOutOfBoundsException(String.format("%d bytes at index %d do not fit into bounds [%d, %d)", length, index, 0, limit()));
+ }
- public final long getLong()
- {
- return _buffer.getLong();
+ byte[] value = new byte[length];
+ boolean bytewise = false;
+ int consumed = 0;
+ int bytesToSkip = index;
+ for (int i = 0, size = _fragments.size(); i < size; i++)
+ {
+ final QpidByteBufferFragment buffer = _fragments.get(i);
+ final int limit = buffer.limit();
+ boolean isLastFragmentToConsider = length + bytesToSkip - consumed <= limit;
+ if (!isLastFragmentToConsider && limit != buffer.capacity())
+ {
+ throw new IllegalStateException(String.format("Unexpectedly limit %d on fragment %d.", limit, i));
+ }
+ if (bytewise)
+ {
+ int offset = 0;
+ while (limit > offset && consumed < length)
+ {
+ value[consumed] = buffer.get(offset);
+ offset++;
+ consumed++;
+ }
+ if (consumed == length)
+ {
+ break;
+ }
+ }
+ else
+ {
+ if (limit >= bytesToSkip + length)
+ {
+ while (consumed < length)
+ {
+ value[consumed] = buffer.get(bytesToSkip + consumed);
+ consumed++;
+ }
+ break;
+ }
+ else if (limit > bytesToSkip)
+ {
+ bytewise = true;
+ while (limit > bytesToSkip + consumed)
+ {
+ value[consumed] = buffer.get(bytesToSkip + consumed);
+ consumed++;
+ }
+ bytesToSkip = 0;
+ }
+ else
+ {
+ bytesToSkip -= limit;
+ }
+ }
+ }
+ if (consumed != length)
+ {
+ throw new IllegalStateException(String.format("Unexpectedly only consumed %d of %d bytes.", consumed, length));
+ }
+ return value;
}
- public QpidByteBuffer putFloat(final int index, final float value)
- {
- _buffer.putFloat(index, value);
- return this;
- }
+ //////////////////
+ // Relative Gets
+ //////////////////
- public double getDouble(final int index)
+ public final byte get()
{
- return _buffer.getDouble(index);
+ byte[] value = new byte[1];
+ get(value, 0, 1);
+ return value[0];
}
- public final boolean hasArray()
+ public final short getUnsignedByte()
{
- return _buffer.hasArray();
+ return (short) (get() & 0xFF);
}
- public final double getDouble()
+ public final short getShort()
{
- return _buffer.getDouble();
+ byte[] value = new byte[2];
+ get(value, 0, value.length);
+ return Shorts.fromByteArray(value);
}
- public final QpidByteBuffer putFloat(final float value)
+ public final int getUnsignedShort()
{
- _buffer.putFloat(value);
- return this;
+ return ((int) getShort()) & 0xFFFF;
}
- public final QpidByteBuffer putInt(final int value)
+ public final char getChar()
{
- _buffer.putInt(value);
- return this;
+ byte[] value = new byte[2];
+ get(value, 0, value.length);
+ return Chars.fromByteArray(value);
}
- public byte[] array()
+ public final int getInt()
{
- return _buffer.array();
+ byte[] value = new byte[4];
+ get(value, 0, value.length);
+ return Ints.fromByteArray(value);
}
- public final QpidByteBuffer putShort(final short value)
+ public final long getUnsignedInt()
{
- _buffer.putShort(value);
- return this;
+ return ((long) getInt()) & 0xFFFFFFFFL;
}
- public int getInt(final int index)
+ public final long getLong()
{
- return _buffer.getInt(index);
+ byte[] value = new byte[8];
+ get(value, 0, value.length);
+ return Longs.fromByteArray(value);
}
- public final int remaining()
+ public final float getFloat()
{
- return _buffer.remaining();
+ final int intValue = getInt();
+ return Float.intBitsToFloat(intValue);
}
- public final QpidByteBuffer put(final byte[] src)
+ public final double getDouble()
{
- _buffer.put(src);
- return this;
+ final long longValue = getLong();
+ return Double.longBitsToDouble(longValue);
}
- public final QpidByteBuffer put(final ByteBuffer src)
+ public final QpidByteBuffer get(final byte[] dst)
{
- _buffer.put(src);
- return this;
+ return get(dst, 0, dst.length);
}
- public final QpidByteBuffer put(final QpidByteBuffer src)
+ public final QpidByteBuffer get(final byte[] dst, final int offset, final int length)
{
- int sourceRemaining = src.remaining();
- if (sourceRemaining > remaining())
+ if (remaining() < length)
{
- throw new BufferOverflowException();
+ throw new BufferUnderflowException();
+ }
+ boolean bytewise = false;
+ int consumed = 0;
+ for (int i = 0, size = _fragments.size(); i < size; i++)
+ {
+ final QpidByteBufferFragment buffer = _fragments.get(i);
+ if (bytewise)
+ {
+ while (buffer.hasRemaining() && consumed < length)
+ {
+ dst[offset + consumed] = buffer.get();
+ consumed++;
+ }
+ if (consumed == length)
+ {
+ return this;
+ }
+ }
+ else
+ {
+ final int remaining = buffer.remaining();
+ if (remaining >= length)
+ {
+ buffer.get(dst, offset, length);
+ return this;
+ }
+ else if (remaining > 0)
+ {
+ bytewise = true;
+ while (remaining > consumed)
+ {
+ dst[offset + consumed] = buffer.get();
+ consumed++;
+ }
+ }
+ }
+ }
+ if (consumed != length)
+ {
+ throw new IllegalStateException(String.format("Unexpectedly only consumed %d of %d bytes.", consumed, length));
}
-
- _buffer.put(src.getUnderlyingBuffer());
return this;
}
- public final QpidByteBuffer get(final byte[] dst, final int offset, final int length)
- {
- _buffer.get(dst, offset, length);
- return this;
- }
+ ///////////////
+ // Other stuff
+ ////////////////
- public final QpidByteBuffer get(final ByteBuffer dst)
+ public final void copyTo(final byte[] dst)
{
- int destinationRemaining = dst.remaining();
- int remaining = remaining();
- if (destinationRemaining < remaining)
+ if (remaining() < dst.length)
{
throw new BufferUnderflowException();
}
- dst.put(_buffer);
- return this;
+ if (remaining() > dst.length)
+ {
+ throw new BufferOverflowException();
+ }
+ int offset = 0;
+ for (QpidByteBufferFragment fragment : _fragments)
+ {
+ final int length = Math.min(fragment.remaining(), dst.length - offset);
+ fragment._buffer.duplicate().get(dst, offset, length);
+ offset += length;
+ }
}
public final void copyTo(final ByteBuffer dst)
{
- dst.put(_buffer.duplicate());
+ if (dst.remaining() < remaining())
+ {
+ throw new BufferOverflowException();
+ }
+ for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+ {
+ final QpidByteBufferFragment fragment = _fragments.get(i);
+ dst.put(fragment._buffer.duplicate());
+ }
}
public final void putCopyOf(final QpidByteBuffer source)
@@ -341,218 +688,345 @@ public class QpidByteBuffer implements AutoCloseable
{
throw new BufferOverflowException();
}
-
- put(source.getUnderlyingBuffer().duplicate());
+ for (int i = 0, fragmentsSize = source._fragments.size(); i < fragmentsSize; i++)
+ {
+ final QpidByteBufferFragment srcFragment = source._fragments.get(i);
+ put(srcFragment._buffer.duplicate());
+ }
}
- public QpidByteBuffer rewind()
+ public final boolean isDirect()
{
- _buffer.rewind();
- return this;
+ for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+ {
+ final QpidByteBufferFragment fragment = _fragments.get(i);
+ if (!fragment.isDirect())
+ {
+ return false;
+ }
+ }
+ return true;
}
- public QpidByteBuffer clear()
+ @Override
+ public final void close()
{
- _buffer.clear();
- return this;
+ dispose();
}
- public QpidByteBuffer putLong(final int index, final long value)
+ public final void dispose()
{
- _buffer.putLong(index, value);
- return this;
+ for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+ {
+ final QpidByteBufferFragment fragment = _fragments.get(i);
+ fragment.dispose();
+ }
}
- public QpidByteBuffer compact()
+ public final InputStream asInputStream()
{
- _buffer.compact();
- return this;
+ return new QpidByteBuffer.BufferInputStream(this);
}
- public final QpidByteBuffer putDouble(final double value)
+ public final long read(ScatteringByteChannel channel) throws IOException
{
- _buffer.putDouble(value);
- return this;
+ ByteBuffer[] byteBuffers = new ByteBuffer[_fragments.size()];
+ for (int i = 0; i < byteBuffers.length; i++)
+ {
+ final QpidByteBufferFragment fragment = _fragments.get(i);
+ byteBuffers[i] = fragment.getUnderlyingBuffer();
+ }
+ return channel.read(byteBuffers);
}
- public int limit()
+ @Override
+ public String toString()
{
- return _buffer.limit();
+ return "QpidByteBuffer{" + _fragments.size() + " fragments}";
}
public QpidByteBuffer reset()
{
- _buffer.reset();
+ if (_resetFragmentIndex < 0)
+ {
+ throw new InvalidMarkException();
+ }
+ final QpidByteBufferFragment fragment = _fragments.get(_resetFragmentIndex);
+ fragment.reset();
+ for (int i = _resetFragmentIndex + 1, size = _fragments.size(); i < size; ++i)
+ {
+ _fragments.get(i).position(0);
+ }
return this;
}
- public QpidByteBuffer flip()
+ public QpidByteBuffer rewind()
{
- _buffer.flip();
+ _resetFragmentIndex = -1;
+ for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+ {
+ final QpidByteBufferFragment fragment = _fragments.get(i);
+ fragment.rewind();
+ }
return this;
}
- public final short getShort()
+ public final boolean hasArray()
{
- return _buffer.getShort();
+ return _fragments.size() == 1 && _fragments.get(0).hasArray();
}
- public final float getFloat()
+ public byte[] array()
{
- return _buffer.getFloat();
+ if (!hasArray())
+ {
+ throw new UnsupportedOperationException("This QpidByteBuffer is not backed by an array.");
+ }
+ return _fragments.get(0).array();
}
- public QpidByteBuffer limit(final int newLimit)
+ public QpidByteBuffer clear()
{
- _buffer.limit(newLimit);
+ for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+ {
+ _fragments.get(i).clear();
+ }
return this;
}
- /**
- * Method does not respect mark.
- *
- * @return QpidByteBuffer
- */
- public QpidByteBuffer duplicate()
+ public QpidByteBuffer compact()
{
- ByteBuffer buffer = _ref.getBuffer();
- if (!(_ref instanceof PooledByteBufferRef))
+ if (_fragments.size() == 1)
{
- buffer = buffer.duplicate();
+ _fragments.get(0).compact();
}
-
- buffer.position(_offset );
- buffer.limit(_offset + _buffer.capacity());
-
- buffer = buffer.slice();
-
- buffer.limit(_buffer.limit());
- buffer.position(_buffer.position());
- return new QpidByteBuffer(_ref, buffer, _offset);
- }
-
- public final QpidByteBuffer put(final byte[] src, final int offset, final int length)
- {
- _buffer.put(src, offset, length);
+ else
+ {
+ int position = position();
+ int limit = limit();
+ if (position != 0)
+ {
+ int dstPos = 0;
+ for (int srcPos = position; srcPos < limit; ++srcPos, ++dstPos)
+ {
+ put(dstPos, get(srcPos));
+ }
+ position(dstPos);
+ limit(capacity());
+ }
+ }
+ _resetFragmentIndex = -1;
return this;
}
- public long getLong(final int index)
- {
- return _buffer.getLong(index);
- }
-
- public int capacity()
+ public int position()
{
- return _buffer.capacity();
+ int totalPosition = 0;
+ for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+ {
+ final QpidByteBufferFragment fragment = _fragments.get(i);
+ totalPosition += fragment.position();
+ if (fragment.position() != fragment.limit())
+ {
+ break;
+ }
+ }
+ return totalPosition;
}
- public char getChar(final int index)
+ public QpidByteBuffer position(int newPosition)
{
- return _buffer.getChar(index);
+ if (newPosition < 0 || newPosition > limit())
+ {
+ throw new IllegalArgumentException(String.format("new position %d is out of bounds [%d, %d)", newPosition, 0, limit()));
+ }
+ for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+ {
+ final QpidByteBufferFragment fragment = _fragments.get(i);
+ final int fragmentLimit = fragment.limit();
+ if (newPosition <= fragmentLimit)
+ {
+ fragment.position(newPosition);
+ newPosition = 0;
+ }
+ else
+ {
+ if (fragmentLimit != fragment.capacity())
+ {
+ throw new IllegalStateException(String.format("QBB Fragment %d has limit %d != capacity %d",
+ i,
+ fragmentLimit,
+ fragment.capacity()));
+ }
+ fragment.position(fragmentLimit);
+ newPosition -= fragmentLimit;
+ }
+ }
+ return this;
}
- public final byte get()
+ public int limit()
{
- return _buffer.get();
- }
+ int totalLimit = 0;
+ for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+ {
+ final QpidByteBufferFragment fragment = _fragments.get(i);
+ final int fragmentLimit = fragment.limit();
+ totalLimit += fragmentLimit;
+ if (fragmentLimit != fragment.capacity())
+ {
+ break;
+ }
+ }
- public byte get(final int index)
- {
- return _buffer.get(index);
+ return totalLimit;
}
- public final QpidByteBuffer get(final byte[] dst)
+ public QpidByteBuffer limit(int newLimit)
{
- _buffer.get(dst);
+ for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+ {
+ final QpidByteBufferFragment fragment = _fragments.get(i);
+ final int fragmentCapacity = fragment.capacity();
+ final int fragmentLimit = Math.min(newLimit, fragmentCapacity);
+ fragment.limit(fragmentLimit);
+ newLimit -= fragmentLimit;
+ }
return this;
}
- public final void copyTo(final byte[] dst)
+ public final QpidByteBuffer mark()
{
- if (remaining() < dst.length)
+ for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
{
- throw new BufferUnderflowException();
+ final QpidByteBufferFragment fragment = _fragments.get(i);
+ if (fragment.position() != fragment.limit())
+ {
+ fragment.mark();
+ _resetFragmentIndex = i;
+ return this;
+ }
}
- _buffer.duplicate().get(dst);
- }
-
- public final QpidByteBuffer putChar(final char value)
- {
- _buffer.putChar(value);
+ _resetFragmentIndex = _fragments.size() - 1;
+ _fragments.get(_resetFragmentIndex).mark();
return this;
}
- public QpidByteBuffer position(final int newPosition)
+ public final int remaining()
{
- _buffer.position(newPosition);
- return this;
+ int remaining = 0;
+ for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+ {
+ final QpidByteBufferFragment fragment = _fragments.get(i);
+ remaining += fragment.remaining();
+ }
+ return remaining;
}
- public int arrayOffset()
+ public final boolean hasRemaining()
{
- return _buffer.arrayOffset();
+ return hasRemaining(1);
}
- public final char getChar()
+ public final boolean hasRemaining(int atLeast)
{
- return _buffer.getChar();
+ int remaining = 0;
+ for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+ {
+ final QpidByteBufferFragment fragment = _fragments.get(i);
+ remaining += fragment.remaining();
+ if (remaining >= atLeast)
+ {
+ return true;
+ }
+ }
+ return false;
}
- public final int getInt()
+ public QpidByteBuffer flip()
{
- return _buffer.getInt();
+ for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+ {
+ final QpidByteBufferFragment fragment = _fragments.get(i);
+ fragment.flip();
+ }
+ return this;
}
- public final QpidByteBuffer putLong(final long value)
+ public int capacity()
{
- _buffer.putLong(value);
- return this;
+ int totalCapacity = 0;
+ for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+ {
+ totalCapacity += _fragments.get(i).capacity();
+ }
+ return totalCapacity;
}
- public float getFloat(final int index)
+ /**
+ * Method does not respect mark.
+ *
+ * @return QpidByteBuffer
+ */
+ public QpidByteBuffer duplicate()
{
- return _buffer.getFloat(index);
+ final QpidByteBuffer newQpidByteBuffer = new QpidByteBuffer();
+ for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+ {
+ newQpidByteBuffer._fragments.add(_fragments.get(i).duplicate());
+ }
+ return newQpidByteBuffer;
}
public QpidByteBuffer slice()
{
- return view(0, _buffer.remaining());
+ return view(0, remaining());
}
public QpidByteBuffer view(int offset, int length)
{
- ByteBuffer buffer = _ref.getBuffer();
- if (!(_ref instanceof PooledByteBufferRef))
+ if (offset + length > remaining())
{
- buffer = buffer.duplicate();
+ throw new IllegalArgumentException(String.format("offset: %d, length: %d, remaining: %d", offset, length, remaining()));
+ }
+ final QpidByteBuffer newQpidByteBuffer = new QpidByteBuffer();
+ boolean firstFragmentToBeConsidered = true;
+ for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize && length > 0; i++)
+ {
+ final QpidByteBufferFragment fragment = _fragments.get(i);
+ if (fragment.hasRemaining())
+ {
+ if (!firstFragmentToBeConsidered && fragment.position() != 0)
+ {
+ throw new IllegalStateException(String.format("Unexpectedly position %d on fragment %d.", fragment.position(), i));
+ }
+ firstFragmentToBeConsidered = false;
+ if (fragment.remaining() > offset)
+ {
+ final int fragmentViewLength = Math.min(fragment.remaining() - offset, length);
+ newQpidByteBuffer._fragments.add(fragment.view(offset, fragmentViewLength));
+ length -= fragmentViewLength;
+ offset = 0;
+ }
+ else
+ {
+ offset -= fragment.remaining();
+ }
+ }
}
- int newRemaining = Math.min(_buffer.remaining() - offset, length);
-
- int newPosition = _offset + _buffer.position() + offset;
- buffer.limit(newPosition + newRemaining);
- buffer.position(newPosition);
-
- buffer = buffer.slice();
-
- return new QpidByteBuffer(_ref, buffer, newPosition);
- }
-
- public int position()
- {
- return _buffer.position();
+ return newQpidByteBuffer;
}
- public QpidByteBuffer putDouble(final int index, final double value)
+ List<ByteBuffer> getUnderlyingBuffers()
{
- _buffer.putDouble(index, value);
- return this;
- }
+ List<ByteBuffer> byteBuffers = new ArrayList<>();
+ for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+ {
+ byteBuffers.add(_fragments.get(i).getUnderlyingBuffer());
+ }
+ return byteBuffers;
- ByteBuffer getUnderlyingBuffer()
- {
- return _buffer;
}
public static QpidByteBuffer allocate(boolean direct, int size)
@@ -562,113 +1036,47 @@ public class QpidByteBuffer implements AutoCloseable
public static QpidByteBuffer allocate(int size)
{
- return new QpidByteBuffer(new NonPooledByteBufferRef(ByteBuffer.allocate(size)));
+ final QpidByteBuffer qpidByteBuffer = new QpidByteBuffer();
+ qpidByteBuffer._fragments.add(QpidByteBufferFragment.allocate(size));
+ return qpidByteBuffer;
}
public static QpidByteBuffer allocateDirect(int size)
{
if (size < 0)
{
- throw new IllegalArgumentException("Cannot allocate QpidByteBuffer with size "
- + size
- + " which is negative.");
- }
-
- final ByteBufferRef ref;
- if (_isPoolInitialized && _pooledBufferSize >= size)
- {
- if (_pooledBufferSize == size)
- {
- ByteBuffer buf = _bufferPool.getBuffer();
- if (buf == null)
- {
- buf = ByteBuffer.allocateDirect(size);
- }
- ref = new PooledByteBufferRef(buf);
- }
- else
- {
- QpidByteBuffer buf = _cachedBuffer.get();
- if (buf == null || buf.remaining() < size)
- {
- if (buf != null)
- {
- buf.dispose();
- }
- buf = allocateDirect(_pooledBufferSize);
- _cachedBuffer.set(buf);
- }
- QpidByteBuffer rVal = buf.view(0, size);
- buf.position(buf.position() + size);
-
- return rVal;
- }
- }
- else
- {
- ref = new NonPooledByteBufferRef(ByteBuffer.allocateDirect(size));
- }
- return new QpidByteBuffer(ref);
- }
-
- public static Collection<QpidByteBuffer> allocateDirectCollection(int size)
- {
- if (_pooledBufferSize == 0)
- {
- return Collections.singleton(allocateDirect(size));
+ throw new IllegalArgumentException("Cannot allocate QpidByteBufferFragment with size "
+ + size
+ + " which is negative.");
}
- else
- {
- List<QpidByteBuffer> buffers = new ArrayList<>((size / _pooledBufferSize) + 2);
- int remaining = size;
-
- QpidByteBuffer buf = _cachedBuffer.get();
- if (buf == null)
- {
- buf = allocateDirect(_pooledBufferSize);
- }
- while (remaining > buf.remaining())
- {
- int bufRemaining = buf.remaining();
- if (buf == _cachedBuffer.get())
- {
- buffers.add(buf.view(0, bufRemaining));
- buf.dispose();
- }
- else
- {
- buffers.add(buf);
- }
- remaining -= bufRemaining;
- buf = allocateDirect(_pooledBufferSize);
- }
- buffers.add(buf.view(0, remaining));
- buf.position(buf.position() + remaining);
- if (buf.hasRemaining())
+ if (_isPoolInitialized)
+ {
+ QpidByteBuffer qpidByteBuffer = new QpidByteBuffer();
+ int allocatedSize = 0;
+ while (size - allocatedSize >= _pooledBufferSize)
{
- _cachedBuffer.set(buf);
+ qpidByteBuffer._fragments.add(QpidByteBufferFragment.allocateDirect(_pooledBufferSize));
+ allocatedSize += _pooledBufferSize;
}
- else
+ if (allocatedSize != size)
{
- _cachedBuffer.set(allocateDirect(_pooledBufferSize));
- buf.dispose();
+ qpidByteBuffer._fragments.add(QpidByteBufferFragment.allocateDirect(size - allocatedSize));
}
- return buffers;
+ return qpidByteBuffer;
+ }
+ else
+ {
+ return allocate(size);
}
}
- public static Collection<QpidByteBuffer> asQpidByteBuffers(final byte[] data)
- {
- return asQpidByteBuffers(data, 0, data.length);
- }
-
- public static Collection<QpidByteBuffer> asQpidByteBuffers(final byte[] data, final int offset, final int length)
+ private static QpidByteBuffer asQpidByteBuffer(final byte[] data, final int offset, final int length)
{
try (QpidByteBufferOutputStream outputStream = new QpidByteBufferOutputStream(true, getPooledBufferSize()))
{
outputStream.write(data, offset, length);
- return outputStream.fetchAccumulatedBuffers();
+ return outputStream.fetchAccumulatedBuffer();
}
catch (IOException e)
{
@@ -676,20 +1084,46 @@ public class QpidByteBuffer implements AutoCloseable
}
}
- public static List<QpidByteBuffer> asQpidByteBuffers(final InputStream stream) throws IOException
+ public static QpidByteBuffer asQpidByteBuffer(final InputStream stream) throws IOException
+ {
+ final QpidByteBuffer qpidByteBuffer = new QpidByteBuffer();
+ final int pooledBufferSize = QpidByteBuffer.getPooledBufferSize();
+ byte[] transferBuf = new byte[pooledBufferSize];
+ int readFragment = 0;
+ int read = stream.read(transferBuf, readFragment, pooledBufferSize - readFragment);
+ while (read > 0)
+ {
+ readFragment += read;
+ if (readFragment == pooledBufferSize)
+ {
+ QpidByteBufferFragment fragment = QpidByteBufferFragment.allocateDirect(pooledBufferSize);
+ fragment.put(transferBuf, 0, pooledBufferSize);
+ fragment.flip();
+ qpidByteBuffer._fragments.add(fragment);
+ readFragment = 0;
+ }
+ read = stream.read(transferBuf, readFragment, pooledBufferSize - readFragment);
+ }
+ if (readFragment != 0)
+ {
+ QpidByteBufferFragment fragment = QpidByteBufferFragment.allocateDirect(readFragment);
+ fragment.put(transferBuf, 0, readFragment);
+ fragment.flip();
+ qpidByteBuffer._fragments.add(fragment);
+ }
+ return qpidByteBuffer;
+ }
+
+ public final SSLEngineResult decryptSSL(SSLEngine engine, QpidByteBuffer dst) throws SSLException
{
- List<QpidByteBuffer> bufs = new ArrayList<>();
- byte[] transferBuf = new byte[QpidByteBuffer.getPooledBufferSize()];
- int read = stream.read(transferBuf);
- while(read > 0)
+ final List<ByteBuffer> dstUnderlyingBuffers = dst.getUnderlyingBuffers();
+ final List<ByteBuffer> underlyingBuffers = getUnderlyingBuffers();
+ if (underlyingBuffers.size() != 1)
{
- QpidByteBuffer chunk = QpidByteBuffer.allocateDirect(read);
- chunk.put(transferBuf, 0, read);
- chunk.flip();
- bufs.add(chunk);
- read = stream.read(transferBuf);
+ throw new IllegalStateException("Expected single fragment buffer");
}
- return bufs;
+ return engine.unwrap(underlyingBuffers.get(0),
+ dstUnderlyingBuffers.toArray(new ByteBuffer[dstUnderlyingBuffers.size()]));
}
public static SSLEngineResult encryptSSL(SSLEngine engine,
@@ -704,72 +1138,62 @@ public class QpidByteBuffer implements AutoCloseable
}
else
{
- src = new ByteBuffer[buffers.size()];
- Iterator<QpidByteBuffer> iterator = buffers.iterator();
- for (int i = 0; i < src.length; i++)
+ List<ByteBuffer> buffers_ = new LinkedList<>();
+ for (QpidByteBuffer buffer : buffers)
{
- src[i] = iterator.next().getUnderlyingBuffer();
+ buffers_.addAll(buffer.getUnderlyingBuffers());
}
+ src = buffers_.toArray(new ByteBuffer[buffers_.size()]);
}
- return engine.wrap(src, dest.getUnderlyingBuffer());
+ final List<ByteBuffer> dstUnderlyingBuffers = dest.getUnderlyingBuffers();
+ if (dstUnderlyingBuffers.size() != 1)
+ {
+ throw new IllegalStateException("Expected a single fragment output buffer");
+ }
+ return engine.wrap(src, dstUnderlyingBuffers.get(0));
}
- public static Collection<QpidByteBuffer> inflate(Collection<QpidByteBuffer> compressedBuffers) throws IOException
+ public static QpidByteBuffer inflate(QpidByteBuffer compressedBuffer) throws IOException
{
- if (compressedBuffers == null)
+ if (compressedBuffer == null)
{
- throw new IllegalArgumentException("compressedBuffers cannot be null");
+ throw new IllegalArgumentException("compressedBuffer cannot be null");
}
- boolean isDirect = false;
- Collection<InputStream> streams = new ArrayList<>(compressedBuffers.size());
- for (QpidByteBuffer buffer : compressedBuffers)
- {
- isDirect = isDirect || buffer.isDirect();
- streams.add(buffer.asInputStream());
- }
+ boolean isDirect = compressedBuffer.isDirect();
final int bufferSize = (isDirect && _pooledBufferSize > 0) ? _pooledBufferSize : 65536;
- Collection<QpidByteBuffer> uncompressedBuffers = new ArrayList<>();
- try (GZIPInputStream gzipInputStream = new GZIPInputStream(new CompositeInputStream(streams)))
+ List<QpidByteBuffer> uncompressedBuffers = new ArrayList<>();
+ try (GZIPInputStream gzipInputStream = new GZIPInputStream(compressedBuffer.asInputStream()))
{
byte[] buf = new byte[bufferSize];
int read;
while ((read = gzipInputStream.read(buf)) != -1)
{
- uncompressedBuffers.addAll(asQpidByteBuffers(buf, 0, read));
+ uncompressedBuffers.add(QpidByteBuffer.asQpidByteBuffer(buf, 0, read));
}
- return uncompressedBuffers;
+ return QpidByteBuffer.concatenate(uncompressedBuffers);
}
- catch (IOException e)
+ finally
{
- for (QpidByteBuffer uncompressedBuffer : uncompressedBuffers)
- {
- uncompressedBuffer.dispose();
- }
- throw e;
+ uncompressedBuffers.forEach(QpidByteBuffer::dispose);
}
}
- public static Collection<QpidByteBuffer> deflate(Collection<QpidByteBuffer> uncompressedBuffers) throws IOException
+ public static QpidByteBuffer deflate(QpidByteBuffer uncompressedBuffer) throws IOException
{
- if (uncompressedBuffers == null)
+ if (uncompressedBuffer == null)
{
- throw new IllegalArgumentException("uncompressedBuffers cannot be null");
+ throw new IllegalArgumentException("uncompressedBuffer cannot be null");
}
- boolean isDirect = false;
- Collection<InputStream> streams = new ArrayList<>(uncompressedBuffers.size());
- for (QpidByteBuffer buffer : uncompressedBuffers)
- {
- isDirect = isDirect || buffer.isDirect();
- streams.add(buffer.asInputStream());
- }
+ boolean isDirect = uncompressedBuffer.isDirect();
final int bufferSize = (isDirect && _pooledBufferSize > 0) ? _pooledBufferSize : 65536;
- try(QpidByteBufferOutputStream compressedOutput = new QpidByteBufferOutputStream(isDirect, bufferSize);
- InputStream compressedInput = new CompositeInputStream(streams);
- GZIPOutputStream gzipStream = new GZIPOutputStream(new BufferedOutputStream(compressedOutput, bufferSize)))
+ try (QpidByteBufferOutputStream compressedOutput = new QpidByteBufferOutputStream(isDirect, bufferSize);
+ InputStream compressedInput = uncompressedBuffer.asInputStream();
+ GZIPOutputStream gzipStream = new GZIPOutputStream(new BufferedOutputStream(compressedOutput,
+ bufferSize)))
{
byte[] buf = new byte[16384];
int read;
@@ -779,25 +1203,29 @@ public class QpidByteBuffer implements AutoCloseable
}
gzipStream.finish();
gzipStream.flush();
- return compressedOutput.fetchAccumulatedBuffers();
+ return compressedOutput.fetchAccumulatedBuffer();
}
}
public static long write(GatheringByteChannel channel, Collection<QpidByteBuffer> qpidByteBuffers)
throws IOException
{
- ByteBuffer[] byteBuffers = new ByteBuffer[qpidByteBuffers.size()];
- Iterator<QpidByteBuffer> iterator = qpidByteBuffers.iterator();
- for (int i = 0; i < byteBuffers.length; i++)
+ List<ByteBuffer> byteBuffers = new ArrayList<>();
+ for (QpidByteBuffer qpidByteBuffer : qpidByteBuffers)
{
- byteBuffers[i] = iterator.next().getUnderlyingBuffer();
+ for (QpidByteBufferFragment fragment : qpidByteBuffer._fragments)
+ {
+ byteBuffers.add(fragment.getUnderlyingBuffer());
+ }
}
- return channel.write(byteBuffers);
+ return channel.write(byteBuffers.toArray(new ByteBuffer[byteBuffers.size()]));
}
public static QpidByteBuffer wrap(final ByteBuffer wrap)
{
- return new QpidByteBuffer(new NonPooledByteBufferRef(wrap));
+ final QpidByteBuffer qpidByteBuffer = new QpidByteBuffer();
+ qpidByteBuffer._fragments.add(new QpidByteBufferFragment(new NonPooledByteBufferRef(wrap)));
+ return qpidByteBuffer;
}
public static QpidByteBuffer wrap(final byte[] data)
@@ -813,19 +1241,25 @@ public class QpidByteBuffer implements AutoCloseable
static void returnToPool(final ByteBuffer buffer)
{
buffer.clear();
- final ByteBuffer duplicate = _zeroed.duplicate();
- duplicate.limit(buffer.capacity());
- buffer.put(duplicate);
+ if (_isPoolInitialized)
+ {
+ final ByteBuffer duplicate = _zeroed.duplicate();
+ duplicate.limit(buffer.capacity());
+ buffer.put(duplicate);
- _bufferPool.returnBuffer(buffer);
+ _bufferPool.returnBuffer(buffer);
+ }
}
public synchronized static void initialisePool(int bufferSize, int maxPoolSize, final double sparsityFraction)
{
- if (_isPoolInitialized && (bufferSize != _pooledBufferSize || maxPoolSize != _bufferPool.getMaxSize() || sparsityFraction != _sparsityFraction))
+ if (_isPoolInitialized && (bufferSize != _pooledBufferSize
+ || maxPoolSize != _bufferPool.getMaxSize()
+ || sparsityFraction != _sparsityFraction))
{
final String errorMessage = String.format(
- "QpidByteBuffer pool has already been initialised with bufferSize=%d, maxPoolSize=%d, and sparsityFraction=%f." +
+ "QpidByteBuffer pool has already been initialised with bufferSize=%d, maxPoolSize=%d, and sparsityFraction=%f."
+ +
"Re-initialisation with different bufferSize=%d and maxPoolSize=%d is not allowed.",
_pooledBufferSize,
_bufferPool.getMaxSize(),
@@ -846,7 +1280,9 @@ public class QpidByteBuffer implements AutoCloseable
_isPoolInitialized = true;
}
- /** Test use only */
+ /**
+ * Test use only
+ */
public synchronized static void deinitialisePool()
{
if (_isPoolInitialized)
@@ -856,18 +1292,15 @@ public class QpidByteBuffer implements AutoCloseable
_zeroed = null;
_isPoolInitialized = false;
_sparsityFraction = 1.0;
+ final QpidByteBufferFragment cachedBuffer = _cachedBuffer.get();
+ if (cachedBuffer != null)
+ {
+ cachedBuffer.dispose();
+ _cachedBuffer.remove();
+ }
}
}
- /**
- * Not for general use!
- * Used to clear threadlocal buffer when shutting down thread pools.
- */
- public static QpidByteBuffer getCachedThreadLocalBuffer()
- {
- return _cachedBuffer.get();
- }
-
public static int getPooledBufferSize()
{
return _pooledBufferSize;
@@ -893,23 +1326,6 @@ public class QpidByteBuffer implements AutoCloseable
return PooledByteBufferRef.getDisposalCounter();
}
- public static List<QpidByteBuffer> reallocateIfNecessary(Collection<QpidByteBuffer> data)
- {
- if (data != null)
- {
- List<QpidByteBuffer> newCopy = new ArrayList<>(data.size());
- for (QpidByteBuffer buf : data)
- {
- newCopy.add(reallocateIfNecessary(buf));
- }
- return newCopy;
- }
- else
- {
- return null;
- }
- }
-
public static QpidByteBuffer reallocateIfNecessary(final QpidByteBuffer data)
{
if (data != null && data.isDirect() && data.isSparse())
@@ -928,7 +1344,57 @@ public class QpidByteBuffer implements AutoCloseable
boolean isSparse()
{
- return _ref.isSparse(_sparsityFraction);
+ for (int i = 0, fragmentsSize = _fragments.size(); i < fragmentsSize; i++)
+ {
+ final QpidByteBufferFragment fragment = _fragments.get(i);
+ if (fragment.isSparse())
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static QpidByteBuffer concatenate(final List<QpidByteBuffer> buffers)
+ {
+ final QpidByteBuffer qpidByteBuffer = new QpidByteBuffer();
+ for (QpidByteBuffer buffer : buffers)
+ {
+ for (QpidByteBufferFragment fragment : buffer._fragments)
+ {
+ qpidByteBuffer._fragments.add(fragment.slice());
+ }
+ }
+ return qpidByteBuffer;
+ }
+
+ public static QpidByteBuffer concatenate(QpidByteBuffer... buffers)
+ {
+ return concatenate(Arrays.asList(buffers));
+ }
+
+ public static QpidByteBuffer emptyQpidByteBuffer()
+ {
+ return EMPTY_QPID_BYTE_BUFFER.duplicate();
+ }
+
+ public static ThreadFactory createQpidByteBufferTrackingThreadFactory(final ThreadFactory factory)
+ {
+ return r -> factory.newThread(() -> {
+ try
+ {
+ r.run();
+ }
+ finally
+ {
+ final QpidByteBufferFragment cachedThreadLocalBuffer = _cachedBuffer.get();
+ if (cachedThreadLocalBuffer != null)
+ {
+ cachedThreadLocalBuffer.dispose();
+ _cachedBuffer.remove();
+ }
+ }
+ });
}
private static final class BufferInputStream extends InputStream
@@ -937,7 +1403,7 @@ public class QpidByteBuffer implements AutoCloseable
private BufferInputStream(final QpidByteBuffer buffer)
{
- _qpidByteBuffer = buffer;
+ _qpidByteBuffer = buffer.duplicate();
}
@Override
@@ -945,7 +1411,7 @@ public class QpidByteBuffer implements AutoCloseable
{
if (_qpidByteBuffer.hasRemaining())
{
- return _qpidByteBuffer.get() & 0xFF;
+ return _qpidByteBuffer.getUnsignedByte();
}
return -1;
}
@@ -1001,6 +1467,324 @@ public class QpidByteBuffer implements AutoCloseable
@Override
public void close()
{
+ _qpidByteBuffer.dispose();
+ }
+ }
+
+
+ static class QpidByteBufferFragment implements AutoCloseable
+ {
+
+ private static final AtomicIntegerFieldUpdater<QpidByteBufferFragment>
+ DISPOSED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(
+ QpidByteBufferFragment.class,
+ "_disposed");
+
+ private final int _offset;
+
+ final ByteBufferRef _ref;
+ volatile ByteBuffer _buffer;
+ @SuppressWarnings("unused")
+ private volatile int _disposed;
+
+
+ QpidByteBufferFragment(ByteBufferRef ref)
+ {
+ this(ref, ref.getBuffer(), 0);
+ }
+
+ private QpidByteBufferFragment(ByteBufferRef ref, ByteBuffer buffer, int offset)
+ {
+ _ref = ref;
+ _buffer = buffer;
+ _offset = offset;
+ _ref.incrementRef(capacity());
+ }
+
+ public final boolean isDirect()
+ {
+ return _buffer.isDirect();
+ }
+
+ @Override
+ public final void close()
+ {
+ dispose();
+ }
+
+ public final void dispose()
+ {
+ if (DISPOSED_UPDATER.compareAndSet(this, 0, 1))
+ {
+ _ref.decrementRef(capacity());
+ _buffer = null;
+ }
+ }
+
+ public final CharBuffer decode(Charset charset)
+ {
+ return charset.decode(getUnderlyingBuffer());
+ }
+
+ @Override
+ public String toString()
+ {
+ return "QpidByteBufferFragment{" +
+ "_buffer=" + _buffer +
+ ", _disposed=" + _disposed +
+ '}';
+ }
+
+ public final boolean hasRemaining()
+ {
+ return _buffer.hasRemaining();
+ }
+
+ public final QpidByteBufferFragment put(final byte b)
+ {
+ _buffer.put(b);
+ return this;
+ }
+
+ public QpidByteBufferFragment put(final int index, final byte b)
+ {
+ _buffer.put(index, b);
+ return this;
+ }
+
+ public final QpidByteBufferFragment mark()
+ {
+ _buffer.mark();
+ return this;
+ }
+
+ public final boolean hasArray()
+ {
+ return _buffer.hasArray();
+ }
+
+ public byte[] array()
+ {
+ return _buffer.array();
+ }
+
+ public final int remaining()
+ {
+ return _buffer.remaining();
+ }
+
+
+ public final QpidByteBufferFragment put(final ByteBuffer src)
+ {
+ _buffer.put(src);
+ return this;
+ }
+
+ public final QpidByteBufferFragment put(final QpidByteBufferFragment src)
+ {
+ int sourceRemaining = src.remaining();
+ if (sourceRemaining > remaining())
+ {
+ throw new BufferOverflowException();
+ }
+
+ _buffer.put(src.getUnderlyingBuffer());
+ return this;
+ }
+
+ public final QpidByteBufferFragment get(final byte[] dst, final int offset, final int length)
+ {
+ _buffer.get(dst, offset, length);
+ return this;
+ }
+
+ public QpidByteBufferFragment rewind()
+ {
+ _buffer.rewind();
+ return this;
+ }
+
+ public QpidByteBufferFragment clear()
+ {
+ _buffer.clear();
+ return this;
+ }
+
+ public QpidByteBufferFragment compact()
+ {
+ _buffer.compact();
+ return this;
+ }
+
+ public int limit()
+ {
+ return _buffer.limit();
+ }
+
+ public QpidByteBufferFragment reset()
+ {
+ _buffer.reset();
+ return this;
+ }
+
+ public QpidByteBufferFragment flip()
+ {
+ _buffer.flip();
+ return this;
+ }
+
+ public QpidByteBufferFragment limit(final int newLimit)
+ {
+ _buffer.limit(newLimit);
+ return this;
+ }
+
+ /**
+ * Method does not respect mark.
+ *
+ * @return QpidByteBufferFragment
+ */
+ public QpidByteBufferFragment duplicate()
+ {
+ ByteBuffer buffer = _ref.getBuffer();
+ if (!(_ref instanceof PooledByteBufferRef))
+ {
+ buffer = buffer.duplicate();
+ }
+
+ buffer.position(_offset);
+ buffer.limit(_offset + _buffer.capacity());
+
+ buffer = buffer.slice();
+
+ buffer.limit(_buffer.limit());
+ buffer.position(_buffer.position());
+ return new QpidByteBufferFragment(_ref, buffer, _offset);
+ }
+
+ public final QpidByteBufferFragment put(final byte[] src, final int offset, final int length)
+ {
+ _buffer.put(src, offset, length);
+ return this;
+ }
+
+
+ public int capacity()
+ {
+ return _buffer.capacity();
+ }
+
+
+ public final byte get()
+ {
+ return _buffer.get();
+ }
+
+ public byte get(final int index)
+ {
+ return _buffer.get(index);
+ }
+
+ public QpidByteBufferFragment position(final int newPosition)
+ {
+ _buffer.position(newPosition);
+ return this;
+ }
+
+ public QpidByteBufferFragment slice()
+ {
+ return view(0, _buffer.remaining());
+ }
+
+ public QpidByteBufferFragment view(int offset, int length)
+ {
+ ByteBuffer buffer = _ref.getBuffer();
+ if (!(_ref instanceof PooledByteBufferRef))
+ {
+ buffer = buffer.duplicate();
+ }
+
+ int newRemaining = Math.min(_buffer.remaining() - offset, length);
+
+ int newPosition = _offset + _buffer.position() + offset;
+ buffer.limit(newPosition + newRemaining);
+ buffer.position(newPosition);
+
+ buffer = buffer.slice();
+
+ return new QpidByteBufferFragment(_ref, buffer, newPosition);
+ }
+
+ public int position()
+ {
+ return _buffer.position();
+ }
+
+ ByteBuffer getUnderlyingBuffer()
+ {
+ return _buffer;
+ }
+
+ public static QpidByteBufferFragment allocate(boolean direct, int size)
+ {
+ return direct ? allocateDirect(size) : allocate(size);
+ }
+
+ public static QpidByteBufferFragment allocate(int size)
+ {
+ return new QpidByteBufferFragment(new NonPooledByteBufferRef(ByteBuffer.allocate(size)));
+ }
+
+ public static QpidByteBufferFragment allocateDirect(int size)
+ {
+ if (size < 0)
+ {
+ throw new IllegalArgumentException("Cannot allocate QpidByteBufferFragment with size "
+ + size
+ + " which is negative.");
+ }
+
+ final ByteBufferRef ref;
+ if (_isPoolInitialized && _pooledBufferSize >= size)
+ {
+ if (_pooledBufferSize == size)
+ {
+ ByteBuffer buf = _bufferPool.getBuffer();
+ if (buf == null)
+ {
+ buf = ByteBuffer.allocateDirect(size);
+ }
+ ref = new PooledByteBufferRef(buf);
+ }
+ else
+ {
+ QpidByteBufferFragment buf = _cachedBuffer.get();
+ if (buf == null || buf.remaining() < size)
+ {
+ if (buf != null)
+ {
+ buf.dispose();
+ }
+ buf = allocateDirect(_pooledBufferSize);
+ _cachedBuffer.set(buf);
+ }
+ QpidByteBufferFragment rVal = buf.view(0, size);
+ buf.position(buf.position() + size);
+
+ return rVal;
+ }
+ }
+ else
+ {
+ ref = new NonPooledByteBufferRef(ByteBuffer.allocateDirect(size));
+ }
+ return new QpidByteBufferFragment(ref);
+ }
+
+ boolean isSparse()
+ {
+ return _ref.isSparse(_sparsityFraction);
}
+
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferInputStream.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferInputStream.java
deleted file mode 100644
index d7ce698..0000000
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferInputStream.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.server.bytebuffer;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-
-import org.apache.qpid.server.streams.CompositeInputStream;
-
-/**
- * InputStream implementation that takes a list QpidByteBuffers.
- * The QpidByteBufferInputStream takes ownership of the buffers and disposes them on close().
- *
- * Not thread safe.
- */
-public class QpidByteBufferInputStream extends InputStream
-{
- private final CompositeInputStream _compositeInputStream;
- private final Collection<QpidByteBuffer> _buffers;
-
- public QpidByteBufferInputStream(Collection<QpidByteBuffer> buffers)
- {
- _buffers = buffers;
-
- final Collection<InputStream> streams = new ArrayList<>(buffers.size());
- for (QpidByteBuffer buffer : buffers)
- {
- streams.add(buffer.asInputStream());
- }
- _compositeInputStream = new CompositeInputStream(streams);
- }
-
- @Override
- public int read() throws IOException
- {
- return _compositeInputStream.read();
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException
- {
- return _compositeInputStream.read(b, off, len);
- }
-
- @Override
- public void mark(int readlimit)
- {
- _compositeInputStream.mark(readlimit);
- }
-
- @Override
- public void reset() throws IOException
- {
- _compositeInputStream.reset();
- }
-
- @Override
- public boolean markSupported()
- {
- return _compositeInputStream.markSupported();
- }
-
- @Override
- public long skip(long n) throws IOException
- {
- return _compositeInputStream.skip(n);
- }
-
- @Override
- public int available() throws IOException
- {
- return _compositeInputStream.available();
- }
-
- @Override
- public void close() throws IOException
- {
- try
- {
- _compositeInputStream.close();
- }
- finally
- {
- for (QpidByteBuffer buffer : _buffers)
- {
- buffer.dispose();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferOutputStream.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferOutputStream.java
index faaa332..08788df 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferOutputStream.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferOutputStream.java
@@ -23,23 +23,13 @@ package org.apache.qpid.server.bytebuffer;
import java.io.IOException;
import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.LinkedList;
-/**
- * OutputStream implementation that yields a list QpidByteBuffers that contain a copy
- * of the incoming bytes. Use fetchAccumulatedBuffers to get the buffers. Caller
- * has responsibility to dispose the buffers after use.
- *
- * It will be normally be desirable to front this stream with java.io.BufferedOutputStream
- * to minimise the number of write and thus the number of buffers created.
- *
- * Not thread safe.
- */
public class QpidByteBufferOutputStream extends OutputStream
{
private final LinkedList<QpidByteBuffer> _buffers = new LinkedList<>();
+ private int _bufferPosition = 0;
+ private final byte[] _buffer;
private final boolean _isDirect;
private final int _maximumBufferSize;
private boolean _closed;
@@ -52,14 +42,13 @@ public class QpidByteBufferOutputStream extends OutputStream
}
_isDirect = isDirect;
_maximumBufferSize = maximumBufferSize;
+ _buffer = new byte[_maximumBufferSize];
}
@Override
public void write(int b) throws IOException
{
- int size = 1;
- byte[] data = new byte[] {(byte)b};
- allocateDataBuffers(data, 0, size);
+ write(new byte[] {(byte)b});
}
@Override
@@ -78,18 +67,18 @@ public class QpidByteBufferOutputStream extends OutputStream
public void close() throws IOException
{
_closed = true;
- for (QpidByteBuffer buffer : _buffers)
- {
- buffer.dispose();
- }
+ _buffers.forEach(QpidByteBuffer::dispose);
_buffers.clear();
}
- public Collection<QpidByteBuffer> fetchAccumulatedBuffers()
+ QpidByteBuffer fetchAccumulatedBuffer()
{
- Collection<QpidByteBuffer> bufs = new ArrayList<>(_buffers);
- _buffers.clear();
- return bufs;
+ if (_bufferPosition != 0)
+ {
+ addSingleQpidByteBuffer(_buffer, 0, _bufferPosition);
+ }
+ final QpidByteBuffer combined = QpidByteBuffer.concatenate(_buffers);
+ return combined;
}
private void allocateDataBuffers(byte[] data, int offset, int len) throws IOException
@@ -99,15 +88,28 @@ public class QpidByteBufferOutputStream extends OutputStream
throw new IOException("Stream is closed");
}
- int size = Math.min(_maximumBufferSize, len);
+ do
+ {
+ int bytesWeCanWrite = Math.min(_buffer.length - _bufferPosition, len);
+ System.arraycopy(data, offset, _buffer, _bufferPosition, bytesWeCanWrite);
+ offset += bytesWeCanWrite;
+ len -= bytesWeCanWrite;
+ _bufferPosition += bytesWeCanWrite;
+ if (_buffer.length == _bufferPosition)
+ {
+ addSingleQpidByteBuffer(_buffer, 0, _buffer.length);
+ }
+ } while (len != 0);
+ }
- QpidByteBuffer current = _isDirect ? QpidByteBuffer.allocateDirect(len) : QpidByteBuffer.allocate(len);
- current.put(data, offset, size);
+ private void addSingleQpidByteBuffer(final byte[] buffer, final int offset, final int length)
+ {
+ QpidByteBuffer current = _isDirect
+ ? QpidByteBuffer.allocateDirect(length)
+ : QpidByteBuffer.allocate(length);
+ current.put(buffer, offset, length);
current.flip();
_buffers.add(current);
- if (len > size)
- {
- allocateDataBuffers(data, offset + size, len - size);
- }
+ _bufferPosition = 0;
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferUtils.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferUtils.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferUtils.java
deleted file mode 100644
index a475ef7..0000000
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferUtils.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.bytebuffer;
-
-import java.nio.BufferUnderflowException;
-import java.util.List;
-
-public class QpidByteBufferUtils
-{
- public static void dispose(List<QpidByteBuffer> in)
- {
- for (int i = 0, inSize = in.size(); i < inSize; i++)
- {
- final QpidByteBuffer qpidByteBuffer = in.get(i);
- qpidByteBuffer.dispose();
- }
- }
-
- public static boolean hasRemaining(List<QpidByteBuffer> in)
- {
- if (in.isEmpty())
- {
- return false;
- }
- for (int i = 0; i < in.size(); i++)
- {
- if (in.get(i).hasRemaining())
- {
- return true;
- }
- }
- return false;
- }
-
- public static long remaining(List<QpidByteBuffer> in)
- {
- long remaining = 0L;
- for (int i = 0; i < in.size(); i++)
- {
- remaining += in.get(i).remaining();
- }
- return remaining;
- }
-
- public static byte get(List<QpidByteBuffer> in)
- {
- for (int i = 0; i < in.size(); i++)
- {
- final QpidByteBuffer buffer = in.get(i);
- if (buffer.hasRemaining())
- {
- return buffer.get();
- }
- }
- throw new BufferUnderflowException();
- }
-
- public static boolean hasRemaining(final List<QpidByteBuffer> in, int len)
- {
- for (int i = 0; i < in.size(); i++)
- {
- final QpidByteBuffer buffer = in.get(i);
- int remaining = buffer.remaining();
- if (remaining >= len)
- {
- return true;
- }
- len -= remaining;
- }
-
- return false;
- }
-
- public static long getLong(final List<QpidByteBuffer> in)
- {
- boolean bytewise = false;
- int consumed = 0;
- long result = 0L;
- for (int i = 0; i < in.size(); i++)
- {
- final QpidByteBuffer buffer = in.get(i);
- int remaining = buffer.remaining();
- if (bytewise)
- {
- while (buffer.hasRemaining() && consumed < 8)
- {
- result <<= 1;
- result |= (0xFF & buffer.get());
- consumed++;
- }
- if (consumed == 8)
- {
- return result;
- }
- }
- else
- {
- if (remaining >= 8)
- {
- return buffer.getLong();
- }
- else if (remaining != 0)
- {
- bytewise = true;
- while (buffer.hasRemaining())
- {
- result <<= 1;
- result |= (0xFF & buffer.get());
- consumed++;
- }
- }
- }
- }
- throw new BufferUnderflowException();
- }
-
- public static int getInt(final List<QpidByteBuffer> in)
- {
- boolean bytewise = false;
- int consumed = 0;
- int result = 0;
- for (int i = 0; i < in.size(); i++)
- {
- final QpidByteBuffer buffer = in.get(i);
- int remaining = buffer.remaining();
- if (bytewise)
- {
- while (buffer.hasRemaining() && consumed < 4)
- {
- result <<= 1;
- result |= (0xFF & buffer.get());
- consumed++;
- }
- if (consumed == 4)
- {
- return result;
- }
- }
- else
- {
- if (remaining >= 4)
- {
- return buffer.getInt();
- }
- else if (remaining != 0)
- {
- bytewise = true;
- while (buffer.hasRemaining())
- {
- result <<= 1;
- result |= (0xFF & buffer.get());
- consumed++;
- }
- }
- }
- }
- throw new BufferUnderflowException();
- }
-
- public static float getFloat(final List<QpidByteBuffer> in)
- {
- return Float.intBitsToFloat(getInt(in));
- }
-
- public static double getDouble(final List<QpidByteBuffer> in)
- {
- return Double.longBitsToDouble(getLong(in));
- }
-
- public static Short getShort(final List<QpidByteBuffer> in)
- {
- boolean bytewise = false;
- int consumed = 0;
- short result = 0;
- for (int i = 0; i < in.size(); i++)
- {
- final QpidByteBuffer buffer = in.get(i);
- int remaining = buffer.remaining();
- if (bytewise)
- {
- while (buffer.hasRemaining() && consumed < 2)
- {
- result <<= 1;
- result |= (0xFF & buffer.get());
- consumed++;
- }
- if (consumed == 2)
- {
- return result;
- }
- }
- else
- {
- if (remaining >= 2)
- {
- return buffer.getShort();
- }
- else if (remaining != 0)
- {
- bytewise = true;
- while (buffer.hasRemaining())
- {
- result <<= 1;
- result |= (0xFF & buffer.get());
- consumed++;
- }
- }
- }
- }
- throw new BufferUnderflowException();
- }
-
- public static int get(final List<QpidByteBuffer> in, final byte[] data)
- {
- int copied = 0;
- int i = 0;
- while (copied < data.length && i < in.size())
- {
- QpidByteBuffer buf = in.get(i);
- if (buf.hasRemaining())
- {
- int remaining = buf.remaining();
- if (remaining >= data.length - copied)
- {
- buf.get(data, copied, data.length - copied);
- return data.length;
- }
- else
- {
- buf.get(data, copied, remaining);
- copied += remaining;
- }
- }
- i++;
- }
- return copied;
- }
-
- public static void skip(final List<QpidByteBuffer> in, int length)
- {
- int skipped = 0;
- int i = 0;
- while (skipped < length && i < in.size())
- {
- QpidByteBuffer buf = in.get(i);
- if (buf.hasRemaining())
- {
- int remaining = buf.remaining();
- if (remaining >= length - skipped)
- {
- buf.position(buf.position() + length - skipped);
- return;
- }
- else
- {
- buf.position(buf.position() + remaining);
- skipped += remaining;
- }
- }
- i++;
- }
-
- if (skipped != length)
- {
- throw new BufferUnderflowException();
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org