You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/08/07 02:28:20 UTC
svn commit: r1694594 [4/5] - in /qpid/java/trunk:
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/
bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ bdbs...
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Fri Aug 7 00:28:17 2015
@@ -50,6 +50,7 @@ import org.apache.qpid.amqp_1_0.transpor
import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.FrameBody;
import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.protocol.AMQConstant;
@@ -166,7 +167,7 @@ public class AMQPConnection_1_0 extends
_frameWriter = new FrameWriter(_endpoint.getDescribedTypeRegistry());
- getSender().send(headerResponse.duplicate());
+ getSender().send(QpidByteBuffer.wrap(headerResponse.duplicate()));
getSender().flush();
if(useSASL)
@@ -254,7 +255,7 @@ public class AMQPConnection_1_0 extends
{
if (_endpoint.isAuthenticated())
{
- getSender().send(AMQP_LAYER_HEADER.duplicate());
+ getSender().send(QpidByteBuffer.wrap(AMQP_LAYER_HEADER.duplicate()));
getSender().flush();
}
else
@@ -343,14 +344,14 @@ public class AMQPConnection_1_0 extends
private final Logger RAW_LOGGER = LoggerFactory.getLogger("RAW");
- public synchronized void received(final ByteBuffer msg)
+ public synchronized void received(final QpidByteBuffer msg)
{
try
{
updateLastReadTime();
if(RAW_LOGGER.isDebugEnabled())
{
- ByteBuffer dup = msg.duplicate();
+ QpidByteBuffer dup = msg.duplicate();
byte[] data = new byte[dup.remaining()];
dup.get(data);
Binary bin = new Binary(data);
@@ -531,7 +532,7 @@ public class AMQPConnection_1_0 extends
_frameWriter.setValue(amqFrame);
- ByteBuffer dup = ByteBuffer.allocateDirect(_endpoint.getMaxFrameSize());
+ QpidByteBuffer dup = QpidByteBuffer.allocateDirect(_endpoint.getMaxFrameSize());
int size = _frameWriter.writeToBuffer(dup);
if (size > _endpoint.getMaxFrameSize())
@@ -543,7 +544,7 @@ public class AMQPConnection_1_0 extends
if (RAW_LOGGER.isDebugEnabled())
{
- ByteBuffer dup2 = dup.duplicate();
+ QpidByteBuffer dup2 = dup.duplicate();
byte[] data = new byte[dup2.remaining()];
dup2.get(data);
Binary bin = new Binary(data);
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Fri Aug 7 00:28:17 2015
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol.v1_0;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.List;
import org.slf4j.Logger;
@@ -43,6 +44,7 @@ import org.apache.qpid.amqp_1_0.type.mes
import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerImpl;
@@ -138,24 +140,23 @@ class ConsumerTarget_1_0 extends Abstrac
Transfer transfer = new Transfer();
//TODO
-
- List<ByteBuffer> fragments = message.getFragments();
- ByteBuffer payload;
+ Collection<QpidByteBuffer> fragments = message.getFragments();
+ QpidByteBuffer payload;
if(fragments.size() == 1)
{
- payload = fragments.get(0);
+ payload = fragments.iterator().next();
}
else
{
int size = 0;
- for(ByteBuffer fragment : fragments)
+ for(QpidByteBuffer fragment : fragments)
{
size += fragment.remaining();
}
- payload = ByteBuffer.allocateDirect(size);
+ payload = QpidByteBuffer.allocateDirect(size);
- for(ByteBuffer fragment : fragments)
+ for(QpidByteBuffer fragment : fragments)
{
payload.put(fragment.duplicate());
}
@@ -171,7 +172,6 @@ class ConsumerTarget_1_0 extends Abstrac
Header oldHeader = null;
try
{
- ByteBuffer encodedBuf = payload.duplicate();
Object value = valueHandler.parse(payload);
if(value instanceof Header)
{
@@ -200,8 +200,8 @@ class ConsumerTarget_1_0 extends Abstrac
_sectionEncoder.encodeObject(header);
Binary encodedHeader = _sectionEncoder.getEncoding();
- ByteBuffer oldPayload = payload;
- payload = ByteBuffer.allocateDirect(oldPayload.remaining() + encodedHeader.getLength());
+ QpidByteBuffer oldPayload = payload;
+ payload = QpidByteBuffer.allocateDirect(oldPayload.remaining() + encodedHeader.getLength());
payload.put(encodedHeader.getArray(),encodedHeader.getArrayOffset(),encodedHeader.getLength());
payload.put(oldPayload);
payload.flip();
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java Fri Aug 7 00:28:17 2015
@@ -49,6 +49,7 @@ import org.apache.qpid.amqp_1_0.type.Uns
import org.apache.qpid.amqp_1_0.type.messaging.AmqpSequence;
import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.transport.codec.BBEncoder;
import org.apache.qpid.typedmessage.TypedBytesContentWriter;
@@ -68,7 +69,7 @@ public class MessageConverter_from_1_0
Object bodyObject;
try
{
- List<Section> sections = sectionDecoder.parseAll(ByteBuffer.wrap(data));
+ List<Section> sections = sectionDecoder.parseAll(QpidByteBuffer.wrap(data));
ListIterator<Section> iterator = sections.listIterator();
Section previousSection = null;
while(iterator.hasNext())
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java Fri Aug 7 00:28:17 2015
@@ -38,6 +38,7 @@ import org.apache.qpid.amqp_1_0.type.Sym
import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.store.StoredMessage;
@@ -257,7 +258,7 @@ public abstract class MessageConverter_t
}
@Override
- public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
+ public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
{
ByteBuffer buf = allData.duplicate();
buf.position(offsetInMessage);
@@ -266,7 +267,7 @@ public abstract class MessageConverter_t
{
buf.limit(size);
}
- return Collections.singleton(buf);
+ return Collections.singleton(QpidByteBuffer.wrap(buf));
}
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java Fri Aug 7 00:28:17 2015
@@ -49,6 +49,7 @@ import org.apache.qpid.amqp_1_0.type.mes
import org.apache.qpid.amqp_1_0.type.messaging.Header;
import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations;
import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.store.StorableMessageMetaData;
@@ -70,9 +71,9 @@ public class MessageMetaData_1_0 impleme
private Map _appProperties;
private Map _footer;
- private List<ByteBuffer> _encodedSections = new ArrayList<ByteBuffer>(3);
+ private List<QpidByteBuffer> _encodedSections = new ArrayList<>(3);
- private volatile ByteBuffer _encoded;
+ private volatile QpidByteBuffer _encoded;
private MessageHeader_1_0 _messageHeader;
@@ -92,29 +93,29 @@ public class MessageMetaData_1_0 impleme
return _header;
}
- private static ArrayList<ByteBuffer> encodeSections(final List<Section> sections, final SectionEncoder encoder)
+ private static ArrayList<QpidByteBuffer> encodeSections(final List<Section> sections, final SectionEncoder encoder)
{
- ArrayList<ByteBuffer> encodedSections = new ArrayList<ByteBuffer>(sections.size());
+ ArrayList<QpidByteBuffer> encodedSections = new ArrayList<QpidByteBuffer>(sections.size());
for(Section section : sections)
{
encoder.encodeObject(section);
- encodedSections.add(encoder.getEncoding().asByteBuffer());
+ encodedSections.add(QpidByteBuffer.wrap(encoder.getEncoding().asByteBuffer()));
encoder.reset();
}
return encodedSections;
}
- public MessageMetaData_1_0(ByteBuffer[] fragments, SectionDecoder decoder)
+ public MessageMetaData_1_0(QpidByteBuffer[] fragments, SectionDecoder decoder)
{
- this(fragments, decoder, new ArrayList<ByteBuffer>(3));
+ this(fragments, decoder, new ArrayList<QpidByteBuffer>(3));
}
- public MessageMetaData_1_0(ByteBuffer[] fragments, SectionDecoder decoder, List<ByteBuffer> immutableSections)
+ public MessageMetaData_1_0(QpidByteBuffer[] fragments, SectionDecoder decoder, List<QpidByteBuffer> immutableSections)
{
this(constructSections(fragments, decoder,immutableSections), immutableSections);
}
- private MessageMetaData_1_0(List<Section> sections, List<ByteBuffer> encodedSections)
+ private MessageMetaData_1_0(List<Section> sections, List<QpidByteBuffer> encodedSections)
{
_encodedSections = encodedSections;
@@ -161,11 +162,11 @@ public class MessageMetaData_1_0 impleme
}
- private static List<Section> constructSections(final ByteBuffer[] fragments, final SectionDecoder decoder, List<ByteBuffer> encodedSections)
+ private static List<Section> constructSections(final QpidByteBuffer[] fragments, final SectionDecoder decoder, List<QpidByteBuffer> encodedSections)
{
List<Section> sections = new ArrayList<Section>(3);
- ByteBuffer src;
+ QpidByteBuffer src;
if(fragments.length == 1)
{
src = fragments[0].duplicate();
@@ -173,12 +174,12 @@ public class MessageMetaData_1_0 impleme
else
{
int size = 0;
- for(ByteBuffer buf : fragments)
+ for(QpidByteBuffer buf : fragments)
{
size += buf.remaining();
}
- src = ByteBuffer.allocateDirect(size);
- for(ByteBuffer buf : fragments)
+ src = QpidByteBuffer.allocateDirect(size);
+ for(QpidByteBuffer buf : fragments)
{
src.put(buf.duplicate());
}
@@ -274,7 +275,7 @@ public class MessageMetaData_1_0 impleme
int pos = 0;
- for(ByteBuffer buf : fragments)
+ for(QpidByteBuffer buf : fragments)
{
/*
if(pos < startBarePos)
@@ -315,7 +316,7 @@ public class MessageMetaData_1_0 impleme
{
int size = 0;
- for(ByteBuffer bin : _encodedSections)
+ for(QpidByteBuffer bin : _encodedSections)
{
size += bin.limit();
}
@@ -323,11 +324,11 @@ public class MessageMetaData_1_0 impleme
return size;
}
- private ByteBuffer encodeAsBuffer()
+ private QpidByteBuffer encodeAsBuffer()
{
- ByteBuffer buf = ByteBuffer.allocateDirect(getStorableSize());
+ QpidByteBuffer buf = QpidByteBuffer.allocateDirect(getStorableSize());
- for(ByteBuffer bin : _encodedSections)
+ for(QpidByteBuffer bin : _encodedSections)
{
buf.put(bin.duplicate());
}
@@ -337,7 +338,7 @@ public class MessageMetaData_1_0 impleme
public int writeToBuffer(ByteBuffer dest)
{
- ByteBuffer buf = _encoded;
+ QpidByteBuffer buf = _encoded;
if(buf == null)
{
@@ -353,13 +354,13 @@ public class MessageMetaData_1_0 impleme
{
buf.limit(dest.remaining());
}
- dest.put(buf);
+ buf.get(dest);
return buf.limit();
}
public int getContentSize()
{
- ByteBuffer buf = _encoded;
+ QpidByteBuffer buf = _encoded;
if(buf == null)
{
@@ -399,17 +400,18 @@ public class MessageMetaData_1_0 impleme
ValueHandler valueHandler = new ValueHandler(_typeRegistry);
ArrayList<Section> sections = new ArrayList<Section>(3);
- ArrayList<ByteBuffer> encodedSections = new ArrayList<ByteBuffer>(3);
+ ArrayList<QpidByteBuffer> encodedSections = new ArrayList<>(3);
while(buf.hasRemaining())
{
try
{
ByteBuffer encodedBuf = buf.duplicate();
- Object parse = valueHandler.parse(buf);
+ final QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(buf);
+ Object parse = valueHandler.parse(qpidByteBuffer);
sections.add((Section) parse);
encodedBuf.limit(buf.position());
- encodedSections.add(encodedBuf);
+ encodedSections.add(QpidByteBuffer.wrap(encodedBuf));
}
catch (AmqpErrorException e)
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java Fri Aug 7 00:28:17 2015
@@ -24,8 +24,10 @@ package org.apache.qpid.server.protocol.
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.AbstractServerMessageImpl;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.util.ByteBufferUtils;
@@ -33,7 +35,7 @@ import org.apache.qpid.util.ByteBufferUt
public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0>
{
- private volatile SoftReference<List<ByteBuffer>> _fragmentsRef;
+ private volatile SoftReference<Collection<QpidByteBuffer>> _fragmentsRef;
private long _arrivalTime;
private final long _size;
@@ -41,18 +43,18 @@ public class Message_1_0 extends Abstrac
public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage)
{
super(storedMessage, null);
- final List<ByteBuffer> fragments = restoreFragments(getStoredMessage());
+ final Collection<QpidByteBuffer> fragments = restoreFragments(getStoredMessage());
_fragmentsRef = new SoftReference<>(fragments);
_size = calculateSize(fragments);
}
- private long calculateSize(final List<ByteBuffer> fragments)
+ private long calculateSize(final Collection<QpidByteBuffer> fragments)
{
long size = 0l;
if(fragments != null)
{
- for(ByteBuffer buf : fragments)
+ for(QpidByteBuffer buf : fragments)
{
size += buf.remaining();
}
@@ -60,28 +62,14 @@ public class Message_1_0 extends Abstrac
return size;
}
- private static List<ByteBuffer> restoreFragments(StoredMessage<MessageMetaData_1_0> storedMessage)
+ private static Collection<QpidByteBuffer> restoreFragments(StoredMessage<MessageMetaData_1_0> storedMessage)
{
- ArrayList<ByteBuffer> fragments = new ArrayList<ByteBuffer>();
- final int FRAGMENT_SIZE = 2048;
- int offset = 0;
- ByteBuffer b;
- do
- {
+ return storedMessage.getContent(0, Integer.MAX_VALUE);
- b = ByteBufferUtils.combine(storedMessage.getContent(offset,FRAGMENT_SIZE));
- if(b.hasRemaining())
- {
- fragments.add(b);
- offset+= b.remaining();
- }
- }
- while(b.hasRemaining());
- return fragments;
}
public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage,
- final List<ByteBuffer> fragments,
+ final Collection<QpidByteBuffer> fragments,
final Object connectionReference)
{
super(storedMessage, connectionReference);
@@ -128,10 +116,10 @@ public class Message_1_0 extends Abstrac
return _arrivalTime;
}
- public List<ByteBuffer> getFragments()
+ public Collection<QpidByteBuffer> getFragments()
{
- List<ByteBuffer> fragments = _fragmentsRef.get();
+ Collection<QpidByteBuffer> fragments = _fragmentsRef.get();
if(fragments == null)
{
fragments = restoreFragments(getStoredMessage());
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java Fri Aug 7 00:28:17 2015
@@ -41,6 +41,7 @@ import org.apache.qpid.amqp_1_0.type.tra
import org.apache.qpid.amqp_1_0.type.transport.Detach;
import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.store.MessageHandle;
import org.apache.qpid.server.store.StoredMessage;
@@ -87,7 +88,7 @@ public class ReceivingLink_1_0 implement
{
// TODO - cope with fragmented messages
- List<ByteBuffer> fragments = null;
+ List<QpidByteBuffer> fragments = null;
@@ -108,7 +109,7 @@ public class ReceivingLink_1_0 implement
return;
}
- fragments = new ArrayList<ByteBuffer>(_incompleteMessage.size());
+ fragments = new ArrayList<QpidByteBuffer>(_incompleteMessage.size());
for(Transfer t : _incompleteMessage)
{
fragments.add(t.getPayload());
@@ -146,20 +147,16 @@ public class ReceivingLink_1_0 implement
else
{
MessageMetaData_1_0 mmd = null;
- List<ByteBuffer> immutableSections = new ArrayList<ByteBuffer>(3);
- mmd = new MessageMetaData_1_0(fragments.toArray(new ByteBuffer[fragments.size()]),
+ List<QpidByteBuffer> immutableSections = new ArrayList<>(3);
+ mmd = new MessageMetaData_1_0(fragments.toArray(new QpidByteBuffer[fragments.size()]),
_sectionDecoder,
immutableSections);
MessageHandle<MessageMetaData_1_0> handle = _vhost.getMessageStore().addMessage(mmd);
- boolean skipping = true;
- int offset = 0;
-
- for(ByteBuffer bareMessageBuf : immutableSections)
+ for(QpidByteBuffer bareMessageBuf : immutableSections)
{
handle.addContent(bareMessageBuf.duplicate());
- offset += bareMessageBuf.remaining();
}
final StoredMessage<MessageMetaData_1_0> storedMessage = handle.allContentAdded();
Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession().getConnection().getReference());
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java Fri Aug 7 00:28:17 2015
@@ -47,6 +47,7 @@ import org.apache.qpid.amqp_1_0.type.tra
import org.apache.qpid.amqp_1_0.type.transport.Detach;
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -78,7 +79,7 @@ public class TxnCoordinatorLink_1_0 impl
{
// TODO - cope with fragmented messages
- ByteBuffer payload = null;
+ QpidByteBuffer payload = null;
if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
@@ -100,7 +101,7 @@ public class TxnCoordinatorLink_1_0 impl
{
size += t.getPayload().limit();
}
- payload = ByteBuffer.allocateDirect(size);
+ payload = QpidByteBuffer.allocateDirect(size);
for(Transfer t : _incompleteMessage)
{
payload.put(t.getPayload().duplicate());
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java Fri Aug 7 00:28:17 2015
@@ -40,6 +40,7 @@ import java.util.UUID;
import javax.security.auth.Subject;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.*;
import org.apache.qpid.server.transport.AMQPConnection;
@@ -135,14 +136,14 @@ public class ProtocolEngine_1_0_0Test ex
final ByteBufferSender sender = mock(ByteBufferSender.class);
when(_networkConnection.getSender()).thenReturn(sender);
- final ArgumentCaptor<ByteBuffer> byteBufferCaptor = ArgumentCaptor.forClass(ByteBuffer.class);
+ final ArgumentCaptor<QpidByteBuffer> byteBufferCaptor = ArgumentCaptor.forClass(QpidByteBuffer.class);
doAnswer(new Answer()
{
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable
{
- _sentBuffers.add(byteBufferCaptor.getValue());
+ _sentBuffers.add(byteBufferCaptor.getValue().getNativeBuffer());
return null;
}
}).when(sender).send(byteBufferCaptor.capture());
@@ -163,11 +164,12 @@ public class ProtocolEngine_1_0_0Test ex
createEngine(useSASL, Transport.TCP);
- _protocolEngine_1_0_0.received(ByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier()));
+ _protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance()
+ .getHeaderIdentifier()));
Open open = new Open();
_frameWriter.setValue(AMQFrame.createAMQFrame((short)0,open));
- ByteBuffer buf = ByteBuffer.allocate(64*1024);
+ QpidByteBuffer buf = QpidByteBuffer.allocate(64*1024);
_frameWriter.writeToBuffer(buf);
buf.flip();
_protocolEngine_1_0_0.received(buf);
@@ -186,11 +188,11 @@ public class ProtocolEngine_1_0_0Test ex
createEngine(useSASL, Transport.TCP);
- _protocolEngine_1_0_0.received(ByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier()));
+ _protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier()));
Open open = new Open();
_frameWriter.setValue(AMQFrame.createAMQFrame((short)0,open));
- ByteBuffer buf = ByteBuffer.allocate(64*1024);
+ QpidByteBuffer buf = QpidByteBuffer.allocate(64*1024);
_frameWriter.writeToBuffer(buf);
buf.flip();
_protocolEngine_1_0_0.received(buf);
@@ -216,11 +218,11 @@ public class ProtocolEngine_1_0_0Test ex
final boolean useSASL = false;
createEngine(useSASL, Transport.SSL);
- _protocolEngine_1_0_0.received(ByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier()));
+ _protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier()));
Open open = new Open();
_frameWriter.setValue(AMQFrame.createAMQFrame((short)0,open));
- ByteBuffer buf = ByteBuffer.allocate(64*1024);
+ QpidByteBuffer buf = QpidByteBuffer.allocate(64*1024);
_frameWriter.writeToBuffer(buf);
buf.flip();
_protocolEngine_1_0_0.received(buf);
@@ -247,22 +249,22 @@ public class ProtocolEngine_1_0_0Test ex
createEngine(useSASL, Transport.TCP);
- _protocolEngine_1_0_0.received(ByteBuffer.wrap(ProtocolEngineCreator_1_0_0_SASL.getInstance().getHeaderIdentifier()));
+ _protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0_SASL.getInstance().getHeaderIdentifier()));
SaslInit init = new SaslInit();
init.setMechanism(Symbol.valueOf("ANONYMOUS"));
_frameWriter.setValue(new SASLFrame(init));
- ByteBuffer buf = ByteBuffer.allocate(64*1024);
+ QpidByteBuffer buf = QpidByteBuffer.allocate(64*1024);
_frameWriter.writeToBuffer(buf);
buf.flip();
_protocolEngine_1_0_0.received(buf);
- _protocolEngine_1_0_0.received(ByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier()));
+ _protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier()));
Open open = new Open();
_frameWriter.setValue(AMQFrame.createAMQFrame((short)0,open));
- buf = ByteBuffer.allocate(64*1024);
+ buf = QpidByteBuffer.allocate(64*1024);
_frameWriter.writeToBuffer(buf);
buf.flip();
_protocolEngine_1_0_0.received(buf);
Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java Fri Aug 7 00:28:17 2015
@@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
@@ -102,9 +103,9 @@ public class MessageConverter_1_0_to_v0_
}
@Override
- public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
+ public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
{
- return Collections.singleton(ByteBuffer.wrap(messageContent, offsetInMessage, size));
+ return Collections.singleton(QpidByteBuffer.wrap(messageContent, offsetInMessage, size));
}
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java Fri Aug 7 00:28:17 2015
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.qpid.AMQPInvalidClassException;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -199,7 +200,7 @@ public class MessageConverter_0_10_to_0_
}
@Override
- public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
+ public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
{
return message.getContent(offsetInMessage, size);
}
Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java Fri Aug 7 00:28:17 2015
@@ -26,6 +26,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.UUID;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
@@ -88,7 +89,7 @@ public class MessageConverter_0_8_to_0_1
}
@Override
- public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
+ public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
{
return message_0_8.getContent(offsetInMessage, size);
}
Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java Fri Aug 7 00:28:17 2015
@@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -104,9 +105,9 @@ public class MessageConverter_1_0_to_v0_
}
@Override
- public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
+ public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
{
- return Collections.singleton(ByteBuffer.wrap(messageContent, offsetInMessage, size));
+ return Collections.singleton(QpidByteBuffer.wrap(messageContent, offsetInMessage, size));
}
@Override
Modified: qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java (original)
+++ qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java Fri Aug 7 00:28:17 2015
@@ -45,6 +45,7 @@ import org.eclipse.jetty.util.ssl.SslCon
import org.eclipse.jetty.websocket.WebSocket;
import org.eclipse.jetty.websocket.WebSocketHandler;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.transport.MultiVersionProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
@@ -223,7 +224,7 @@ class WebSocketProvider implements Accep
@Override
public void onMessage(final byte[] data, final int offset, final int length)
{
- _engine.received(ByteBuffer.wrap(data, offset, length).slice());
+ _engine.received(QpidByteBuffer.wrap(data, offset, length).slice());
}
@Override
@@ -277,8 +278,29 @@ class WebSocketProvider implements Accep
}
+ private void send(final ByteBuffer msg)
+ {
+ try
+ {
+ if (msg.hasArray())
+ {
+ _connection.sendMessage(msg.array(), msg.arrayOffset() + msg.position(), msg.remaining());
+ }
+ else
+ {
+ byte[] copy = new byte[msg.remaining()];
+ msg.duplicate().get(copy);
+ _connection.sendMessage(copy, 0, copy.length);
+ }
+ }
+ catch (IOException e)
+ {
+ close();
+ }
+ }
+
@Override
- public void send(final ByteBuffer msg)
+ public void send(final QpidByteBuffer msg)
{
try
{
@@ -299,6 +321,7 @@ class WebSocketProvider implements Accep
}
}
+
@Override
public void flush()
{
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java Fri Aug 7 00:28:17 2015
@@ -30,6 +30,7 @@ import java.util.concurrent.CopyOnWriteA
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
import org.slf4j.Logger;
@@ -121,7 +122,7 @@ public class AMQProtocolHandler implemen
/** Object to lock on when changing the latch */
private Object _failoverLatchChange = new Object();
- private AMQDecoder _decoder;
+ private ClientDecoder _decoder;
private ProtocolVersion _suggestedProtocolVersion;
@@ -559,7 +560,7 @@ public class AMQProtocolHandler implemen
final ByteBuffer buf = asByteBuffer(frame);
_lastWriteTime = System.currentTimeMillis();
_writtenBytes += buf.remaining();
- _sender.send(buf);
+ _sender.send(QpidByteBuffer.wrap(buf));
if(flush)
{
_sender.flush();
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Fri Aug 7 00:28:17 2015
@@ -24,6 +24,7 @@ import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -33,6 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.QpidException;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession_0_8;
import org.apache.qpid.client.AMQTopic;
@@ -76,7 +78,7 @@ public abstract class AbstractJMSMessage
_logger.debug("Non-fragmented message body (bodySize=" + contentHeader.getBodySize() + ")");
}
- data = ((ContentBody) bodies.get(0)).getPayload().duplicate();
+ data = ((ContentBody) bodies.get(0)).getPayload().getNativeBuffer().duplicate();
}
else if (bodies != null)
{
@@ -91,7 +93,7 @@ public abstract class AbstractJMSMessage
while (it.hasNext())
{
ContentBody cb = (ContentBody) it.next();
- final ByteBuffer payload = cb.getPayload().duplicate();
+ final ByteBuffer payload = cb.getPayload().getNativeBuffer().duplicate();
if (payload.isDirect() || payload.isReadOnly())
{
data.put(payload);
@@ -131,15 +133,25 @@ public abstract class AbstractJMSMessage
protected AbstractJMSMessage create010MessageWithBody(long messageNbr, MessageProperties msgProps,
DeliveryProperties deliveryProps,
- java.nio.ByteBuffer body) throws QpidException
+ Collection<QpidByteBuffer> body) throws QpidException
{
ByteBuffer data;
final boolean debug = _logger.isDebugEnabled();
- if (body != null)
+ if (body != null && body.size() != 0)
{
- data = body;
+ int size = 0;
+ for(QpidByteBuffer b : body)
+ {
+ size += b.remaining();
+ }
+ data = ByteBuffer.allocate(size);
+ for(QpidByteBuffer b : body)
+ {
+ b.get(data);
+ }
+ data.flip();
}
else // body == null
{
@@ -180,7 +192,7 @@ public abstract class AbstractJMSMessage
}
public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, MessageProperties msgProps,
- DeliveryProperties deliveryProps, java.nio.ByteBuffer body)
+ DeliveryProperties deliveryProps, Collection<QpidByteBuffer> body)
throws JMSException, QpidException
{
final AbstractJMSMessage msg =
@@ -193,7 +205,7 @@ public abstract class AbstractJMSMessage
private class BodyInputStream extends InputStream
{
private final Iterator<ContentBody> _bodiesIter;
- private ByteBuffer _currentBuffer;
+ private QpidByteBuffer _currentBuffer;
public BodyInputStream(final List<ContentBody> bodies)
{
_bodiesIter = bodies.iterator();
Modified: qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/MockSender.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/MockSender.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/MockSender.java (original)
+++ qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/MockSender.java Fri Aug 7 00:28:17 2015
@@ -22,12 +22,19 @@ package org.apache.qpid.client.transport
import java.nio.ByteBuffer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.transport.ByteBufferSender;
public class MockSender implements ByteBufferSender
{
- public void send(ByteBuffer msg)
+ private void send(ByteBuffer msg)
+ {
+
+ }
+
+ @Override
+ public void send(final QpidByteBuffer msg)
{
}
Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java?rev=1694594&view=auto
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java (added)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java Fri Aug 7 00:28:17 2015
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.bytebuffer;
+
+import java.nio.ByteBuffer;
+
+public interface ByteBufferRef
+{
+ void incrementRef();
+
+ void decrementRef();
+
+ ByteBuffer getBuffer();
+}
Propchange: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java?rev=1694594&view=auto
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java (added)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java Fri Aug 7 00:28:17 2015
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.bytebuffer;
+
+import java.nio.ByteBuffer;
+
+class NonPooledByteBufferRef implements ByteBufferRef
+{
+ private final ByteBuffer _buffer;
+
+ NonPooledByteBufferRef(final ByteBuffer buffer)
+ {
+ _buffer = buffer;
+ }
+
+ @Override
+ public void incrementRef()
+ {
+
+ }
+
+ @Override
+ public void decrementRef()
+ {
+
+ }
+
+ @Override
+ public ByteBuffer getBuffer()
+ {
+ return _buffer;
+ }
+}
Propchange: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java?rev=1694594&view=auto
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java (added)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java Fri Aug 7 00:28:17 2015
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.bytebuffer;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+class PooledByteBufferRef implements ByteBufferRef
+{
+ private static final AtomicIntegerFieldUpdater<PooledByteBufferRef> UPDATER = AtomicIntegerFieldUpdater.newUpdater(PooledByteBufferRef.class, "_refCount");
+
+ private final ByteBuffer _buffer;
+ private volatile int _refCount;
+
+ PooledByteBufferRef(final ByteBuffer buffer)
+ {
+ _buffer = buffer;
+ }
+
+ @Override
+ public void incrementRef()
+ {
+ UPDATER.incrementAndGet(this);
+ }
+
+ @Override
+ public void decrementRef()
+ {
+ if(UPDATER.decrementAndGet(this) == 0)
+ {
+ returnToPool(this);
+ }
+ }
+
+ @Override
+ public ByteBuffer getBuffer()
+ {
+ return _buffer.duplicate();
+ }
+
+ private void returnToPool(final PooledByteBufferRef byteBufferRef)
+ {
+
+ }
+
+}
Propchange: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java?rev=1694594&view=auto
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java (added)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java Fri Aug 7 00:28:17 2015
@@ -0,0 +1,769 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.bytebuffer;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+import org.apache.qpid.codec.MarkableDataInput;
+import org.apache.qpid.framing.AMQShortString;
+
+public final class QpidByteBuffer
+{
+ private static final AtomicIntegerFieldUpdater<QpidByteBuffer> DISPOSED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(QpidByteBuffer.class, "_disposed");
+
+ private final ByteBuffer _buffer;
+ private final ByteBufferRef _ref;
+ private volatile int _disposed;
+
+ QpidByteBuffer(ByteBufferRef ref)
+ {
+ this(ref.getBuffer(), ref);
+ }
+
+ private QpidByteBuffer(ByteBuffer buf, ByteBufferRef ref)
+ {
+ _buffer = buf;
+ _ref = ref;
+ ref.incrementRef();
+ }
+
+
+ public boolean hasRemaining()
+ {
+ return _buffer.hasRemaining();
+ }
+
+ public QpidByteBuffer putInt(final int index, final int value)
+ {
+ _buffer.putInt(index, value);
+ return this;
+ }
+
+ public boolean isDirect()
+ {
+ return _buffer.isDirect();
+ }
+
+ public QpidByteBuffer putShort(final int index, final short value)
+ {
+ _buffer.putShort(index, value);
+ return this;
+ }
+
+ public QpidByteBuffer putChar(final int index, final char value)
+ {
+ _buffer.putChar(index, value);
+ return this;
+
+ }
+
+ public QpidByteBuffer put(final byte b)
+ {
+ _buffer.put(b);
+ return this;
+ }
+
+ public QpidByteBuffer put(final int index, final byte b)
+ {
+ _buffer.put(index, b);
+ return this;
+ }
+
+ public short getShort(final int index)
+ {
+ return _buffer.getShort(index);
+ }
+
+
+ public QpidByteBuffer mark()
+ {
+ _buffer.mark();
+ return this;
+ }
+
+ public long getLong()
+ {
+ return _buffer.getLong();
+ }
+
+ public QpidByteBuffer putFloat(final int index, final float value)
+ {
+ _buffer.putFloat(index, value);
+ return this;
+ }
+
+ public double getDouble(final int index)
+ {
+ return _buffer.getDouble(index);
+ }
+
+ public boolean hasArray()
+ {
+ return _buffer.hasArray();
+ }
+
+ public QpidByteBuffer asReadOnlyBuffer()
+ {
+ return new QpidByteBuffer(_buffer.asReadOnlyBuffer(), _ref);
+ }
+
+ public double getDouble()
+ {
+ return _buffer.getDouble();
+ }
+
+ public QpidByteBuffer putFloat(final float value)
+ {
+ _buffer.putFloat(value);
+ return this;
+ }
+
+ public QpidByteBuffer putInt(final int value)
+ {
+ _buffer.putInt(value);
+ return this;
+ }
+
+ public byte[] array()
+ {
+ return _buffer.array();
+ }
+
+ public QpidByteBuffer putShort(final short value)
+ {
+ _buffer.putShort(value);
+ return this;
+ }
+
+ public int getInt(final int index)
+ {
+ return _buffer.getInt(index);
+ }
+
+ public int remaining()
+ {
+ return _buffer.remaining();
+ }
+
+ public QpidByteBuffer put(final byte[] src)
+ {
+ _buffer.put(src);
+ return this;
+ }
+
+ public QpidByteBuffer put(final ByteBuffer src)
+ {
+ _buffer.put(src);
+ return this;
+ }
+
+ public QpidByteBuffer put(final QpidByteBuffer src)
+ {
+ _buffer.put(src._buffer);
+ return this;
+ }
+
+
+
+ public QpidByteBuffer get(final byte[] dst, final int offset, final int length)
+ {
+ _buffer.get(dst, offset, length);
+ return this;
+ }
+
+ public QpidByteBuffer get(final ByteBuffer src)
+ {
+ src.put(_buffer);
+ return this;
+ }
+
+
+ public QpidByteBuffer rewind()
+ {
+ _buffer.rewind();
+ return this;
+ }
+
+ public QpidByteBuffer clear()
+ {
+ _buffer.clear();
+ return this;
+ }
+
+ public QpidByteBuffer putLong(final int index, final long value)
+ {
+ _buffer.putLong(index, value);
+ return this;
+ }
+ public QpidByteBuffer compact()
+ {
+ _buffer.compact();
+ return this;
+ }
+
+ public QpidByteBuffer putDouble(final double value)
+ {
+ _buffer.putDouble(value);
+ return this;
+ }
+
+ public int limit()
+ {
+ return _buffer.limit();
+ }
+
+ public QpidByteBuffer reset()
+ {
+ _buffer.reset();
+ return this;
+ }
+
+ public QpidByteBuffer flip()
+ {
+ _buffer.flip();
+ return this;
+ }
+
+ public short getShort()
+ {
+ return _buffer.getShort();
+ }
+
+ public float getFloat()
+ {
+ return _buffer.getFloat();
+ }
+
+ public QpidByteBuffer limit(final int newLimit)
+ {
+ _buffer.limit(newLimit);
+ return this;
+ }
+
+ public QpidByteBuffer duplicate()
+ {
+ return new QpidByteBuffer(_buffer.duplicate(), _ref);
+ }
+
+ public QpidByteBuffer put(final byte[] src, final int offset, final int length)
+ {
+ _buffer.put(src, offset, length);
+ return this;
+ }
+
+ public long getLong(final int index)
+ {
+ return _buffer.getLong(index);
+ }
+
+ public int capacity()
+ {
+ return _buffer.capacity();
+ }
+
+ public boolean isReadOnly()
+ {
+ return _buffer.isReadOnly();
+ }
+
+ public char getChar(final int index)
+ {
+ return _buffer.getChar(index);
+ }
+
+ public byte get()
+ {
+ return _buffer.get();
+ }
+
+ public byte get(final int index)
+ {
+ return _buffer.get(index);
+ }
+
+ public QpidByteBuffer get(final byte[] dst)
+ {
+ _buffer.get(dst);
+ return this;
+ }
+
+ public QpidByteBuffer putChar(final char value)
+ {
+ _buffer.putChar(value);
+ return this;
+ }
+
+ public QpidByteBuffer position(final int newPosition)
+ {
+ _buffer.position(newPosition);
+ return this;
+ }
+
+ public int arrayOffset()
+ {
+ return _buffer.arrayOffset();
+ }
+
+ public char getChar()
+ {
+ return _buffer.getChar();
+ }
+
+ public int getInt()
+ {
+ return _buffer.getInt();
+ }
+
+ public QpidByteBuffer putLong(final long value)
+ {
+ _buffer.putLong(value);
+ return this;
+ }
+
+ public float getFloat(final int index)
+ {
+ return _buffer.getFloat(index);
+ }
+
+ public QpidByteBuffer slice()
+ {
+ return new QpidByteBuffer(_buffer.slice(), _ref);
+ }
+
+ public QpidByteBuffer view(int offset, int length)
+ {
+ ByteBuffer buf = _buffer.slice();
+ buf.position(offset);
+ buf = buf.slice();
+ buf.limit(length);
+ return new QpidByteBuffer(buf, _ref);
+ }
+
+ public int position()
+ {
+ return _buffer.position();
+ }
+
+ public QpidByteBuffer putDouble(final int index, final double value)
+ {
+ _buffer.putDouble(index, value);
+ return this;
+ }
+
+ @Override
+ protected void finalize() throws Throwable
+ {
+ dispose();
+ super.finalize();
+ }
+
+ public void dispose()
+ {
+ if(DISPOSED_UPDATER.compareAndSet(this,0,1))
+ {
+ _ref.decrementRef();
+ }
+ }
+
+ public InputStream asInputStream()
+ {
+ return new BufferInputStream();
+ }
+
+ public MarkableDataInput asDataInput()
+ {
+ return new BufferDataInput();
+ }
+
+
+ public DataOutput asDataOutput()
+ {
+ return new BufferDataOutput();
+ }
+
+ public static QpidByteBuffer allocate(int size)
+ {
+ return new QpidByteBuffer(new NonPooledByteBufferRef(ByteBuffer.allocate(size)));
+ }
+
+
+ public static QpidByteBuffer allocateDirect(int size)
+ {
+ return new QpidByteBuffer(new NonPooledByteBufferRef(ByteBuffer.allocate(size)));
+ }
+
+
+ public ByteBuffer getNativeBuffer()
+ {
+ return _buffer;
+ }
+
+ public CharBuffer decode(Charset charset)
+ {
+ return charset.decode(_buffer);
+ }
+
+ public static QpidByteBuffer wrap(final ByteBuffer wrap)
+ {
+ return new QpidByteBuffer(new NonPooledByteBufferRef(wrap));
+ }
+
+ public static QpidByteBuffer wrap(final byte[] data)
+ {
+ return wrap(ByteBuffer.wrap(data));
+ }
+
+ public static QpidByteBuffer wrap(final byte[] data, int offset, int length)
+ {
+ return wrap(ByteBuffer.wrap(data, offset, length));
+ }
+
+ private final class BufferInputStream extends InputStream
+ {
+
+ @Override
+ public int read() throws IOException
+ {
+ if (_buffer.hasRemaining())
+ {
+ return _buffer.get() & 0xFF;
+ }
+ return -1;
+ }
+
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException
+ {
+ if (!_buffer.hasRemaining())
+ {
+ return -1;
+ }
+ if(_buffer.remaining() < len)
+ {
+ len = _buffer.remaining();
+ }
+ _buffer.get(b, off, len);
+
+ return len;
+ }
+
+ @Override
+ public void mark(int readlimit)
+ {
+ _buffer.mark();
+ }
+
+ @Override
+ public void reset() throws IOException
+ {
+ _buffer.reset();
+ }
+
+ @Override
+ public boolean markSupported()
+ {
+ return true;
+ }
+
+ @Override
+ public long skip(long n) throws IOException
+ {
+ _buffer.position(_buffer.position()+(int)n);
+ return n;
+ }
+
+ @Override
+ public int available() throws IOException
+ {
+ return _buffer.remaining();
+ }
+
+ @Override
+ public void close()
+ {
+ }
+ }
+
+ private final class BufferDataInput implements MarkableDataInput
+ {
+ private int _mark;
+ private final int _offset;
+
+ public BufferDataInput()
+ {
+ _offset = _buffer.position();
+ }
+
+ public void readFully(byte[] b)
+ {
+ _buffer.get(b);
+ }
+
+ public void readFully(byte[] b, int off, int len)
+ {
+ _buffer.get(b, 0, len);
+ }
+
+ public QpidByteBuffer readAsByteBuffer(int len)
+ {
+ final QpidByteBuffer view = view(0, len);
+ skipBytes(len);
+ return view;
+ }
+
+ public int skipBytes(int n)
+ {
+ _buffer.position(_buffer.position()+n);
+ return _buffer.position()-_offset;
+ }
+
+ public boolean readBoolean()
+ {
+ return _buffer.get() != 0;
+ }
+
+ public byte readByte()
+ {
+ return _buffer.get();
+ }
+
+ public int readUnsignedByte()
+ {
+ return ((int) _buffer.get()) & 0xFF;
+ }
+
+ public short readShort()
+ {
+ return _buffer.getShort();
+ }
+
+ public int readUnsignedShort()
+ {
+ return ((int) _buffer.getShort()) & 0xffff;
+ }
+
+ public char readChar()
+ {
+ return (char) _buffer.getChar();
+ }
+
+ public int readInt()
+ {
+ return _buffer.getInt();
+ }
+
+ public long readLong()
+ {
+ return _buffer.getLong();
+ }
+
+ public float readFloat()
+ {
+ return _buffer.getFloat();
+ }
+
+ public double readDouble()
+ {
+ return _buffer.getDouble();
+ }
+
+ public AMQShortString readAMQShortString()
+ {
+ return AMQShortString.readAMQShortString(_buffer);
+ }
+
+ public String readLine()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public String readUTF()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public int available()
+ {
+ return _buffer.remaining();
+ }
+
+
+ public long skip(long i)
+ {
+ _buffer.position(_buffer.position()+(int)i);
+ return i;
+ }
+
+ public int read(byte[] b)
+ {
+ readFully(b);
+ return b.length;
+ }
+
+ public int position()
+ {
+ return _buffer.position()-_offset;
+ }
+
+ public void position(int position)
+ {
+ _buffer.position(position + _offset);
+ }
+
+ public int length()
+ {
+ return _buffer.limit();
+ }
+
+
+ public void mark(int readAhead)
+ {
+ _mark = position();
+ }
+
+ public void reset()
+ {
+ _buffer.position(_mark);
+ }
+ }
+
+ private final class BufferDataOutput implements DataOutput
+ {
+ public void write(int b)
+ {
+ _buffer.put((byte) b);
+ }
+
+ public void write(byte[] b)
+ {
+ _buffer.put(b);
+ }
+
+
+ public void write(byte[] b, int off, int len)
+ {
+ _buffer.put(b, off, len);
+
+ }
+
+ public void writeBoolean(boolean v)
+ {
+ _buffer.put(v ? (byte) 1 : (byte) 0);
+ }
+
+ public void writeByte(int v)
+ {
+ _buffer.put((byte) v);
+ }
+
+ public void writeShort(int v)
+ {
+ _buffer.putShort((short) v);
+ }
+
+ public void writeChar(int v)
+ {
+ _buffer.put((byte) (v >>> 8));
+ _buffer.put((byte) v);
+ }
+
+ public void writeInt(int v)
+ {
+ _buffer.putInt(v);
+ }
+
+ public void writeLong(long v)
+ {
+ _buffer.putLong(v);
+ }
+
+ public void writeFloat(float v)
+ {
+ writeInt(Float.floatToIntBits(v));
+ }
+
+ public void writeDouble(double v)
+ {
+ writeLong(Double.doubleToLongBits(v));
+ }
+
+ public void writeBytes(String s)
+ {
+ throw new UnsupportedOperationException("writeBytes(String s) not supported");
+ }
+
+ public void writeChars(String s)
+ {
+ int len = s.length();
+ for (int i = 0 ; i < len ; i++)
+ {
+ int v = s.charAt(i);
+ _buffer.put((byte) (v >>> 8));
+ _buffer.put((byte) v);
+ }
+ }
+
+ public void writeUTF(String s)
+ {
+ int strlen = s.length();
+
+ int pos = _buffer.position();
+ _buffer.position(pos + 2);
+
+
+ for (int i = 0; i < strlen; i++)
+ {
+ int c = s.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F))
+ {
+ c = s.charAt(i);
+ _buffer.put((byte) c);
+
+ }
+ else if (c > 0x07FF)
+ {
+ _buffer.put((byte) (0xE0 | ((c >> 12) & 0x0F)));
+ _buffer.put((byte) (0x80 | ((c >> 6) & 0x3F)));
+ _buffer.put((byte) (0x80 | (c & 0x3F)));
+ }
+ else
+ {
+ _buffer.put((byte) (0xC0 | ((c >> 6) & 0x1F)));
+ _buffer.put((byte) (0x80 | (c & 0x3F)));
+ }
+ }
+
+ int len = _buffer.position() - (pos + 2);
+
+ _buffer.put(pos++, (byte) (len >>> 8));
+ _buffer.put(pos, (byte) len);
+ }
+
+ }
+
+}
Propchange: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Fri Aug 7 00:28:17 2015
@@ -95,9 +95,6 @@ public abstract class AMQDecoder<T exten
return _methodProcessor;
}
-
- public abstract void decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException;
-
protected void decode(final MarkableDataInput msg) throws IOException, AMQFrameDecodingException
{
// If this is the first read then we may be getting a protocol initiation back if we tried to negotiate
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java Fri Aug 7 00:28:17 2015
@@ -42,7 +42,6 @@ public class ClientDecoder extends AMQDe
super(false, methodProcessor);
}
- @Override
public void decodeBuffer(ByteBuffer buf)
throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
{
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java Fri Aug 7 00:28:17 2015
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.codec;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.framing.AMQShortString;
import java.io.DataInput;
@@ -28,8 +29,8 @@ import java.nio.ByteBuffer;
public interface MarkableDataInput extends DataInput
{
- public void mark(int readAhead);
- public void reset() throws IOException;
+ void mark(int readAhead);
+ void reset() throws IOException;
int available() throws IOException;
@@ -37,8 +38,8 @@ public interface MarkableDataInput exten
int read(byte[] b) throws IOException;
- ByteBuffer readAsByteBuffer(int len) throws IOException;
+ QpidByteBuffer readAsByteBuffer(int len) throws IOException;
- public AMQShortString readAMQShortString() throws IOException;
+ AMQShortString readAMQShortString() throws IOException;
}
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java Fri Aug 7 00:28:17 2015
@@ -23,6 +23,7 @@ package org.apache.qpid.codec;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.framing.*;
public class ServerDecoder extends AMQDecoder<ServerMethodProcessor<? extends ServerChannelMethodProcessor>>
@@ -38,9 +39,9 @@ public class ServerDecoder extends AMQDe
super(true, methodProcessor);
}
- public void decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
+ public void decodeBuffer(QpidByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
{
- decode(new ByteBufferDataInput(buf));
+ decode(buf.asDataInput());
}
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java Fri Aug 7 00:28:17 2015
@@ -24,6 +24,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.util.BytesDataOutput;
@@ -62,7 +63,7 @@ public class AMQFrame extends AMQDataBlo
}
private static final byte[] FRAME_END_BYTE_ARRAY = new byte[] { FRAME_END_BYTE };
- private static final ByteBuffer FRAME_END_BYTE_BUFFER = ByteBuffer.allocateDirect(1);
+ private static final QpidByteBuffer FRAME_END_BYTE_BUFFER = QpidByteBuffer.allocateDirect(1);
static
{
FRAME_END_BYTE_BUFFER.put(FRAME_END_BYTE);
@@ -72,7 +73,7 @@ public class AMQFrame extends AMQDataBlo
@Override
public long writePayload(final ByteBufferSender sender) throws IOException
{
- ByteBuffer frameHeader = ByteBuffer.allocate(7);
+ QpidByteBuffer frameHeader = QpidByteBuffer.allocate(7);
frameHeader.put(_bodyFrame.getFrameType());
EncodingUtils.writeUnsignedShort(frameHeader, _channel);
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java Fri Aug 7 00:28:17 2015
@@ -32,6 +32,8 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.QpidException;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.transport.ByteBufferSender;
@@ -118,8 +120,8 @@ public abstract class AMQMethodBodyImpl
{
final int size = getSize();
- ByteBuffer buf = ByteBuffer.allocate(size);
- ByteBufferDataOutput dataOutput = new ByteBufferDataOutput(buf);
+ QpidByteBuffer buf = QpidByteBuffer.allocate(size);
+ DataOutput dataOutput = buf.asDataOutput();
writePayload(dataOutput);
buf.flip();
sender.send(buf);
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java Fri Aug 7 00:28:17 2015
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.util.ByteBufferDataOutput;
@@ -86,7 +87,8 @@ public class BasicContentHeaderPropertie
private static final int USER_ID_MASK = 1 << 4;
private static final int APPLICATION_ID_MASK = 1 << 3;
private static final int CLUSTER_ID_MASK = 1 << 2;
- private ByteBuffer _encodedForm;
+
+ private QpidByteBuffer _encodedForm;
public BasicContentHeaderProperties(BasicContentHeaderProperties other)
@@ -482,9 +484,8 @@ public class BasicContentHeaderPropertie
else
{
int propertyListSize = getPropertyListSize();
- ByteBuffer buf = ByteBuffer.allocateDirect(propertyListSize);
- ByteBufferDataOutput out = new ByteBufferDataOutput(buf);
- writePropertyListPayload(out);
+ QpidByteBuffer buf = QpidByteBuffer.allocateDirect(propertyListSize);
+ writePropertyListPayload(buf.asDataOutput());
buf.flip();
sender.send(buf);
return propertyListSize;
@@ -503,9 +504,7 @@ public class BasicContentHeaderPropertie
_encodedForm = buffer.readAsByteBuffer(size);
- ByteBufferDataInput input = new ByteBufferDataInput(_encodedForm.slice());
-
- decode(input);
+ decode(_encodedForm.slice().asDataInput());
}
@@ -529,7 +528,7 @@ public class BasicContentHeaderPropertie
{
long length = EncodingUtils.readUnsignedInteger(buffer);
- ByteBuffer buf = _encodedForm.slice();
+ QpidByteBuffer buf = _encodedForm.slice();
buf.position(headersOffset+4);
buf = buf.slice();
buf.limit((int)length);
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java Fri Aug 7 00:28:17 2015
@@ -22,9 +22,10 @@ package org.apache.qpid.framing;
import java.nio.ByteBuffer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.codec.MarkableDataInput;
-public class ByteArrayDataInput implements ExtendedDataInput, MarkableDataInput
+public class ByteArrayDataInput implements MarkableDataInput
{
private byte[] _data;
private int _offset;
@@ -167,11 +168,11 @@ public class ByteArrayDataInput implemen
}
@Override
- public ByteBuffer readAsByteBuffer(final int len)
+ public QpidByteBuffer readAsByteBuffer(final int len)
{
byte[] data = new byte[len];
readFully(data);
- return ByteBuffer.wrap(data);
+ return QpidByteBuffer.wrap(ByteBuffer.wrap(data));
}
public int position()
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java Fri Aug 7 00:28:17 2015
@@ -22,9 +22,10 @@ package org.apache.qpid.framing;
import java.nio.ByteBuffer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.codec.MarkableDataInput;
-public class ByteBufferDataInput implements ExtendedDataInput, MarkableDataInput
+public class ByteBufferDataInput implements MarkableDataInput
{
private final ByteBuffer _underlying;
private int _mark;
@@ -46,12 +47,12 @@ public class ByteBufferDataInput impleme
_underlying.get(b,0, len);
}
- public ByteBuffer readAsByteBuffer(int len)
+ public QpidByteBuffer readAsByteBuffer(int len)
{
ByteBuffer buf = _underlying.slice();
buf.limit(len);
skipBytes(len);
- return buf;
+ return QpidByteBuffer.wrap(buf);
}
public int skipBytes(int n)
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java Fri Aug 7 00:28:17 2015
@@ -23,9 +23,10 @@ package org.apache.qpid.framing;
import java.nio.ByteBuffer;
import java.util.List;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.codec.MarkableDataInput;
-public class ByteBufferListDataInput implements ExtendedDataInput, MarkableDataInput
+public class ByteBufferListDataInput implements MarkableDataInput
{
private final List<ByteBuffer> _underlying;
private int _bufferIndex;
@@ -45,7 +46,7 @@ public class ByteBufferListDataInput imp
}
else
{
- ByteBuffer buf = readAsByteBuffer(b.length);
+ ByteBuffer buf = readAsNativeByteBuffer(b.length);
buf.get(b);
}
}
@@ -59,13 +60,18 @@ public class ByteBufferListDataInput imp
}
else
{
- ByteBuffer buf = readAsByteBuffer(len);
+ ByteBuffer buf = readAsNativeByteBuffer(len);
buf.get(b, off, len);
}
}
@Override
- public ByteBuffer readAsByteBuffer(int len)
+ public QpidByteBuffer readAsByteBuffer(int len)
+ {
+ return QpidByteBuffer.wrap(readAsNativeByteBuffer(len));
+ }
+
+ private ByteBuffer readAsNativeByteBuffer(int len)
{
ByteBuffer currentBuffer = getCurrentBuffer();
if(currentBuffer.remaining()>=len)
@@ -167,7 +173,7 @@ public class ByteBufferListDataInput imp
}
else
{
- return readAsByteBuffer(size);
+ return readAsNativeByteBuffer(size);
}
}
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java Fri Aug 7 00:28:17 2015
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.framing;
-import java.nio.ByteBuffer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
public interface ChannelMethodProcessor
{
@@ -32,7 +32,7 @@ public interface ChannelMethodProcessor
void receiveChannelCloseOk();
- void receiveMessageContent(ByteBuffer data);
+ void receiveMessageContent(QpidByteBuffer data);
void receiveMessageHeader(BasicContentHeaderProperties properties, long bodySize);
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java Fri Aug 7 00:28:17 2015
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.qpid.QpidException;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.transport.ByteBufferSender;
@@ -33,7 +34,7 @@ public class ContentBody implements AMQB
{
public static final byte TYPE = 3;
- private ByteBuffer _payload;
+ private QpidByteBuffer _payload;
public ContentBody()
{
@@ -42,9 +43,15 @@ public class ContentBody implements AMQB
public ContentBody(ByteBuffer payload)
{
+ _payload = QpidByteBuffer.wrap(payload);
+ }
+
+ public ContentBody(QpidByteBuffer payload)
+ {
_payload = payload;
}
+
public byte getFrameType()
{
return TYPE;
@@ -82,7 +89,7 @@ public class ContentBody implements AMQB
}
}
- public ByteBuffer getPayload()
+ public QpidByteBuffer getPayload()
{
return _payload;
}
@@ -92,7 +99,7 @@ public class ContentBody implements AMQB
throws IOException
{
- ByteBuffer payload = in.readAsByteBuffer((int)bodySize);
+ QpidByteBuffer payload = in.readAsByteBuffer((int) bodySize);
if(!methodProcessor.ignoreAllButCloseOk())
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org