You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2016/09/29 16:36:36 UTC

svn commit: r1762812 [1/2] - in /qpid/java/trunk: broker-core/src/test/java/org/apache/qpid/server/transport/ common/ common/src/main/java/org/apache/qpid/bytebuffer/ common/src/main/java/org/apache/qpid/framing/ common/src/test/java/org/apache/qpid/by...

Author: orudyy
Date: Thu Sep 29 16:36:36 2016
New Revision: 1762812

URL: http://svn.apache.org/viewvc?rev=1762812&view=rev
Log:
QPID-6803: Avoid chains of DirectByteBuffers

Added:
    qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferImpl.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/SlicedQpidByteBuffer.java
    qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/SlicedQpidByteBufferTest.java
Modified:
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
    qpid/java/trunk/common/pom.xml
    qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
    qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferTest.java

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java?rev=1762812&r1=1762811&r2=1762812&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java Thu Sep 29 16:36:36 2016
@@ -44,6 +44,7 @@ import com.fasterxml.jackson.databind.Ob
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.Transport;
@@ -157,6 +158,9 @@ public class TCPandSSLTransportTest exte
         List<String> blackList = mapper.readValue(Broker.DEFAULT_SECURITY_TLS_PROTOCOL_BLACK_LIST, type);
         when(port.getTlsProtocolBlackList()).thenReturn(blackList);
         when(port.getTlsProtocolWhiteList()).thenReturn(whiteList);
