You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/02/07 11:13:42 UTC
svn commit: r504485 - in /incubator/qpid/branches/qpid.0-9/cpp: lib/broker/
tests/
Author: aconway
Date: Wed Feb 7 02:13:41 2007
New Revision: 504485
URL: http://svn.apache.org/viewvc?view=rev&rev=504485
Log:
* broker/BrokerMessage.cpp: Added ConnectionToken publisher.
* cpp/lib/broker/BrokerMessageMessage.cpp:
- Added ConnectionToken publisher.
- Implemented getDeliveryMode, getApplicationHeaders
* cpp/lib/broker/Reference.cpp: Holds MessageMessage instead of just
MessageTransferBody.
Modified:
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h
incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp?view=diff&rev=504485&r1=504484&r2=504485
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp Wed Feb 7 02:13:41 2007
@@ -188,7 +188,9 @@
if(blocked) queue->dispatch();
}
-void Channel::handleInlineTransfer(Message::shared_ptr& msg, Exchange::shared_ptr& exch){
+void Channel::handleInlineTransfer(
+ Message::shared_ptr msg, Exchange::shared_ptr& exch)
+{
if(transactional){
TxPublish* deliverable = new TxPublish(msg);
exch->route(*deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h?view=diff&rev=504485&r1=504484&r2=504485
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h Wed Feb 7 02:13:41 2007
@@ -139,7 +139,7 @@
void handleContent(boost::shared_ptr<framing::AMQContentBody>);
void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>);
- void handleInlineTransfer(Message::shared_ptr& msg, Exchange::shared_ptr& exchange);
+ void handleInlineTransfer(Message::shared_ptr msg, Exchange::shared_ptr& exchange);
// For ChannelAdapter
void handleMethodInContext(
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp?view=diff&rev=504485&r1=504484&r2=504485
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp Wed Feb 7 02:13:41 2007
@@ -41,11 +41,10 @@
const string& _exchange, const string& _routingKey,
bool _mandatory, bool _immediate, framing::AMQMethodBody::shared_ptr respondTo
) :
- Message(_exchange, _routingKey, _mandatory, _immediate, respondTo),
- publisher(_publisher),
+ Message(_publisher, _exchange, _routingKey, _mandatory,
+ _immediate, respondTo),
size(0)
-{
-}
+{}
// FIXME aconway 2007-02-01: remove.
// BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) :
@@ -56,7 +55,7 @@
// }
// For tests only.
-BasicMessage::BasicMessage() : publisher(0), size(0)
+BasicMessage::BasicMessage() : size(0)
{}
BasicMessage::~BasicMessage(){
@@ -126,10 +125,6 @@
return getHeaderProperties()->getHeaders();
}
-const ConnectionToken* const BasicMessage::getPublisher(){
- return publisher;
-}
-
bool BasicMessage::isPersistent()
{
if(!header) return false;
@@ -230,12 +225,14 @@
store->stage(this);
}
if (!content.get() || content->size() > 0) {
+ // FIXME aconway 2007-02-07: handle MessageMessage.
//set content to lazy loading mode (but only if there is stored content):
//Note: the LazyLoadedContent instance contains a raw pointer to the message, however it is
// then set as a member of that message so its lifetime is guaranteed to be no longer than
// that of the message itself
- content = std::auto_ptr<Content>(new LazyLoadedContent(store, this, expectedContentSize()));
+ content = std::auto_ptr<Content>(
+ new LazyLoadedContent(store, this, expectedContentSize()));
}
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h?view=diff&rev=504485&r1=504484&r2=504485
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h Wed Feb 7 02:13:41 2007
@@ -52,7 +52,6 @@
* request.
*/
class BasicMessage : public Message {
- const ConnectionToken* const publisher;
framing::AMQHeaderBody::shared_ptr header;
std::auto_ptr<Content> content;
sys::Mutex contentLock;
@@ -72,7 +71,6 @@
void setHeader(framing::AMQHeaderBody::shared_ptr header);
void addContent(framing::AMQContentBody::shared_ptr data);
bool isComplete();
- const ConnectionToken* const getPublisher();
void deliver(framing::ChannelAdapter&,
const string& consumerTag,
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h?view=diff&rev=504485&r1=504484&r2=504485
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h Wed Feb 7 02:13:41 2007
@@ -40,10 +40,10 @@
class FieldTable;
}
-namespace broker {
-class MessageStore;
+namespace broker {
class ConnectionToken;
+class MessageStore;
/**
* Base class for all types of internal broker messages
@@ -51,6 +51,7 @@
* TODO; AMS: for the moment this is mostly a placeholder
*/
class Message{
+ const ConnectionToken* publisher;
std::string exchange;
std::string routingKey;
const bool mandatory;
@@ -62,9 +63,12 @@
public:
typedef boost::shared_ptr<Message> shared_ptr;
- Message(const std::string& _exchange, const std::string& _routingKey,
+ Message(const ConnectionToken* publisher_,
+ const std::string& _exchange,
+ const std::string& _routingKey,
bool _mandatory, bool _immediate,
framing::AMQMethodBody::shared_ptr respondTo_) :
+ publisher(publisher_),
exchange(_exchange),
routingKey(_routingKey),
mandatory(_mandatory),
@@ -122,7 +126,9 @@
virtual framing::BasicHeaderProperties* getHeaderProperties() = 0;
virtual const framing::FieldTable& getApplicationHeaders() = 0;
virtual bool isPersistent() = 0;
- virtual const ConnectionToken* const getPublisher() = 0;
+ virtual const ConnectionToken* getPublisher() const {
+ return publisher;
+ }
virtual void encode(framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
virtual void encodeHeader(framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp?view=diff&rev=504485&r1=504484&r2=504485
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp Wed Feb 7 02:13:41 2007
@@ -25,26 +25,24 @@
#include "MessageAppendBody.h"
#include "Reference.h"
#include "framing/FieldTable.h"
+#include "framing/BasicHeaderProperties.h"
#include <iostream>
using namespace std;
-using namespace qpid::broker;
using namespace qpid::framing;
-
-MessageMessage::MessageMessage(TransferPtr transfer_)
- : Message(transfer_->getDestination(), transfer_->getRoutingKey(),
- transfer_->getMandatory(), transfer_->getImmediate(),
- transfer_),
- transfer(transfer_)
-{}
-MessageMessage::MessageMessage(TransferPtr transfer_, const Reference& ref)
- : Message(transfer_->getDestination(), transfer_->getRoutingKey(),
- transfer_->getMandatory(), transfer_->getImmediate(),
- transfer_),
- transfer(transfer_),
- appends(ref.getAppends())
+namespace qpid {
+namespace broker {
+
+MessageMessage::MessageMessage(
+ ConnectionToken* publisher, TransferPtr transfer_
+) : Message(publisher, transfer_->getDestination(),
+ transfer_->getRoutingKey(),
+ transfer_->getMandatory(),
+ transfer_->getImmediate(),
+ transfer_),
+ transfer(transfer_)
{}
void MessageMessage::deliver(
@@ -55,29 +53,29 @@
{
channel.send(
new MessageTransferBody(channel.getVersion(),
- transfer->getTicket(),
- consumerTag,
- getRedelivered(),
- transfer->getImmediate(),
- transfer->getTtl(),
- transfer->getPriority(),
- transfer->getTimestamp(),
- transfer->getDeliveryMode(),
- transfer->getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer->getMessageId(),
- transfer->getCorrelationId(),
- transfer->getReplyTo(),
- transfer->getContentType(),
- transfer->getContentEncoding(),
- transfer->getUserId(),
- transfer->getAppId(),
- transfer->getTransactionId(),
- transfer->getSecurityToken(),
- transfer->getApplicationHeaders(),
- transfer->getBody(),
- transfer->getMandatory()));
+ transfer->getTicket(),
+ consumerTag,
+ getRedelivered(),
+ transfer->getImmediate(),
+ transfer->getTtl(),
+ transfer->getPriority(),
+ transfer->getTimestamp(),
+ transfer->getDeliveryMode(),
+ transfer->getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer->getMessageId(),
+ transfer->getCorrelationId(),
+ transfer->getReplyTo(),
+ transfer->getContentType(),
+ transfer->getContentEncoding(),
+ transfer->getUserId(),
+ transfer->getAppId(),
+ transfer->getTransactionId(),
+ transfer->getSecurityToken(),
+ transfer->getApplicationHeaders(),
+ transfer->getBody(),
+ transfer->getMandatory()));
}
void MessageMessage::sendGetOk(
@@ -107,19 +105,11 @@
const FieldTable& MessageMessage::getApplicationHeaders()
{
- THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
return transfer->getApplicationHeaders();
}
bool MessageMessage::isPersistent()
{
- THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
- return false; // FIXME aconway 2007-02-05:
-}
-
-const ConnectionToken* const MessageMessage::getPublisher()
-{
- THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
- return 0; // FIXME aconway 2007-02-05:
+ return transfer->getDeliveryMode() == PERSISTENT;
}
u_int32_t MessageMessage::encodedSize()
@@ -146,3 +136,5 @@
return 0; // FIXME aconway 2007-02-05:
}
+
+}} // namespace qpid::broker
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h?view=diff&rev=504485&r1=504484&r2=504485
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h Wed Feb 7 02:13:41 2007
@@ -35,19 +35,25 @@
}
namespace broker {
+class ConnectionToken;
class Reference;
class MessageMessage: public Message{
public:
- typedef Reference::TransferPtr TransferPtr;
+ typedef boost::shared_ptr<MessageMessage> shared_ptr;
+ typedef boost::shared_ptr<framing::MessageTransferBody> TransferPtr;
typedef Reference::AppendPtr AppendPtr;
typedef Reference::Appends Appends;
- MessageMessage(TransferPtr transfer);
- MessageMessage(TransferPtr transfer, const Reference&);
+ MessageMessage(ConnectionToken* publisher, TransferPtr transfer);
// Default destructor okay
-
+
+ TransferPtr getTransfer() { return transfer; }
+
+ const Appends& getAppends() { return appends; }
+ void setAppends(const Appends& appends_) { appends = appends_; }
+
void deliver(framing::ChannelAdapter& channel,
const std::string& consumerTag,
u_int64_t deliveryTag,
@@ -64,19 +70,16 @@
framing::BasicHeaderProperties* getHeaderProperties();
const framing::FieldTable& getApplicationHeaders();
bool isPersistent();
- const ConnectionToken* const getPublisher();
u_int32_t encodedSize();
u_int32_t encodedHeaderSize();
u_int32_t encodedContentSize();
u_int64_t expectedContentSize();
- TransferPtr getTransfer() { return transfer; }
- const Appends& getAppends() { return appends; }
private:
const TransferPtr transfer;
- const Appends appends;
+ Appends appends;
};
}}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp?view=diff&rev=504485&r1=504484&r2=504485
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp Wed Feb 7 02:13:41 2007
@@ -217,14 +217,13 @@
MessageTransferBody::shared_ptr transfer(
boost::shared_polymorphic_downcast<MessageTransferBody>(
context.methodBody));
- if (body.isInline()) {
- Message::shared_ptr msg(new MessageMessage(transfer));
- channel.handleInlineTransfer(msg, exchange);
- }
- else {
- // Add to reference.
- references.get(body.getValue()).transfer(transfer);
- }
+ MessageMessage::shared_ptr message(
+ new MessageMessage(&connection, transfer));
+
+ if (body.isInline())
+ channel.handleInlineTransfer(message, exchange);
+ else
+ references.get(body.getValue()).addMessage(message);
client.ok(context);
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp?view=diff&rev=504485&r1=504484&r2=504485
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp Wed Feb 7 02:13:41 2007
@@ -43,14 +43,14 @@
}
void Reference::close() {
- for_each(transfers.begin(), transfers.end(),
+ for_each(messages.begin(), messages.end(),
boost::bind(&Reference::complete, this, _1));
registry->references.erase(getId());
}
-void Reference::complete(TransferPtr transfer) {
- MessageMessage::shared_ptr msg(new MessageMessage(transfer, *this));
- registry->handler.complete(msg);
+void Reference::complete(MessagePtr message) {
+ message->setAppends(appends);
+ registry->handler.complete(message);
}
}} // namespace qpid::broker
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h?view=diff&rev=504485&r1=504484&r2=504485
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h Wed Feb 7 02:13:41 2007
@@ -28,20 +28,21 @@
namespace qpid {
namespace framing {
-class MessageTransferBody;
class MessageAppendBody;
}
namespace broker {
+class MessageMessage;
class CompletionHandler;
class ReferenceRegistry;
/**
* A reference is an accumulation point for data in a multi-frame
- * message. A reference can be used by multiple transfer commands, so
- * the reference tracks which commands are using it. When the reference
- * is closed, all the associated transfers are completed.
+ * message. A reference can be used by multiple transfer commands to
+ * create multiple messages, so the reference tracks which commands
+ * are using it. When the reference is closed, all the associated
+ * transfers are completed.
*
* THREAD UNSAFE: per-channel resource, access to channels is
* serialized.
@@ -50,8 +51,8 @@
{
public:
typedef std::string Id;
- typedef boost::shared_ptr<framing::MessageTransferBody> TransferPtr;
- typedef std::vector<TransferPtr> Transfers;
+ typedef boost::shared_ptr<MessageMessage> MessagePtr;
+ typedef std::vector<MessagePtr> Messages;
typedef boost::shared_ptr<framing::MessageAppendBody> AppendPtr;
typedef std::vector<AppendPtr> Appends;
@@ -60,24 +61,24 @@
const std::string& getId() const { return id; }
- /** Add a transfer to be completed with this reference */
- void transfer(TransferPtr transfer) { transfers.push_back(transfer); }
+ /** Add a message to be completed with this reference */
+ void addMessage(MessagePtr message) { messages.push_back(message); }
/** Append more data to the reference */
void append(AppendPtr ptr) { appends.push_back(ptr); }
- /** Close the reference, complete each associated transfer */
+ /** Close the reference, complete each associated message */
void close();
const Appends& getAppends() const { return appends; }
- const Transfers& getTransfers() const { return transfers; }
+ const Messages& getMessages() const { return messages; }
private:
- void complete(TransferPtr transfer);
+ void complete(MessagePtr message);
Id id;
ReferenceRegistry* registry;
- Transfers transfers;
+ Messages messages;
Appends appends;
};
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp?view=diff&rev=504485&r1=504484&r2=504485
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp Wed Feb 7 02:13:41 2007
@@ -41,14 +41,20 @@
struct MockCompletionHandler : public CompletionHandler {
- std::vector<Message::shared_ptr> messages;
- void complete(Message::shared_ptr msg) { messages.push_back(msg); }
+ std::vector<MessageMessage::shared_ptr> messages;
+ void complete(Message::shared_ptr m) {
+ MessageMessage::shared_ptr mm =
+ dynamic_pointer_cast<MessageMessage>(m);
+ CPPUNIT_ASSERT(mm);
+ messages.push_back(mm);
+ }
};
MockCompletionHandler handler;
ProtocolVersion v;
ReferenceRegistry registry;
MessageTransferBody::shared_ptr t1, t2;
+ MessageMessage::shared_ptr m1, m2;
MessageAppendBody::shared_ptr a1, a2;
public:
@@ -56,6 +62,8 @@
registry(handler),
t1(new MessageTransferBody(v)),
t2(new MessageTransferBody(v)),
+ m1(new MessageMessage(0, t1)),
+ m2(new MessageMessage(0, t2)),
a1(new MessageAppendBody(v)),
a2(new MessageAppendBody(v))
{}
@@ -74,19 +82,11 @@
} catch(...) {}
}
- MessageMessage& handlerMessage(size_t i) {
- CPPUNIT_ASSERT(handler.messages.size() > i);
- MessageMessage* msg = dynamic_cast<MessageMessage*>(
- handler.messages[i].get());
- CPPUNIT_ASSERT(msg);
- return *msg;
- }
-
void testReference() {
Reference& ref = registry.open("foo");
- ref.transfer(t1);
- ref.transfer(t2);
- CPPUNIT_ASSERT_EQUAL(size_t(2), ref.getTransfers().size());
+ ref.addMessage(m1);
+ ref.addMessage(m2);
+ CPPUNIT_ASSERT_EQUAL(size_t(2), ref.getMessages().size());
ref.append(a1);
ref.append(a2);
CPPUNIT_ASSERT_EQUAL(size_t(2), ref.getAppends().size());
@@ -96,16 +96,16 @@
CPPUNIT_FAIL("Expected exception");
} catch(...) {}
- vector<Message::shared_ptr>& messages = handler.messages;
+ vector<MessageMessage::shared_ptr>& messages = handler.messages;
CPPUNIT_ASSERT_EQUAL(size_t(2), messages.size());
- CPPUNIT_ASSERT_EQUAL(handlerMessage(0).getTransfer(), t1);
- CPPUNIT_ASSERT_EQUAL(handlerMessage(0).getAppends()[0], a1);
- CPPUNIT_ASSERT_EQUAL(handlerMessage(0).getAppends()[1], a2);
-
- CPPUNIT_ASSERT_EQUAL(handlerMessage(1).getTransfer(), t2);
- CPPUNIT_ASSERT_EQUAL(handlerMessage(1).getAppends()[0], a1);
- CPPUNIT_ASSERT_EQUAL(handlerMessage(1).getAppends()[1], a2);
+ CPPUNIT_ASSERT_EQUAL(messages[0], m1);
+ CPPUNIT_ASSERT_EQUAL(messages[0]->getAppends()[0], a1);
+ CPPUNIT_ASSERT_EQUAL(messages[0]->getAppends()[1], a2);
+
+ CPPUNIT_ASSERT_EQUAL(messages[1], m2);
+ CPPUNIT_ASSERT_EQUAL(messages[1]->getAppends()[0], a1);
+ CPPUNIT_ASSERT_EQUAL(messages[1]->getAppends()[1], a2);
}