You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/12/15 16:45:48 UTC
svn commit: r1720183 [1/3] - in /qpid/java/trunk:
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
broker-core/src/main/java/org/apache/qpid/server/message/
broker-core/src/main/java/org/apache/qpid/server/message/internal/
broker-core/s...
Author: rgodfrey
Date: Tue Dec 15 15:45:46 2015
New Revision: 1720183
URL: http://svn.apache.org/viewvc?rev=1720183&view=rev
Log:
QPID-6953 : Remove use of DataOutput for AMQP 0-8/9/9-1 encoding
Removed:
qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferDataOutput.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/util/BytesDataOutput.java
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/message/Encrypted091MessageFactoryTest.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQType.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicNackBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicQosOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelCloseOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConfirmSelectBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConfirmSelectOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionCloseOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldArray.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueBindOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueUnbindOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/TxCommitBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/TxCommitOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/TxRollbackBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/TxRollbackOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/TxSelectBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/TxSelectOkBody.java
qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java
qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/FieldTableTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Tue Dec 15 15:45:46 2015
@@ -1123,14 +1123,49 @@ public abstract class AbstractBDBMessage
return data;
}
+
@Override
- public synchronized Collection<QpidByteBuffer> getContent()
+ public synchronized Collection<QpidByteBuffer> getContent(int offset, int length)
{
Collection<QpidByteBuffer> bufs = getContentAsByteBuffer();
Collection<QpidByteBuffer> content = new ArrayList<>(bufs.size());
+ int pos = 0;
for (QpidByteBuffer buf : bufs)
{
- content.add(buf.duplicate());
+ if(length > 0)
+ {
+ int bufRemaining = buf.remaining();
+ if (pos + bufRemaining <= offset)
+ {
+ pos += bufRemaining;
+ }
+ else if (pos >= offset)
+ {
+ buf = buf.duplicate();
+ if (bufRemaining <= length)
+ {
+ length -= bufRemaining;
+ }
+ else
+ {
+ buf.limit(length);
+ length = 0;
+ }
+ content.add(buf);
+ pos += buf.remaining();
+
+ }
+ else
+ {
+ int offsetInBuf = offset - pos;
+ int limit = length < bufRemaining - offsetInBuf ? length : bufRemaining - offsetInBuf;
+ final QpidByteBuffer bufView = buf.view(offsetInBuf, limit);
+ content.add(bufView);
+ length -= limit;
+ pos+=limit+offsetInBuf;
+ }
+ }
+
}
return content;
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java Tue Dec 15 15:45:46 2015
@@ -168,13 +168,13 @@ public abstract class AbstractServerMess
}
@Override
- final public Collection<QpidByteBuffer> getContent()
+ final public Collection<QpidByteBuffer> getContent(int offset, int length)
{
StoredMessage<T> storedMessage = getStoredMessage();
boolean wasInMemory = storedMessage.isInMemory();
try
{
- return storedMessage.getContent();
+ return storedMessage.getContent(offset, length);
}
finally
{
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java Tue Dec 15 15:45:46 2015
@@ -28,7 +28,7 @@ import org.apache.qpid.bytebuffer.QpidBy
public interface MessageContentSource
{
- Collection<QpidByteBuffer> getContent();
+ Collection<QpidByteBuffer> getContent(int offset, int length);
long getSize();
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java Tue Dec 15 15:45:46 2015
@@ -64,7 +64,7 @@ public class InternalMessage extends Abs
{
super(msg, null);
_contentSize = msg.getMetaData().getContentSize();
- Collection<QpidByteBuffer> bufs = msg.getContent();
+ Collection<QpidByteBuffer> bufs = msg.getContent(0, _contentSize);
try(ObjectInputStream is = new ObjectInputStream(new ByteBufferInputStream(ByteBufferUtils.combine(bufs))))
{
@@ -223,9 +223,9 @@ public class InternalMessage extends Abs
}
@Override
- public Collection<QpidByteBuffer> getContent()
+ public Collection<QpidByteBuffer> getContent(final int offset, final int length)
{
- return Collections.singleton(QpidByteBuffer.wrap(bytes));
+ return Collections.singleton(QpidByteBuffer.wrap(bytes, offset, length));
}
@Override
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Tue Dec 15 15:45:46 2015
@@ -2596,7 +2596,8 @@ public abstract class AbstractQueue<X ex
@Override
public void write(OutputStream outputStream) throws IOException
{
- Collection<QpidByteBuffer> content = _messageReference.getMessage().getContent();
+ ServerMessage message = _messageReference.getMessage();
+ Collection<QpidByteBuffer> content = message.getContent(0, (int) message.getSize());
try
{
for (QpidByteBuffer b : content)
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Tue Dec 15 15:45:46 2015
@@ -1506,13 +1506,48 @@ public abstract class AbstractJDBCMessag
}
@Override
- public synchronized Collection<QpidByteBuffer> getContent()
+ public synchronized Collection<QpidByteBuffer> getContent(int offset, int length)
{
Collection<QpidByteBuffer> bufs = getContentAsByteBuffer();
Collection<QpidByteBuffer> content = new ArrayList<>(bufs.size());
+
+ int pos = 0;
for (QpidByteBuffer buf : bufs)
{
- content.add(buf.duplicate());
+ if (length > 0)
+ {
+ int bufRemaining = buf.remaining();
+ if (pos + bufRemaining <= offset)
+ {
+ pos += bufRemaining;
+ }
+ else if (pos >= offset)
+ {
+ buf = buf.duplicate();
+ if (bufRemaining <= length)
+ {
+ length -= bufRemaining;
+ }
+ else
+ {
+ buf.limit(length);
+ length = 0;
+ }
+ content.add(buf);
+ pos += buf.remaining();
+
+ }
+ else
+ {
+ int offsetInBuf = offset - pos;
+ int limit = length < bufRemaining - offsetInBuf ? length : bufRemaining - offsetInBuf;
+ final QpidByteBuffer bufView = buf.view(offsetInBuf, limit);
+ content.add(bufView);
+ length -= limit;
+ pos+=limit+offsetInBuf;
+ }
+ }
+
}
return content;
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java Tue Dec 15 15:45:46 2015
@@ -83,14 +83,15 @@ public class StoredMemoryMessage<T exten
return this;
}
+
@Override
- public Collection<QpidByteBuffer> getContent()
+ public Collection<QpidByteBuffer> getContent(int offset, int length)
{
if(_content == null)
{
return null;
}
- return Collections.singleton(_content.duplicate());
+ return Collections.singleton(_content.view(offset, length));
}
public T getMetaData()
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java Tue Dec 15 15:45:46 2015
@@ -31,7 +31,8 @@ public interface StoredMessage<M extends
long getMessageNumber();
- Collection<QpidByteBuffer> getContent();
+ Collection<QpidByteBuffer> getContent(int offset, int length);
+
void remove();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java Tue Dec 15 15:45:46 2015
@@ -106,7 +106,6 @@ public class NetworkConnectionScheduler
{
rerun = false;
boolean closed = connection.doWork();
-
if (!closed && connection.getScheduler() == this)
{
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java Tue Dec 15 15:45:46 2015
@@ -184,7 +184,14 @@ public class NonBlockingNetworkTransport
else
{
LOGGER.error("No Engine available.");
- }
+ try
+ {
+ socketChannel.close();
+ }
+ catch (IOException e)
+ {
+ LOGGER.debug("Failed to close socket " + socketChannel, e);
+ } }
}
}
catch (IOException e)
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java Tue Dec 15 15:45:46 2015
@@ -100,7 +100,7 @@ public class TrustStoreMessageSourceTest
{
final int bodySize = (int) message.getSize();
byte[] msgContent = new byte[bodySize];
- final Collection<QpidByteBuffer> allData = message.getStoredMessage().getContent();
+ final Collection<QpidByteBuffer> allData = message.getStoredMessage().getContent(0, bodySize);
int total = 0;
for(QpidByteBuffer b : allData)
{
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java Tue Dec 15 15:45:46 2015
@@ -20,16 +20,11 @@
*/
package org.apache.qpid.server.store;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.framing.EncodingUtils;
import org.apache.qpid.server.plugin.MessageMetaDataType;
-import org.apache.qpid.server.util.ByteBufferOutputStream;
public class TestMessageMetaData implements StorableMessageMetaData
{
@@ -96,17 +91,8 @@ public class TestMessageMetaData impleme
public int writeToBuffer(QpidByteBuffer dest)
{
int oldPosition = dest.position();
- try
- {
- DataOutput dataOutputStream = dest.asDataOutput();
- EncodingUtils.writeLong(dataOutputStream, _messageId);
- EncodingUtils.writeInteger(dataOutputStream, _contentSize);
- }
- catch (IOException e)
- {
- // This shouldn't happen as we are not actually using anything that can throw an IO Exception
- throw new RuntimeException(e);
- }
+ dest.putLong(_messageId);
+ dest.putInt(_contentSize);
return dest.position() - oldPosition;
};
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java Tue Dec 15 15:45:46 2015
@@ -99,11 +99,12 @@ public class TestMessageMetaDataType imp
}
@Override
- public Collection<QpidByteBuffer> getContent()
+ public Collection<QpidByteBuffer> getContent(int offset, int length)
{
return null;
}
+
@Override
public Object getConnectionReference()
{
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java Tue Dec 15 15:45:46 2015
@@ -109,7 +109,7 @@ class MockServerMessage implements Serve
}
@Override
- public Collection<QpidByteBuffer> getContent()
+ public Collection<QpidByteBuffer> getContent(int offset, int length)
{
throw new UnsupportedOperationException();
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java Tue Dec 15 15:45:46 2015
@@ -78,9 +78,9 @@ public class MessageConverter_Internal_t
}
@Override
- public Collection<QpidByteBuffer> getContent()
+ public Collection<QpidByteBuffer> getContent(final int offset, final int length)
{
- return Collections.singleton(QpidByteBuffer.wrap(messageContent));
+ return Collections.singleton(QpidByteBuffer.wrap(messageContent, offset, length));
}
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java Tue Dec 15 15:45:46 2015
@@ -85,9 +85,9 @@ public class MessageConverter_v0_10 impl
}
@Override
- public Collection<QpidByteBuffer> getContent()
+ public Collection<QpidByteBuffer> getContent(final int offset, final int length)
{
- return serverMsg.getContent();
+ return serverMsg.getContent(offset, length);
}
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java Tue Dec 15 15:45:46 2015
@@ -63,7 +63,7 @@ public class MessageConverter_v0_10_to_I
final String mimeType = serverMessage.getMessageHeader().getMimeType();
byte[] data = new byte[(int) serverMessage.getSize()];
int total = 0;
- for(QpidByteBuffer b : serverMessage.getContent())
+ for(QpidByteBuffer b : serverMessage.getContent(0, (int) serverMessage.getSize()))
{
int len = b.remaining();
b.get(data, total, len);
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java Tue Dec 15 15:45:46 2015
@@ -83,6 +83,6 @@ public class MessageTransferMessage exte
public Collection<QpidByteBuffer> getBody()
{
- return getContent();
+ return getContent(0, (int) getSize());
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java Tue Dec 15 15:45:46 2015
@@ -392,14 +392,7 @@ public class AMQPConnection_0_8
_logger.debug("SEND: " + frame);
}
- try
- {
- frame.writePayload(_sender);
- }
- catch (IOException e)
- {
- throw new ServerScopedRuntimeException(e);
- }
+ frame.writePayload(_sender);
updateLastWriteTime();
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java Tue Dec 15 15:45:46 2015
@@ -91,9 +91,9 @@ public class MessageConverter_Internal_t
}
@Override
- public Collection<QpidByteBuffer> getContent()
+ public Collection<QpidByteBuffer> getContent(final int offset, final int length)
{
- return Collections.singleton(QpidByteBuffer.wrap(messageContent));
+ return Collections.singleton(QpidByteBuffer.wrap(messageContent, offset, length));
}
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java Tue Dec 15 15:45:46 2015
@@ -63,7 +63,7 @@ public class MessageConverter_v0_8_to_In
final String mimeType = serverMessage.getMessageHeader().getMimeType();
byte[] data = new byte[(int) serverMessage.getSize()];
int total = 0;
- for(QpidByteBuffer b : serverMessage.getContent())
+ for(QpidByteBuffer b : serverMessage.getContent(0, (int) serverMessage.getSize()))
{
int len = b.remaining();
b.get(data, total, len);
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java Tue Dec 15 15:45:46 2015
@@ -110,32 +110,24 @@ public class MessageMetaData implements
public int writeToBuffer(final QpidByteBuffer dest)
{
int oldPosition = dest.position();
- try
- {
-
- dest.putInt(_contentHeaderBody.getSize());
- _contentHeaderBody.writePayload(dest);
- EncodingUtils.writeShortStringBytes(dest, _messagePublishInfo.getExchange());
- EncodingUtils.writeShortStringBytes(dest, _messagePublishInfo.getRoutingKey());
- byte flags = 0;
- if(_messagePublishInfo.isMandatory())
- {
- flags |= MANDATORY_FLAG;
- }
- if(_messagePublishInfo.isImmediate())
- {
- flags |= IMMEDIATE_FLAG;
- }
- dest.put(flags);
- dest.putLong(_arrivalTime);
+ dest.putInt(_contentHeaderBody.getSize());
+ _contentHeaderBody.writePayload(dest);
+ EncodingUtils.writeShortStringBytes(dest, _messagePublishInfo.getExchange());
+ EncodingUtils.writeShortStringBytes(dest, _messagePublishInfo.getRoutingKey());
+ byte flags = 0;
+ if(_messagePublishInfo.isMandatory())
+ {
+ flags |= MANDATORY_FLAG;
}
- catch (IOException e)
+ if(_messagePublishInfo.isImmediate())
{
- // This shouldn't happen as we are not actually using anything that can throw an IO Exception
- throw new ConnectionScopedRuntimeException(e);
+ flags |= IMMEDIATE_FLAG;
}
+ dest.put(flags);
+ dest.putLong(_arrivalTime);
+
return dest.position()-oldPosition;
}
@@ -143,55 +135,47 @@ public class MessageMetaData implements
@Override
public Collection<QpidByteBuffer> asByteBuffers()
{
- try
- {
- final List<QpidByteBuffer> buffers = new ArrayList<>();
- QpidByteBuffer buf = QpidByteBuffer.allocateDirect(4);
- buffers.add(buf);
- buf.putInt(0, _contentHeaderBody.getSize());
- _contentHeaderBody.writePayload(new ByteBufferSender()
+ final List<QpidByteBuffer> buffers = new ArrayList<>();
+ QpidByteBuffer buf = QpidByteBuffer.allocateDirect(4);
+ buffers.add(buf);
+ buf.putInt(0, _contentHeaderBody.getSize());
+ _contentHeaderBody.writePayload(new ByteBufferSender()
+ {
+ @Override
+ public void send(final QpidByteBuffer msg)
+ {
+ buffers.add(msg.duplicate());
+ }
+
+ @Override
+ public void flush()
+ {
+
+ }
+
+ @Override
+ public void close()
{
- @Override
- public void send(final QpidByteBuffer msg)
- {
- buffers.add(msg.duplicate());
- }
-
- @Override
- public void flush()
- {
-
- }
-
- @Override
- public void close()
- {
-
- }
- });
- buf = QpidByteBuffer.allocateDirect(9+EncodingUtils.encodedShortStringLength(_messagePublishInfo.getExchange())+EncodingUtils.encodedShortStringLength(_messagePublishInfo.getRoutingKey()));
- EncodingUtils.writeShortStringBytes(buf, _messagePublishInfo.getExchange());
- EncodingUtils.writeShortStringBytes(buf, _messagePublishInfo.getRoutingKey());
- byte flags = 0;
- if(_messagePublishInfo.isMandatory())
- {
- flags |= MANDATORY_FLAG;
- }
- if(_messagePublishInfo.isImmediate())
- {
- flags |= IMMEDIATE_FLAG;
- }
- buf.put(flags);
- buf.putLong(_arrivalTime);
- buf.flip();
- buffers.add(buf);
- return buffers;
+
+ }
+ });
+ buf = QpidByteBuffer.allocateDirect(9+EncodingUtils.encodedShortStringLength(_messagePublishInfo.getExchange())+EncodingUtils.encodedShortStringLength(_messagePublishInfo.getRoutingKey()));
+ EncodingUtils.writeShortStringBytes(buf, _messagePublishInfo.getExchange());
+ EncodingUtils.writeShortStringBytes(buf, _messagePublishInfo.getRoutingKey());
+ byte flags = 0;
+ if(_messagePublishInfo.isMandatory())
+ {
+ flags |= MANDATORY_FLAG;
}
- catch (IOException e)
+ if(_messagePublishInfo.isImmediate())
{
- // This shouldn't happen as we are not actually using anything that can throw an IO Exception
- throw new ConnectionScopedRuntimeException(e);
+ flags |= IMMEDIATE_FLAG;
}
+ buf.put(flags);
+ buf.putLong(_arrivalTime);
+ buf.flip();
+ buffers.add(buf);
+ return buffers;
}
public int getContentSize()
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java Tue Dec 15 15:45:46 2015
@@ -20,11 +20,9 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,8 +36,6 @@ import org.apache.qpid.framing.AMQMethod
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.BasicGetOkBody;
-import org.apache.qpid.framing.BasicReturnBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.MessagePublishInfo;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
@@ -49,7 +45,6 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.util.ByteBufferUtils;
import org.apache.qpid.util.GZIPUtils;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
@@ -100,73 +95,64 @@ public class ProtocolOutputConverterImpl
return writeMessageDelivery(message, message.getContentHeaderBody(), channelId, deliverBody);
}
+ interface DisposableMessageContentSource extends MessageContentSource
+ {
+ void dispose();
+ }
+
private long writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
{
int bodySize = (int) message.getSize();
boolean msgCompressed = isCompressed(contentHeaderBody);
- Collection<QpidByteBuffer> modifiedContentBuffers = null;
+ DisposableMessageContentSource modifiedContent = null;
boolean compressionSupported = _connection.isCompressionSupported();
- Collection<QpidByteBuffer> contentBuffers = message.getContent();
long length;
if(msgCompressed
&& !compressionSupported
- && (contentBuffers != null)
- && (modifiedContentBuffers = inflateIfPossible(contentBuffers)) != null)
+ && (modifiedContent = inflateIfPossible(message)) != null)
{
BasicContentHeaderProperties modifiedProps =
new BasicContentHeaderProperties(contentHeaderBody.getProperties());
modifiedProps.setEncoding((String)null);
- length = writeMessageDeliveryModified(modifiedContentBuffers, channelId, deliverBody, modifiedProps);
+ length = writeMessageDeliveryModified(modifiedContent, channelId, deliverBody, modifiedProps);
}
else if(!msgCompressed
&& compressionSupported
&& contentHeaderBody.getProperties().getEncoding()==null
&& bodySize > _connection.getMessageCompressionThreshold()
- && (contentBuffers != null)
- && (modifiedContentBuffers = deflateIfPossible(contentBuffers)) != null)
+ && (modifiedContent = deflateIfPossible(message)) != null)
{
BasicContentHeaderProperties modifiedProps =
new BasicContentHeaderProperties(contentHeaderBody.getProperties());
modifiedProps.setEncoding(GZIP_ENCODING);
- length = writeMessageDeliveryModified(modifiedContentBuffers, channelId, deliverBody, modifiedProps);
+ length = writeMessageDeliveryModified(modifiedContent, channelId, deliverBody, modifiedProps);
}
else
{
- writeMessageDeliveryUnchanged(contentBuffers, channelId, deliverBody, contentHeaderBody, bodySize);
+ writeMessageDeliveryUnchanged(message, channelId, deliverBody, contentHeaderBody, bodySize);
length = bodySize;
}
- if (contentBuffers != null)
+ if (modifiedContent != null)
{
- for (QpidByteBuffer buf : contentBuffers)
- {
- buf.dispose();
- }
- }
-
- if (modifiedContentBuffers != null)
- {
- for(QpidByteBuffer buf : modifiedContentBuffers)
- {
- buf.dispose();
- }
+ modifiedContent.dispose();
}
return length;
}
- private Collection<QpidByteBuffer> deflateIfPossible(final Collection<QpidByteBuffer> buffers)
+ private DisposableMessageContentSource deflateIfPossible(MessageContentSource source)
{
try
{
- return QpidByteBuffer.deflate(buffers);
+ return new ModifiedContentSource(QpidByteBuffer.deflate(source.getContent(0, (int) source.getSize())));
}
catch (IOException e)
{
@@ -175,11 +161,12 @@ public class ProtocolOutputConverterImpl
}
}
- private Collection<QpidByteBuffer> inflateIfPossible(final Collection<QpidByteBuffer> buffers)
+
+ private DisposableMessageContentSource inflateIfPossible(MessageContentSource source)
{
try
{
- return QpidByteBuffer.inflate(buffers);
+ return new ModifiedContentSource(QpidByteBuffer.inflate(source.getContent(0, (int) source.getSize())));
}
catch (IOException e)
{
@@ -188,18 +175,19 @@ public class ProtocolOutputConverterImpl
}
}
- private int writeMessageDeliveryModified(final Collection<QpidByteBuffer> contentBuffers, final int channelId,
+
+ private int writeMessageDeliveryModified(final MessageContentSource content, final int channelId,
final AMQBody deliverBody,
final BasicContentHeaderProperties modifiedProps)
{
- final int bodySize = ByteBufferUtils.remaining(contentBuffers);
+ final int bodySize = (int) content.getSize();
ContentHeaderBody modifiedHeaderBody = new ContentHeaderBody(modifiedProps, bodySize);
- writeMessageDeliveryUnchanged(contentBuffers, channelId, deliverBody, modifiedHeaderBody, bodySize);
+ writeMessageDeliveryUnchanged(content, channelId, deliverBody, modifiedHeaderBody, bodySize);
return bodySize;
}
- private void writeMessageDeliveryUnchanged(Collection<QpidByteBuffer> contentBuffers,
+ private void writeMessageDeliveryUnchanged(MessageContentSource content,
int channelId, AMQBody deliverBody, ContentHeaderBody contentHeaderBody,
int bodySize)
{
@@ -219,7 +207,7 @@ public class ProtocolOutputConverterImpl
int writtenSize = capacity;
- AMQBody firstContentBody = new MessageContentSourceBody(contentBuffers, 0, capacity);
+ AMQBody firstContentBody = new MessageContentSourceBody(content, 0, capacity);
CompositeAMQBodyBlock
compositeBlock =
@@ -229,7 +217,7 @@ public class ProtocolOutputConverterImpl
while (writtenSize < bodySize)
{
capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
- AMQBody body = new MessageContentSourceBody(contentBuffers, writtenSize, capacity);
+ AMQBody body = new MessageContentSourceBody(content, writtenSize, capacity);
writtenSize += capacity;
writeFrame(new AMQFrame(channelId, body));
@@ -246,47 +234,12 @@ public class ProtocolOutputConverterImpl
{
public static final byte TYPE = 3;
private final int _length;
- private final Collection<QpidByteBuffer> _contentBuffers;
+ private final MessageContentSource _content;
private final int _offset;
- public MessageContentSourceBody(Collection<QpidByteBuffer> bufs, int offset, int length)
+ public MessageContentSourceBody(MessageContentSource content, int offset, int length)
{
- int pos = 0;
- int added = 0;
-
- List<QpidByteBuffer> content = new ArrayList<>(bufs.size());
- for(QpidByteBuffer buf : bufs)
- {
- if(pos < offset)
- {
- final int remaining = buf.remaining();
- if(pos + remaining > offset)
- {
- buf = buf.view(offset-pos,length);
-
- content.add(buf);
- added += buf.remaining();
- }
- pos += remaining;
-
- }
- else
- {
- buf = buf.slice();
- if(buf.remaining() > (length-added))
- {
- buf.limit(length-added);
- }
- content.add(buf);
- added += buf.remaining();
- }
- if(added >= length)
- {
- break;
- }
- }
-
- _contentBuffers = content;
+ _content = content;
_offset = offset;
_length = length;
}
@@ -301,32 +254,11 @@ public class ProtocolOutputConverterImpl
return _length;
}
- public void writePayload(DataOutput buffer) throws IOException
- {
- for(QpidByteBuffer buf : _contentBuffers)
- {
- if (buf.hasArray())
- {
- buffer.write(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
- }
- else
- {
-
- byte[] data = new byte[_length];
-
- buf.get(data);
-
- buffer.write(data);
- }
- buf.dispose();
- }
- }
-
@Override
- public long writePayload(final ByteBufferSender sender) throws IOException
+ public long writePayload(final ByteBufferSender sender)
{
- long size = 0l;
- for(QpidByteBuffer buf : _contentBuffers)
+ long size = 0L;
+ for(QpidByteBuffer buf : _content.getContent(_offset, _length))
{
size += buf.remaining();
@@ -373,8 +305,7 @@ public class ProtocolOutputConverterImpl
exchangeName = pb.getExchange();
routingKey = pb.getRoutingKey();
- final AMQBody returnBlock = new EncodedDeliveryBody(deliveryTag, routingKey, exchangeName, consumerTag, isRedelivered);
- return returnBlock;
+ return new EncodedDeliveryBody(deliveryTag, routingKey, exchangeName, consumerTag, isRedelivered);
}
private class EncodedDeliveryBody implements AMQBody
@@ -418,16 +349,7 @@ public class ProtocolOutputConverterImpl
return _underlyingBody.getSize();
}
- public void writePayload(DataOutput buffer) throws IOException
- {
- if(_underlyingBody == null)
- {
- _underlyingBody = createAMQBody();
- }
- _underlyingBody.writePayload(buffer);
- }
-
- public long writePayload(ByteBufferSender sender) throws IOException
+ public long writePayload(ByteBufferSender sender)
{
if(_underlyingBody == null)
{
@@ -461,14 +383,11 @@ public class ProtocolOutputConverterImpl
final boolean isRedelivered = Boolean.TRUE.equals(props.getProperty(InstanceProperties.Property.REDELIVERED));
- BasicGetOkBody getOkBody =
- _connection.getMethodRegistry().createBasicGetOkBody(deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey,
- queueSize);
-
- return getOkBody;
+ return _connection.getMethodRegistry().createBasicGetOkBody(deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey,
+ queueSize);
}
private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
@@ -476,14 +395,11 @@ public class ProtocolOutputConverterImpl
AMQShortString replyText)
{
- BasicReturnBody basicReturnBody =
- _connection.getMethodRegistry().createBasicReturnBody(replyCode,
- replyText,
- messagePublishInfo.getExchange(),
- messagePublishInfo.getRoutingKey());
-
- return basicReturnBody;
+ return _connection.getMethodRegistry().createBasicReturnBody(replyCode,
+ replyText,
+ messagePublishInfo.getExchange(),
+ messagePublishInfo.getRoutingKey());
}
public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
@@ -533,13 +449,8 @@ public class ProtocolOutputConverterImpl
return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
}
- public void writePayload(DataOutput buffer) throws IOException
- {
- AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
- }
-
@Override
- public long writePayload(final ByteBufferSender sender) throws IOException
+ public long writePayload(final ByteBufferSender sender)
{
long size = (new AMQFrame(_channel, _methodBody)).writePayload(sender);
@@ -586,13 +497,8 @@ public class ProtocolOutputConverterImpl
return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
}
- public void writePayload(DataOutput buffer) throws IOException
- {
- AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
- }
-
@Override
- public long writePayload(final ByteBufferSender sender) throws IOException
+ public long writePayload(final ByteBufferSender sender)
{
long size = (new AMQFrame(_channel, _methodBody)).writePayload(sender);
size += (new AMQFrame(_channel, _headerBody)).writePayload(sender);
@@ -611,4 +517,81 @@ public class ProtocolOutputConverterImpl
}
}
+ private static class ModifiedContentSource implements DisposableMessageContentSource
+ {
+ private final Collection<QpidByteBuffer> _buffers;
+ private final int _size;
+
+ public ModifiedContentSource(final Collection<QpidByteBuffer> buffers)
+ {
+ _buffers = buffers;
+ int size = 0;
+ for(QpidByteBuffer buf : buffers)
+ {
+ size += buf.remaining();
+ }
+ _size = size;
+ }
+
+ @Override
+ public void dispose()
+ {
+ for(QpidByteBuffer buffer : _buffers)
+ {
+ buffer.dispose();
+ }
+ }
+
+ @Override
+ public Collection<QpidByteBuffer> getContent(final int offset, int length)
+ {
+ Collection<QpidByteBuffer> content = new ArrayList<>(_buffers.size());
+ int pos = 0;
+ for (QpidByteBuffer buf : _buffers)
+ {
+ if (length > 0)
+ {
+ int bufRemaining = buf.remaining();
+ if (pos + bufRemaining <= offset)
+ {
+ pos += bufRemaining;
+ }
+ else if (pos >= offset)
+ {
+ buf = buf.duplicate();
+ if (bufRemaining <= length)
+ {
+ length -= bufRemaining;
+ }
+ else
+ {
+ buf.limit(length);
+ length = 0;
+ }
+ content.add(buf);
+ pos+=buf.remaining();
+
+ }
+ else
+ {
+ int offsetInBuf = offset - pos;
+ int limit = length < bufRemaining - offsetInBuf ? length : bufRemaining - offsetInBuf;
+ final QpidByteBuffer bufView = buf.view(offsetInBuf, limit);
+ content.add(bufView);
+ length -= limit;
+ pos+=limit+offsetInBuf;
+ }
+ }
+
+ }
+ return content;
+
+ }
+
+ @Override
+ public long getSize()
+ {
+ return _size;
+ }
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java Tue Dec 15 15:45:46 2015
@@ -51,6 +51,7 @@ import org.apache.qpid.amqp_1_0.type.mes
import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
import org.apache.qpid.amqp_1_0.type.messaging.Data;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.transport.codec.BBEncoder;
import org.apache.qpid.typedmessage.TypedBytesContentWriter;
@@ -63,7 +64,7 @@ public class MessageConverter_from_1_0
public static Object convertBodyToObject(final Message_1_0 serverMessage)
{
byte[] data = new byte[(int) serverMessage.getSize()];
- final Collection<QpidByteBuffer> allData = serverMessage.getStoredMessage().getContent();
+ final Collection<QpidByteBuffer> allData = serverMessage.getContent(0, (int) serverMessage.getSize());
int offset = 0;
for(QpidByteBuffer buf : allData)
{
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java Tue Dec 15 15:45:46 2015
@@ -209,7 +209,7 @@ public abstract class MessageConverter_t
final String mimeType = serverMessage.getMessageHeader().getMimeType();
byte[] data = new byte[(int) serverMessage.getSize()];
int total = 0;
- for(QpidByteBuffer b : serverMessage.getContent())
+ for(QpidByteBuffer b : serverMessage.getContent(0, (int) serverMessage.getSize()))
{
int len = b.remaining();
b.get(data, total, len);
@@ -245,9 +245,9 @@ public abstract class MessageConverter_t
}
@Override
- public Collection<QpidByteBuffer> getContent()
+ public Collection<QpidByteBuffer> getContent(int offset, int length)
{
- return Collections.singleton(allData.duplicate());
+ return Collections.singleton(allData.view(offset, length));
}
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java Tue Dec 15 15:45:46 2015
@@ -100,7 +100,7 @@ public class Message_1_0 extends Abstrac
public Collection<QpidByteBuffer> getFragments()
{
- return getContent();
+ return getContent(0, (int) getSize());
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java Tue Dec 15 15:45:46 2015
@@ -89,9 +89,9 @@ public class MessageConverter_1_0_to_v0_
}
@Override
- public Collection<QpidByteBuffer> getContent()
+ public Collection<QpidByteBuffer> getContent(final int offset, final int length)
{
- return Collections.singleton(QpidByteBuffer.wrap(messageContent));
+ return Collections.singleton(QpidByteBuffer.wrap(messageContent, offset, length));
}
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java Tue Dec 15 15:45:46 2015
@@ -193,9 +193,9 @@ public class MessageConverter_0_10_to_0_
}
@Override
- public Collection<QpidByteBuffer> getContent()
+ public Collection<QpidByteBuffer> getContent(final int offset, final int length)
{
- return message.getContent();
+ return message.getContent(offset, length);
}
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java Tue Dec 15 15:45:46 2015
@@ -82,9 +82,9 @@ public class MessageConverter_0_8_to_0_1
}
@Override
- public Collection<QpidByteBuffer> getContent()
+ public Collection<QpidByteBuffer> getContent(final int offset, final int length)
{
- return message_0_8.getContent();
+ return message_0_8.getContent(offset, length);
}
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java Tue Dec 15 15:45:46 2015
@@ -91,9 +91,9 @@ public class MessageConverter_1_0_to_v0_
}
@Override
- public Collection<QpidByteBuffer> getContent()
+ public Collection<QpidByteBuffer> getContent(final int offset, final int length)
{
- return Collections.singleton(QpidByteBuffer.wrap(messageContent));
+ return Collections.singleton(QpidByteBuffer.wrap(messageContent, offset, length));
}
@Override
Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java Tue Dec 15 15:45:46 2015
@@ -218,7 +218,8 @@ public class ReportRunner<T>
private static ReportableMessage convertMessage(QueueEntry entry)
{
final MessageInfoImpl messageInfo = new MessageInfoImpl(entry, true);
- final Collection<QpidByteBuffer> contentBuffers = entry.getMessage().getContent();
+ ServerMessage message = entry.getMessage();
+ final Collection<QpidByteBuffer> contentBuffers = message.getContent(0, (int) message.getSize());
final ByteBuffer content = ByteBufferUtils.combine(contentBuffers);
for(QpidByteBuffer buf : contentBuffers)
{
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java Tue Dec 15 15:45:46 2015
@@ -30,7 +30,6 @@ import java.util.concurrent.CopyOnWriteA
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
import org.slf4j.Logger;
@@ -47,7 +46,6 @@ import org.apache.qpid.client.state.AMQS
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
-import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.codec.ClientDecoder;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQBody;
@@ -71,7 +69,6 @@ import org.apache.qpid.transport.Excepti
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.TransportActivity;
-import org.apache.qpid.util.BytesDataOutput;
public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver, TransportActivity
{
@@ -142,8 +139,6 @@ public class AMQProtocolHandler implemen
private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024;
private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY];
- private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(_reusableBytes);
- private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes);
private int _queueId = 1;
private final Object _queueIdLock = new Object();
@@ -557,10 +552,9 @@ public class AMQProtocolHandler implemen
public synchronized void writeFrame(AMQDataBlock frame, boolean flush)
{
- final ByteBuffer buf = asByteBuffer(frame);
_lastWriteTime = System.currentTimeMillis();
- _writtenBytes += buf.remaining();
- _sender.send(QpidByteBuffer.wrap(buf));
+ _writtenBytes += frame.getSize();
+ frame.writePayload(_sender);
if(flush)
{
_sender.flush();
@@ -581,49 +575,6 @@ public class AMQProtocolHandler implemen
}
- private ByteBuffer asByteBuffer(AMQDataBlock block)
- {
- final int size = (int) block.getSize();
-
- final byte[] data;
-
-
- if(size > REUSABLE_BYTE_BUFFER_CAPACITY)
- {
- data= new byte[size];
- }
- else
- {
-
- data = _reusableBytes;
- }
- _reusableDataOutput.setBuffer(data);
-
- try
- {
- block.writePayload(_reusableDataOutput);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
-
- final ByteBuffer buf;
-
- if(size < REUSABLE_BYTE_BUFFER_CAPACITY)
- {
- buf = _reusableByteBuffer;
- buf.position(0);
- }
- else
- {
- buf = ByteBuffer.wrap(data);
- }
- buf.limit(_reusableDataOutput.length());
-
- return buf;
- }
-
/**
* Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Tue Dec 15 15:45:46 2015
@@ -58,7 +58,6 @@ import org.apache.qpid.transport.Message
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.Option;
import org.apache.qpid.transport.codec.BBEncoder;
-import org.apache.qpid.util.BytesDataOutput;
import org.apache.qpid.util.GZIPUtils;
import org.apache.qpid.util.Strings;
@@ -251,9 +250,6 @@ public class BasicMessageProducer_0_10 e
final int headerLength = buf.remaining();
byte[] unencryptedBytes = new byte[headerLength + (data == null ? 0 : data.remaining())];
- BytesDataOutput output = new BytesDataOutput(unencryptedBytes);
-
- output.write(buf.array(), buf.arrayOffset()+buf.position(), buf.remaining());
if (data != null)
{
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java Tue Dec 15 15:45:46 2015
@@ -38,6 +38,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.QpidException;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AMQMessageDelegate_0_8;
import org.apache.qpid.client.message.AbstractJMSMessage;
@@ -59,7 +60,6 @@ import org.apache.qpid.framing.ContentHe
import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.util.BytesDataOutput;
import org.apache.qpid.util.GZIPUtils;
public class BasicMessageProducer_0_8 extends BasicMessageProducer
@@ -222,8 +222,8 @@ public class BasicMessageProducer_0_8 ex
final int headerLength = contentHeaderProperties.getPropertyListSize() + 2;
byte[] unencryptedBytes = new byte[headerLength + size];
- BytesDataOutput output = new BytesDataOutput(unencryptedBytes);
- output.writeShort((short) (contentHeaderProperties.getPropertyFlags() & 0xffff));
+ QpidByteBuffer output = QpidByteBuffer.wrap(unencryptedBytes);
+ output.putShort((short) (contentHeaderProperties.getPropertyFlags() & 0xffff));
contentHeaderProperties.writePropertyListPayload(output);
if (size != 0)
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java Tue Dec 15 15:45:46 2015
@@ -29,6 +29,9 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
* the content body/ies.
@@ -40,6 +43,7 @@ public class UnprocessedMessage_0_8 exte
{
private long _bytesReceived = 0;
+ private static final Logger LOGGER = LoggerFactory.getLogger(UnprocessedMessage_0_8.class);
private AMQShortString _exchange;
private AMQShortString _routingKey;
@@ -124,6 +128,7 @@ public class UnprocessedMessage_0_8 exte
public boolean isAllBodyDataReceived()
{
+ LOGGER.debug("Received {} of {} bytes for message body", _bytesReceived, getContentHeader().getBodySize());
return _bytesReceived == getContentHeader().getBodySize();
}
Modified: qpid/java/trunk/client/src/test/java/org/apache/qpid/client/message/Encrypted091MessageFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/test/java/org/apache/qpid/client/message/Encrypted091MessageFactoryTest.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/client/src/test/java/org/apache/qpid/client/message/Encrypted091MessageFactoryTest.java (original)
+++ qpid/java/trunk/client/src/test/java/org/apache/qpid/client/message/Encrypted091MessageFactoryTest.java Tue Dec 15 15:45:46 2015
@@ -38,13 +38,13 @@ import javax.crypto.spec.IvParameterSpec
import javax.crypto.spec.SecretKeySpec;
import javax.security.auth.x500.X500Principal;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.test.utils.TestSSLConstants;
import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.util.BytesDataOutput;
public class Encrypted091MessageFactoryTest extends QpidTestCase
{
@@ -78,8 +78,8 @@ public class Encrypted091MessageFactoryT
final int headerLength = _props.getPropertyListSize() + 2;
_unencrypted = new byte[headerLength + _data.length];
- BytesDataOutput output = new BytesDataOutput(_unencrypted);
- output.writeShort((short) (_props.getPropertyFlags() & 0xffff));
+ QpidByteBuffer output = QpidByteBuffer.wrap(_unencrypted);
+ output.putShort((short) (_props.getPropertyFlags() & 0xffff));
_props.writePropertyListPayload(output);
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java Tue Dec 15 15:45:46 2015
@@ -21,7 +21,6 @@
package org.apache.qpid.bytebuffer;
import java.io.BufferedOutputStream;
-import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
@@ -440,11 +439,6 @@ public final class QpidByteBuffer
}
- public DataOutput asDataOutput()
- {
- return new BufferDataOutput();
- }
-
public static QpidByteBuffer allocate(int size)
{
return new QpidByteBuffer(new NonPooledByteBufferRef(ByteBuffer.allocate(size)));
@@ -910,118 +904,4 @@ public final class QpidByteBuffer
}
}
- private final class BufferDataOutput implements DataOutput
- {
- public void write(int b)
- {
- _buffer.put((byte) b);
- }
-
- public void write(byte[] b)
- {
- _buffer.put(b);
- }
-
-
- public void write(byte[] b, int off, int len)
- {
- _buffer.put(b, off, len);
-
- }
-
- public void writeBoolean(boolean v)
- {
- _buffer.put(v ? (byte) 1 : (byte) 0);
- }
-
- public void writeByte(int v)
- {
- _buffer.put((byte) v);
- }
-
- public void writeShort(int v)
- {
- _buffer.putShort((short) v);
- }
-
- public void writeChar(int v)
- {
- _buffer.put((byte) (v >>> 8));
- _buffer.put((byte) v);
- }
-
- public void writeInt(int v)
- {
- _buffer.putInt(v);
- }
-
- public void writeLong(long v)
- {
- _buffer.putLong(v);
- }
-
- public void writeFloat(float v)
- {
- writeInt(Float.floatToIntBits(v));
- }
-
- public void writeDouble(double v)
- {
- writeLong(Double.doubleToLongBits(v));
- }
-
- public void writeBytes(String s)
- {
- throw new UnsupportedOperationException("writeBytes(String s) not supported");
- }
-
- public void writeChars(String s)
- {
- int len = s.length();
- for (int i = 0 ; i < len ; i++)
- {
- int v = s.charAt(i);
- _buffer.put((byte) (v >>> 8));
- _buffer.put((byte) v);
- }
- }
-
- public void writeUTF(String s)
- {
- int strlen = s.length();
-
- int pos = _buffer.position();
- _buffer.position(pos + 2);
-
-
- for (int i = 0; i < strlen; i++)
- {
- int c = s.charAt(i);
- if ((c >= 0x0001) && (c <= 0x007F))
- {
- c = s.charAt(i);
- _buffer.put((byte) c);
-
- }
- else if (c > 0x07FF)
- {
- _buffer.put((byte) (0xE0 | ((c >> 12) & 0x0F)));
- _buffer.put((byte) (0x80 | ((c >> 6) & 0x3F)));
- _buffer.put((byte) (0x80 | (c & 0x3F)));
- }
- else
- {
- _buffer.put((byte) (0xC0 | ((c >> 6) & 0x1F)));
- _buffer.put((byte) (0x80 | (c & 0x3F)));
- }
- }
-
- int len = _buffer.position() - (pos + 2);
-
- _buffer.put(pos++, (byte) (len >>> 8));
- _buffer.put(pos, (byte) len);
- }
-
- }
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org