+        final Broker broker = mock(Broker.class);
+        when(broker.getEventLogger()).thenReturn(mock(EventLogger.class));
+        when(port.getParent(Broker.class)).thenReturn(broker);
 
         TCPandSSLTransport transport = new TCPandSSLTransport(new HashSet<>(Arrays.asList(transports)),
                                                               port,

Modified: qpid/java/trunk/common/pom.xml
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/pom.xml?rev=1762812&r1=1762811&r2=1762812&view=diff
==============================================================================
--- qpid/java/trunk/common/pom.xml (original)
+++ qpid/java/trunk/common/pom.xml Thu Sep 29 16:36:36 2016
@@ -54,6 +54,13 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${guava-version}</version>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
    
   <build>

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java?rev=1762812&r1=1762811&r2=1762812&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java Thu Sep 29 16:36:36 2016
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,6 +17,7 @@
  * under the License.
  *
  */
+
 package org.apache.qpid.bytebuffer;
 
 import java.io.BufferedOutputStream;
@@ -31,7 +31,6 @@ import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.zip.GZIPInputStream;
@@ -41,428 +40,273 @@ import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLEngineResult;
 import javax.net.ssl.SSLException;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.qpid.streams.CompositeInputStream;
 
-public final class QpidByteBuffer
+public abstract class QpidByteBuffer
 {
-    private static final Logger LOGGER = LoggerFactory.getLogger(QpidByteBuffer.class);
-
-    private static final AtomicIntegerFieldUpdater<QpidByteBuffer> DISPOSED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(
+    private static final AtomicIntegerFieldUpdater<QpidByteBuffer>
+            DISPOSED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(
             QpidByteBuffer.class,
             "_disposed");
-
     private static final ThreadLocal<QpidByteBuffer> _cachedBuffer = new ThreadLocal<>();
-    private volatile ByteBuffer _buffer;
-    private final ByteBufferRef _ref;
-
-    @SuppressWarnings("unused")
-    private volatile int _disposed;
-
     private volatile static boolean _isPoolInitialized;
     private volatile static BufferPool _bufferPool;
     private volatile static int _pooledBufferSize;
     private volatile static ByteBuffer _zeroed;
+    final ByteBufferRef _ref;
+    volatile ByteBuffer _buffer;
+    @SuppressWarnings("unused")
+    private volatile int _disposed;
 
-    QpidByteBuffer(ByteBufferRef ref)
-    {
-        this(ref.getBuffer(), ref);
-    }
-
-    private QpidByteBuffer(ByteBuffer buf, ByteBufferRef ref)
+    QpidByteBuffer(ByteBufferRef ref, ByteBuffer buffer)
     {
-        _buffer = buf;
         _ref = ref;
-        ref.incrementRef();
+        _buffer = buffer;
     }
 
-
-    public boolean hasRemaining()
+    public final boolean isDirect()
     {
-        return _buffer.hasRemaining();
+        return _buffer.isDirect();
     }
 
-    public QpidByteBuffer putInt(final int index, final int value)
+    public final short getUnsignedByte()
     {
-        _buffer.putInt(index, value);
-        return this;
+        return (short) (((short) get()) & 0xFF);
     }
 
-    public boolean isDirect()
+    public final int getUnsignedShort()
     {
-        return _buffer.isDirect();
+        return ((int) getShort()) & 0xffff;
     }
 
-    public QpidByteBuffer putShort(final int index, final short value)
+    public final long getUnsignedInt()
     {
-        _buffer.putShort(index, value);
-        return this;
+        return ((long) getInt()) & 0xffffffffL;
     }
 
-    public QpidByteBuffer putChar(final int index, final char value)
+    public final QpidByteBuffer putUnsignedByte(final short s)
     {
-        _buffer.putChar(index, value);
+        put((byte) s);
         return this;
-
     }
 
-    public QpidByteBuffer put(final byte b)
+    public final QpidByteBuffer putUnsignedShort(final int i)
     {
-        _buffer.put(b);
+        putShort((short) i);
         return this;
     }
 
-    public QpidByteBuffer put(final int index, final byte b)
+    public final QpidByteBuffer putUnsignedInt(final long value)
     {
-        _buffer.put(index, b);
+        putInt((int) value);
         return this;
     }
 
-    public short getShort(final int index)
+    public final void dispose()
     {
-        return _buffer.getShort(index);
+        if (DISPOSED_UPDATER.compareAndSet(this, 0, 1))
+        {
+            _ref.decrementRef();
+            _buffer = null;
+        }
     }
 
-
-    public QpidByteBuffer mark()
+    public final InputStream asInputStream()
     {
-        _buffer.mark();
-        return this;
+        return new BufferInputStream(this);
     }
 
-    public long getLong()
+    public final ByteBuffer asByteBuffer()
     {
-        return _buffer.getLong();
+        try
+        {
+            return getUnderlyingBuffer();
+        }
+        finally
+        {
+            dispose();
+        }
     }
 
-    public QpidByteBuffer putFloat(final int index, final float value)
+    public final CharBuffer decode(Charset charset)
     {
-        _buffer.putFloat(index, value);
-        return this;
+        ByteBuffer underlyingBuffer = getUnderlyingBuffer();
+        try
+        {
+            return charset.decode(underlyingBuffer);
+        }
+        finally
+        {
+            updateFromLastUnderlying();
+        }
     }
 
-    public double getDouble(final int index)
+    public final int read(ReadableByteChannel channel) throws IOException
     {
-        return _buffer.getDouble(index);
+        ByteBuffer underlyingBuffer = getUnderlyingBuffer();
+        try
+        {
+            return channel.read(underlyingBuffer);
+        }
+        finally
+        {
+            updateFromLastUnderlying();
+        }
     }
 
-    public boolean hasArray()
+    public final SSLEngineResult decryptSSL(SSLEngine engine, QpidByteBuffer dest) throws SSLException
     {
-        return _buffer.hasArray();
+        ByteBuffer underlyingBuffer = getUnderlyingBuffer();
+        ByteBuffer destUnderlyingBuffer = dest.getUnderlyingBuffer();
+        try
+        {
+            return engine.unwrap(underlyingBuffer, destUnderlyingBuffer);
+        }
+        finally
+        {
+            updateFromLastUnderlying();
+            dest.updateFromLastUnderlying();
+        }
     }
 
-    public QpidByteBuffer asReadOnlyBuffer()
+    @Override
+    public String toString()
     {
-        return new QpidByteBuffer(_buffer.asReadOnlyBuffer(), _ref);
+        return "QpidByteBuffer{" +
+               "_buffer=" + _buffer +
+               ", _disposed=" + _disposed +
+               '}';
     }
 
-    public double getDouble()
-    {
-        return _buffer.getDouble();
-    }
+    public abstract boolean hasRemaining();
 
-    public QpidByteBuffer putFloat(final float value)
-    {
-        _buffer.putFloat(value);
-        return this;
-    }
+    public abstract QpidByteBuffer putInt(int index, int value);
 
-    public QpidByteBuffer putInt(final int value)
-    {
-        _buffer.putInt(value);
-        return this;
-    }
+    public abstract QpidByteBuffer putShort(int index, short value);
 
-    public byte[] array()
-    {
-        return _buffer.array();
-    }
+    public abstract QpidByteBuffer putChar(int index, char value);
 
-    public QpidByteBuffer putShort(final short value)
-    {
-        _buffer.putShort(value);
-        return this;
-    }
+    public abstract QpidByteBuffer put(byte b);
 
-    public int getInt(final int index)
-    {
-        return _buffer.getInt(index);
-    }
+    public abstract QpidByteBuffer put(int index, byte b);
 
-    public int remaining()
-    {
-        return _buffer.remaining();
-    }
+    public abstract short getShort(int index);
 
-    public QpidByteBuffer put(final byte[] src)
-    {
-        _buffer.put(src);
-        return this;
-    }
+    public abstract QpidByteBuffer mark();
 
-    public QpidByteBuffer put(final ByteBuffer src)
-    {
-        _buffer.put(src);
-        return this;
-    }
+    public abstract long getLong();
 
-    public QpidByteBuffer put(final QpidByteBuffer src)
-    {
-        _buffer.put(src._buffer);
-        return this;
-    }
+    public abstract QpidByteBuffer putFloat(int index, float value);
 
+    public abstract double getDouble(int index);
 
+    public abstract boolean hasArray();
 
-    public QpidByteBuffer get(final byte[] dst, final int offset, final int length)
-    {
-        _buffer.get(dst, offset, length);
-        return this;
-    }
+    public abstract double getDouble();
 
-    public QpidByteBuffer get(final ByteBuffer dst)
-    {
-        dst.put(_buffer);
-        return this;
-    }
+    public abstract QpidByteBuffer putFloat(float value);
 
-    public void copyTo(final ByteBuffer dst)
-    {
-        dst.put(_buffer.duplicate());
-    }
+    public abstract QpidByteBuffer putInt(int value);
 
-    public void putCopyOf(final QpidByteBuffer buf)
-    {
-        _buffer.put(buf._buffer.duplicate());
-    }
+    public abstract byte[] array();
 
-    public QpidByteBuffer rewind()
-    {
-        _buffer.rewind();
-        return this;
-    }
+    public abstract QpidByteBuffer putShort(short value);
 
-    public QpidByteBuffer clear()
-    {
-        _buffer.clear();
-        return this;
-    }
+    public abstract int getInt(int index);
 
-    public QpidByteBuffer putLong(final int index, final long value)
-    {
-        _buffer.putLong(index, value);
-        return this;
-    }
-    public QpidByteBuffer compact()
-    {
-        _buffer.compact();
-        return this;
-    }
+    public abstract int remaining();
 
-    public QpidByteBuffer putDouble(final double value)
-    {
-        _buffer.putDouble(value);
-        return this;
-    }
+    public abstract QpidByteBuffer put(byte[] src);
 
-    public int limit()
-    {
-        return _buffer.limit();
-    }
+    public abstract QpidByteBuffer put(ByteBuffer src);
 
-    public QpidByteBuffer reset()
-    {
-        _buffer.reset();
-        return this;
-    }
+    public abstract QpidByteBuffer put(QpidByteBuffer src);
 
-    public QpidByteBuffer flip()
-    {
-        _buffer.flip();
-        return this;
-    }
+    public abstract QpidByteBuffer get(byte[] dst, int offset, int length);
 
-    public short getShort()
-    {
-        return _buffer.getShort();
-    }
+    public abstract QpidByteBuffer get(ByteBuffer dst);
 
-    public float getFloat()
-    {
-        return _buffer.getFloat();
-    }
+    public abstract void copyTo(ByteBuffer dst);
 
-    public QpidByteBuffer limit(final int newLimit)
-    {
-        _buffer.limit(newLimit);
-        return this;
-    }
+    public abstract void putCopyOf(QpidByteBuffer buf);
 
-    public QpidByteBuffer duplicate()
-    {
-        return new QpidByteBuffer(_buffer.duplicate(), _ref);
-    }
+    public abstract QpidByteBuffer rewind();
 
-    public QpidByteBuffer put(final byte[] src, final int offset, final int length)
-    {
-        _buffer.put(src, offset, length);
-        return this;
-    }
+    public abstract QpidByteBuffer clear();
 
-    public long getLong(final int index)
-    {
-        return _buffer.getLong(index);
-    }
+    public abstract QpidByteBuffer putLong(int index, long value);
 
-    public int capacity()
-    {
-        return _buffer.capacity();
-    }
+    public abstract QpidByteBuffer compact();
 
-    public boolean isReadOnly()
-    {
-        return _buffer.isReadOnly();
-    }
+    public abstract QpidByteBuffer putDouble(double value);
 
-    public char getChar(final int index)
-    {
-        return _buffer.getChar(index);
-    }
+    public abstract int limit();
 
-    public byte get()
-    {
-        return _buffer.get();
-    }
+    public abstract QpidByteBuffer reset();
 
-    public byte get(final int index)
-    {
-        return _buffer.get(index);
-    }
+    public abstract QpidByteBuffer flip();
 
-    public QpidByteBuffer get(final byte[] dst)
-    {
-        _buffer.get(dst);
-        return this;
-    }
+    public abstract short getShort();
 
+    public abstract float getFloat();
 
-    public void copyTo(final byte[] dst)
-    {
-        _buffer.duplicate().get(dst);
-    }
+    public abstract QpidByteBuffer limit(int newLimit);
 
-    public QpidByteBuffer putChar(final char value)
-    {
-        _buffer.putChar(value);
-        return this;
-    }
+    public abstract QpidByteBuffer duplicate();
 
-    public QpidByteBuffer position(final int newPosition)
-    {
-        _buffer.position(newPosition);
-        return this;
-    }
+    public abstract QpidByteBuffer put(byte[] src, int offset, int length);
 
-    public int arrayOffset()
-    {
-        return _buffer.arrayOffset();
-    }
+    public abstract long getLong(int index);
 
-    public char getChar()
-    {
-        return _buffer.getChar();
-    }
-
-    public int getInt()
-    {
-        return _buffer.getInt();
-    }
+    public abstract int capacity();
 
-    public QpidByteBuffer putLong(final long value)
-    {
-        _buffer.putLong(value);
-        return this;
-    }
+    public abstract char getChar(int index);
 
-    public float getFloat(final int index)
-    {
-        return _buffer.getFloat(index);
-    }
+    public abstract byte get();
 
-    public int getUnsignedByte()
-    {
-        return ((int)get()) & 0xFF;
-    }
+    public abstract byte get(int index);
 
-    public int getUnsignedShort()
-    {
-        return ((int) getShort()) & 0xffff;
-    }
+    public abstract QpidByteBuffer get(byte[] dst);
 
-    public long getUnsignedInt()
-    {
-        return ((long) getInt()) & 0xffffffffL;
-    }
+    public abstract void copyTo(byte[] dst);
 
-    public void putUnsignedByte(final short s)
-    {
-        put((byte)s);
-    }
+    public abstract QpidByteBuffer putChar(char value);
 
+    public abstract QpidByteBuffer position(int newPosition);
 
-    public void putUnsignedShort(final int i)
-    {
-        putShort((short)i);
-    }
+    public abstract int arrayOffset();
 
-    public void putUnsignedInt(final long l)
-    {
-        putInt((int)l);
-    }
+    public abstract char getChar();
 
+    public abstract int getInt();
 
-    public QpidByteBuffer slice()
-    {
-        return new QpidByteBuffer(_buffer.slice(), _ref);
-    }
+    public abstract QpidByteBuffer putLong(long value);
 
-    public QpidByteBuffer view(int offset, int length)
-    {
-        ByteBuffer buf = _buffer.slice();
-        buf.position(offset);
-        buf.limit(offset+Math.min(length, buf.remaining()));
-        buf = buf.slice();
+    public abstract float getFloat(int index);
 
-        return new QpidByteBuffer(buf, _ref);
-    }
+    public abstract QpidByteBuffer slice();
 
-    public int position()
-    {
-        return _buffer.position();
-    }
+    public abstract QpidByteBuffer view(int offset, int length);
 
-    public QpidByteBuffer putDouble(final int index, final double value)
-    {
-        _buffer.putDouble(index, value);
-        return this;
-    }
+    public abstract int position();
 
-    public void dispose()
-    {
-        if(DISPOSED_UPDATER.compareAndSet(this,0,1))
-        {
-            _ref.decrementRef();
-            _buffer = null;
-        }
-    }
+    public abstract QpidByteBuffer putDouble(int index, double value);
 
-    public InputStream asInputStream()
-    {
-        return new BufferInputStream();
-    }
+    /**
+     * Returns an underlying byte buffer for update operations.
+     * <p></p>
+     * Method {@link #updateFromLastUnderlying()} needs to be invoked to update the state of {@link QpidByteBuffer}
+     *
+     * @return ByteBuffer
+     */
+    abstract ByteBuffer getUnderlyingBuffer();
 
+    /**
+     * Used to update the state of {@link QpidByteBuffer} after underlying byte buffer is modified.
+     *
+     * @throws IllegalStateException when method is invoked without previous call to {@link #getUnderlyingBuffer()}
+     */
+    abstract void updateFromLastUnderlying();
 
     public static QpidByteBuffer allocate(boolean direct, int size)
     {
@@ -471,14 +315,16 @@ public final class QpidByteBuffer
 
     public static QpidByteBuffer allocate(int size)
     {
-        return new QpidByteBuffer(new NonPooledByteBufferRef(ByteBuffer.allocate(size)));
+        return new QpidByteBufferImpl(new NonPooledByteBufferRef(ByteBuffer.allocate(size)));
     }
 
     public static QpidByteBuffer allocateDirect(int size)
     {
         if (size < 0)
         {
-            throw new IllegalArgumentException("Cannot allocate QpidByteBuffer with size " + size + " which is negative.");
+            throw new IllegalArgumentException("Cannot allocate QpidByteBuffer with size "
+                                               + size
+                                               + " which is negative.");
         }
 
         final ByteBufferRef ref;
@@ -515,29 +361,29 @@ public final class QpidByteBuffer
         {
             ref = new NonPooledByteBufferRef(ByteBuffer.allocateDirect(size));
         }
-        return new QpidByteBuffer(ref);
+        return new QpidByteBufferImpl(ref);
     }
 
     public static Collection<QpidByteBuffer> allocateDirectCollection(int size)
     {
-        if(_pooledBufferSize == 0)
+        if (_pooledBufferSize == 0)
         {
             return Collections.singleton(allocateDirect(size));
         }
         else
         {
-            List<QpidByteBuffer> buffers = new ArrayList<>((size / _pooledBufferSize)+2);
+            List<QpidByteBuffer> buffers = new ArrayList<>((size / _pooledBufferSize) + 2);
             int remaining = size;
 
             QpidByteBuffer buf = _cachedBuffer.get();
-            if(buf == null)
+            if (buf == null)
             {
                 buf = allocateDirect(_pooledBufferSize);
             }
-            while(remaining > buf.remaining())
+            while (remaining > buf.remaining())
             {
                 int bufRemaining = buf.remaining();
-                if (buf  == _cachedBuffer.get())
+                if (buf == _cachedBuffer.get())
                 {
                     buffers.add(buf.view(0, bufRemaining));
                     buf.dispose();
@@ -565,40 +411,29 @@ public final class QpidByteBuffer
         }
     }
 
-    public ByteBuffer asByteBuffer()
-    {
-        _ref.removeFromPool();
-        return _buffer;
-    }
-
-    public CharBuffer decode(Charset charset)
-    {
-        return charset.decode(_buffer);
-    }
-
-    public int read(ReadableByteChannel channel) throws IOException
-    {
-        return channel.read(_buffer);
-    }
-
-
-    public SSLEngineResult decryptSSL(SSLEngine engine, QpidByteBuffer dest) throws SSLException
-    {
-        return engine.unwrap(_buffer, dest._buffer);
-    }
-
-
     public static SSLEngineResult encryptSSL(SSLEngine engine,
                                              final Collection<QpidByteBuffer> buffers,
                                              QpidByteBuffer dest) throws SSLException
     {
+        List<QpidByteBuffer> qpidBuffers = new ArrayList<>(buffers);
         final ByteBuffer[] src = new ByteBuffer[buffers.size()];
-        Iterator<QpidByteBuffer> iter = buffers.iterator();
-        for(int i = 0; i<src.length; i++)
+        for (int i = 0; i < src.length; i++)
+        {
+            src[i] = qpidBuffers.get(i).getUnderlyingBuffer();
+        }
+        ByteBuffer destinationUnderlyingBuffer = dest.getUnderlyingBuffer();
+        try
+        {
+            return engine.wrap(src, destinationUnderlyingBuffer);
+        }
+        finally
         {
-            src[i] = iter.next()._buffer;
+            for (QpidByteBuffer qpidByteBuffer : qpidBuffers)
+            {
+                qpidByteBuffer.updateFromLastUnderlying();
+            }
+            dest.updateFromLastUnderlying();
         }
-        return engine.wrap(src, dest._buffer);
     }
 
     public static Collection<QpidByteBuffer> inflate(Collection<QpidByteBuffer> compressedBuffers) throws IOException
@@ -673,20 +508,31 @@ public final class QpidByteBuffer
         }
     }
 
-    public static long write(GatheringByteChannel channel, Collection<QpidByteBuffer> buffers) throws IOException
+    public static long write(GatheringByteChannel channel, Collection<QpidByteBuffer> qpidByteBuffers)
+            throws IOException
     {
-        ByteBuffer[] bufs = new ByteBuffer[buffers.size()];
-        Iterator<QpidByteBuffer> bufIter = buffers.iterator();
-        for(int i = 0; i < bufs.length; i++)
+        List<QpidByteBuffer> qpidBuffers = new ArrayList<>(qpidByteBuffers);
+        ByteBuffer[] byteBuffers = new ByteBuffer[qpidBuffers.size()];
+        for (int i = 0; i < byteBuffers.length; i++)
+        {
+            byteBuffers[i] = qpidBuffers.get(i).getUnderlyingBuffer();
+        }
+        try
         {
-            bufs[i] = bufIter.next()._buffer;
+            return channel.write(byteBuffers);
+        }
+        finally
+        {
+            for (QpidByteBuffer qbb : qpidBuffers)
+            {
+                qbb.updateFromLastUnderlying();
+            }
         }
-        return channel.write(bufs);
     }
 
     public static QpidByteBuffer wrap(final ByteBuffer wrap)
     {
-        return new QpidByteBuffer(new NonPooledByteBufferRef(wrap));
+        return new QpidByteBufferImpl(new NonPooledByteBufferRef(wrap));
     }
 
     public static QpidByteBuffer wrap(final byte[] data)
@@ -713,9 +559,13 @@ public final class QpidByteBuffer
     {
         if (_isPoolInitialized && (bufferSize != _pooledBufferSize || maxPoolSize != _bufferPool.getMaxSize()))
         {
-            final String errorMessage = String.format("QpidByteBuffer pool has already been initialised with bufferSize=%d and maxPoolSize=%d." +
-                            "Re-initialisation with different bufferSize=%d and maxPoolSize=%d is not allowed.",
-                            _pooledBufferSize, _bufferPool.getMaxSize(), bufferSize, maxPoolSize);
+            final String errorMessage = String.format(
+                    "QpidByteBuffer pool has already been initialised with bufferSize=%d and maxPoolSize=%d." +
+                    "Re-initialisation with different bufferSize=%d and maxPoolSize=%d is not allowed.",
+                    _pooledBufferSize,
+                    _bufferPool.getMaxSize(),
+                    bufferSize,
+                    maxPoolSize);
             throw new IllegalStateException(errorMessage);
         }
         if (bufferSize <= 0)
@@ -723,22 +573,27 @@ public final class QpidByteBuffer
             throw new IllegalArgumentException("Negative or zero bufferSize illegal : " + bufferSize);
         }
 
-
         _bufferPool = new BufferPool(maxPoolSize);
         _pooledBufferSize = bufferSize;
         _zeroed = ByteBuffer.allocateDirect(_pooledBufferSize);
         _isPoolInitialized = true;
     }
 
-    private final class BufferInputStream extends InputStream
+    private static final class BufferInputStream extends InputStream
     {
+        private final QpidByteBuffer _qpidByteBuffer;
+
+        private BufferInputStream(final QpidByteBuffer buffer)
+        {
+            _qpidByteBuffer = buffer;
+        }
 
         @Override
         public int read() throws IOException
         {
-            if (_buffer.hasRemaining())
+            if (_qpidByteBuffer.hasRemaining())
             {
-                return _buffer.get() & 0xFF;
+                return _qpidByteBuffer.get() & 0xFF;
             }
             return -1;
         }
@@ -747,15 +602,15 @@ public final class QpidByteBuffer
         @Override
         public int read(byte[] b, int off, int len) throws IOException
         {
-            if (!_buffer.hasRemaining())
+            if (!_qpidByteBuffer.hasRemaining())
             {
                 return -1;
             }
-            if(_buffer.remaining() < len)
+            if (_qpidByteBuffer.remaining() < len)
             {
-                len = _buffer.remaining();
+                len = _qpidByteBuffer.remaining();
             }
-            _buffer.get(b, off, len);
+            _qpidByteBuffer.get(b, off, len);
 
             return len;
         }
@@ -763,13 +618,13 @@ public final class QpidByteBuffer
         @Override
         public void mark(int readlimit)
         {
-            _buffer.mark();
+            _qpidByteBuffer.mark();
         }
 
         @Override
         public void reset() throws IOException
         {
-            _buffer.reset();
+            _qpidByteBuffer.reset();
         }
 
         @Override
@@ -781,14 +636,14 @@ public final class QpidByteBuffer
         @Override
         public long skip(long n) throws IOException
         {
-            _buffer.position(_buffer.position()+(int)n);
+            _qpidByteBuffer.position(_qpidByteBuffer.position() + (int) n);
             return n;
         }
 
         @Override
         public int available() throws IOException
         {
-            return _buffer.remaining();
+            return _qpidByteBuffer.remaining();
         }
 
         @Override
@@ -796,5 +651,4 @@ public final class QpidByteBuffer
         {
         }
     }
-
 }

Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferImpl.java?rev=1762812&view=auto
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferImpl.java (added)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferImpl.java Thu Sep 29 16:36:36 2016
@@ -0,0 +1,454 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.bytebuffer;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+final class QpidByteBufferImpl extends QpidByteBuffer
+{
+
+    QpidByteBufferImpl(ByteBufferRef ref)
+    {
+        this(ref.getBuffer(), ref);
+    }
+
+    private QpidByteBufferImpl(ByteBuffer buf, ByteBufferRef ref)
+    {
+        super(ref, buf);
+        ref.incrementRef();
+    }
+
+    @Override
+    public boolean hasRemaining()
+    {
+        return _buffer.hasRemaining();
+    }
+
+    @Override
+    public QpidByteBuffer putInt(final int index, final int value)
+    {
+        _buffer.putInt(index, value);
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer putShort(final int index, final short value)
+    {
+        _buffer.putShort(index, value);
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer putChar(final int index, final char value)
+    {
+        _buffer.putChar(index, value);
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer put(final byte b)
+    {
+        _buffer.put(b);
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer put(final int index, final byte b)
+    {
+        _buffer.put(index, b);
+        return this;
+    }
+
+    @Override
+    public short getShort(final int index)
+    {
+        return _buffer.getShort(index);
+    }
+
+
+    @Override
+    public QpidByteBuffer mark()
+    {
+        _buffer.mark();
+        return this;
+    }
+
+    @Override
+    public long getLong()
+    {
+        return _buffer.getLong();
+    }
+
+    @Override
+    public QpidByteBuffer putFloat(final int index, final float value)
+    {
+        _buffer.putFloat(index, value);
+        return this;
+    }
+
+    @Override
+    public double getDouble(final int index)
+    {
+        return _buffer.getDouble(index);
+    }
+
+    @Override
+    public boolean hasArray()
+    {
+        return _buffer.hasArray();
+    }
+
+    @Override
+    public double getDouble()
+    {
+        return _buffer.getDouble();
+    }
+
+    @Override
+    public QpidByteBuffer putFloat(final float value)
+    {
+        _buffer.putFloat(value);
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer putInt(final int value)
+    {
+        _buffer.putInt(value);
+        return this;
+    }
+
+    @Override
+    public byte[] array()
+    {
+        return _buffer.array();
+    }
+
+    @Override
+    public QpidByteBuffer putShort(final short value)
+    {
+        _buffer.putShort(value);
+        return this;
+    }
+
+    @Override
+    public int getInt(final int index)
+    {
+        return _buffer.getInt(index);
+    }
+
+    @Override
+    public int remaining()
+    {
+        return _buffer.remaining();
+    }
+
+    @Override
+    public QpidByteBuffer put(final byte[] src)
+    {
+        _buffer.put(src);
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer put(final ByteBuffer src)
+    {
+        _buffer.put(src);
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer put(final QpidByteBuffer src)
+    {
+        ByteBuffer underlyingBuffer = src.getUnderlyingBuffer();
+        _buffer.put(underlyingBuffer);
+        src.updateFromLastUnderlying();
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer get(final byte[] dst, final int offset, final int length)
+    {
+        _buffer.get(dst, offset, length);
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer get(final ByteBuffer dst)
+    {
+        int destinationRemaining = dst.remaining();
+        int remaining = remaining();
+        if (destinationRemaining < remaining)
+        {
+            throw new BufferUnderflowException();
+        }
+        dst.put(_buffer);
+        return this;
+    }
+
+    @Override
+    public void copyTo(final ByteBuffer dst)
+    {
+        dst.put(_buffer.duplicate());
+    }
+
+    @Override
+    public void putCopyOf(final QpidByteBuffer buf)
+    {
+        _buffer.put(buf.getUnderlyingBuffer().duplicate());
+        if (buf instanceof SlicedQpidByteBuffer)
+        {
+            ((SlicedQpidByteBuffer)buf).clearLastUnderlyingBuffer();
+        }
+    }
+
+    @Override
+    public QpidByteBuffer rewind()
+    {
+        _buffer.rewind();
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer clear()
+    {
+        _buffer.clear();
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer putLong(final int index, final long value)
+    {
+        _buffer.putLong(index, value);
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer compact()
+    {
+        _buffer.compact();
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer putDouble(final double value)
+    {
+        _buffer.putDouble(value);
+        return this;
+    }
+
+    @Override
+    public int limit()
+    {
+        return _buffer.limit();
+    }
+
+    @Override
+    public QpidByteBuffer reset()
+    {
+        _buffer.reset();
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer flip()
+    {
+        _buffer.flip();
+        return this;
+    }
+
+    @Override
+    public short getShort()
+    {
+        return _buffer.getShort();
+    }
+
+    @Override
+    public float getFloat()
+    {
+        return _buffer.getFloat();
+    }
+
+    @Override
+    public QpidByteBuffer limit(final int newLimit)
+    {
+        _buffer.limit(newLimit);
+        return this;
+    }
+
+    @Override
+    public QpidByteBufferImpl duplicate()
+    {
+        return new QpidByteBufferImpl(_buffer.duplicate(), _ref);
+    }
+
+    @Override
+    public QpidByteBuffer put(final byte[] src, final int offset, final int length)
+    {
+        _buffer.put(src, offset, length);
+        return this;
+    }
+
+    @Override
+    public long getLong(final int index)
+    {
+        return _buffer.getLong(index);
+    }
+
+    @Override
+    public int capacity()
+    {
+        return _buffer.capacity();
+    }
+
+    @Override
+    public char getChar(final int index)
+    {
+        return _buffer.getChar(index);
+    }
+
+    @Override
+    public byte get()
+    {
+        return _buffer.get();
+    }
+
+    @Override
+    public byte get(final int index)
+    {
+        return _buffer.get(index);
+    }
+
+    @Override
+    public QpidByteBuffer get(final byte[] dst)
+    {
+        _buffer.get(dst);
+        return this;
+    }
+
+    @Override
+    public void copyTo(final byte[] dst)
+    {
+        _buffer.duplicate().get(dst);
+    }
+
+    @Override
+    public QpidByteBuffer putChar(final char value)
+    {
+        _buffer.putChar(value);
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer position(final int newPosition)
+    {
+        _buffer.position(newPosition);
+        return this;
+    }
+
+    @Override
+    public int arrayOffset()
+    {
+        return _buffer.arrayOffset();
+    }
+
+    @Override
+    public char getChar()
+    {
+        return _buffer.getChar();
+    }
+
+    @Override
+    public int getInt()
+    {
+        return _buffer.getInt();
+    }
+
+    @Override
+    public QpidByteBuffer putLong(final long value)
+    {
+        _buffer.putLong(value);
+        return this;
+    }
+
+    @Override
+    public float getFloat(final int index)
+    {
+        return _buffer.getFloat(index);
+    }
+
+
+    @Override
+    public QpidByteBuffer slice()
+    {
+        if (isDirect())
+        {
+            return new SlicedQpidByteBuffer(0, remaining(), remaining(), position(), _ref);
+        }
+        else
+        {
+            return new QpidByteBufferImpl(_buffer.slice(), _ref);
+        }
+    }
+
+    @Override
+    public QpidByteBuffer view(int offset, int length)
+    {
+        if (isDirect())
+        {
+            int capacity = Math.min(_buffer.remaining() - offset, length);
+            return new SlicedQpidByteBuffer(0, capacity, capacity, offset + position(), _ref);
+        }
+        else
+        {
+            ByteBuffer buf = _buffer.slice();
+            buf.position(offset);
+            buf.limit(offset + Math.min(length, buf.remaining()));
+            buf = buf.slice();
+            return new QpidByteBufferImpl(buf, _ref);
+        }
+    }
+
+    @Override
+    public int position()
+    {
+        return _buffer.position();
+    }
+
+    @Override
+    public QpidByteBuffer putDouble(final int index, final double value)
+    {
+        _buffer.putDouble(index, value);
+        return this;
+    }
+
+    ByteBuffer getUnderlyingBuffer()
+    {
+        return _buffer;
+    }
+
+    @Override
+    void updateFromLastUnderlying()
+    {
+        // noop
+    }
+}

Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/SlicedQpidByteBuffer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/SlicedQpidByteBuffer.java?rev=1762812&view=auto
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/SlicedQpidByteBuffer.java (added)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/SlicedQpidByteBuffer.java Thu Sep 29 16:36:36 2016
@@ -0,0 +1,717 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.bytebuffer;
+
+import java.nio.BufferOverflowException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.InvalidMarkException;
+
+final class SlicedQpidByteBuffer extends QpidByteBuffer
+{
+    private static final int SIZE_DOUBLE = 8;
+    private static final int SIZE_FLOAT = 4;
+    private static final int SIZE_LONG = 8;
+    private static final int SIZE_INT = 4;
+    private static final int SIZE_CHAR = 2;
+    private static final int SIZE_SHORT = 2;
+    private static final int SIZE_BYTE = 1;
+
+    private final int _capacity;
+    private final int _offset;
+
+    private int _mark = -1;
+    private int _position = 0;
+    private int _limit;
+    private ByteBuffer _lastUnderlyingBuffer;
+
+    SlicedQpidByteBuffer(final int position,
+                         final int limit,
+                         final int capacity,
+                         final int offset,
+                         final ByteBufferRef ref)
+    {
+        super(ref, ref.getBuffer());
+
+        if (capacity < 0)
+        {
+            throw new IllegalArgumentException("Capacity cannot be negative");
+        }
+
+        if (limit > capacity || limit < 0)
+        {
+            throw new IllegalArgumentException("Limit cannot be greater than capacity or negative");
+        }
+
+        if (position > limit || position < 0)
+        {
+            throw new IllegalArgumentException("Position cannot be greater than limit or negative");
+        }
+
+        if (offset < 0)
+        {
+            throw new IllegalArgumentException("Offset cannot be negative");
+        }
+
+        _capacity = capacity;
+        _position = position;
+        _limit = limit;
+        _offset = offset;
+        _ref.incrementRef();
+    }
+
+    @Override
+    public boolean hasRemaining()
+    {
+        return _position < _limit;
+    }
+
+    @Override
+    public QpidByteBuffer putInt(final int index, final int value)
+    {
+        checkIndexBounds(index, SIZE_INT);
+        _buffer.putInt(_offset + index, value);
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer putShort(final int index, final short value)
+    {
+        checkIndexBounds(index, SIZE_SHORT);
+        _buffer.putShort(_offset + index, value);
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer putChar(final int index, final char value)
+    {
+        checkIndexBounds(index, SIZE_CHAR);
+        _buffer.putChar(_offset + index, value);
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer put(final int index, final byte b)
+    {
+        checkIndexBounds(index, SIZE_BYTE);
+        _buffer.put(_offset + index, b);
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer put(final byte b)
+    {
+        checkOverflow(SIZE_BYTE);
+        put(_position, b);
+        _position++;
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer putDouble(final int index, final double value)
+    {
+        checkIndexBounds(index, SIZE_DOUBLE);
+
+        _buffer.putDouble(_offset + index, value);
+        return this;
+    }
+
+
+    @Override
+    public short getShort(final int index)
+    {
+        checkIndexBounds(index, SIZE_SHORT);
+        return _buffer.getShort(index + _offset);
+    }
+
+    @Override
+    public QpidByteBuffer mark()
+    {
+        _mark = _position;
+        return this;
+    }
+
+    @Override
+    public long getLong()
+    {
+        checkUnderflow(SIZE_LONG);
+
+        long value = getLong(_position);
+        _position += SIZE_LONG;
+        return value;
+    }
+
+    @Override
+    public QpidByteBuffer putFloat(final int index, final float value)
+    {
+        checkIndexBounds(index, SIZE_FLOAT);
+
+        _buffer.putFloat(_offset + index, value);
+        return this;
+    }
+
+    @Override
+    public double getDouble(final int index)
+    {
+        checkIndexBounds(index, SIZE_DOUBLE);
+        return _buffer.getDouble(index + _offset);
+    }
+
+    @Override
+    public boolean hasArray()
+    {
+        return _buffer.hasArray();
+    }
+
+    @Override
+    public double getDouble()
+    {
+        checkUnderflow(SIZE_DOUBLE);
+
+        double value = getDouble(_position);
+        _position += SIZE_DOUBLE;
+        return value;
+    }
+
+    @Override
+    public QpidByteBuffer putFloat(final float value)
+    {
+        checkOverflow(SIZE_FLOAT);
+
+        putFloat(position(), value);
+        _position += SIZE_FLOAT;
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer putInt(final int value)
+    {
+        checkOverflow(SIZE_INT);
+
+        putInt(position(), value);
+        _position += SIZE_INT;
+        return this;
+    }
+
+    @Override
+    public byte[] array()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public QpidByteBuffer putShort(final short value)
+    {
+        checkOverflow(SIZE_SHORT);
+
+        putShort(position(), value);
+        _position += SIZE_SHORT;
+        return this;
+    }
+
+    @Override
+    public int getInt(final int index)
+    {
+        checkIndexBounds(index, SIZE_INT);
+        return _buffer.getInt(index + _offset);
+    }
+
+    @Override
+    public int remaining()
+    {
+        return _limit - _position;
+    }
+
+    @Override
+    public QpidByteBuffer put(final byte[] src)
+    {
+        return put(src, 0, src.length);
+    }
+
+    @Override
+    public QpidByteBuffer put(final ByteBuffer src)
+    {
+        int sourceRemaining = src.remaining();
+        if (sourceRemaining > remaining())
+        {
+            throw new BufferOverflowException();
+        }
+
+        for (int i = 0; i < sourceRemaining; i++)
+        {
+            put(src.get());
+        }
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer put(final QpidByteBuffer src)
+    {
+        if (src == this)
+        {
+            throw new IllegalArgumentException();
+        }
+
+        int sourceRemaining = src.remaining();
+        if (sourceRemaining > remaining())
+        {
+            throw new BufferOverflowException();
+        }
+
+        for (int i = 0; i < sourceRemaining; i++)
+        {
+            put(src.get());
+        }
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer get(final byte[] dst, final int offset, final int length)
+    {
+        checkBounds(dst, offset, length);
+
+        if (length > remaining())
+        {
+            throw new BufferUnderflowException();
+        }
+
+        // TODO consider using a slice of the underlying BB, followed by a bulk method
+        for (int i = offset; i < offset + length; i++)
+        {
+            dst[i] = get();
+        }
+
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer get(final ByteBuffer dst)
+    {
+        int destinationRemaining = dst.remaining();
+        int remaining = remaining();
+        if (destinationRemaining < remaining)
+        {
+            throw new BufferUnderflowException();
+        }
+
+        for (int i = 0; i < remaining; i++)
+        {
+            dst.put(get());
+        }
+        return this;
+    }
+
+    @Override
+    public void copyTo(final ByteBuffer dst)
+    {
+        int destinationRemaining = dst.remaining();
+        int remaining = remaining();
+        if (destinationRemaining < remaining)
+        {
+            throw new BufferUnderflowException();
+        }
+
+        for (int i = 0; i < remaining; i++)
+        {
+            dst.put(get(_position + i));
+        }
+    }
+
+    @Override
+    public void putCopyOf(final QpidByteBuffer source)
+    {
+        int remaining = remaining();
+        int sourceRemaining = source.remaining();
+        if (sourceRemaining > remaining)
+        {
+            throw new BufferOverflowException();
+        }
+
+        for (int i = 0; i < sourceRemaining; i++)
+        {
+            put(source.get(i));
+        }
+    }
+
+    @Override
+    public QpidByteBuffer rewind()
+    {
+        _position = 0;
+        _mark = -1;
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer clear()
+    {
+        _position = 0;
+        _limit = _capacity;
+        _mark = -1;
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer putLong(final int index, final long value)
+    {
+        checkIndexBounds(index, SIZE_LONG);
+
+        _buffer.putLong(_offset + index, value);
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer compact()
+    {
+        int remaining = remaining();
+        if (_position > 0 && _position < _limit)
+        {
+            for (int i = 0; i < remaining; i++)
+            {
+                put(i, get(_position + i));
+            }
+        }
+        _position = remaining;
+        _limit = _capacity;
+        _mark = -1;
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer putDouble(final double value)
+    {
+        checkOverflow(SIZE_DOUBLE);
+
+        putDouble(position(), value);
+        _position += SIZE_DOUBLE;
+        return this;
+    }
+
+    @Override
+    public int limit()
+    {
+        return _limit;
+    }
+
+    @Override
+    public QpidByteBuffer reset()
+    {
+        if (_mark < 0)
+        {
+            throw new InvalidMarkException();
+        }
+        _position = _mark;
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer flip()
+    {
+        _limit = _position;
+        _position = 0;
+        _mark = -1;
+        return this;
+    }
+
+    @Override
+    public short getShort()
+    {
+        checkUnderflow(SIZE_SHORT);
+
+        short value = getShort(_position);
+        _position += SIZE_SHORT;
+        return value;
+    }
+
+    @Override
+    public float getFloat()
+    {
+        checkUnderflow(SIZE_FLOAT);
+
+        float value = getFloat(_position);
+        _position += SIZE_FLOAT;
+        return value;
+    }
+
+    @Override
+    public QpidByteBuffer limit(final int newLimit)
+    {
+        if (newLimit > _capacity || newLimit < 0)
+        {
+            throw new IllegalArgumentException();
+        }
+        _limit = newLimit;
+        if (_position > _limit)
+        {
+            _position = _limit;
+        }
+        if (_mark > _limit)
+        {
+            _mark = -1;
+        }
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer duplicate()
+    {
+        SlicedQpidByteBuffer duplicate = new SlicedQpidByteBuffer(_position, _limit, _capacity, _offset, _ref);
+        duplicate._mark = _mark;
+        return duplicate;
+    }
+
+    @Override
+    public QpidByteBuffer put(final byte[] src, final int offset, final int length)
+    {
+        checkBounds(src, offset, length);
+
+        if (length > remaining())
+        {
+            throw new BufferOverflowException();
+        }
+
+        for (int i = offset; i < offset + length; i++)
+        {
+            put(src[i]);
+        }
+        return this;
+    }
+
+    @Override
+    public long getLong(final int index)
+    {
+        checkIndexBounds(index, SIZE_LONG);
+        return _buffer.getLong(index + _offset);
+    }
+
+    @Override
+    public int capacity()
+    {
+        return _capacity;
+    }
+
+    @Override
+    public char getChar(final int index)
+    {
+        checkIndexBounds(index, SIZE_CHAR);
+        return _buffer.getChar(index + _offset);
+    }
+
+    @Override
+    public byte get()
+    {
+        checkUnderflow(SIZE_BYTE);
+
+        byte value = get(_position);
+        _position += SIZE_BYTE;
+        return value;
+    }
+
+    @Override
+    public byte get(final int index)
+    {
+        checkIndexBounds(index, SIZE_BYTE);
+        return _buffer.get(index + _offset);
+    }
+
+    @Override
+    public QpidByteBuffer get(final byte[] dst)
+    {
+        return get(dst, 0, dst.length);
+    }
+
+    @Override
+    public void copyTo(final byte[] dst)
+    {
+        if (remaining() < dst.length)
+        {
+            throw new BufferUnderflowException();
+        }
+
+        for (int i = 0; i < dst.length; i++)
+        {
+            dst[i] = get(_position + i);
+        }
+    }
+
+    @Override
+    public QpidByteBuffer putChar(final char value)
+    {
+        checkOverflow(SIZE_CHAR);
+
+        putChar(position(), value);
+        _position += SIZE_CHAR;
+        return this;
+    }
+
+    @Override
+    public QpidByteBuffer position(final int newPosition)
+    {
+        if (newPosition > _limit || newPosition < 0)
+        {
+            throw new IllegalArgumentException();
+        }
+        _position = newPosition;
+        if (_mark > _position)
+        {
+            _mark = -1;
+        }
+        return this;
+    }
+
+    @Override
+    public int arrayOffset()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public char getChar()
+    {
+        checkUnderflow(SIZE_CHAR);
+
+        char value = getChar(_position);
+        _position += SIZE_CHAR;
+        return value;
+    }
+
+    @Override
+    public int getInt()
+    {
+        checkUnderflow(SIZE_INT);
+
+        int value = getInt(_position);
+        _position += SIZE_INT;
+        return value;
+    }
+
+    @Override
+    public QpidByteBuffer putLong(final long value)
+    {
+        checkOverflow(SIZE_LONG);
+
+        putLong(position(), value);
+        _position += SIZE_LONG;
+        return this;
+    }
+
+    @Override
+    public float getFloat(final int index)
+    {
+        checkIndexBounds(index, SIZE_FLOAT);
+        return _buffer.getFloat(index + _offset);
+    }
+
+
+    @Override
+    public QpidByteBuffer slice()
+    {
+        return new SlicedQpidByteBuffer(0, remaining(), remaining(), _offset + _position, _ref);
+    }
+
+    @Override
+    public QpidByteBuffer view(final int offset, final int length)
+    {
+        int newCapacity = Math.min(length, remaining() - offset);
+        return new SlicedQpidByteBuffer(0, newCapacity, newCapacity, _offset + _position + offset, _ref);
+    }
+
+    @Override
+    public int position()
+    {
+        return _position;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "SlicedQpidByteBuffer{" +
+               "_capacity=" + _capacity +
+               ", _offset=" + _offset +
+               ", _mark=" + _mark +
+               ", _position=" + _position +
+               ", _limit=" + _limit +
+               '}';
+    }
+
+    @Override
+    ByteBuffer getUnderlyingBuffer()
+    {
+        ByteBuffer buffer = _buffer.duplicate();
+        buffer.position(_offset);
+        buffer.limit(_offset + _capacity);
+
+        buffer = buffer.slice();
+        buffer.position(_position);
+        buffer.limit(_limit);
+        _lastUnderlyingBuffer = buffer;
+        return buffer;
+    }
+
+    @Override
+    void updateFromLastUnderlying()
+    {
+        if (_lastUnderlyingBuffer == null)
+        {
+            throw new IllegalStateException("No last underlying ByteBuffer recorded for " + this);
+        }
+        _position = _lastUnderlyingBuffer.position();
+        _limit = _lastUnderlyingBuffer.limit();
+        _lastUnderlyingBuffer = null;
+    }
+
+    void clearLastUnderlyingBuffer()
+    {
+        _lastUnderlyingBuffer = null;
+    }
+
+    private void checkBounds(final byte[] array, final int offset, final int length)
+    {
+        if (offset < 0 || (offset > 0 && offset > array.length - 1) || length < 0 || length > array.length)
+        {
+            throw new IndexOutOfBoundsException();
+        }
+    }
+
+    private void checkIndexBounds(int index, int size)
+    {
+        if (index < 0 || size > _limit - index)
+        {
+            throw new IndexOutOfBoundsException();
+        }
+    }
+
+    private void checkOverflow(final int size)
+    {
+        if (_limit - _position < size)
+        {
+            throw new BufferOverflowException();
+        }
+    }
+
+    private void checkUnderflow(final int size)
+    {
+        if (_limit - _position < size)
+        {
+            throw new BufferUnderflowException();
+        }
+    }
+
+}

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java?rev=1762812&r1=1762811&r2=1762812&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java Thu Sep 29 16:36:36 2016
@@ -136,8 +136,8 @@ public class ConnectionStartBody extends
     public static void process(final QpidByteBuffer in, final ClientMethodProcessor dispatcher)
             throws AMQFrameDecodingException
     {
-        short versionMajor = (short) in.getUnsignedByte();
-        short versionMinor = (short) in.getUnsignedByte();
+        short versionMajor = in.getUnsignedByte();
+        short versionMinor = in.getUnsignedByte();
         FieldTable serverProperties = EncodingUtils.readFieldTable(in);
         byte[] mechanisms = EncodingUtils.readBytes(in);
         byte[] locales = EncodingUtils.readBytes(in);

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java?rev=1762812&r1=1762811&r2=1762812&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java Thu Sep 29 16:36:36 2016
@@ -320,7 +320,7 @@ public class EncodingUtils
 
     public static long readLongAsShortString(QpidByteBuffer buffer)
     {
-        short length = (short) buffer.getUnsignedByte();
+        short length = buffer.getUnsignedByte();
         short pos = 0;
         if (length == 0)
         {

Modified: qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferTest.java?rev=1762812&r1=1762811&r2=1762812&view=diff
==============================================================================
--- qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferTest.java (original)
+++ qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferTest.java Thu Sep 29 16:36:36 2016
@@ -150,6 +150,67 @@ public class QpidByteBufferTest extends
         }
     }
 
+    public void testSlice() throws Exception
+    {
+        QpidByteBuffer directBuffer = QpidByteBuffer.allocate(true, 6);
+        directBuffer.position(2);
+        directBuffer.limit(5);
+        QpidByteBuffer directSlice = directBuffer.slice();
+
+        assertTrue("Direct slice should be direct too", directSlice.isDirect());
+        assertTrue("Direct slice should be special", directSlice instanceof SlicedQpidByteBuffer);
+        assertEquals("Unexpected capacity", 3, directSlice.capacity());
+        assertEquals("Unexpected limit", 3, directSlice.limit());
+        assertEquals("Unexpected position", 0, directSlice.position());
+
+        directBuffer.dispose();
+        directSlice.dispose();
+
+        final QpidByteBuffer heapBuffer = QpidByteBuffer.allocate(false, 6);
+        final QpidByteBuffer heapSlice = heapBuffer.slice();
+        assertFalse("Heap slice should not be special", heapSlice instanceof SlicedQpidByteBuffer);
+        heapBuffer.dispose();
+        heapSlice.dispose();
+    }
+
+    public void testView() throws Exception
+    {
+        doTestView(true);
+        doTestView(false);
+    }
+
+    private void doTestView(final boolean direct)
+    {
+        byte[] content = "ABCDEF".getBytes();
+        QpidByteBuffer buffer = QpidByteBuffer.allocate(direct, content.length);
+        buffer.put(content);
+        buffer.position(2);
+        buffer.limit(5);
+
+        QpidByteBuffer view = buffer.view(0, buffer.remaining());
+
+        assertEquals("Unexpected view direct", direct, view.isDirect());
+
+        assertEquals("Unexpected capacity", 3, view.capacity());
+        assertEquals("Unexpected limit", 3, view.limit());
+        assertEquals("Unexpected position", 0, view.position());
+
+        byte[] destination = new byte[view.remaining()];
+        view.get(destination);
+
+        Assert.assertArrayEquals("CDE".getBytes(), destination);
+
+        QpidByteBuffer viewWithOffset = buffer.view(1, 1);
+        destination = new byte[viewWithOffset.remaining()];
+        viewWithOffset.get(destination);
+
+        Assert.assertArrayEquals("D".getBytes(), destination);
+
+        buffer.dispose();
+        view.dispose();
+        viewWithOffset.dispose();
+    }
+
     private void doDeflateInflate(byte[] input,
                                   Collection<QpidByteBuffer> inputBufs,
                                   boolean direct) throws IOException



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org