You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/07/28 17:48:31 UTC

svn commit: r1693123 [2/2] - in /qpid/java/trunk: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgra...

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java Tue Jul 28 15:48:30 2015
@@ -28,7 +28,9 @@ import java.nio.ByteBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.codec.MarkableDataInput;
 import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.ByteBufferDataOutput;
 import org.apache.qpid.util.BytesDataOutput;
 
 public class BasicContentHeaderProperties
@@ -84,7 +86,7 @@ public class BasicContentHeaderPropertie
     private static final int USER_ID_MASK = 1 << 4;
     private static final int APPLICATION_ID_MASK = 1 << 3;
     private static final int CLUSTER_ID_MASK = 1 << 2;
-    private byte[] _encodedForm;
+    private ByteBuffer _encodedForm;
 
 
     public BasicContentHeaderProperties(BasicContentHeaderProperties other)
@@ -134,7 +136,7 @@ public class BasicContentHeaderPropertie
     {
         if(useEncodedForm())
         {
-            return _encodedForm.length;
+            return _encodedForm.remaining();
         }
         else
         {
@@ -235,7 +237,21 @@ public class BasicContentHeaderPropertie
     {
         if(useEncodedForm())
         {
-            buffer.write(_encodedForm);
+            int offset;
+            int length = _encodedForm.remaining();;
+            byte[] array;
+            if(_encodedForm.hasArray())
+            {
+                array = _encodedForm.array();
+                offset = _encodedForm.arrayOffset() + _encodedForm.position();
+            }
+            else
+            {
+                array = new byte[length];
+                _encodedForm.duplicate().get(array);
+                offset = 0;
+            }
+            buffer.write(array, offset, length);
         }
         else
         {
@@ -318,7 +334,7 @@ public class BasicContentHeaderPropertie
         }
     }
 
-    public int read(DataInput input) throws IOException
+    public int read(MarkableDataInput input) throws IOException
     {
 
         _propertyFlags = input.readUnsignedShort();
@@ -347,7 +363,7 @@ public class BasicContentHeaderPropertie
         {
             int fieldTableLength = input.readInt();
 
-            _headers = new FieldTable(input, fieldTableLength);
+            _headers = new FieldTable(input.readAsByteBuffer(fieldTableLength));
 
             length += 4;
             length += fieldTableLength;
@@ -460,22 +476,23 @@ public class BasicContentHeaderPropertie
     {
         if(useEncodedForm())
         {
-            sender.send(ByteBuffer.wrap(_encodedForm));
-            return _encodedForm.length;
+            sender.send(_encodedForm.duplicate());
+            return _encodedForm.remaining();
         }
         else
         {
             int propertyListSize = getPropertyListSize();
-            byte[] data = new byte[propertyListSize];
-            BytesDataOutput out = new BytesDataOutput(data);
+            ByteBuffer buf = ByteBuffer.allocateDirect(propertyListSize);
+            ByteBufferDataOutput out = new ByteBufferDataOutput(buf);
             writePropertyListPayload(out);
-            sender.send(ByteBuffer.wrap(data));
+            buf.flip();
+            sender.send(buf);
             return propertyListSize;
         }
 
     }
 
-    public void populatePropertiesFromBuffer(DataInput buffer, int propertyFlags, int size) throws AMQFrameDecodingException, IOException
+    public void populatePropertiesFromBuffer(MarkableDataInput buffer, int propertyFlags, int size) throws AMQFrameDecodingException, IOException
     {
         _propertyFlags = propertyFlags;
 
@@ -484,16 +501,15 @@ public class BasicContentHeaderPropertie
             _logger.debug("Property flags: " + _propertyFlags);
         }
 
-        _encodedForm = new byte[size];
-        buffer.readFully(_encodedForm);
+        _encodedForm = buffer.readAsByteBuffer(size);
 
-        ByteArrayDataInput input = new ByteArrayDataInput(_encodedForm);
+        ByteBufferDataInput input = new ByteBufferDataInput(_encodedForm);
 
         decode(input);
 
     }
 
-    private void decode(ByteArrayDataInput buffer) throws IOException, AMQFrameDecodingException
+    private void decode(MarkableDataInput buffer) throws IOException, AMQFrameDecodingException
     {
         int headersOffset = 0;
 
@@ -513,7 +529,11 @@ public class BasicContentHeaderPropertie
         {
             long length = EncodingUtils.readUnsignedInteger(buffer);
 
-            _headers = new FieldTable(_encodedForm, headersOffset+4, (int)length);
+            ByteBuffer buf = _encodedForm.slice();
+            buf.position(headersOffset+4);
+            buf = buf.slice();
+            buf.limit((int)length);
+            _headers = new FieldTable(buf);
 
             buffer.skipBytes((int)length);
         }

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java Tue Jul 28 15:48:30 2015
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.framing;
 
+import java.nio.ByteBuffer;
+
 import org.apache.qpid.codec.MarkableDataInput;
 
 public class ByteArrayDataInput implements ExtendedDataInput, MarkableDataInput
@@ -164,6 +166,14 @@ public class ByteArrayDataInput implemen
         return b.length;
     }
 
+    @Override
+    public ByteBuffer readAsByteBuffer(final int len)
+    {
+        byte[] data = new byte[len];
+        readFully(data);
+        return ByteBuffer.wrap(data);
+    }
+
     public int position()
     {
         return _offset - _origin;

Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java?rev=1693123&view=auto
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java (added)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java Tue Jul 28 15:48:30 2015
@@ -0,0 +1,169 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.codec.MarkableDataInput;
+
+public class ByteBufferDataInput implements ExtendedDataInput, MarkableDataInput
+{
+    private final ByteBuffer _underlying;
+    private int _mark;
+
+    public ByteBufferDataInput(ByteBuffer underlying)
+    {
+        _underlying = underlying.slice();
+    }
+
+    public void readFully(byte[] b)
+    {
+        _underlying.get(b);
+    }
+
+    public void readFully(byte[] b, int off, int len)
+    {
+        _underlying.get(b,0, len);
+    }
+
+    public ByteBuffer readAsByteBuffer(int len)
+    {
+        ByteBuffer buf = _underlying.slice();
+        buf.limit(len);
+        skipBytes(len);
+        return buf;
+    }
+
+    public int skipBytes(int n)
+    {
+        _underlying.position(_underlying.position()+n);
+        return _underlying.position();
+    }
+
+    public boolean readBoolean()
+    {
+        return _underlying.get() != 0;
+    }
+
+    public byte readByte()
+    {
+        return _underlying.get();
+    }
+
+    public int readUnsignedByte()
+    {
+        return ((int)_underlying.get()) & 0xFF;
+    }
+
+    public short readShort()
+    {
+        return _underlying.getShort();
+    }
+
+    public int readUnsignedShort()
+    {
+        return ((int)_underlying.getShort()) & 0xffff;
+    }
+
+    public char readChar()
+    {
+        return (char) _underlying.getChar();
+    }
+
+    public int readInt()
+    {
+        return _underlying.getInt();
+    }
+
+    public long readLong()
+    {
+        return _underlying.getLong();
+    }
+
+    public float readFloat()
+    {
+        return _underlying.getFloat();
+    }
+
+    public double readDouble()
+    {
+        return _underlying.getDouble();
+    }
+
+    public AMQShortString readAMQShortString()
+    {
+        return AMQShortString.readAMQShortString(_underlying);
+    }
+
+    public String readLine()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public String readUTF()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public int available()
+    {
+        return _underlying.remaining();
+    }
+
+
+    public long skip(long i)
+    {
+        _underlying.position(_underlying.position()+(int)i);
+        return i;
+    }
+
+    public int read(byte[] b)
+    {
+        readFully(b);
+        return b.length;
+    }
+
+    public int position()
+    {
+        return _underlying.position();
+    }
+
+    public void position(int position)
+    {
+        _underlying.position(position);
+    }
+
+    public int length()
+    {
+        return _underlying.limit();
+    }
+
+
+    public void mark(int readAhead)
+    {
+        _mark = position();
+    }
+
+    public void reset()
+    {
+        _underlying.position(_mark);
+    }
+}

Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java?rev=1693123&view=auto
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java (added)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java Tue Jul 28 15:48:30 2015
@@ -0,0 +1,303 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.qpid.codec.MarkableDataInput;
+
+public class ByteBufferListDataInput implements ExtendedDataInput, MarkableDataInput
+{
+    private final List<ByteBuffer> _underlying;
+    private int _bufferIndex;
+    private int _mark;
+
+    public ByteBufferListDataInput(List<ByteBuffer> underlying)
+    {
+        _underlying = underlying;
+    }
+
+    public void readFully(byte[] b)
+    {
+        final ByteBuffer currentBuffer = getCurrentBuffer();
+        if(currentBuffer.remaining()>b.length)
+        {
+            currentBuffer.get(b);
+        }
+        else
+        {
+            ByteBuffer buf = readAsByteBuffer(b.length);
+            buf.get(b);
+        }
+    }
+
+    public void readFully(byte[] b, int off, int len)
+    {
+        final ByteBuffer currentBuffer = getCurrentBuffer();
+        if(currentBuffer.remaining()>len)
+        {
+            currentBuffer.get(b, off, len);
+        }
+        else
+        {
+            ByteBuffer buf = readAsByteBuffer(len);
+            buf.get(b, off, len);
+        }
+    }
+
+    @Override
+    public ByteBuffer readAsByteBuffer(int len)
+    {
+        ByteBuffer currentBuffer = getCurrentBuffer();
+        if(currentBuffer.remaining()>=len)
+        {
+            ByteBuffer buf = currentBuffer.slice();
+            buf.limit(len);
+            currentBuffer.position(currentBuffer.position()+len);
+            return buf;
+        }
+        else
+        {
+            ByteBuffer dest = currentBuffer.isDirect() ? ByteBuffer.allocateDirect(len) : ByteBuffer.allocate(len);
+            while(dest.hasRemaining() && available()>0)
+            {
+                advanceIfNecessary();
+                currentBuffer = getCurrentBuffer();
+                final int remaining = dest.remaining();
+                if(currentBuffer.remaining()>= remaining)
+                {
+                    ByteBuffer buf = currentBuffer.slice();
+                    buf.limit(remaining);
+                    currentBuffer.position(currentBuffer.position()+remaining);
+                    dest.put(buf);
+                }
+                else
+                {
+                    dest.put(currentBuffer);
+                }
+            }
+
+            dest.flip();
+            return dest;
+        }
+    }
+
+    public int skipBytes(int n)
+    {
+        final ByteBuffer currentBuffer = getCurrentBuffer();
+        if(currentBuffer.remaining()>n)
+        {
+            currentBuffer.position(currentBuffer.position()+n);
+        }
+        else
+        {
+            n -= currentBuffer.remaining();
+            currentBuffer.position(currentBuffer.limit());
+            if(_bufferIndex != _underlying.size()-1)
+            {
+                _bufferIndex++;
+                skipBytes(n);
+            }
+        }
+        return position();
+    }
+
+    private ByteBuffer getCurrentBuffer()
+    {
+        return _underlying.get(_bufferIndex);
+    }
+
+    public boolean readBoolean()
+    {
+        advanceIfNecessary();
+        return getCurrentBuffer().get() != 0;
+    }
+
+    private void advanceIfNecessary()
+    {
+        while(!getCurrentBuffer().hasRemaining() && _bufferIndex != _underlying.size()-1)
+        {
+            _bufferIndex++;
+        }
+    }
+
+    public byte readByte()
+    {
+        advanceIfNecessary();
+        return getCurrentBuffer().get();
+    }
+
+    public int readUnsignedByte()
+    {
+        advanceIfNecessary();
+        return ((int)getCurrentBuffer().get()) & 0xFF;
+    }
+
+    public short readShort()
+    {
+        return getBuffer(2).getShort();
+    }
+
+    private ByteBuffer getBuffer(int size)
+    {
+        advanceIfNecessary();
+        final ByteBuffer currentBuffer = getCurrentBuffer();
+        if(currentBuffer.remaining()>= size)
+        {
+            return currentBuffer;
+        }
+        else
+        {
+            return readAsByteBuffer(size);
+        }
+    }
+
+    public int readUnsignedShort()
+    {
+        return ((int)getBuffer(2).getShort()) & 0xffff;
+    }
+
+    public char readChar()
+    {
+        return (char) getBuffer(2).getChar();
+    }
+
+    public int readInt()
+    {
+        return getBuffer(4).getInt();
+    }
+
+    public long readLong()
+    {
+        return getBuffer(8).getLong();
+    }
+
+    public float readFloat()
+    {
+        return getBuffer(4).getFloat();
+    }
+
+    public double readDouble()
+    {
+        return getBuffer(8).getDouble();
+    }
+
+    public AMQShortString readAMQShortString()
+    {
+        advanceIfNecessary();
+        final ByteBuffer currentBuffer = getCurrentBuffer();
+        int size = ((int) currentBuffer.get(currentBuffer.position())) & 0xff;
+        return AMQShortString.readAMQShortString(getBuffer(size + 1));
+    }
+
+    public String readLine()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public String readUTF()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public int available()
+    {
+        int remaining = 0;
+        for(int i = _bufferIndex; i < _underlying.size(); i++)
+        {
+            remaining += _underlying.get(i).remaining();
+        }
+        return remaining;
+    }
+
+
+    public long skip(long i)
+    {
+        skipBytes((int)i);
+        return i;
+    }
+
+    public int read(byte[] b)
+    {
+        readFully(b);
+        return b.length;
+    }
+
+    public int position()
+    {
+        int position = 0;
+        for(int i = 0; i < _bufferIndex; i++)
+        {
+            position += _underlying.get(i).limit();
+        }
+        position += getCurrentBuffer().position();
+        return position;
+    }
+
+    public void position(int position)
+    {
+        int offset = 0;
+        boolean beforePos = true;
+        for(int i = 0; i < _underlying.size(); i++)
+        {
+            final ByteBuffer buffer = _underlying.get(i);
+            if(beforePos)
+            {
+                if (position - offset <= buffer.limit())
+                {
+                    buffer.position(position - offset);
+                    _bufferIndex = i;
+                    beforePos = false;
+                }
+                else
+                {
+                    offset += buffer.limit();
+                }
+            }
+            else
+            {
+                buffer.position(0);
+            }
+        }
+    }
+
+    public int length()
+    {
+        int length = 0;
+        for(ByteBuffer buf : _underlying)
+        {
+            length+= buf.limit();
+        }
+        return length;
+    }
+
+
+    public void mark(int readAhead)
+    {
+        _mark = position();
+    }
+
+    public void reset()
+    {
+        position(_mark);
+    }
+}

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java Tue Jul 28 15:48:30 2015
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.framing;
 
+import java.nio.ByteBuffer;
+
 public interface ChannelMethodProcessor
 {
     void receiveChannelFlow(boolean active);
@@ -30,7 +32,7 @@ public interface ChannelMethodProcessor
 
     void receiveChannelCloseOk();
 
-    void receiveMessageContent(byte[] data);
+    void receiveMessageContent(ByteBuffer data);
 
     void receiveMessageHeader(BasicContentHeaderProperties properties, long bodySize);
 

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java Tue Jul 28 15:48:30 2015
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.framing;
 
-import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -34,20 +33,14 @@ public class ContentBody implements AMQB
 {
     public static final byte TYPE = 3;
 
-    private byte[] _payload;
+    private ByteBuffer _payload;
 
     public ContentBody()
     {
     }
 
-    public ContentBody(DataInput buffer, long size) throws AMQFrameDecodingException, IOException
-    {
-        _payload = new byte[(int)size];
-        buffer.readFully(getPayload());
-    }
 
-
-    public ContentBody(byte[] payload)
+    public ContentBody(ByteBuffer payload)
     {
         _payload = payload;
     }
@@ -59,12 +52,14 @@ public class ContentBody implements AMQB
 
     public int getSize()
     {
-        return getPayload() == null ? 0 : getPayload().length;
+        return _payload == null ? 0 : _payload.remaining();
     }
 
     public void writePayload(DataOutput buffer) throws IOException
     {
-        buffer.write(getPayload());
+        byte[] data = new byte[_payload.remaining()];
+        _payload.duplicate().get(data);
+        buffer.write(data);
     }
 
     public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
@@ -78,8 +73,8 @@ public class ContentBody implements AMQB
     {
         if(_payload != null)
         {
-            sender.send(ByteBuffer.wrap(_payload));
-            return _payload.length;
+            sender.send(_payload.duplicate());
+            return _payload.remaining();
         }
         else
         {
@@ -87,7 +82,7 @@ public class ContentBody implements AMQB
         }
     }
 
-    public byte[] getPayload()
+    public ByteBuffer getPayload()
     {
         return _payload;
     }
@@ -97,8 +92,7 @@ public class ContentBody implements AMQB
             throws IOException
     {
 
-        byte[] payload = new byte[(int)bodySize];
-        in.readFully(payload);
+        ByteBuffer payload = in.readAsByteBuffer((int)bodySize);
 
         if(!methodProcessor.ignoreAllButCloseOk())
         {
@@ -106,77 +100,6 @@ public class ContentBody implements AMQB
         }
     }
 
-    private static class BufferContentBody implements AMQBody
-    {
-        private final int _length;
-        private final int _offset;
-        private final ByteBuffer _buf;
-
-        private BufferContentBody( ByteBuffer buf, int offset, int length)
-        {
-            _length = length;
-            _offset = offset;
-            _buf = buf;
-        }
-
-        public byte getFrameType()
-        {
-            return TYPE;
-        }
-
-
-        public int getSize()
-        {
-            return _length;
-        }
-
-        public void writePayload(DataOutput buffer) throws IOException
-        {
-            if(_buf.hasArray())
-            {
-                buffer.write(_buf.array(), _buf.arrayOffset() +  _offset, _length);
-            }
-            else
-            {
-                byte[] data = new byte[_length];
-                ByteBuffer buf = _buf.duplicate();
-
-                buf.position(_offset);
-                buf.limit(_offset+_length);
-                buf.get(data);
-                buffer.write(data);
-            }
-        }
-
-        @Override
-        public long writePayload(final ByteBufferSender sender) throws IOException
-        {
-            if(_buf.hasArray())
-            {
-                sender.send(ByteBuffer.wrap(_buf.array(), _buf.arrayOffset() + _offset, _length));
-            }
-            else
-            {
-                ByteBuffer buf = _buf.duplicate();
-
-                buf.position(_offset);
-                buf.limit(_offset+_length);
-                sender.send(buf);
-            }
-            return _length;
-        }
-
-        public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws QpidException
-        {
-            throw new RuntimeException("Buffered Body only to be used for outgoing data");
-        }
-    }
-
-    public static AMQFrame createAMQFrame(int channelId, ByteBuffer buf, int offset, int length)
-    {
-        return new AMQFrame(channelId, new BufferContentBody(buf, offset, length));
-    }
-
     public static AMQFrame createAMQFrame(int channelId, ContentBody body)
     {
         return new AMQFrame(channelId, body);

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Tue Jul 28 15:48:30 2015
@@ -43,7 +43,7 @@ public class ContentHeaderBody implement
     /** must never be null */
     private BasicContentHeaderProperties _properties;
 
-    public ContentHeaderBody(DataInput buffer, long size) throws AMQFrameDecodingException, IOException
+    public ContentHeaderBody(MarkableDataInput buffer, long size) throws AMQFrameDecodingException, IOException
     {
         buffer.readUnsignedShort();
         buffer.readUnsignedShort();
@@ -80,7 +80,7 @@ public class ContentHeaderBody implement
      * @throws AMQProtocolVersionException if there is a version issue
      * @throws IOException if there is an IO issue
      */
-    public static ContentHeaderBody createFromBuffer(DataInputStream buffer, long size)
+    public static ContentHeaderBody createFromBuffer(MarkableDataInput buffer, long size)
         throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
     {
         ContentHeaderBody body = new ContentHeaderBody(buffer, size);
@@ -105,13 +105,13 @@ public class ContentHeaderBody implement
     @Override
     public long writePayload(final ByteBufferSender sender) throws IOException
     {
-        byte[] data = new byte[14];
-        BytesDataOutput buffer = new BytesDataOutput(data);
-        EncodingUtils.writeUnsignedShort(buffer, CLASS_ID);
-        EncodingUtils.writeUnsignedShort(buffer, 0);
-        buffer.writeLong(_bodySize);
-        EncodingUtils.writeUnsignedShort(buffer, _properties.getPropertyFlags());
-        sender.send(ByteBuffer.wrap(data));
+        ByteBuffer data = ByteBuffer.allocateDirect(14);
+        EncodingUtils.writeUnsignedShort(data, CLASS_ID);
+        EncodingUtils.writeUnsignedShort(data, 0);
+        data.putLong(_bodySize);
+        EncodingUtils.writeUnsignedShort(data, _properties.getPropertyFlags());
+        data.flip();
+        sender.send(data);
         return 14 + _properties.writePropertyListPayload(sender);
     }
 

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java Tue Jul 28 15:48:30 2015
@@ -23,6 +23,7 @@ package org.apache.qpid.framing;
 import java.io.DataInput;
 import java.io.IOException;
 
+import org.apache.qpid.codec.MarkableDataInput;
 import org.apache.qpid.protocol.AMQConstant;
 
 public class ContentHeaderPropertiesFactory
@@ -39,7 +40,7 @@ public class ContentHeaderPropertiesFact
     }
 
     public BasicContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags,
-                                                                 DataInput buffer, int size)
+                                                                 MarkableDataInput buffer, int size)
              throws AMQFrameDecodingException, IOException
     {
         BasicContentHeaderProperties properties;

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java Tue Jul 28 15:48:30 2015
@@ -30,6 +30,8 @@ import java.nio.charset.StandardCharsets
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.codec.MarkableDataInput;
+
 public class EncodingUtils
 {
     private static final Logger _logger = LoggerFactory.getLogger(EncodingUtils.class);
@@ -278,6 +280,22 @@ public class EncodingUtils
         }
     }
 
+    public static void writeUnsignedShort(ByteBuffer buffer, int s) throws IOException
+    {
+        // TODO: Is this comparison safe? Do I need to cast RHS to long?
+        if (s < Short.MAX_VALUE)
+        {
+            buffer.putShort((short) s);
+        }
+        else
+        {
+            short sv = (short) s;
+            buffer.put((byte) (0xFF & (sv >> 8)));
+            buffer.put((byte) (0xFF & sv));
+        }
+    }
+
+
     public static int unsignedIntegerLength()
     {
         return 4;
@@ -303,6 +321,27 @@ public class EncodingUtils
         }
     }
 
+    public static void writeUnsignedInteger(ByteBuffer buffer, long l) throws IOException
+    {
+        // TODO: Is this comparison safe? Do I need to cast RHS to long?
+        if (l < Integer.MAX_VALUE)
+        {
+            buffer.putInt((int) l);
+        }
+        else
+        {
+            int iv = (int) l;
+
+            // FIXME: This *may* go faster if we build this into a local 4-byte array and then
+            // put the array in a single call.
+            buffer.put((byte) (0xFF & (iv >> 24)));
+            buffer.put((byte) (0xFF & (iv >> 16)));
+            buffer.put((byte) (0xFF & (iv >> 8)));
+            buffer.put((byte) (0xFF & iv));
+        }
+    }
+
+
     public static void writeFieldTableBytes(DataOutput buffer, FieldTable table) throws IOException
     {
         if (table != null)
@@ -579,7 +618,7 @@ public class EncodingUtils
         return result;
     }
 
-    public static FieldTable readFieldTable(DataInput buffer) throws AMQFrameDecodingException, IOException
+    public static FieldTable readFieldTable(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
     {
         long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL;
         if (length == 0)
@@ -588,10 +627,11 @@ public class EncodingUtils
         }
         else
         {
-            return FieldTableFactory.newFieldTable(buffer, length);
+            return new FieldTable(buffer.readAsByteBuffer((int)length));
         }
     }
 
+
     public static AMQShortString readAMQShortString(DataInput buffer) throws IOException
     {
         return AMQShortString.readFromBuffer(buffer);

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java Tue Jul 28 15:48:30 2015
@@ -21,11 +21,11 @@
 package org.apache.qpid.framing;
 
 import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
@@ -49,8 +49,7 @@ public class FieldTable
     private static final String STRICT_AMQP_NAME = "STRICT_AMQP";
     private static final boolean STRICT_AMQP = Boolean.valueOf(System.getProperty(STRICT_AMQP_NAME, "false"));
 
-    private byte[] _encodedForm;
-    private int _encodedFormOffset;
+    private ByteBuffer _encodedForm;
     private LinkedHashMap<AMQShortString, AMQTypedValue> _properties = null;
     private long _encodedSize;
     private static final int INITIAL_HASHMAP_CAPACITY = 16;
@@ -69,30 +68,18 @@ public class FieldTable
         _strictAMQP = strictAMQP;
     }
 
-    /**
-     * Construct a new field table.
-     *
-     * @param buffer the buffer from which to read data. The length byte must be read already
-     * @param length the length of the field table. Must be great than 0.
-     * @throws IOException if there is an issue reading the buffer
-     */
-    public FieldTable(DataInput buffer, long length) throws IOException
+    public FieldTable(byte[] encodedForm, int offset, int length)
     {
-        this();
-        _encodedForm = new byte[(int) length];
-        buffer.readFully(_encodedForm);
-        _encodedSize = length;
+        this(ByteBuffer.wrap(encodedForm,offset,length));
     }
 
-    public FieldTable(byte[] encodedForm, int offset, int length)
+    public FieldTable(ByteBuffer buffer)
     {
         this();
-        _encodedForm = encodedForm;
-        _encodedFormOffset = offset;
-        _encodedSize = length;
+        _encodedForm = buffer;
+        _encodedSize = buffer.remaining();
     }
 
-
     public boolean isClean()
     {
         return _encodedForm != null;
@@ -858,14 +845,10 @@ public class FieldTable
             }
 
         }
-        else if(_encodedFormOffset == 0 && _encodedSize == _encodedForm.length)
-        {
-            return _encodedForm.clone();
-        }
         else
         {
-            byte[] encodedCopy = new byte[(int) _encodedSize];
-            System.arraycopy(_encodedForm,_encodedFormOffset,encodedCopy,0,(int)_encodedSize);
+            byte[] encodedCopy = new byte[_encodedForm.remaining()];
+            _encodedForm.duplicate().get(encodedCopy);
             return encodedCopy;
         }
 
@@ -1077,10 +1060,12 @@ public class FieldTable
 
     private void putDataInBuffer(DataOutput buffer) throws IOException
     {
-
         if (_encodedForm != null)
         {
-            buffer.write(_encodedForm,_encodedFormOffset,(int)_encodedSize);
+            byte[] encodedCopy = new byte[_encodedForm.remaining()];
+            _encodedForm.duplicate().get(encodedCopy);
+
+            buffer.write(encodedCopy);
         }
         else if (_properties != null)
         {
@@ -1109,7 +1094,7 @@ public class FieldTable
     private void setFromBuffer() throws AMQFrameDecodingException, IOException
     {
 
-        ByteArrayDataInput baid = new ByteArrayDataInput(_encodedForm, _encodedFormOffset, (int)_encodedSize);
+        ByteBufferDataInput dataInput = new ByteBufferDataInput(_encodedForm.duplicate());
 
         if (_encodedSize > 0)
         {
@@ -1120,12 +1105,12 @@ public class FieldTable
             do
             {
 
-                final AMQShortString key = baid.readAMQShortString();
-                AMQTypedValue value = AMQTypedValue.readFromBuffer(baid);
+                final AMQShortString key = dataInput.readAMQShortString();
+                AMQTypedValue value = AMQTypedValue.readFromBuffer(dataInput);
                 _properties.put(key, value);
 
             }
-            while (baid.available() > 0);
+            while (dataInput.available() > 0);
 
         }
 

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java Tue Jul 28 15:48:30 2015
@@ -34,10 +34,5 @@ public class FieldTableFactory
         return new FieldTable();
     }
 
-    public static FieldTable newFieldTable(DataInput byteBuffer, long length) throws AMQFrameDecodingException, IOException
-    {
-        return new FieldTable(byteBuffer, length);
-    }
-
 
 }

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java Tue Jul 28 15:48:30 2015
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.framing;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -603,7 +604,7 @@ public class FrameCreatingMethodProcesso
         }
 
         @Override
-        public void receiveMessageContent(final byte[] data)
+        public void receiveMessageContent(ByteBuffer data)
         {
             _processedMethods.add(new AMQFrame(_channelId, new ContentBody(data)));
         }

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java Tue Jul 28 15:48:30 2015
@@ -227,7 +227,7 @@ public class Connection extends Connecti
             securityLayer = SecurityLayerFactory.newInstance(getConnectionSettings());
 
             IoNetworkTransport transport = new IoNetworkTransport();
-            final InputHandler inputHandler = new InputHandler(new Assembler(this));
+            final InputHandler inputHandler = new InputHandler(new Assembler(this), false);
             addFrameSizeObserver(inputHandler);
             ExceptionHandlingByteBufferReceiver secureReceiver = securityLayer.receiver(inputHandler);
             if(secureReceiver instanceof ConnectionListener)

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java Tue Jul 28 15:48:30 2015
@@ -93,9 +93,9 @@ public final class ProtocolHeader implem
         return false;
     }
 
-    public ByteBuffer toByteBuffer()
+    public ByteBuffer toByteBuffer(final boolean useDirect)
     {
-        ByteBuffer buf = ByteBuffer.allocate(8);
+        ByteBuffer buf = useDirect ? ByteBuffer.allocateDirect(8) : ByteBuffer.allocate(8);
         buf.put(AMQP);
         buf.put(protoClass);
         buf.put(instance);

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Assembler.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Assembler.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Assembler.java Tue Jul 28 15:48:30 2015
@@ -155,7 +155,7 @@ public class Assembler implements Networ
                 {
                     size += f.getSize();
                 }
-                segment = ByteBuffer.allocate(size);
+                segment = allocateByteBuffer(size);
                 for (Frame f : frames)
                 {
                     segment.put(f.getBody());
@@ -167,6 +167,11 @@ public class Assembler implements Networ
 
     }
 
+    protected ByteBuffer allocateByteBuffer(final int size)
+    {
+        return ByteBuffer.allocate(size);
+    }
+
     private void assemble(Frame frame, ByteBuffer segment)
     {
         BBDecoder dec = _decoder.get();

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java Tue Jul 28 15:48:30 2015
@@ -159,7 +159,7 @@ public final class Disassembler implemen
     {
         synchronized (sendlock)
         {
-            sender.send(header.toByteBuffer());
+            sender.send(header.toByteBuffer(false));
             sender.flush();
         }
     }

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java Tue Jul 28 15:48:30 2015
@@ -29,6 +29,9 @@ import static org.apache.qpid.transport.
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.qpid.transport.Constant;
 import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
 import org.apache.qpid.transport.FrameSizeObserver;
@@ -46,18 +49,21 @@ import org.apache.qpid.transport.Segment
 
 public class InputHandler implements ExceptionHandlingByteBufferReceiver, FrameSizeObserver
 {
+    private static final Logger LOGGER = LoggerFactory.getLogger(InputHandler.class);
 
     private int _maxFrameSize = Constant.MIN_MAX_FRAME_SIZE;
 
+
     public enum State
     {
         PROTO_HDR,
         FRAME_HDR,
         FRAME_BODY,
-        ERROR
+        ERROR;
     }
-
     private final NetworkEventReceiver receiver;
+
+    private final boolean _useDirect;
     private State state;
     private ByteBuffer input = null;
     private int needed;
@@ -67,27 +73,24 @@ public class InputHandler implements Exc
     private byte track;
     private int channel;
 
-    public InputHandler(NetworkEventReceiver receiver, State state)
+
+    public InputHandler(NetworkEventReceiver receiver, final boolean useDirect)
     {
         this.receiver = receiver;
-        this.state = state;
+        this.state = PROTO_HDR;
+        _useDirect = useDirect;
 
         switch (state)
         {
-        case PROTO_HDR:
-            needed = 8;
-            break;
-        case FRAME_HDR:
-            needed = Frame.HEADER_SIZE;
-            break;
+            case PROTO_HDR:
+                needed = 8;
+                break;
+            case FRAME_HDR:
+                needed = Frame.HEADER_SIZE;
+                break;
         }
     }
 
-    public InputHandler(NetworkEventReceiver receiver)
-    {
-        this(receiver, PROTO_HDR);
-    }
-
     public void setMaxFrameSize(final int maxFrameSize)
     {
         _maxFrameSize = maxFrameSize;
@@ -98,6 +101,7 @@ public class InputHandler implements Exc
         receiver.received(new ProtocolError(Frame.L1, fmt, args));
     }
 
+    @Override
     public void received(ByteBuffer buf)
     {
         int limit = buf.limit();
@@ -132,7 +136,7 @@ public class InputHandler implements Exc
             {
                 if (input == null)
                 {
-                    input = ByteBuffer.allocate(needed);
+                    input = _useDirect ? ByteBuffer.allocateDirect(needed) : ByteBuffer.allocate(needed);
                 }
                 input.put(buf);
                 needed -= remaining;
@@ -185,7 +189,7 @@ public class InputHandler implements Exc
             channel = (0xFFFF & input.getShort(pos + 6));
             if (size == 0)
             {
-                Frame frame = new Frame(flags, type, track, channel, ByteBuffer.allocate(0));
+                Frame frame = new Frame(flags, type, track, channel, _useDirect ? ByteBuffer.allocateDirect(0) : ByteBuffer.allocate(0));
                 receiver.received(frame);
                 needed = Frame.HEADER_SIZE;
                 return FRAME_HDR;

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/util/Functions.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/util/Functions.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/util/Functions.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/util/Functions.java Tue Jul 28 15:48:30 2015
@@ -109,6 +109,10 @@ public final class Functions
     {
         return hex(bytes, limit, "");
     }
+    public static String hex(ByteBuffer bytes, int limit)
+    {
+        return hex(bytes, limit, "");
+    }
 
     public static String hex(byte[] bytes, int limit, CharSequence separator)
     {
@@ -127,6 +131,26 @@ public final class Functions
         {
             sb.append("...");
         }
+        return sb.toString();
+    }
+
+    public static String hex(ByteBuffer bytes, int limit, CharSequence separator)
+    {
+        limit = Math.min(limit, bytes == null ? 0 : bytes.remaining());
+        StringBuilder sb = new StringBuilder(3 + limit*2);
+        for(int i = 0; i < limit; i++)
+        {
+            sb.append(HEX_CHARACTERS[(((int)(bytes.get(bytes.position()+i))) & 0xf0)>>4]);
+            sb.append(HEX_CHARACTERS[(((int)bytes.get(bytes.position()+i)) & 0x0f)]);
+            if(i != bytes.remaining() - 1)
+            {
+                sb.append(separator);
+            }
+        }
+        if(bytes != null && bytes.remaining()>limit)
+        {
+            sb.append("...");
+        }
         return sb.toString();
     }
 

Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferDataOutput.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferDataOutput.java?rev=1693123&view=auto
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferDataOutput.java (added)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferDataOutput.java Tue Jul 28 15:48:30 2015
@@ -0,0 +1,145 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+
+public class ByteBufferDataOutput implements DataOutput
+{
+    private final ByteBuffer _buf;
+
+    public ByteBufferDataOutput(ByteBuffer buf)
+    {
+        _buf = buf;
+    }
+
+    public void write(int b)
+    {
+        _buf.put((byte)b);
+    }
+
+    public void write(byte[] b)
+    {
+       _buf.put(b);
+    }
+
+
+    public void write(byte[] b, int off, int len)
+    {
+        _buf.put(b, off, len);
+
+    }
+
+    public void writeBoolean(boolean v)
+    {
+        _buf.put( v ? (byte) 1 : (byte) 0);
+    }
+
+    public void writeByte(int v)
+    {
+        _buf.put((byte) v);
+    }
+
+    public void writeShort(int v)
+    {
+        _buf.putShort((short)v);
+    }
+
+    public void writeChar(int v)
+    {
+        _buf.put((byte) (v >>> 8));
+        _buf.put((byte) v);
+    }
+
+    public void writeInt(int v)
+    {
+        _buf.putInt(v);
+    }
+
+    public void writeLong(long v)
+    {
+        _buf.putLong(v);
+    }
+
+    public void writeFloat(float v)
+    {
+        writeInt(Float.floatToIntBits(v));
+    }
+
+    public void writeDouble(double v)
+    {
+        writeLong(Double.doubleToLongBits(v));
+    }
+
+    public void writeBytes(String s)
+    {
+        throw new UnsupportedOperationException("writeBytes(String s) not supported");
+    }
+
+    public void writeChars(String s)
+    {
+        int len = s.length();
+        for (int i = 0 ; i < len ; i++)
+        {
+            int v = s.charAt(i);
+            _buf.put((byte) (v >>> 8));
+            _buf.put((byte) v);
+        }
+    }
+
+    public void writeUTF(String s)
+    {
+        int strlen = s.length();
+
+        int pos = _buf.position();
+        _buf.position(pos+2);
+
+
+        for (int i = 0; i < strlen; i++)
+        {
+            int c = s.charAt(i);
+            if ((c >= 0x0001) && (c <= 0x007F))
+            {
+                c = s.charAt(i);
+                _buf.put((byte) c);
+
+            }
+            else if (c > 0x07FF)
+            {
+                _buf.put((byte) (0xE0 | ((c >> 12) & 0x0F)));
+                _buf.put((byte) (0x80 | ((c >>  6) & 0x3F)));
+                _buf.put((byte) (0x80 | (c & 0x3F)));
+            }
+            else
+            {
+                _buf.put((byte) (0xC0 | ((c >>  6) & 0x1F)));
+                _buf.put((byte) (0x80 | (c & 0x3F)));
+            }
+        }
+
+        int len = _buf.position() - (pos + 2);
+
+        _buf.put(pos++, (byte) (len >>> 8));
+        _buf.put(pos, (byte) len);
+    }
+
+}

Modified: qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java (original)
+++ qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java Tue Jul 28 15:48:30 2015
@@ -83,7 +83,7 @@ public class AMQDecoderTest extends Qpid
         Random random = new Random();
         final byte[] payload = new byte[2048];
         random.nextBytes(payload);
-        final AMQBody body = new ContentBody(payload);
+        final AMQBody body = new ContentBody(ByteBuffer.wrap(payload));
         AMQFrame frame = new AMQFrame(1, body);
         byte[] outputBuf = new byte[4096];
         BytesDataOutput dataOutput = new BytesDataOutput(outputBuf);
@@ -91,14 +91,16 @@ public class AMQDecoderTest extends Qpid
         for(int i = 0 ; i < dataOutput.length(); i++)
         {
             _decoder.decodeBuffer(ByteBuffer.wrap(outputBuf, i, 1));
-
         }
         List<AMQDataBlock> frames = _methodProcessor.getProcessedMethods();
         if (frames.get(0) instanceof AMQFrame)
         {
             assertEquals(ContentBody.TYPE, ((AMQFrame) frames.get(0)).getBodyFrame().getFrameType());
             ContentBody decodedBody = (ContentBody) ((AMQFrame) frames.get(0)).getBodyFrame();
-            assertTrue("Body was corrupted", Arrays.equals(payload, decodedBody.getPayload()));
+            final ByteBuffer byteBuffer = decodedBody.getPayload().duplicate();
+            byte[] bodyBytes = new byte[byteBuffer.remaining()];
+            byteBuffer.get(bodyBytes);
+            assertTrue("Body was corrupted", Arrays.equals(payload, bodyBytes));
         }
         else
         {

Modified: qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java (original)
+++ qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java Tue Jul 28 15:48:30 2015
@@ -87,7 +87,7 @@ public class BasicContentHeaderPropertie
 
     public void testPopulatePropertiesFromBuffer() throws Exception
     {
-        DataInputStream buf = new DataInputStream(new ByteArrayInputStream(new byte[300]));
+        ByteArrayDataInput buf = new ByteArrayDataInput(new byte[300]);
         _testProperties.populatePropertiesFromBuffer(buf, 99, 99);
     }
 

Modified: qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/FieldTableTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/FieldTableTest.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/FieldTableTest.java (original)
+++ qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/FieldTableTest.java Tue Jul 28 15:48:30 2015
@@ -25,6 +25,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -477,7 +478,7 @@ public class FieldTableTest extends Qpid
         // Extract the table back from the buffer again.
         try
         {
-            FieldTable extractedOuterTable = EncodingUtils.readFieldTable(new DataInputStream(new ByteArrayInputStream(data)));
+            FieldTable extractedOuterTable = EncodingUtils.readFieldTable(new ByteArrayDataInput(data));
 
             FieldTable extractedTable = extractedOuterTable.getFieldTable("innerTable");
 
@@ -600,13 +601,14 @@ public class FieldTableTest extends Qpid
         ByteArrayOutputStream baos = new ByteArrayOutputStream((int) table.getEncodedSize() + 4);
         table.writeToBuffer(new DataOutputStream(baos));
 
-        ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
-        DataInputStream dis = new DataInputStream(bais);
+        ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray());
 
+        long length = buf.getInt() & 0xFFFFFFFFL;
+        buf = buf.slice();
+        buf.limit((int)length);
 
-        long length = dis.readInt() & 0xFFFFFFFFL;
 
-        FieldTable table2 = new FieldTable(dis, length);
+        FieldTable table2 = new FieldTable(buf);
 
         Assert.assertEquals((Boolean) true, table2.getBoolean("bool"));
         Assert.assertEquals((Byte) Byte.MAX_VALUE, table2.getByte("byte"));
@@ -918,7 +920,7 @@ public class FieldTableTest extends Qpid
         assertEquals("unexpected data length", 24, length);
 
         //Create a second FieldTable from the encoded bytes
-        FieldTable tableFromBytes = new FieldTable(new DataInputStream(new ByteArrayInputStream(data)), length);
+        FieldTable tableFromBytes = new FieldTable(ByteBuffer.wrap(data));
 
         //Create a final FieldTable and addAll() from the table created with encoded bytes
         FieldTable destinationTable = new FieldTable();

Modified: qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/ConnectionBinding.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/ConnectionBinding.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/ConnectionBinding.java (original)
+++ qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/ConnectionBinding.java Tue Jul 28 15:48:30 2015
@@ -83,7 +83,7 @@ public abstract class ConnectionBinding
 
     public ExceptionHandlingByteBufferReceiver receiver(Connection conn)
     {
-        final InputHandler inputHandler = new InputHandler(new Assembler(conn));
+        final InputHandler inputHandler = new InputHandler(new Assembler(conn), false);
         conn.addFrameSizeObserver(inputHandler);
         if (conn.getConnectionSettings() != null &&
             conn.getConnectionSettings().isUseSASLEncryption())

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java Tue Jul 28 15:48:30 2015
@@ -41,6 +41,7 @@ import javax.jms.MessageProducer;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -132,7 +133,7 @@ public class FieldTableMessageTest exten
             final long bodyLength = bytesMessage.getBodyLength();
             byte[] data = new byte[(int) bodyLength];
             bytesMessage.readBytes(data);
-            FieldTable actual = FieldTableFactory.newFieldTable(new DataInputStream(new ByteArrayInputStream(data)), bodyLength);
+            FieldTable actual = new FieldTable(ByteBuffer.wrap(data));
             for (String key : _expected.keys())
             {
                 assertEquals("Values for " + key + " did not match", _expected.getObject(key), actual.getObject(key));



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org