You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/07/28 17:48:31 UTC
svn commit: r1693123 [2/2] - in /qpid/java/trunk:
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgra...
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java Tue Jul 28 15:48:30 2015
@@ -28,7 +28,9 @@ import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.ByteBufferDataOutput;
import org.apache.qpid.util.BytesDataOutput;
public class BasicContentHeaderProperties
@@ -84,7 +86,7 @@ public class BasicContentHeaderPropertie
private static final int USER_ID_MASK = 1 << 4;
private static final int APPLICATION_ID_MASK = 1 << 3;
private static final int CLUSTER_ID_MASK = 1 << 2;
- private byte[] _encodedForm;
+ private ByteBuffer _encodedForm;
public BasicContentHeaderProperties(BasicContentHeaderProperties other)
@@ -134,7 +136,7 @@ public class BasicContentHeaderPropertie
{
if(useEncodedForm())
{
- return _encodedForm.length;
+ return _encodedForm.remaining();
}
else
{
@@ -235,7 +237,21 @@ public class BasicContentHeaderPropertie
{
if(useEncodedForm())
{
- buffer.write(_encodedForm);
+ int offset;
+ int length = _encodedForm.remaining();;
+ byte[] array;
+ if(_encodedForm.hasArray())
+ {
+ array = _encodedForm.array();
+ offset = _encodedForm.arrayOffset() + _encodedForm.position();
+ }
+ else
+ {
+ array = new byte[length];
+ _encodedForm.duplicate().get(array);
+ offset = 0;
+ }
+ buffer.write(array, offset, length);
}
else
{
@@ -318,7 +334,7 @@ public class BasicContentHeaderPropertie
}
}
- public int read(DataInput input) throws IOException
+ public int read(MarkableDataInput input) throws IOException
{
_propertyFlags = input.readUnsignedShort();
@@ -347,7 +363,7 @@ public class BasicContentHeaderPropertie
{
int fieldTableLength = input.readInt();
- _headers = new FieldTable(input, fieldTableLength);
+ _headers = new FieldTable(input.readAsByteBuffer(fieldTableLength));
length += 4;
length += fieldTableLength;
@@ -460,22 +476,23 @@ public class BasicContentHeaderPropertie
{
if(useEncodedForm())
{
- sender.send(ByteBuffer.wrap(_encodedForm));
- return _encodedForm.length;
+ sender.send(_encodedForm.duplicate());
+ return _encodedForm.remaining();
}
else
{
int propertyListSize = getPropertyListSize();
- byte[] data = new byte[propertyListSize];
- BytesDataOutput out = new BytesDataOutput(data);
+ ByteBuffer buf = ByteBuffer.allocateDirect(propertyListSize);
+ ByteBufferDataOutput out = new ByteBufferDataOutput(buf);
writePropertyListPayload(out);
- sender.send(ByteBuffer.wrap(data));
+ buf.flip();
+ sender.send(buf);
return propertyListSize;
}
}
- public void populatePropertiesFromBuffer(DataInput buffer, int propertyFlags, int size) throws AMQFrameDecodingException, IOException
+ public void populatePropertiesFromBuffer(MarkableDataInput buffer, int propertyFlags, int size) throws AMQFrameDecodingException, IOException
{
_propertyFlags = propertyFlags;
@@ -484,16 +501,15 @@ public class BasicContentHeaderPropertie
_logger.debug("Property flags: " + _propertyFlags);
}
- _encodedForm = new byte[size];
- buffer.readFully(_encodedForm);
+ _encodedForm = buffer.readAsByteBuffer(size);
- ByteArrayDataInput input = new ByteArrayDataInput(_encodedForm);
+ ByteBufferDataInput input = new ByteBufferDataInput(_encodedForm);
decode(input);
}
- private void decode(ByteArrayDataInput buffer) throws IOException, AMQFrameDecodingException
+ private void decode(MarkableDataInput buffer) throws IOException, AMQFrameDecodingException
{
int headersOffset = 0;
@@ -513,7 +529,11 @@ public class BasicContentHeaderPropertie
{
long length = EncodingUtils.readUnsignedInteger(buffer);
- _headers = new FieldTable(_encodedForm, headersOffset+4, (int)length);
+ ByteBuffer buf = _encodedForm.slice();
+ buf.position(headersOffset+4);
+ buf = buf.slice();
+ buf.limit((int)length);
+ _headers = new FieldTable(buf);
buffer.skipBytes((int)length);
}
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java Tue Jul 28 15:48:30 2015
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.codec.MarkableDataInput;
public class ByteArrayDataInput implements ExtendedDataInput, MarkableDataInput
@@ -164,6 +166,14 @@ public class ByteArrayDataInput implemen
return b.length;
}
+ @Override
+ public ByteBuffer readAsByteBuffer(final int len)
+ {
+ byte[] data = new byte[len];
+ readFully(data);
+ return ByteBuffer.wrap(data);
+ }
+
public int position()
{
return _offset - _origin;
Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java?rev=1693123&view=auto
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java (added)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java Tue Jul 28 15:48:30 2015
@@ -0,0 +1,169 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.codec.MarkableDataInput;
+
+public class ByteBufferDataInput implements ExtendedDataInput, MarkableDataInput
+{
+ private final ByteBuffer _underlying;
+ private int _mark;
+
+ public ByteBufferDataInput(ByteBuffer underlying)
+ {
+ _underlying = underlying.slice();
+ }
+
+ public void readFully(byte[] b)
+ {
+ _underlying.get(b);
+ }
+
+ public void readFully(byte[] b, int off, int len)
+ {
+ _underlying.get(b,0, len);
+ }
+
+ public ByteBuffer readAsByteBuffer(int len)
+ {
+ ByteBuffer buf = _underlying.slice();
+ buf.limit(len);
+ skipBytes(len);
+ return buf;
+ }
+
+ public int skipBytes(int n)
+ {
+ _underlying.position(_underlying.position()+n);
+ return _underlying.position();
+ }
+
+ public boolean readBoolean()
+ {
+ return _underlying.get() != 0;
+ }
+
+ public byte readByte()
+ {
+ return _underlying.get();
+ }
+
+ public int readUnsignedByte()
+ {
+ return ((int)_underlying.get()) & 0xFF;
+ }
+
+ public short readShort()
+ {
+ return _underlying.getShort();
+ }
+
+ public int readUnsignedShort()
+ {
+ return ((int)_underlying.getShort()) & 0xffff;
+ }
+
+ public char readChar()
+ {
+ return (char) _underlying.getChar();
+ }
+
+ public int readInt()
+ {
+ return _underlying.getInt();
+ }
+
+ public long readLong()
+ {
+ return _underlying.getLong();
+ }
+
+ public float readFloat()
+ {
+ return _underlying.getFloat();
+ }
+
+ public double readDouble()
+ {
+ return _underlying.getDouble();
+ }
+
+ public AMQShortString readAMQShortString()
+ {
+ return AMQShortString.readAMQShortString(_underlying);
+ }
+
+ public String readLine()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public String readUTF()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public int available()
+ {
+ return _underlying.remaining();
+ }
+
+
+ public long skip(long i)
+ {
+ _underlying.position(_underlying.position()+(int)i);
+ return i;
+ }
+
+ public int read(byte[] b)
+ {
+ readFully(b);
+ return b.length;
+ }
+
+ public int position()
+ {
+ return _underlying.position();
+ }
+
+ public void position(int position)
+ {
+ _underlying.position(position);
+ }
+
+ public int length()
+ {
+ return _underlying.limit();
+ }
+
+
+ public void mark(int readAhead)
+ {
+ _mark = position();
+ }
+
+ public void reset()
+ {
+ _underlying.position(_mark);
+ }
+}
Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java?rev=1693123&view=auto
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java (added)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java Tue Jul 28 15:48:30 2015
@@ -0,0 +1,303 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.qpid.codec.MarkableDataInput;
+
+public class ByteBufferListDataInput implements ExtendedDataInput, MarkableDataInput
+{
+ private final List<ByteBuffer> _underlying;
+ private int _bufferIndex;
+ private int _mark;
+
+ public ByteBufferListDataInput(List<ByteBuffer> underlying)
+ {
+ _underlying = underlying;
+ }
+
+ public void readFully(byte[] b)
+ {
+ final ByteBuffer currentBuffer = getCurrentBuffer();
+ if(currentBuffer.remaining()>b.length)
+ {
+ currentBuffer.get(b);
+ }
+ else
+ {
+ ByteBuffer buf = readAsByteBuffer(b.length);
+ buf.get(b);
+ }
+ }
+
+ public void readFully(byte[] b, int off, int len)
+ {
+ final ByteBuffer currentBuffer = getCurrentBuffer();
+ if(currentBuffer.remaining()>len)
+ {
+ currentBuffer.get(b, off, len);
+ }
+ else
+ {
+ ByteBuffer buf = readAsByteBuffer(len);
+ buf.get(b, off, len);
+ }
+ }
+
+ @Override
+ public ByteBuffer readAsByteBuffer(int len)
+ {
+ ByteBuffer currentBuffer = getCurrentBuffer();
+ if(currentBuffer.remaining()>=len)
+ {
+ ByteBuffer buf = currentBuffer.slice();
+ buf.limit(len);
+ currentBuffer.position(currentBuffer.position()+len);
+ return buf;
+ }
+ else
+ {
+ ByteBuffer dest = currentBuffer.isDirect() ? ByteBuffer.allocateDirect(len) : ByteBuffer.allocate(len);
+ while(dest.hasRemaining() && available()>0)
+ {
+ advanceIfNecessary();
+ currentBuffer = getCurrentBuffer();
+ final int remaining = dest.remaining();
+ if(currentBuffer.remaining()>= remaining)
+ {
+ ByteBuffer buf = currentBuffer.slice();
+ buf.limit(remaining);
+ currentBuffer.position(currentBuffer.position()+remaining);
+ dest.put(buf);
+ }
+ else
+ {
+ dest.put(currentBuffer);
+ }
+ }
+
+ dest.flip();
+ return dest;
+ }
+ }
+
+ public int skipBytes(int n)
+ {
+ final ByteBuffer currentBuffer = getCurrentBuffer();
+ if(currentBuffer.remaining()>n)
+ {
+ currentBuffer.position(currentBuffer.position()+n);
+ }
+ else
+ {
+ n -= currentBuffer.remaining();
+ currentBuffer.position(currentBuffer.limit());
+ if(_bufferIndex != _underlying.size()-1)
+ {
+ _bufferIndex++;
+ skipBytes(n);
+ }
+ }
+ return position();
+ }
+
+ private ByteBuffer getCurrentBuffer()
+ {
+ return _underlying.get(_bufferIndex);
+ }
+
+ public boolean readBoolean()
+ {
+ advanceIfNecessary();
+ return getCurrentBuffer().get() != 0;
+ }
+
+ private void advanceIfNecessary()
+ {
+ while(!getCurrentBuffer().hasRemaining() && _bufferIndex != _underlying.size()-1)
+ {
+ _bufferIndex++;
+ }
+ }
+
+ public byte readByte()
+ {
+ advanceIfNecessary();
+ return getCurrentBuffer().get();
+ }
+
+ public int readUnsignedByte()
+ {
+ advanceIfNecessary();
+ return ((int)getCurrentBuffer().get()) & 0xFF;
+ }
+
+ public short readShort()
+ {
+ return getBuffer(2).getShort();
+ }
+
+ private ByteBuffer getBuffer(int size)
+ {
+ advanceIfNecessary();
+ final ByteBuffer currentBuffer = getCurrentBuffer();
+ if(currentBuffer.remaining()>= size)
+ {
+ return currentBuffer;
+ }
+ else
+ {
+ return readAsByteBuffer(size);
+ }
+ }
+
+ public int readUnsignedShort()
+ {
+ return ((int)getBuffer(2).getShort()) & 0xffff;
+ }
+
+ public char readChar()
+ {
+ return (char) getBuffer(2).getChar();
+ }
+
+ public int readInt()
+ {
+ return getBuffer(4).getInt();
+ }
+
+ public long readLong()
+ {
+ return getBuffer(8).getLong();
+ }
+
+ public float readFloat()
+ {
+ return getBuffer(4).getFloat();
+ }
+
+ public double readDouble()
+ {
+ return getBuffer(8).getDouble();
+ }
+
+ public AMQShortString readAMQShortString()
+ {
+ advanceIfNecessary();
+ final ByteBuffer currentBuffer = getCurrentBuffer();
+ int size = ((int) currentBuffer.get(currentBuffer.position())) & 0xff;
+ return AMQShortString.readAMQShortString(getBuffer(size + 1));
+ }
+
+ public String readLine()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public String readUTF()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public int available()
+ {
+ int remaining = 0;
+ for(int i = _bufferIndex; i < _underlying.size(); i++)
+ {
+ remaining += _underlying.get(i).remaining();
+ }
+ return remaining;
+ }
+
+
+ public long skip(long i)
+ {
+ skipBytes((int)i);
+ return i;
+ }
+
+ public int read(byte[] b)
+ {
+ readFully(b);
+ return b.length;
+ }
+
+ public int position()
+ {
+ int position = 0;
+ for(int i = 0; i < _bufferIndex; i++)
+ {
+ position += _underlying.get(i).limit();
+ }
+ position += getCurrentBuffer().position();
+ return position;
+ }
+
+ public void position(int position)
+ {
+ int offset = 0;
+ boolean beforePos = true;
+ for(int i = 0; i < _underlying.size(); i++)
+ {
+ final ByteBuffer buffer = _underlying.get(i);
+ if(beforePos)
+ {
+ if (position - offset <= buffer.limit())
+ {
+ buffer.position(position - offset);
+ _bufferIndex = i;
+ beforePos = false;
+ }
+ else
+ {
+ offset += buffer.limit();
+ }
+ }
+ else
+ {
+ buffer.position(0);
+ }
+ }
+ }
+
+ public int length()
+ {
+ int length = 0;
+ for(ByteBuffer buf : _underlying)
+ {
+ length+= buf.limit();
+ }
+ return length;
+ }
+
+
+ public void mark(int readAhead)
+ {
+ _mark = position();
+ }
+
+ public void reset()
+ {
+ position(_mark);
+ }
+}
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java Tue Jul 28 15:48:30 2015
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
public interface ChannelMethodProcessor
{
void receiveChannelFlow(boolean active);
@@ -30,7 +32,7 @@ public interface ChannelMethodProcessor
void receiveChannelCloseOk();
- void receiveMessageContent(byte[] data);
+ void receiveMessageContent(ByteBuffer data);
void receiveMessageHeader(BasicContentHeaderProperties properties, long bodySize);
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java Tue Jul 28 15:48:30 2015
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.framing;
-import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -34,20 +33,14 @@ public class ContentBody implements AMQB
{
public static final byte TYPE = 3;
- private byte[] _payload;
+ private ByteBuffer _payload;
public ContentBody()
{
}
- public ContentBody(DataInput buffer, long size) throws AMQFrameDecodingException, IOException
- {
- _payload = new byte[(int)size];
- buffer.readFully(getPayload());
- }
-
- public ContentBody(byte[] payload)
+ public ContentBody(ByteBuffer payload)
{
_payload = payload;
}
@@ -59,12 +52,14 @@ public class ContentBody implements AMQB
public int getSize()
{
- return getPayload() == null ? 0 : getPayload().length;
+ return _payload == null ? 0 : _payload.remaining();
}
public void writePayload(DataOutput buffer) throws IOException
{
- buffer.write(getPayload());
+ byte[] data = new byte[_payload.remaining()];
+ _payload.duplicate().get(data);
+ buffer.write(data);
}
public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
@@ -78,8 +73,8 @@ public class ContentBody implements AMQB
{
if(_payload != null)
{
- sender.send(ByteBuffer.wrap(_payload));
- return _payload.length;
+ sender.send(_payload.duplicate());
+ return _payload.remaining();
}
else
{
@@ -87,7 +82,7 @@ public class ContentBody implements AMQB
}
}
- public byte[] getPayload()
+ public ByteBuffer getPayload()
{
return _payload;
}
@@ -97,8 +92,7 @@ public class ContentBody implements AMQB
throws IOException
{
- byte[] payload = new byte[(int)bodySize];
- in.readFully(payload);
+ ByteBuffer payload = in.readAsByteBuffer((int)bodySize);
if(!methodProcessor.ignoreAllButCloseOk())
{
@@ -106,77 +100,6 @@ public class ContentBody implements AMQB
}
}
- private static class BufferContentBody implements AMQBody
- {
- private final int _length;
- private final int _offset;
- private final ByteBuffer _buf;
-
- private BufferContentBody( ByteBuffer buf, int offset, int length)
- {
- _length = length;
- _offset = offset;
- _buf = buf;
- }
-
- public byte getFrameType()
- {
- return TYPE;
- }
-
-
- public int getSize()
- {
- return _length;
- }
-
- public void writePayload(DataOutput buffer) throws IOException
- {
- if(_buf.hasArray())
- {
- buffer.write(_buf.array(), _buf.arrayOffset() + _offset, _length);
- }
- else
- {
- byte[] data = new byte[_length];
- ByteBuffer buf = _buf.duplicate();
-
- buf.position(_offset);
- buf.limit(_offset+_length);
- buf.get(data);
- buffer.write(data);
- }
- }
-
- @Override
- public long writePayload(final ByteBufferSender sender) throws IOException
- {
- if(_buf.hasArray())
- {
- sender.send(ByteBuffer.wrap(_buf.array(), _buf.arrayOffset() + _offset, _length));
- }
- else
- {
- ByteBuffer buf = _buf.duplicate();
-
- buf.position(_offset);
- buf.limit(_offset+_length);
- sender.send(buf);
- }
- return _length;
- }
-
- public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws QpidException
- {
- throw new RuntimeException("Buffered Body only to be used for outgoing data");
- }
- }
-
- public static AMQFrame createAMQFrame(int channelId, ByteBuffer buf, int offset, int length)
- {
- return new AMQFrame(channelId, new BufferContentBody(buf, offset, length));
- }
-
public static AMQFrame createAMQFrame(int channelId, ContentBody body)
{
return new AMQFrame(channelId, body);
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Tue Jul 28 15:48:30 2015
@@ -43,7 +43,7 @@ public class ContentHeaderBody implement
/** must never be null */
private BasicContentHeaderProperties _properties;
- public ContentHeaderBody(DataInput buffer, long size) throws AMQFrameDecodingException, IOException
+ public ContentHeaderBody(MarkableDataInput buffer, long size) throws AMQFrameDecodingException, IOException
{
buffer.readUnsignedShort();
buffer.readUnsignedShort();
@@ -80,7 +80,7 @@ public class ContentHeaderBody implement
* @throws AMQProtocolVersionException if there is a version issue
* @throws IOException if there is an IO issue
*/
- public static ContentHeaderBody createFromBuffer(DataInputStream buffer, long size)
+ public static ContentHeaderBody createFromBuffer(MarkableDataInput buffer, long size)
throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
{
ContentHeaderBody body = new ContentHeaderBody(buffer, size);
@@ -105,13 +105,13 @@ public class ContentHeaderBody implement
@Override
public long writePayload(final ByteBufferSender sender) throws IOException
{
- byte[] data = new byte[14];
- BytesDataOutput buffer = new BytesDataOutput(data);
- EncodingUtils.writeUnsignedShort(buffer, CLASS_ID);
- EncodingUtils.writeUnsignedShort(buffer, 0);
- buffer.writeLong(_bodySize);
- EncodingUtils.writeUnsignedShort(buffer, _properties.getPropertyFlags());
- sender.send(ByteBuffer.wrap(data));
+ ByteBuffer data = ByteBuffer.allocateDirect(14);
+ EncodingUtils.writeUnsignedShort(data, CLASS_ID);
+ EncodingUtils.writeUnsignedShort(data, 0);
+ data.putLong(_bodySize);
+ EncodingUtils.writeUnsignedShort(data, _properties.getPropertyFlags());
+ data.flip();
+ sender.send(data);
return 14 + _properties.writePropertyListPayload(sender);
}
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java Tue Jul 28 15:48:30 2015
@@ -23,6 +23,7 @@ package org.apache.qpid.framing;
import java.io.DataInput;
import java.io.IOException;
+import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.protocol.AMQConstant;
public class ContentHeaderPropertiesFactory
@@ -39,7 +40,7 @@ public class ContentHeaderPropertiesFact
}
public BasicContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags,
- DataInput buffer, int size)
+ MarkableDataInput buffer, int size)
throws AMQFrameDecodingException, IOException
{
BasicContentHeaderProperties properties;
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java Tue Jul 28 15:48:30 2015
@@ -30,6 +30,8 @@ import java.nio.charset.StandardCharsets
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.codec.MarkableDataInput;
+
public class EncodingUtils
{
private static final Logger _logger = LoggerFactory.getLogger(EncodingUtils.class);
@@ -278,6 +280,22 @@ public class EncodingUtils
}
}
+ public static void writeUnsignedShort(ByteBuffer buffer, int s) throws IOException
+ {
+ // TODO: Is this comparison safe? Do I need to cast RHS to long?
+ if (s < Short.MAX_VALUE)
+ {
+ buffer.putShort((short) s);
+ }
+ else
+ {
+ short sv = (short) s;
+ buffer.put((byte) (0xFF & (sv >> 8)));
+ buffer.put((byte) (0xFF & sv));
+ }
+ }
+
+
public static int unsignedIntegerLength()
{
return 4;
@@ -303,6 +321,27 @@ public class EncodingUtils
}
}
+ public static void writeUnsignedInteger(ByteBuffer buffer, long l) throws IOException
+ {
+ // TODO: Is this comparison safe? Do I need to cast RHS to long?
+ if (l < Integer.MAX_VALUE)
+ {
+ buffer.putInt((int) l);
+ }
+ else
+ {
+ int iv = (int) l;
+
+ // FIXME: This *may* go faster if we build this into a local 4-byte array and then
+ // put the array in a single call.
+ buffer.put((byte) (0xFF & (iv >> 24)));
+ buffer.put((byte) (0xFF & (iv >> 16)));
+ buffer.put((byte) (0xFF & (iv >> 8)));
+ buffer.put((byte) (0xFF & iv));
+ }
+ }
+
+
public static void writeFieldTableBytes(DataOutput buffer, FieldTable table) throws IOException
{
if (table != null)
@@ -579,7 +618,7 @@ public class EncodingUtils
return result;
}
- public static FieldTable readFieldTable(DataInput buffer) throws AMQFrameDecodingException, IOException
+ public static FieldTable readFieldTable(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL;
if (length == 0)
@@ -588,10 +627,11 @@ public class EncodingUtils
}
else
{
- return FieldTableFactory.newFieldTable(buffer, length);
+ return new FieldTable(buffer.readAsByteBuffer((int)length));
}
}
+
public static AMQShortString readAMQShortString(DataInput buffer) throws IOException
{
return AMQShortString.readFromBuffer(buffer);
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java Tue Jul 28 15:48:30 2015
@@ -21,11 +21,11 @@
package org.apache.qpid.framing;
import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
@@ -49,8 +49,7 @@ public class FieldTable
private static final String STRICT_AMQP_NAME = "STRICT_AMQP";
private static final boolean STRICT_AMQP = Boolean.valueOf(System.getProperty(STRICT_AMQP_NAME, "false"));
- private byte[] _encodedForm;
- private int _encodedFormOffset;
+ private ByteBuffer _encodedForm;
private LinkedHashMap<AMQShortString, AMQTypedValue> _properties = null;
private long _encodedSize;
private static final int INITIAL_HASHMAP_CAPACITY = 16;
@@ -69,30 +68,18 @@ public class FieldTable
_strictAMQP = strictAMQP;
}
- /**
- * Construct a new field table.
- *
- * @param buffer the buffer from which to read data. The length byte must be read already
- * @param length the length of the field table. Must be great than 0.
- * @throws IOException if there is an issue reading the buffer
- */
- public FieldTable(DataInput buffer, long length) throws IOException
+ public FieldTable(byte[] encodedForm, int offset, int length)
{
- this();
- _encodedForm = new byte[(int) length];
- buffer.readFully(_encodedForm);
- _encodedSize = length;
+ this(ByteBuffer.wrap(encodedForm,offset,length));
}
- public FieldTable(byte[] encodedForm, int offset, int length)
+ public FieldTable(ByteBuffer buffer)
{
this();
- _encodedForm = encodedForm;
- _encodedFormOffset = offset;
- _encodedSize = length;
+ _encodedForm = buffer;
+ _encodedSize = buffer.remaining();
}
-
public boolean isClean()
{
return _encodedForm != null;
@@ -858,14 +845,10 @@ public class FieldTable
}
}
- else if(_encodedFormOffset == 0 && _encodedSize == _encodedForm.length)
- {
- return _encodedForm.clone();
- }
else
{
- byte[] encodedCopy = new byte[(int) _encodedSize];
- System.arraycopy(_encodedForm,_encodedFormOffset,encodedCopy,0,(int)_encodedSize);
+ byte[] encodedCopy = new byte[_encodedForm.remaining()];
+ _encodedForm.duplicate().get(encodedCopy);
return encodedCopy;
}
@@ -1077,10 +1060,12 @@ public class FieldTable
private void putDataInBuffer(DataOutput buffer) throws IOException
{
-
if (_encodedForm != null)
{
- buffer.write(_encodedForm,_encodedFormOffset,(int)_encodedSize);
+ byte[] encodedCopy = new byte[_encodedForm.remaining()];
+ _encodedForm.duplicate().get(encodedCopy);
+
+ buffer.write(encodedCopy);
}
else if (_properties != null)
{
@@ -1109,7 +1094,7 @@ public class FieldTable
private void setFromBuffer() throws AMQFrameDecodingException, IOException
{
- ByteArrayDataInput baid = new ByteArrayDataInput(_encodedForm, _encodedFormOffset, (int)_encodedSize);
+ ByteBufferDataInput dataInput = new ByteBufferDataInput(_encodedForm.duplicate());
if (_encodedSize > 0)
{
@@ -1120,12 +1105,12 @@ public class FieldTable
do
{
- final AMQShortString key = baid.readAMQShortString();
- AMQTypedValue value = AMQTypedValue.readFromBuffer(baid);
+ final AMQShortString key = dataInput.readAMQShortString();
+ AMQTypedValue value = AMQTypedValue.readFromBuffer(dataInput);
_properties.put(key, value);
}
- while (baid.available() > 0);
+ while (dataInput.available() > 0);
}
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java Tue Jul 28 15:48:30 2015
@@ -34,10 +34,5 @@ public class FieldTableFactory
return new FieldTable();
}
- public static FieldTable newFieldTable(DataInput byteBuffer, long length) throws AMQFrameDecodingException, IOException
- {
- return new FieldTable(byteBuffer, length);
- }
-
}
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java Tue Jul 28 15:48:30 2015
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -603,7 +604,7 @@ public class FrameCreatingMethodProcesso
}
@Override
- public void receiveMessageContent(final byte[] data)
+ public void receiveMessageContent(ByteBuffer data)
{
_processedMethods.add(new AMQFrame(_channelId, new ContentBody(data)));
}
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java Tue Jul 28 15:48:30 2015
@@ -227,7 +227,7 @@ public class Connection extends Connecti
securityLayer = SecurityLayerFactory.newInstance(getConnectionSettings());
IoNetworkTransport transport = new IoNetworkTransport();
- final InputHandler inputHandler = new InputHandler(new Assembler(this));
+ final InputHandler inputHandler = new InputHandler(new Assembler(this), false);
addFrameSizeObserver(inputHandler);
ExceptionHandlingByteBufferReceiver secureReceiver = securityLayer.receiver(inputHandler);
if(secureReceiver instanceof ConnectionListener)
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java Tue Jul 28 15:48:30 2015
@@ -93,9 +93,9 @@ public final class ProtocolHeader implem
return false;
}
- public ByteBuffer toByteBuffer()
+ public ByteBuffer toByteBuffer(final boolean useDirect)
{
- ByteBuffer buf = ByteBuffer.allocate(8);
+ ByteBuffer buf = useDirect ? ByteBuffer.allocateDirect(8) : ByteBuffer.allocate(8);
buf.put(AMQP);
buf.put(protoClass);
buf.put(instance);
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Assembler.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Assembler.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Assembler.java Tue Jul 28 15:48:30 2015
@@ -155,7 +155,7 @@ public class Assembler implements Networ
{
size += f.getSize();
}
- segment = ByteBuffer.allocate(size);
+ segment = allocateByteBuffer(size);
for (Frame f : frames)
{
segment.put(f.getBody());
@@ -167,6 +167,11 @@ public class Assembler implements Networ
}
+ protected ByteBuffer allocateByteBuffer(final int size)
+ {
+ return ByteBuffer.allocate(size);
+ }
+
private void assemble(Frame frame, ByteBuffer segment)
{
BBDecoder dec = _decoder.get();
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java Tue Jul 28 15:48:30 2015
@@ -159,7 +159,7 @@ public final class Disassembler implemen
{
synchronized (sendlock)
{
- sender.send(header.toByteBuffer());
+ sender.send(header.toByteBuffer(false));
sender.flush();
}
}
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java Tue Jul 28 15:48:30 2015
@@ -29,6 +29,9 @@ import static org.apache.qpid.transport.
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.transport.Constant;
import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
import org.apache.qpid.transport.FrameSizeObserver;
@@ -46,18 +49,21 @@ import org.apache.qpid.transport.Segment
public class InputHandler implements ExceptionHandlingByteBufferReceiver, FrameSizeObserver
{
+ private static final Logger LOGGER = LoggerFactory.getLogger(InputHandler.class);
private int _maxFrameSize = Constant.MIN_MAX_FRAME_SIZE;
+
public enum State
{
PROTO_HDR,
FRAME_HDR,
FRAME_BODY,
- ERROR
+ ERROR;
}
-
private final NetworkEventReceiver receiver;
+
+ private final boolean _useDirect;
private State state;
private ByteBuffer input = null;
private int needed;
@@ -67,27 +73,24 @@ public class InputHandler implements Exc
private byte track;
private int channel;
- public InputHandler(NetworkEventReceiver receiver, State state)
+
+ public InputHandler(NetworkEventReceiver receiver, final boolean useDirect)
{
this.receiver = receiver;
- this.state = state;
+ this.state = PROTO_HDR;
+ _useDirect = useDirect;
switch (state)
{
- case PROTO_HDR:
- needed = 8;
- break;
- case FRAME_HDR:
- needed = Frame.HEADER_SIZE;
- break;
+ case PROTO_HDR:
+ needed = 8;
+ break;
+ case FRAME_HDR:
+ needed = Frame.HEADER_SIZE;
+ break;
}
}
- public InputHandler(NetworkEventReceiver receiver)
- {
- this(receiver, PROTO_HDR);
- }
-
public void setMaxFrameSize(final int maxFrameSize)
{
_maxFrameSize = maxFrameSize;
@@ -98,6 +101,7 @@ public class InputHandler implements Exc
receiver.received(new ProtocolError(Frame.L1, fmt, args));
}
+ @Override
public void received(ByteBuffer buf)
{
int limit = buf.limit();
@@ -132,7 +136,7 @@ public class InputHandler implements Exc
{
if (input == null)
{
- input = ByteBuffer.allocate(needed);
+ input = _useDirect ? ByteBuffer.allocateDirect(needed) : ByteBuffer.allocate(needed);
}
input.put(buf);
needed -= remaining;
@@ -185,7 +189,7 @@ public class InputHandler implements Exc
channel = (0xFFFF & input.getShort(pos + 6));
if (size == 0)
{
- Frame frame = new Frame(flags, type, track, channel, ByteBuffer.allocate(0));
+ Frame frame = new Frame(flags, type, track, channel, _useDirect ? ByteBuffer.allocateDirect(0) : ByteBuffer.allocate(0));
receiver.received(frame);
needed = Frame.HEADER_SIZE;
return FRAME_HDR;
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/util/Functions.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/util/Functions.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/util/Functions.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/util/Functions.java Tue Jul 28 15:48:30 2015
@@ -109,6 +109,10 @@ public final class Functions
{
return hex(bytes, limit, "");
}
+ public static String hex(ByteBuffer bytes, int limit)
+ {
+ return hex(bytes, limit, "");
+ }
public static String hex(byte[] bytes, int limit, CharSequence separator)
{
@@ -127,6 +131,26 @@ public final class Functions
{
sb.append("...");
}
+ return sb.toString();
+ }
+
+ public static String hex(ByteBuffer bytes, int limit, CharSequence separator)
+ {
+ limit = Math.min(limit, bytes == null ? 0 : bytes.remaining());
+ StringBuilder sb = new StringBuilder(3 + limit*2);
+ for(int i = 0; i < limit; i++)
+ {
+ sb.append(HEX_CHARACTERS[(((int)(bytes.get(bytes.position()+i))) & 0xf0)>>4]);
+ sb.append(HEX_CHARACTERS[(((int)bytes.get(bytes.position()+i)) & 0x0f)]);
+ if(i != bytes.remaining() - 1)
+ {
+ sb.append(separator);
+ }
+ }
+ if(bytes != null && bytes.remaining()>limit)
+ {
+ sb.append("...");
+ }
return sb.toString();
}
Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferDataOutput.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferDataOutput.java?rev=1693123&view=auto
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferDataOutput.java (added)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferDataOutput.java Tue Jul 28 15:48:30 2015
@@ -0,0 +1,145 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+
+public class ByteBufferDataOutput implements DataOutput
+{
+ private final ByteBuffer _buf;
+
+ public ByteBufferDataOutput(ByteBuffer buf)
+ {
+ _buf = buf;
+ }
+
+ public void write(int b)
+ {
+ _buf.put((byte)b);
+ }
+
+ public void write(byte[] b)
+ {
+ _buf.put(b);
+ }
+
+
+ public void write(byte[] b, int off, int len)
+ {
+ _buf.put(b, off, len);
+
+ }
+
+ public void writeBoolean(boolean v)
+ {
+ _buf.put( v ? (byte) 1 : (byte) 0);
+ }
+
+ public void writeByte(int v)
+ {
+ _buf.put((byte) v);
+ }
+
+ public void writeShort(int v)
+ {
+ _buf.putShort((short)v);
+ }
+
+ public void writeChar(int v)
+ {
+ _buf.put((byte) (v >>> 8));
+ _buf.put((byte) v);
+ }
+
+ public void writeInt(int v)
+ {
+ _buf.putInt(v);
+ }
+
+ public void writeLong(long v)
+ {
+ _buf.putLong(v);
+ }
+
+ public void writeFloat(float v)
+ {
+ writeInt(Float.floatToIntBits(v));
+ }
+
+ public void writeDouble(double v)
+ {
+ writeLong(Double.doubleToLongBits(v));
+ }
+
+ public void writeBytes(String s)
+ {
+ throw new UnsupportedOperationException("writeBytes(String s) not supported");
+ }
+
+ public void writeChars(String s)
+ {
+ int len = s.length();
+ for (int i = 0 ; i < len ; i++)
+ {
+ int v = s.charAt(i);
+ _buf.put((byte) (v >>> 8));
+ _buf.put((byte) v);
+ }
+ }
+
+ public void writeUTF(String s)
+ {
+ int strlen = s.length();
+
+ int pos = _buf.position();
+ _buf.position(pos+2);
+
+
+ for (int i = 0; i < strlen; i++)
+ {
+ int c = s.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F))
+ {
+ c = s.charAt(i);
+ _buf.put((byte) c);
+
+ }
+ else if (c > 0x07FF)
+ {
+ _buf.put((byte) (0xE0 | ((c >> 12) & 0x0F)));
+ _buf.put((byte) (0x80 | ((c >> 6) & 0x3F)));
+ _buf.put((byte) (0x80 | (c & 0x3F)));
+ }
+ else
+ {
+ _buf.put((byte) (0xC0 | ((c >> 6) & 0x1F)));
+ _buf.put((byte) (0x80 | (c & 0x3F)));
+ }
+ }
+
+ int len = _buf.position() - (pos + 2);
+
+ _buf.put(pos++, (byte) (len >>> 8));
+ _buf.put(pos, (byte) len);
+ }
+
+}
Modified: qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java (original)
+++ qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java Tue Jul 28 15:48:30 2015
@@ -83,7 +83,7 @@ public class AMQDecoderTest extends Qpid
Random random = new Random();
final byte[] payload = new byte[2048];
random.nextBytes(payload);
- final AMQBody body = new ContentBody(payload);
+ final AMQBody body = new ContentBody(ByteBuffer.wrap(payload));
AMQFrame frame = new AMQFrame(1, body);
byte[] outputBuf = new byte[4096];
BytesDataOutput dataOutput = new BytesDataOutput(outputBuf);
@@ -91,14 +91,16 @@ public class AMQDecoderTest extends Qpid
for(int i = 0 ; i < dataOutput.length(); i++)
{
_decoder.decodeBuffer(ByteBuffer.wrap(outputBuf, i, 1));
-
}
List<AMQDataBlock> frames = _methodProcessor.getProcessedMethods();
if (frames.get(0) instanceof AMQFrame)
{
assertEquals(ContentBody.TYPE, ((AMQFrame) frames.get(0)).getBodyFrame().getFrameType());
ContentBody decodedBody = (ContentBody) ((AMQFrame) frames.get(0)).getBodyFrame();
- assertTrue("Body was corrupted", Arrays.equals(payload, decodedBody.getPayload()));
+ final ByteBuffer byteBuffer = decodedBody.getPayload().duplicate();
+ byte[] bodyBytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bodyBytes);
+ assertTrue("Body was corrupted", Arrays.equals(payload, bodyBytes));
}
else
{
Modified: qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java (original)
+++ qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java Tue Jul 28 15:48:30 2015
@@ -87,7 +87,7 @@ public class BasicContentHeaderPropertie
public void testPopulatePropertiesFromBuffer() throws Exception
{
- DataInputStream buf = new DataInputStream(new ByteArrayInputStream(new byte[300]));
+ ByteArrayDataInput buf = new ByteArrayDataInput(new byte[300]);
_testProperties.populatePropertiesFromBuffer(buf, 99, 99);
}
Modified: qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/FieldTableTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/FieldTableTest.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/FieldTableTest.java (original)
+++ qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/FieldTableTest.java Tue Jul 28 15:48:30 2015
@@ -25,6 +25,7 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -477,7 +478,7 @@ public class FieldTableTest extends Qpid
// Extract the table back from the buffer again.
try
{
- FieldTable extractedOuterTable = EncodingUtils.readFieldTable(new DataInputStream(new ByteArrayInputStream(data)));
+ FieldTable extractedOuterTable = EncodingUtils.readFieldTable(new ByteArrayDataInput(data));
FieldTable extractedTable = extractedOuterTable.getFieldTable("innerTable");
@@ -600,13 +601,14 @@ public class FieldTableTest extends Qpid
ByteArrayOutputStream baos = new ByteArrayOutputStream((int) table.getEncodedSize() + 4);
table.writeToBuffer(new DataOutputStream(baos));
- ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
- DataInputStream dis = new DataInputStream(bais);
+ ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray());
+ long length = buf.getInt() & 0xFFFFFFFFL;
+ buf = buf.slice();
+ buf.limit((int)length);
- long length = dis.readInt() & 0xFFFFFFFFL;
- FieldTable table2 = new FieldTable(dis, length);
+ FieldTable table2 = new FieldTable(buf);
Assert.assertEquals((Boolean) true, table2.getBoolean("bool"));
Assert.assertEquals((Byte) Byte.MAX_VALUE, table2.getByte("byte"));
@@ -918,7 +920,7 @@ public class FieldTableTest extends Qpid
assertEquals("unexpected data length", 24, length);
//Create a second FieldTable from the encoded bytes
- FieldTable tableFromBytes = new FieldTable(new DataInputStream(new ByteArrayInputStream(data)), length);
+ FieldTable tableFromBytes = new FieldTable(ByteBuffer.wrap(data));
//Create a final FieldTable and addAll() from the table created with encoded bytes
FieldTable destinationTable = new FieldTable();
Modified: qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/ConnectionBinding.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/ConnectionBinding.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/ConnectionBinding.java (original)
+++ qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/ConnectionBinding.java Tue Jul 28 15:48:30 2015
@@ -83,7 +83,7 @@ public abstract class ConnectionBinding
public ExceptionHandlingByteBufferReceiver receiver(Connection conn)
{
- final InputHandler inputHandler = new InputHandler(new Assembler(conn));
+ final InputHandler inputHandler = new InputHandler(new Assembler(conn), false);
conn.addFrameSizeObserver(inputHandler);
if (conn.getConnectionSettings() != null &&
conn.getConnectionSettings().isUseSASLEncryption())
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java Tue Jul 28 15:48:30 2015
@@ -41,6 +41,7 @@ import javax.jms.MessageProducer;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -132,7 +133,7 @@ public class FieldTableMessageTest exten
final long bodyLength = bytesMessage.getBodyLength();
byte[] data = new byte[(int) bodyLength];
bytesMessage.readBytes(data);
- FieldTable actual = FieldTableFactory.newFieldTable(new DataInputStream(new ByteArrayInputStream(data)), bodyLength);
+ FieldTable actual = new FieldTable(ByteBuffer.wrap(data));
for (String key : _expected.keys())
{
assertEquals("Values for " + key + " did not match", _expected.getObject(key), actual.getObject(key));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org