You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2016/09/29 16:36:36 UTC
svn commit: r1762812 [1/2] - in /qpid/java/trunk:
broker-core/src/test/java/org/apache/qpid/server/transport/ common/
common/src/main/java/org/apache/qpid/bytebuffer/
common/src/main/java/org/apache/qpid/framing/
common/src/test/java/org/apache/qpid/by...
Author: orudyy
Date: Thu Sep 29 16:36:36 2016
New Revision: 1762812
URL: http://svn.apache.org/viewvc?rev=1762812&view=rev
Log:
QPID-6803: Avoid chains of DirectByteBuffers
Added:
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferImpl.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/SlicedQpidByteBuffer.java
qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/SlicedQpidByteBufferTest.java
Modified:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
qpid/java/trunk/common/pom.xml
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferTest.java
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java?rev=1762812&r1=1762811&r2=1762812&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java Thu Sep 29 16:36:36 2016
@@ -44,6 +44,7 @@ import com.fasterxml.jackson.databind.Ob
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
@@ -157,6 +158,9 @@ public class TCPandSSLTransportTest exte
List<String> blackList = mapper.readValue(Broker.DEFAULT_SECURITY_TLS_PROTOCOL_BLACK_LIST, type);
when(port.getTlsProtocolBlackList()).thenReturn(blackList);
when(port.getTlsProtocolWhiteList()).thenReturn(whiteList);
+ final Broker broker = mock(Broker.class);
+ when(broker.getEventLogger()).thenReturn(mock(EventLogger.class));
+ when(port.getParent(Broker.class)).thenReturn(broker);
TCPandSSLTransport transport = new TCPandSSLTransport(new HashSet<>(Arrays.asList(transports)),
port,
Modified: qpid/java/trunk/common/pom.xml
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/pom.xml?rev=1762812&r1=1762811&r2=1762812&view=diff
==============================================================================
--- qpid/java/trunk/common/pom.xml (original)
+++ qpid/java/trunk/common/pom.xml Thu Sep 29 16:36:36 2016
@@ -54,6 +54,13 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava-version}</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java?rev=1762812&r1=1762811&r2=1762812&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java Thu Sep 29 16:36:36 2016
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,6 +17,7 @@
* under the License.
*
*/
+
package org.apache.qpid.bytebuffer;
import java.io.BufferedOutputStream;
@@ -31,7 +31,6 @@ import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.zip.GZIPInputStream;
@@ -41,428 +40,273 @@ import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.qpid.streams.CompositeInputStream;
-public final class QpidByteBuffer
+public abstract class QpidByteBuffer
{
- private static final Logger LOGGER = LoggerFactory.getLogger(QpidByteBuffer.class);
-
- private static final AtomicIntegerFieldUpdater<QpidByteBuffer> DISPOSED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(
+ private static final AtomicIntegerFieldUpdater<QpidByteBuffer>
+ DISPOSED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(
QpidByteBuffer.class,
"_disposed");
-
private static final ThreadLocal<QpidByteBuffer> _cachedBuffer = new ThreadLocal<>();
- private volatile ByteBuffer _buffer;
- private final ByteBufferRef _ref;
-
- @SuppressWarnings("unused")
- private volatile int _disposed;
-
private volatile static boolean _isPoolInitialized;
private volatile static BufferPool _bufferPool;
private volatile static int _pooledBufferSize;
private volatile static ByteBuffer _zeroed;
+ final ByteBufferRef _ref;
+ volatile ByteBuffer _buffer;
+ @SuppressWarnings("unused")
+ private volatile int _disposed;
- QpidByteBuffer(ByteBufferRef ref)
- {
- this(ref.getBuffer(), ref);
- }
-
- private QpidByteBuffer(ByteBuffer buf, ByteBufferRef ref)
+ QpidByteBuffer(ByteBufferRef ref, ByteBuffer buffer)
{
- _buffer = buf;
_ref = ref;
- ref.incrementRef();
+ _buffer = buffer;
}
-
- public boolean hasRemaining()
+ public final boolean isDirect()
{
- return _buffer.hasRemaining();
+ return _buffer.isDirect();
}
- public QpidByteBuffer putInt(final int index, final int value)
+ public final short getUnsignedByte()
{
- _buffer.putInt(index, value);
- return this;
+ return (short) (((short) get()) & 0xFF);
}
- public boolean isDirect()
+ public final int getUnsignedShort()
{
- return _buffer.isDirect();
+ return ((int) getShort()) & 0xffff;
}
- public QpidByteBuffer putShort(final int index, final short value)
+ public final long getUnsignedInt()
{
- _buffer.putShort(index, value);
- return this;
+ return ((long) getInt()) & 0xffffffffL;
}
- public QpidByteBuffer putChar(final int index, final char value)
+ public final QpidByteBuffer putUnsignedByte(final short s)
{
- _buffer.putChar(index, value);
+ put((byte) s);
return this;
-
}
- public QpidByteBuffer put(final byte b)
+ public final QpidByteBuffer putUnsignedShort(final int i)
{
- _buffer.put(b);
+ putShort((short) i);
return this;
}
- public QpidByteBuffer put(final int index, final byte b)
+ public final QpidByteBuffer putUnsignedInt(final long value)
{
- _buffer.put(index, b);
+ putInt((int) value);
return this;
}
- public short getShort(final int index)
+ public final void dispose()
{
- return _buffer.getShort(index);
+ if (DISPOSED_UPDATER.compareAndSet(this, 0, 1))
+ {
+ _ref.decrementRef();
+ _buffer = null;
+ }
}
-
- public QpidByteBuffer mark()
+ public final InputStream asInputStream()
{
- _buffer.mark();
- return this;
+ return new BufferInputStream(this);
}
- public long getLong()
+ public final ByteBuffer asByteBuffer()
{
- return _buffer.getLong();
+ try
+ {
+ return getUnderlyingBuffer();
+ }
+ finally
+ {
+ dispose();
+ }
}
- public QpidByteBuffer putFloat(final int index, final float value)
+ public final CharBuffer decode(Charset charset)
{
- _buffer.putFloat(index, value);
- return this;
+ ByteBuffer underlyingBuffer = getUnderlyingBuffer();
+ try
+ {
+ return charset.decode(underlyingBuffer);
+ }
+ finally
+ {
+ updateFromLastUnderlying();
+ }
}
- public double getDouble(final int index)
+ public final int read(ReadableByteChannel channel) throws IOException
{
- return _buffer.getDouble(index);
+ ByteBuffer underlyingBuffer = getUnderlyingBuffer();
+ try
+ {
+ return channel.read(underlyingBuffer);
+ }
+ finally
+ {
+ updateFromLastUnderlying();
+ }
}
- public boolean hasArray()
+ public final SSLEngineResult decryptSSL(SSLEngine engine, QpidByteBuffer dest) throws SSLException
{
- return _buffer.hasArray();
+ ByteBuffer underlyingBuffer = getUnderlyingBuffer();
+ ByteBuffer destUnderlyingBuffer = dest.getUnderlyingBuffer();
+ try
+ {
+ return engine.unwrap(underlyingBuffer, destUnderlyingBuffer);
+ }
+ finally
+ {
+ updateFromLastUnderlying();
+ dest.updateFromLastUnderlying();
+ }
}
- public QpidByteBuffer asReadOnlyBuffer()
+ @Override
+ public String toString()
{
- return new QpidByteBuffer(_buffer.asReadOnlyBuffer(), _ref);
+ return "QpidByteBuffer{" +
+ "_buffer=" + _buffer +
+ ", _disposed=" + _disposed +
+ '}';
}
- public double getDouble()
- {
- return _buffer.getDouble();
- }
+ public abstract boolean hasRemaining();
- public QpidByteBuffer putFloat(final float value)
- {
- _buffer.putFloat(value);
- return this;
- }
+ public abstract QpidByteBuffer putInt(int index, int value);
- public QpidByteBuffer putInt(final int value)
- {
- _buffer.putInt(value);
- return this;
- }
+ public abstract QpidByteBuffer putShort(int index, short value);
- public byte[] array()
- {
- return _buffer.array();
- }
+ public abstract QpidByteBuffer putChar(int index, char value);
- public QpidByteBuffer putShort(final short value)
- {
- _buffer.putShort(value);
- return this;
- }
+ public abstract QpidByteBuffer put(byte b);
- public int getInt(final int index)
- {
- return _buffer.getInt(index);
- }
+ public abstract QpidByteBuffer put(int index, byte b);
- public int remaining()
- {
- return _buffer.remaining();
- }
+ public abstract short getShort(int index);
- public QpidByteBuffer put(final byte[] src)
- {
- _buffer.put(src);
- return this;
- }
+ public abstract QpidByteBuffer mark();
- public QpidByteBuffer put(final ByteBuffer src)
- {
- _buffer.put(src);
- return this;
- }
+ public abstract long getLong();
- public QpidByteBuffer put(final QpidByteBuffer src)
- {
- _buffer.put(src._buffer);
- return this;
- }
+ public abstract QpidByteBuffer putFloat(int index, float value);
+ public abstract double getDouble(int index);
+ public abstract boolean hasArray();
- public QpidByteBuffer get(final byte[] dst, final int offset, final int length)
- {
- _buffer.get(dst, offset, length);
- return this;
- }
+ public abstract double getDouble();
- public QpidByteBuffer get(final ByteBuffer dst)
- {
- dst.put(_buffer);
- return this;
- }
+ public abstract QpidByteBuffer putFloat(float value);
- public void copyTo(final ByteBuffer dst)
- {
- dst.put(_buffer.duplicate());
- }
+ public abstract QpidByteBuffer putInt(int value);
- public void putCopyOf(final QpidByteBuffer buf)
- {
- _buffer.put(buf._buffer.duplicate());
- }
+ public abstract byte[] array();
- public QpidByteBuffer rewind()
- {
- _buffer.rewind();
- return this;
- }
+ public abstract QpidByteBuffer putShort(short value);
- public QpidByteBuffer clear()
- {
- _buffer.clear();
- return this;
- }
+ public abstract int getInt(int index);
- public QpidByteBuffer putLong(final int index, final long value)
- {
- _buffer.putLong(index, value);
- return this;
- }
- public QpidByteBuffer compact()
- {
- _buffer.compact();
- return this;
- }
+ public abstract int remaining();
- public QpidByteBuffer putDouble(final double value)
- {
- _buffer.putDouble(value);
- return this;
- }
+ public abstract QpidByteBuffer put(byte[] src);
- public int limit()
- {
- return _buffer.limit();
- }
+ public abstract QpidByteBuffer put(ByteBuffer src);
- public QpidByteBuffer reset()
- {
- _buffer.reset();
- return this;
- }
+ public abstract QpidByteBuffer put(QpidByteBuffer src);
- public QpidByteBuffer flip()
- {
- _buffer.flip();
- return this;
- }
+ public abstract QpidByteBuffer get(byte[] dst, int offset, int length);
- public short getShort()
- {
- return _buffer.getShort();
- }
+ public abstract QpidByteBuffer get(ByteBuffer dst);
- public float getFloat()
- {
- return _buffer.getFloat();
- }
+ public abstract void copyTo(ByteBuffer dst);
- public QpidByteBuffer limit(final int newLimit)
- {
- _buffer.limit(newLimit);
- return this;
- }
+ public abstract void putCopyOf(QpidByteBuffer buf);
- public QpidByteBuffer duplicate()
- {
- return new QpidByteBuffer(_buffer.duplicate(), _ref);
- }
+ public abstract QpidByteBuffer rewind();
- public QpidByteBuffer put(final byte[] src, final int offset, final int length)
- {
- _buffer.put(src, offset, length);
- return this;
- }
+ public abstract QpidByteBuffer clear();
- public long getLong(final int index)
- {
- return _buffer.getLong(index);
- }
+ public abstract QpidByteBuffer putLong(int index, long value);
- public int capacity()
- {
- return _buffer.capacity();
- }
+ public abstract QpidByteBuffer compact();
- public boolean isReadOnly()
- {
- return _buffer.isReadOnly();
- }
+ public abstract QpidByteBuffer putDouble(double value);
- public char getChar(final int index)
- {
- return _buffer.getChar(index);
- }
+ public abstract int limit();
- public byte get()
- {
- return _buffer.get();
- }
+ public abstract QpidByteBuffer reset();
- public byte get(final int index)
- {
- return _buffer.get(index);
- }
+ public abstract QpidByteBuffer flip();
- public QpidByteBuffer get(final byte[] dst)
- {
- _buffer.get(dst);
- return this;
- }
+ public abstract short getShort();
+ public abstract float getFloat();
- public void copyTo(final byte[] dst)
- {
- _buffer.duplicate().get(dst);
- }
+ public abstract QpidByteBuffer limit(int newLimit);
- public QpidByteBuffer putChar(final char value)
- {
- _buffer.putChar(value);
- return this;
- }
+ public abstract QpidByteBuffer duplicate();
- public QpidByteBuffer position(final int newPosition)
- {
- _buffer.position(newPosition);
- return this;
- }
+ public abstract QpidByteBuffer put(byte[] src, int offset, int length);
- public int arrayOffset()
- {
- return _buffer.arrayOffset();
- }
+ public abstract long getLong(int index);
- public char getChar()
- {
- return _buffer.getChar();
- }
-
- public int getInt()
- {
- return _buffer.getInt();
- }
+ public abstract int capacity();
- public QpidByteBuffer putLong(final long value)
- {
- _buffer.putLong(value);
- return this;
- }
+ public abstract char getChar(int index);
- public float getFloat(final int index)
- {
- return _buffer.getFloat(index);
- }
+ public abstract byte get();
- public int getUnsignedByte()
- {
- return ((int)get()) & 0xFF;
- }
+ public abstract byte get(int index);
- public int getUnsignedShort()
- {
- return ((int) getShort()) & 0xffff;
- }
+ public abstract QpidByteBuffer get(byte[] dst);
- public long getUnsignedInt()
- {
- return ((long) getInt()) & 0xffffffffL;
- }
+ public abstract void copyTo(byte[] dst);
- public void putUnsignedByte(final short s)
- {
- put((byte)s);
- }
+ public abstract QpidByteBuffer putChar(char value);
+ public abstract QpidByteBuffer position(int newPosition);
- public void putUnsignedShort(final int i)
- {
- putShort((short)i);
- }
+ public abstract int arrayOffset();
- public void putUnsignedInt(final long l)
- {
- putInt((int)l);
- }
+ public abstract char getChar();
+ public abstract int getInt();
- public QpidByteBuffer slice()
- {
- return new QpidByteBuffer(_buffer.slice(), _ref);
- }
+ public abstract QpidByteBuffer putLong(long value);
- public QpidByteBuffer view(int offset, int length)
- {
- ByteBuffer buf = _buffer.slice();
- buf.position(offset);
- buf.limit(offset+Math.min(length, buf.remaining()));
- buf = buf.slice();
+ public abstract float getFloat(int index);
- return new QpidByteBuffer(buf, _ref);
- }
+ public abstract QpidByteBuffer slice();
- public int position()
- {
- return _buffer.position();
- }
+ public abstract QpidByteBuffer view(int offset, int length);
- public QpidByteBuffer putDouble(final int index, final double value)
- {
- _buffer.putDouble(index, value);
- return this;
- }
+ public abstract int position();
- public void dispose()
- {
- if(DISPOSED_UPDATER.compareAndSet(this,0,1))
- {
- _ref.decrementRef();
- _buffer = null;
- }
- }
+ public abstract QpidByteBuffer putDouble(int index, double value);
- public InputStream asInputStream()
- {
- return new BufferInputStream();
- }
+ /**
+ * Returns an underlying byte buffer for update operations.
+ * <p></p>
+ * Method {@link #updateFromLastUnderlying()} needs to be invoked to update the state of {@link QpidByteBuffer}
+ *
+ * @return ByteBuffer
+ */
+ abstract ByteBuffer getUnderlyingBuffer();
+ /**
+ * Used to update the state of {@link QpidByteBuffer} after underlying byte buffer is modified.
+ *
+ * @throws IllegalStateException when method is invoked without previous call to {@link #getUnderlyingBuffer()}
+ */
+ abstract void updateFromLastUnderlying();
public static QpidByteBuffer allocate(boolean direct, int size)
{
@@ -471,14 +315,16 @@ public final class QpidByteBuffer
public static QpidByteBuffer allocate(int size)
{
- return new QpidByteBuffer(new NonPooledByteBufferRef(ByteBuffer.allocate(size)));
+ return new QpidByteBufferImpl(new NonPooledByteBufferRef(ByteBuffer.allocate(size)));
}
public static QpidByteBuffer allocateDirect(int size)
{
if (size < 0)
{
- throw new IllegalArgumentException("Cannot allocate QpidByteBuffer with size " + size + " which is negative.");
+ throw new IllegalArgumentException("Cannot allocate QpidByteBuffer with size "
+ + size
+ + " which is negative.");
}
final ByteBufferRef ref;
@@ -515,29 +361,29 @@ public final class QpidByteBuffer
{
ref = new NonPooledByteBufferRef(ByteBuffer.allocateDirect(size));
}
- return new QpidByteBuffer(ref);
+ return new QpidByteBufferImpl(ref);
}
public static Collection<QpidByteBuffer> allocateDirectCollection(int size)
{
- if(_pooledBufferSize == 0)
+ if (_pooledBufferSize == 0)
{
return Collections.singleton(allocateDirect(size));
}
else
{
- List<QpidByteBuffer> buffers = new ArrayList<>((size / _pooledBufferSize)+2);
+ List<QpidByteBuffer> buffers = new ArrayList<>((size / _pooledBufferSize) + 2);
int remaining = size;
QpidByteBuffer buf = _cachedBuffer.get();
- if(buf == null)
+ if (buf == null)
{
buf = allocateDirect(_pooledBufferSize);
}
- while(remaining > buf.remaining())
+ while (remaining > buf.remaining())
{
int bufRemaining = buf.remaining();
- if (buf == _cachedBuffer.get())
+ if (buf == _cachedBuffer.get())
{
buffers.add(buf.view(0, bufRemaining));
buf.dispose();
@@ -565,40 +411,29 @@ public final class QpidByteBuffer
}
}
- public ByteBuffer asByteBuffer()
- {
- _ref.removeFromPool();
- return _buffer;
- }
-
- public CharBuffer decode(Charset charset)
- {
- return charset.decode(_buffer);
- }
-
- public int read(ReadableByteChannel channel) throws IOException
- {
- return channel.read(_buffer);
- }
-
-
- public SSLEngineResult decryptSSL(SSLEngine engine, QpidByteBuffer dest) throws SSLException
- {
- return engine.unwrap(_buffer, dest._buffer);
- }
-
-
public static SSLEngineResult encryptSSL(SSLEngine engine,
final Collection<QpidByteBuffer> buffers,
QpidByteBuffer dest) throws SSLException
{
+ List<QpidByteBuffer> qpidBuffers = new ArrayList<>(buffers);
final ByteBuffer[] src = new ByteBuffer[buffers.size()];
- Iterator<QpidByteBuffer> iter = buffers.iterator();
- for(int i = 0; i<src.length; i++)
+ for (int i = 0; i < src.length; i++)
+ {
+ src[i] = qpidBuffers.get(i).getUnderlyingBuffer();
+ }
+ ByteBuffer destinationUnderlyingBuffer = dest.getUnderlyingBuffer();
+ try
+ {
+ return engine.wrap(src, destinationUnderlyingBuffer);
+ }
+ finally
{
- src[i] = iter.next()._buffer;
+ for (QpidByteBuffer qpidByteBuffer : qpidBuffers)
+ {
+ qpidByteBuffer.updateFromLastUnderlying();
+ }
+ dest.updateFromLastUnderlying();
}
- return engine.wrap(src, dest._buffer);
}
public static Collection<QpidByteBuffer> inflate(Collection<QpidByteBuffer> compressedBuffers) throws IOException
@@ -673,20 +508,31 @@ public final class QpidByteBuffer
}
}
- public static long write(GatheringByteChannel channel, Collection<QpidByteBuffer> buffers) throws IOException
+ public static long write(GatheringByteChannel channel, Collection<QpidByteBuffer> qpidByteBuffers)
+ throws IOException
{
- ByteBuffer[] bufs = new ByteBuffer[buffers.size()];
- Iterator<QpidByteBuffer> bufIter = buffers.iterator();
- for(int i = 0; i < bufs.length; i++)
+ List<QpidByteBuffer> qpidBuffers = new ArrayList<>(qpidByteBuffers);
+ ByteBuffer[] byteBuffers = new ByteBuffer[qpidBuffers.size()];
+ for (int i = 0; i < byteBuffers.length; i++)
+ {
+ byteBuffers[i] = qpidBuffers.get(i).getUnderlyingBuffer();
+ }
+ try
{
- bufs[i] = bufIter.next()._buffer;
+ return channel.write(byteBuffers);
+ }
+ finally
+ {
+ for (QpidByteBuffer qbb : qpidBuffers)
+ {
+ qbb.updateFromLastUnderlying();
+ }
}
- return channel.write(bufs);
}
public static QpidByteBuffer wrap(final ByteBuffer wrap)
{
- return new QpidByteBuffer(new NonPooledByteBufferRef(wrap));
+ return new QpidByteBufferImpl(new NonPooledByteBufferRef(wrap));
}
public static QpidByteBuffer wrap(final byte[] data)
@@ -713,9 +559,13 @@ public final class QpidByteBuffer
{
if (_isPoolInitialized && (bufferSize != _pooledBufferSize || maxPoolSize != _bufferPool.getMaxSize()))
{
- final String errorMessage = String.format("QpidByteBuffer pool has already been initialised with bufferSize=%d and maxPoolSize=%d." +
- "Re-initialisation with different bufferSize=%d and maxPoolSize=%d is not allowed.",
- _pooledBufferSize, _bufferPool.getMaxSize(), bufferSize, maxPoolSize);
+ final String errorMessage = String.format(
+ "QpidByteBuffer pool has already been initialised with bufferSize=%d and maxPoolSize=%d." +
+ "Re-initialisation with different bufferSize=%d and maxPoolSize=%d is not allowed.",
+ _pooledBufferSize,
+ _bufferPool.getMaxSize(),
+ bufferSize,
+ maxPoolSize);
throw new IllegalStateException(errorMessage);
}
if (bufferSize <= 0)
@@ -723,22 +573,27 @@ public final class QpidByteBuffer
throw new IllegalArgumentException("Negative or zero bufferSize illegal : " + bufferSize);
}
-
_bufferPool = new BufferPool(maxPoolSize);
_pooledBufferSize = bufferSize;
_zeroed = ByteBuffer.allocateDirect(_pooledBufferSize);
_isPoolInitialized = true;
}
- private final class BufferInputStream extends InputStream
+ private static final class BufferInputStream extends InputStream
{
+ private final QpidByteBuffer _qpidByteBuffer;
+
+ private BufferInputStream(final QpidByteBuffer buffer)
+ {
+ _qpidByteBuffer = buffer;
+ }
@Override
public int read() throws IOException
{
- if (_buffer.hasRemaining())
+ if (_qpidByteBuffer.hasRemaining())
{
- return _buffer.get() & 0xFF;
+ return _qpidByteBuffer.get() & 0xFF;
}
return -1;
}
@@ -747,15 +602,15 @@ public final class QpidByteBuffer
@Override
public int read(byte[] b, int off, int len) throws IOException
{
- if (!_buffer.hasRemaining())
+ if (!_qpidByteBuffer.hasRemaining())
{
return -1;
}
- if(_buffer.remaining() < len)
+ if (_qpidByteBuffer.remaining() < len)
{
- len = _buffer.remaining();
+ len = _qpidByteBuffer.remaining();
}
- _buffer.get(b, off, len);
+ _qpidByteBuffer.get(b, off, len);
return len;
}
@@ -763,13 +618,13 @@ public final class QpidByteBuffer
@Override
public void mark(int readlimit)
{
- _buffer.mark();
+ _qpidByteBuffer.mark();
}
@Override
public void reset() throws IOException
{
- _buffer.reset();
+ _qpidByteBuffer.reset();
}
@Override
@@ -781,14 +636,14 @@ public final class QpidByteBuffer
@Override
public long skip(long n) throws IOException
{
- _buffer.position(_buffer.position()+(int)n);
+ _qpidByteBuffer.position(_qpidByteBuffer.position() + (int) n);
return n;
}
@Override
public int available() throws IOException
{
- return _buffer.remaining();
+ return _qpidByteBuffer.remaining();
}
@Override
@@ -796,5 +651,4 @@ public final class QpidByteBuffer
{
}
}
-
}
Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferImpl.java?rev=1762812&view=auto
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferImpl.java (added)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferImpl.java Thu Sep 29 16:36:36 2016
@@ -0,0 +1,454 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.bytebuffer;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+final class QpidByteBufferImpl extends QpidByteBuffer
+{
+
+ QpidByteBufferImpl(ByteBufferRef ref)
+ {
+ this(ref.getBuffer(), ref);
+ }
+
+ private QpidByteBufferImpl(ByteBuffer buf, ByteBufferRef ref)
+ {
+ super(ref, buf);
+ ref.incrementRef();
+ }
+
+ @Override
+ public boolean hasRemaining()
+ {
+ return _buffer.hasRemaining();
+ }
+
+ @Override
+ public QpidByteBuffer putInt(final int index, final int value)
+ {
+ _buffer.putInt(index, value);
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer putShort(final int index, final short value)
+ {
+ _buffer.putShort(index, value);
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer putChar(final int index, final char value)
+ {
+ _buffer.putChar(index, value);
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer put(final byte b)
+ {
+ _buffer.put(b);
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer put(final int index, final byte b)
+ {
+ _buffer.put(index, b);
+ return this;
+ }
+
+ @Override
+ public short getShort(final int index)
+ {
+ return _buffer.getShort(index);
+ }
+
+
+ @Override
+ public QpidByteBuffer mark()
+ {
+ _buffer.mark();
+ return this;
+ }
+
+ @Override
+ public long getLong()
+ {
+ return _buffer.getLong();
+ }
+
+ @Override
+ public QpidByteBuffer putFloat(final int index, final float value)
+ {
+ _buffer.putFloat(index, value);
+ return this;
+ }
+
+ @Override
+ public double getDouble(final int index)
+ {
+ return _buffer.getDouble(index);
+ }
+
+ @Override
+ public boolean hasArray()
+ {
+ return _buffer.hasArray();
+ }
+
+ @Override
+ public double getDouble()
+ {
+ return _buffer.getDouble();
+ }
+
+ @Override
+ public QpidByteBuffer putFloat(final float value)
+ {
+ _buffer.putFloat(value);
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer putInt(final int value)
+ {
+ _buffer.putInt(value);
+ return this;
+ }
+
+ @Override
+ public byte[] array()
+ {
+ return _buffer.array();
+ }
+
+ @Override
+ public QpidByteBuffer putShort(final short value)
+ {
+ _buffer.putShort(value);
+ return this;
+ }
+
+ @Override
+ public int getInt(final int index)
+ {
+ return _buffer.getInt(index);
+ }
+
+ @Override
+ public int remaining()
+ {
+ return _buffer.remaining();
+ }
+
+ @Override
+ public QpidByteBuffer put(final byte[] src)
+ {
+ _buffer.put(src);
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer put(final ByteBuffer src)
+ {
+ _buffer.put(src);
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer put(final QpidByteBuffer src)
+ {
+ ByteBuffer underlyingBuffer = src.getUnderlyingBuffer();
+ _buffer.put(underlyingBuffer);
+ src.updateFromLastUnderlying();
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer get(final byte[] dst, final int offset, final int length)
+ {
+ _buffer.get(dst, offset, length);
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer get(final ByteBuffer dst)
+ {
+ int destinationRemaining = dst.remaining();
+ int remaining = remaining();
+ if (destinationRemaining < remaining)
+ {
+ throw new BufferUnderflowException();
+ }
+ dst.put(_buffer);
+ return this;
+ }
+
+ @Override
+ public void copyTo(final ByteBuffer dst)
+ {
+ dst.put(_buffer.duplicate());
+ }
+
+ @Override
+ public void putCopyOf(final QpidByteBuffer buf)
+ {
+ _buffer.put(buf.getUnderlyingBuffer().duplicate());
+ if (buf instanceof SlicedQpidByteBuffer)
+ {
+ ((SlicedQpidByteBuffer)buf).clearLastUnderlyingBuffer();
+ }
+ }
+
+ @Override
+ public QpidByteBuffer rewind()
+ {
+ _buffer.rewind();
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer clear()
+ {
+ _buffer.clear();
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer putLong(final int index, final long value)
+ {
+ _buffer.putLong(index, value);
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer compact()
+ {
+ _buffer.compact();
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer putDouble(final double value)
+ {
+ _buffer.putDouble(value);
+ return this;
+ }
+
+ @Override
+ public int limit()
+ {
+ return _buffer.limit();
+ }
+
+ @Override
+ public QpidByteBuffer reset()
+ {
+ _buffer.reset();
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer flip()
+ {
+ _buffer.flip();
+ return this;
+ }
+
+ @Override
+ public short getShort()
+ {
+ return _buffer.getShort();
+ }
+
+ @Override
+ public float getFloat()
+ {
+ return _buffer.getFloat();
+ }
+
+ @Override
+ public QpidByteBuffer limit(final int newLimit)
+ {
+ _buffer.limit(newLimit);
+ return this;
+ }
+
+ @Override
+ public QpidByteBufferImpl duplicate()
+ {
+ return new QpidByteBufferImpl(_buffer.duplicate(), _ref);
+ }
+
+ @Override
+ public QpidByteBuffer put(final byte[] src, final int offset, final int length)
+ {
+ _buffer.put(src, offset, length);
+ return this;
+ }
+
+ @Override
+ public long getLong(final int index)
+ {
+ return _buffer.getLong(index);
+ }
+
+ @Override
+ public int capacity()
+ {
+ return _buffer.capacity();
+ }
+
+ @Override
+ public char getChar(final int index)
+ {
+ return _buffer.getChar(index);
+ }
+
+ @Override
+ public byte get()
+ {
+ return _buffer.get();
+ }
+
+ @Override
+ public byte get(final int index)
+ {
+ return _buffer.get(index);
+ }
+
+ @Override
+ public QpidByteBuffer get(final byte[] dst)
+ {
+ _buffer.get(dst);
+ return this;
+ }
+
+ @Override
+ public void copyTo(final byte[] dst)
+ {
+ _buffer.duplicate().get(dst);
+ }
+
+ @Override
+ public QpidByteBuffer putChar(final char value)
+ {
+ _buffer.putChar(value);
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer position(final int newPosition)
+ {
+ _buffer.position(newPosition);
+ return this;
+ }
+
+ @Override
+ public int arrayOffset()
+ {
+ return _buffer.arrayOffset();
+ }
+
+ @Override
+ public char getChar()
+ {
+ return _buffer.getChar();
+ }
+
+ @Override
+ public int getInt()
+ {
+ return _buffer.getInt();
+ }
+
+ @Override
+ public QpidByteBuffer putLong(final long value)
+ {
+ _buffer.putLong(value);
+ return this;
+ }
+
+ @Override
+ public float getFloat(final int index)
+ {
+ return _buffer.getFloat(index);
+ }
+
+
+ @Override
+ public QpidByteBuffer slice()
+ {
+ if (isDirect())
+ {
+ return new SlicedQpidByteBuffer(0, remaining(), remaining(), position(), _ref);
+ }
+ else
+ {
+ return new QpidByteBufferImpl(_buffer.slice(), _ref);
+ }
+ }
+
+ @Override
+ public QpidByteBuffer view(int offset, int length)
+ {
+ if (isDirect())
+ {
+ int capacity = Math.min(_buffer.remaining() - offset, length);
+ return new SlicedQpidByteBuffer(0, capacity, capacity, offset + position(), _ref);
+ }
+ else
+ {
+ ByteBuffer buf = _buffer.slice();
+ buf.position(offset);
+ buf.limit(offset + Math.min(length, buf.remaining()));
+ buf = buf.slice();
+ return new QpidByteBufferImpl(buf, _ref);
+ }
+ }
+
+ @Override
+ public int position()
+ {
+ return _buffer.position();
+ }
+
+ @Override
+ public QpidByteBuffer putDouble(final int index, final double value)
+ {
+ _buffer.putDouble(index, value);
+ return this;
+ }
+
+ ByteBuffer getUnderlyingBuffer()
+ {
+ return _buffer;
+ }
+
+ @Override
+ void updateFromLastUnderlying()
+ {
+ // noop
+ }
+}
Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/SlicedQpidByteBuffer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/SlicedQpidByteBuffer.java?rev=1762812&view=auto
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/SlicedQpidByteBuffer.java (added)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/SlicedQpidByteBuffer.java Thu Sep 29 16:36:36 2016
@@ -0,0 +1,717 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.bytebuffer;
+
+import java.nio.BufferOverflowException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.InvalidMarkException;
+
+final class SlicedQpidByteBuffer extends QpidByteBuffer
+{
+ private static final int SIZE_DOUBLE = 8;
+ private static final int SIZE_FLOAT = 4;
+ private static final int SIZE_LONG = 8;
+ private static final int SIZE_INT = 4;
+ private static final int SIZE_CHAR = 2;
+ private static final int SIZE_SHORT = 2;
+ private static final int SIZE_BYTE = 1;
+
+ private final int _capacity;
+ private final int _offset;
+
+ private int _mark = -1;
+ private int _position = 0;
+ private int _limit;
+ private ByteBuffer _lastUnderlyingBuffer;
+
+ SlicedQpidByteBuffer(final int position,
+ final int limit,
+ final int capacity,
+ final int offset,
+ final ByteBufferRef ref)
+ {
+ super(ref, ref.getBuffer());
+
+ if (capacity < 0)
+ {
+ throw new IllegalArgumentException("Capacity cannot be negative");
+ }
+
+ if (limit > capacity || limit < 0)
+ {
+ throw new IllegalArgumentException("Limit cannot be greater than capacity or negative");
+ }
+
+ if (position > limit || position < 0)
+ {
+ throw new IllegalArgumentException("Position cannot be greater than limit or negative");
+ }
+
+ if (offset < 0)
+ {
+ throw new IllegalArgumentException("Offset cannot be negative");
+ }
+
+ _capacity = capacity;
+ _position = position;
+ _limit = limit;
+ _offset = offset;
+ _ref.incrementRef();
+ }
+
+ @Override
+ public boolean hasRemaining()
+ {
+ return _position < _limit;
+ }
+
+ @Override
+ public QpidByteBuffer putInt(final int index, final int value)
+ {
+ checkIndexBounds(index, SIZE_INT);
+ _buffer.putInt(_offset + index, value);
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer putShort(final int index, final short value)
+ {
+ checkIndexBounds(index, SIZE_SHORT);
+ _buffer.putShort(_offset + index, value);
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer putChar(final int index, final char value)
+ {
+ checkIndexBounds(index, SIZE_CHAR);
+ _buffer.putChar(_offset + index, value);
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer put(final int index, final byte b)
+ {
+ checkIndexBounds(index, SIZE_BYTE);
+ _buffer.put(_offset + index, b);
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer put(final byte b)
+ {
+ checkOverflow(SIZE_BYTE);
+ put(_position, b);
+ _position++;
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer putDouble(final int index, final double value)
+ {
+ checkIndexBounds(index, SIZE_DOUBLE);
+
+ _buffer.putDouble(_offset + index, value);
+ return this;
+ }
+
+
+ @Override
+ public short getShort(final int index)
+ {
+ checkIndexBounds(index, SIZE_SHORT);
+ return _buffer.getShort(index + _offset);
+ }
+
+ @Override
+ public QpidByteBuffer mark()
+ {
+ _mark = _position;
+ return this;
+ }
+
+ @Override
+ public long getLong()
+ {
+ checkUnderflow(SIZE_LONG);
+
+ long value = getLong(_position);
+ _position += SIZE_LONG;
+ return value;
+ }
+
+ @Override
+ public QpidByteBuffer putFloat(final int index, final float value)
+ {
+ checkIndexBounds(index, SIZE_FLOAT);
+
+ _buffer.putFloat(_offset + index, value);
+ return this;
+ }
+
+ @Override
+ public double getDouble(final int index)
+ {
+ checkIndexBounds(index, SIZE_DOUBLE);
+ return _buffer.getDouble(index + _offset);
+ }
+
+ @Override
+ public boolean hasArray()
+ {
+ return _buffer.hasArray();
+ }
+
+ @Override
+ public double getDouble()
+ {
+ checkUnderflow(SIZE_DOUBLE);
+
+ double value = getDouble(_position);
+ _position += SIZE_DOUBLE;
+ return value;
+ }
+
+ @Override
+ public QpidByteBuffer putFloat(final float value)
+ {
+ checkOverflow(SIZE_FLOAT);
+
+ putFloat(position(), value);
+ _position += SIZE_FLOAT;
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer putInt(final int value)
+ {
+ checkOverflow(SIZE_INT);
+
+ putInt(position(), value);
+ _position += SIZE_INT;
+ return this;
+ }
+
+ @Override
+ public byte[] array()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public QpidByteBuffer putShort(final short value)
+ {
+ checkOverflow(SIZE_SHORT);
+
+ putShort(position(), value);
+ _position += SIZE_SHORT;
+ return this;
+ }
+
+ @Override
+ public int getInt(final int index)
+ {
+ checkIndexBounds(index, SIZE_INT);
+ return _buffer.getInt(index + _offset);
+ }
+
+ @Override
+ public int remaining()
+ {
+ return _limit - _position;
+ }
+
+ @Override
+ public QpidByteBuffer put(final byte[] src)
+ {
+ return put(src, 0, src.length);
+ }
+
+ @Override
+ public QpidByteBuffer put(final ByteBuffer src)
+ {
+ int sourceRemaining = src.remaining();
+ if (sourceRemaining > remaining())
+ {
+ throw new BufferOverflowException();
+ }
+
+ for (int i = 0; i < sourceRemaining; i++)
+ {
+ put(src.get());
+ }
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer put(final QpidByteBuffer src)
+ {
+ if (src == this)
+ {
+ throw new IllegalArgumentException();
+ }
+
+ int sourceRemaining = src.remaining();
+ if (sourceRemaining > remaining())
+ {
+ throw new BufferOverflowException();
+ }
+
+ for (int i = 0; i < sourceRemaining; i++)
+ {
+ put(src.get());
+ }
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer get(final byte[] dst, final int offset, final int length)
+ {
+ checkBounds(dst, offset, length);
+
+ if (length > remaining())
+ {
+ throw new BufferUnderflowException();
+ }
+
+ // TODO consider using a slice of the underlying BB, followed by a bulk method
+ for (int i = offset; i < offset + length; i++)
+ {
+ dst[i] = get();
+ }
+
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer get(final ByteBuffer dst)
+ {
+ int destinationRemaining = dst.remaining();
+ int remaining = remaining();
+ if (destinationRemaining < remaining)
+ {
+ throw new BufferUnderflowException();
+ }
+
+ for (int i = 0; i < remaining; i++)
+ {
+ dst.put(get());
+ }
+ return this;
+ }
+
+ @Override
+ public void copyTo(final ByteBuffer dst)
+ {
+ int destinationRemaining = dst.remaining();
+ int remaining = remaining();
+ if (destinationRemaining < remaining)
+ {
+ throw new BufferUnderflowException();
+ }
+
+ for (int i = 0; i < remaining; i++)
+ {
+ dst.put(get(_position + i));
+ }
+ }
+
+ @Override
+ public void putCopyOf(final QpidByteBuffer source)
+ {
+ int remaining = remaining();
+ int sourceRemaining = source.remaining();
+ if (sourceRemaining > remaining)
+ {
+ throw new BufferOverflowException();
+ }
+
+ for (int i = 0; i < sourceRemaining; i++)
+ {
+ put(source.get(i));
+ }
+ }
+
+ @Override
+ public QpidByteBuffer rewind()
+ {
+ _position = 0;
+ _mark = -1;
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer clear()
+ {
+ _position = 0;
+ _limit = _capacity;
+ _mark = -1;
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer putLong(final int index, final long value)
+ {
+ checkIndexBounds(index, SIZE_LONG);
+
+ _buffer.putLong(_offset + index, value);
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer compact()
+ {
+ int remaining = remaining();
+ if (_position > 0 && _position < _limit)
+ {
+ for (int i = 0; i < remaining; i++)
+ {
+ put(i, get(_position + i));
+ }
+ }
+ _position = remaining;
+ _limit = _capacity;
+ _mark = -1;
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer putDouble(final double value)
+ {
+ checkOverflow(SIZE_DOUBLE);
+
+ putDouble(position(), value);
+ _position += SIZE_DOUBLE;
+ return this;
+ }
+
+ @Override
+ public int limit()
+ {
+ return _limit;
+ }
+
+ @Override
+ public QpidByteBuffer reset()
+ {
+ if (_mark < 0)
+ {
+ throw new InvalidMarkException();
+ }
+ _position = _mark;
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer flip()
+ {
+ _limit = _position;
+ _position = 0;
+ _mark = -1;
+ return this;
+ }
+
+ @Override
+ public short getShort()
+ {
+ checkUnderflow(SIZE_SHORT);
+
+ short value = getShort(_position);
+ _position += SIZE_SHORT;
+ return value;
+ }
+
+ @Override
+ public float getFloat()
+ {
+ checkUnderflow(SIZE_FLOAT);
+
+ float value = getFloat(_position);
+ _position += SIZE_FLOAT;
+ return value;
+ }
+
+ @Override
+ public QpidByteBuffer limit(final int newLimit)
+ {
+ if (newLimit > _capacity || newLimit < 0)
+ {
+ throw new IllegalArgumentException();
+ }
+ _limit = newLimit;
+ if (_position > _limit)
+ {
+ _position = _limit;
+ }
+ if (_mark > _limit)
+ {
+ _mark = -1;
+ }
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer duplicate()
+ {
+ SlicedQpidByteBuffer duplicate = new SlicedQpidByteBuffer(_position, _limit, _capacity, _offset, _ref);
+ duplicate._mark = _mark;
+ return duplicate;
+ }
+
+ @Override
+ public QpidByteBuffer put(final byte[] src, final int offset, final int length)
+ {
+ checkBounds(src, offset, length);
+
+ if (length > remaining())
+ {
+ throw new BufferOverflowException();
+ }
+
+ for (int i = offset; i < offset + length; i++)
+ {
+ put(src[i]);
+ }
+ return this;
+ }
+
+ @Override
+ public long getLong(final int index)
+ {
+ checkIndexBounds(index, SIZE_LONG);
+ return _buffer.getLong(index + _offset);
+ }
+
+ @Override
+ public int capacity()
+ {
+ return _capacity;
+ }
+
+ @Override
+ public char getChar(final int index)
+ {
+ checkIndexBounds(index, SIZE_CHAR);
+ return _buffer.getChar(index + _offset);
+ }
+
+ @Override
+ public byte get()
+ {
+ checkUnderflow(SIZE_BYTE);
+
+ byte value = get(_position);
+ _position += SIZE_BYTE;
+ return value;
+ }
+
+ @Override
+ public byte get(final int index)
+ {
+ checkIndexBounds(index, SIZE_BYTE);
+ return _buffer.get(index + _offset);
+ }
+
+ @Override
+ public QpidByteBuffer get(final byte[] dst)
+ {
+ return get(dst, 0, dst.length);
+ }
+
+ @Override
+ public void copyTo(final byte[] dst)
+ {
+ if (remaining() < dst.length)
+ {
+ throw new BufferUnderflowException();
+ }
+
+ for (int i = 0; i < dst.length; i++)
+ {
+ dst[i] = get(_position + i);
+ }
+ }
+
+ @Override
+ public QpidByteBuffer putChar(final char value)
+ {
+ checkOverflow(SIZE_CHAR);
+
+ putChar(position(), value);
+ _position += SIZE_CHAR;
+ return this;
+ }
+
+ @Override
+ public QpidByteBuffer position(final int newPosition)
+ {
+ if (newPosition > _limit || newPosition < 0)
+ {
+ throw new IllegalArgumentException();
+ }
+ _position = newPosition;
+ if (_mark > _position)
+ {
+ _mark = -1;
+ }
+ return this;
+ }
+
+ @Override
+ public int arrayOffset()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public char getChar()
+ {
+ checkUnderflow(SIZE_CHAR);
+
+ char value = getChar(_position);
+ _position += SIZE_CHAR;
+ return value;
+ }
+
+ @Override
+ public int getInt()
+ {
+ checkUnderflow(SIZE_INT);
+
+ int value = getInt(_position);
+ _position += SIZE_INT;
+ return value;
+ }
+
+ @Override
+ public QpidByteBuffer putLong(final long value)
+ {
+ checkOverflow(SIZE_LONG);
+
+ putLong(position(), value);
+ _position += SIZE_LONG;
+ return this;
+ }
+
+ @Override
+ public float getFloat(final int index)
+ {
+ checkIndexBounds(index, SIZE_FLOAT);
+ return _buffer.getFloat(index + _offset);
+ }
+
+
+ @Override
+ public QpidByteBuffer slice()
+ {
+ return new SlicedQpidByteBuffer(0, remaining(), remaining(), _offset + _position, _ref);
+ }
+
+ @Override
+ public QpidByteBuffer view(final int offset, final int length)
+ {
+ int newCapacity = Math.min(length, remaining() - offset);
+ return new SlicedQpidByteBuffer(0, newCapacity, newCapacity, _offset + _position + offset, _ref);
+ }
+
+ @Override
+ public int position()
+ {
+ return _position;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "SlicedQpidByteBuffer{" +
+ "_capacity=" + _capacity +
+ ", _offset=" + _offset +
+ ", _mark=" + _mark +
+ ", _position=" + _position +
+ ", _limit=" + _limit +
+ '}';
+ }
+
+ @Override
+ ByteBuffer getUnderlyingBuffer()
+ {
+ ByteBuffer buffer = _buffer.duplicate();
+ buffer.position(_offset);
+ buffer.limit(_offset + _capacity);
+
+ buffer = buffer.slice();
+ buffer.position(_position);
+ buffer.limit(_limit);
+ _lastUnderlyingBuffer = buffer;
+ return buffer;
+ }
+
+ @Override
+ void updateFromLastUnderlying()
+ {
+ if (_lastUnderlyingBuffer == null)
+ {
+ throw new IllegalStateException("No last underlying ByteBuffer recorded for " + this);
+ }
+ _position = _lastUnderlyingBuffer.position();
+ _limit = _lastUnderlyingBuffer.limit();
+ _lastUnderlyingBuffer = null;
+ }
+
+ void clearLastUnderlyingBuffer()
+ {
+ _lastUnderlyingBuffer = null;
+ }
+
+ private void checkBounds(final byte[] array, final int offset, final int length)
+ {
+ if (offset < 0 || (offset > 0 && offset > array.length - 1) || length < 0 || length > array.length)
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ }
+
+ private void checkIndexBounds(int index, int size)
+ {
+ if (index < 0 || size > _limit - index)
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ }
+
+ private void checkOverflow(final int size)
+ {
+ if (_limit - _position < size)
+ {
+ throw new BufferOverflowException();
+ }
+ }
+
+ private void checkUnderflow(final int size)
+ {
+ if (_limit - _position < size)
+ {
+ throw new BufferUnderflowException();
+ }
+ }
+
+}
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java?rev=1762812&r1=1762811&r2=1762812&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java Thu Sep 29 16:36:36 2016
@@ -136,8 +136,8 @@ public class ConnectionStartBody extends
public static void process(final QpidByteBuffer in, final ClientMethodProcessor dispatcher)
throws AMQFrameDecodingException
{
- short versionMajor = (short) in.getUnsignedByte();
- short versionMinor = (short) in.getUnsignedByte();
+ short versionMajor = in.getUnsignedByte();
+ short versionMinor = in.getUnsignedByte();
FieldTable serverProperties = EncodingUtils.readFieldTable(in);
byte[] mechanisms = EncodingUtils.readBytes(in);
byte[] locales = EncodingUtils.readBytes(in);
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java?rev=1762812&r1=1762811&r2=1762812&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java Thu Sep 29 16:36:36 2016
@@ -320,7 +320,7 @@ public class EncodingUtils
public static long readLongAsShortString(QpidByteBuffer buffer)
{
- short length = (short) buffer.getUnsignedByte();
+ short length = buffer.getUnsignedByte();
short pos = 0;
if (length == 0)
{
Modified: qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferTest.java?rev=1762812&r1=1762811&r2=1762812&view=diff
==============================================================================
--- qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferTest.java (original)
+++ qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferTest.java Thu Sep 29 16:36:36 2016
@@ -150,6 +150,67 @@ public class QpidByteBufferTest extends
}
}
+ public void testSlice() throws Exception
+ {
+ QpidByteBuffer directBuffer = QpidByteBuffer.allocate(true, 6);
+ directBuffer.position(2);
+ directBuffer.limit(5);
+ QpidByteBuffer directSlice = directBuffer.slice();
+
+ assertTrue("Direct slice should be direct too", directSlice.isDirect());
+ assertTrue("Direct slice should be special", directSlice instanceof SlicedQpidByteBuffer);
+ assertEquals("Unexpected capacity", 3, directSlice.capacity());
+ assertEquals("Unexpected limit", 3, directSlice.limit());
+ assertEquals("Unexpected position", 0, directSlice.position());
+
+ directBuffer.dispose();
+ directSlice.dispose();
+
+ final QpidByteBuffer heapBuffer = QpidByteBuffer.allocate(false, 6);
+ final QpidByteBuffer heapSlice = heapBuffer.slice();
+ assertFalse("Heap slice should not be special", heapSlice instanceof SlicedQpidByteBuffer);
+ heapBuffer.dispose();
+ heapSlice.dispose();
+ }
+
+ public void testView() throws Exception
+ {
+ doTestView(true);
+ doTestView(false);
+ }
+
+ private void doTestView(final boolean direct)
+ {
+ byte[] content = "ABCDEF".getBytes();
+ QpidByteBuffer buffer = QpidByteBuffer.allocate(direct, content.length);
+ buffer.put(content);
+ buffer.position(2);
+ buffer.limit(5);
+
+ QpidByteBuffer view = buffer.view(0, buffer.remaining());
+
+ assertEquals("Unexpected view direct", direct, view.isDirect());
+
+ assertEquals("Unexpected capacity", 3, view.capacity());
+ assertEquals("Unexpected limit", 3, view.limit());
+ assertEquals("Unexpected position", 0, view.position());
+
+ byte[] destination = new byte[view.remaining()];
+ view.get(destination);
+
+ Assert.assertArrayEquals("CDE".getBytes(), destination);
+
+ QpidByteBuffer viewWithOffset = buffer.view(1, 1);
+ destination = new byte[viewWithOffset.remaining()];
+ viewWithOffset.get(destination);
+
+ Assert.assertArrayEquals("D".getBytes(), destination);
+
+ buffer.dispose();
+ view.dispose();
+ viewWithOffset.dispose();
+ }
+
private void doDeflateInflate(byte[] input,
Collection<QpidByteBuffer> inputBufs,
boolean direct) throws IOException
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org