You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2007/03/05 19:01:23 UTC
svn commit: r514751 - in /incubator/qpid/branches/qpid.0-9: ./
cpp/lib/broker/BrokerMessageMessage.cpp
cpp/lib/broker/BrokerMessageMessage.h cpp/lib/broker/MessageHandlerImpl.cpp
Author: astitcher
Date: Mon Mar 5 10:01:22 2007
New Revision: 514751
URL: http://svn.apache.org/viewvc?view=rev&rev=514751
Log:
r1239@fuschia: andrew | 2007-02-26 10:58:52 +0000
Refactored message transfer to extract commmonality from deliver/get
r1242@fuschia: andrew | 2007-03-05 17:54:44 +0000
Turn oversize inline transfers into reference transfers
Modified:
incubator/qpid/branches/qpid.0-9/ (props changed)
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
Propchange: incubator/qpid/branches/qpid.0-9/
------------------------------------------------------------------------------
--- svk:merge (original)
+++ svk:merge Mon Mar 5 10:01:22 2007
@@ -1 +1 @@
-8427bd24-ae5a-4eba-a324-d2fc9c9c6c77:/local/qpid.0-9.ams:1237
+8427bd24-ae5a-4eba-a324-d2fc9c9c6c77:/local/qpid.0-9.ams:1242
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp?view=diff&rev=514751&r1=514750&r2=514751
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp Mon Mar 5 10:01:22 2007
@@ -61,14 +61,13 @@
reference(reference_)
{}
-void MessageMessage::deliver(
+void MessageMessage::transferMessage(
framing::ChannelAdapter& channel,
const std::string& consumerTag,
- u_int64_t /*deliveryTag*/,
- u_int32_t /*framesize*/)
-{
+ u_int32_t framesize)
+{
const framing::Content& body = transfer->getBody();
-
+
// Send any reference data
if (!body.isInline()){
// Open
@@ -81,8 +80,9 @@
}
}
- // The the transfer
- channel.send(
+ // The transfer
+ if ( transfer->size()<=framesize ) {
+ channel.send(
new MessageTransferBody(channel.getVersion(),
transfer->getTicket(),
consumerTag,
@@ -107,6 +107,44 @@
transfer->getApplicationHeaders(),
body,
transfer->getMandatory()));
+ } else {
+ // Thing to do here is to construct a simple reference message then deliver that instead
+ // fragmentmentation will be taken care of in the delivery
+ // if necessary; problem is to invent a reference name to use
+ string content = body.getValue();
+ string refname = "dummy";
+ TransferPtr newTransfer(
+ new MessageTransferBody(channel.getVersion(),
+ transfer->getTicket(),
+ consumerTag,
+ getRedelivered(),
+ transfer->getImmediate(),
+ transfer->getTtl(),
+ transfer->getPriority(),
+ transfer->getTimestamp(),
+ transfer->getDeliveryMode(),
+ transfer->getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer->getMessageId(),
+ transfer->getCorrelationId(),
+ transfer->getReplyTo(),
+ transfer->getContentType(),
+ transfer->getContentEncoding(),
+ transfer->getUserId(),
+ transfer->getAppId(),
+ transfer->getTransactionId(),
+ transfer->getSecurityToken(),
+ transfer->getApplicationHeaders(),
+ framing::Content(REFERENCE, refname),
+ transfer->getMandatory()));
+ ReferencePtr newRef(new Reference(refname));
+ Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content));
+ newRef->append(newAppend);
+ MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), 0, newTransfer, newRef);
+ newMsg.transferMessage(channel, consumerTag, framesize);
+ return;
+ }
// Close any reference data
if (!body.isInline()){
// Close
@@ -114,39 +152,24 @@
}
}
+void MessageMessage::deliver(
+ framing::ChannelAdapter& channel,
+ const std::string& consumerTag,
+ u_int64_t /*deliveryTag*/,
+ u_int32_t framesize)
+{
+ transferMessage(channel, consumerTag, framesize);
+}
+
void MessageMessage::sendGetOk(
const framing::MethodContext& context,
const std::string& destination,
u_int32_t /*messageCount*/,
u_int64_t /*deliveryTag*/,
- u_int32_t /*framesize*/)
+ u_int32_t framesize)
{
framing::ChannelAdapter* channel = context.channel;
- channel->send(
- new MessageTransferBody(channel->getVersion(),
- transfer->getTicket(),
- destination,
- getRedelivered(),
- transfer->getImmediate(),
- transfer->getTtl(),
- transfer->getPriority(),
- transfer->getTimestamp(),
- transfer->getDeliveryMode(),
- transfer->getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer->getMessageId(),
- transfer->getCorrelationId(),
- transfer->getReplyTo(),
- transfer->getContentType(),
- transfer->getContentEncoding(),
- transfer->getUserId(),
- transfer->getAppId(),
- transfer->getTransactionId(),
- transfer->getSecurityToken(),
- transfer->getApplicationHeaders(),
- transfer->getBody(),
- transfer->getMandatory()));
+ transferMessage(*channel, destination, framesize);
}
bool MessageMessage::isComplete()
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h?view=diff&rev=514751&r1=514750&r2=514751
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h Mon Mar 5 10:01:22 2007
@@ -76,6 +76,10 @@
u_int64_t expectedContentSize();
private:
+ void transferMessage(framing::ChannelAdapter& channel,
+ const std::string& consumerTag,
+ u_int32_t framesize);
+
framing::RequestId requestId;
const TransferPtr transfer;
const ReferencePtr reference;
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp?view=diff&rev=514751&r1=514750&r2=514751
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp Mon Mar 5 10:01:22 2007
@@ -187,12 +187,12 @@
}
void
-MessageHandlerImpl::reject(const MethodContext&,
+MessageHandlerImpl::reject(const MethodContext& /*context*/,
u_int16_t /*code*/,
const string& /*text*/ )
{
- // FIXME astitcher 2007-01-11: 0-9 feature
- THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented ");
+ channel.ack();
+ // channel.requeue();
}
void