You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/03/30 17:50:10 UTC
svn commit: r524139 [1/2] - in /incubator/qpid/trunk/qpid/cpp: lib/broker/
tests/
Author: gsim
Date: Fri Mar 30 08:50:07 2007
New Revision: 524139
URL: http://svn.apache.org/viewvc?view=rev&rev=524139
Log:
Refactored the MessageStore interface to restrict visibility of broker core from store implementations.
Added:
incubator/qpid/trunk/qpid/cpp/lib/broker/Persistable.h (with props)
incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableExchange.h (with props)
incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableMessage.h (with props)
incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableQueue.h (with props)
incubator/qpid/trunk/qpid/cpp/lib/broker/RecoverableMessage.h (with props)
incubator/qpid/trunk/qpid/cpp/lib/broker/RecoverableQueue.h (with props)
incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManagerImpl.cpp (with props)
incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManagerImpl.h (with props)
Removed:
incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.cpp
Modified:
incubator/qpid/trunk/qpid/cpp/lib/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.h
incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageBase.h
incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageMessage.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageMessage.h
incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h
incubator/qpid/trunk/qpid/cpp/lib/broker/DeliveryRecord.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/DeliveryRecord.h
incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.h
incubator/qpid/trunk/qpid/cpp/lib/broker/Makefile.am
incubator/qpid/trunk/qpid/cpp/lib/broker/MessageBuilder.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h
incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h
incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h
incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.h
incubator/qpid/trunk/qpid/cpp/lib/broker/TransactionalStore.h
incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.h
incubator/qpid/trunk/qpid/cpp/lib/broker/TxBuffer.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.h
incubator/qpid/trunk/qpid/cpp/tests/BrokerChannelTest.cpp
incubator/qpid/trunk/qpid/cpp/tests/LazyLoadedContentTest.cpp
incubator/qpid/trunk/qpid/cpp/tests/MessageBuilderTest.cpp
incubator/qpid/trunk/qpid/cpp/tests/TxAckTest.cpp
incubator/qpid/trunk/qpid/cpp/tests/TxBufferTest.cpp
incubator/qpid/trunk/qpid/cpp/tests/TxPublishTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/Broker.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/Broker.cpp Fri Mar 30 08:50:07 2007
@@ -29,6 +29,7 @@
#include "MessageStoreModule.h"
#include "NullMessageStore.h"
#include "ProtocolInitiation.h"
+#include "RecoveryManagerImpl.h"
#include "Connection.h"
#include "sys/ConnectionInputHandler.h"
#include "sys/ConnectionInputHandlerFactory.h"
@@ -61,9 +62,8 @@
exchanges.declare(amq_match, HeadersExchange::typeName);
if(store.get()) {
- RecoveryManager recoverer(queues, exchanges);
- MessageStoreSettings storeSettings = { getStagingThreshold() };
- store->recover(recoverer, &storeSettings);
+ RecoveryManagerImpl recoverer(queues, exchanges, conf.getStagingThreshold());
+ store->recover(recoverer);
}
cleaner.start();
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.cpp Fri Mar 30 08:50:07 2007
@@ -264,7 +264,7 @@
throw ConnectionException(530, "Received ack for unrecognised delivery tag");
}else if(i!=j){
ack_iterator end = ++i;
- for_each(j, end, mem_fun_ref(&DeliveryRecord::discard));
+ for_each(j, end, bind2nd(mem_fun_ref(&DeliveryRecord::discard), 0));
unacked.erase(unacked.begin(), end);
//recalculate the prefetch:
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp Fri Mar 30 08:50:07 2007
@@ -33,6 +33,7 @@
#include "AMQMethodBody.h"
#include "AMQFrame.h"
#include "framing/ChannelAdapter.h"
+#include "RecoveryManagerImpl.h"
using namespace boost;
using namespace qpid::broker;
@@ -134,6 +135,9 @@
void BasicMessage::decodeHeader(Buffer& buffer)
{
+ //don't care about the type here, but want encode/decode to be symmetric
+ RecoveryManagerImpl::decodeMessageType(buffer);
+
string exchange;
string routingKey;
@@ -169,40 +173,42 @@
}
}
-void BasicMessage::encode(Buffer& buffer)
+void BasicMessage::encode(Buffer& buffer) const
{
encodeHeader(buffer);
encodeContent(buffer);
}
-void BasicMessage::encodeHeader(Buffer& buffer)
+void BasicMessage::encodeHeader(Buffer& buffer) const
{
+ RecoveryManagerImpl::encodeMessageType(*this, buffer);
buffer.putShortString(getExchange());
buffer.putShortString(getRoutingKey());
buffer.putLong(header->size());
header->encode(buffer);
}
-void BasicMessage::encodeContent(Buffer& buffer)
+void BasicMessage::encodeContent(Buffer& buffer) const
{
Mutex::ScopedLock locker(contentLock);
if (content.get()) content->encode(buffer);
}
-uint32_t BasicMessage::encodedSize()
+uint32_t BasicMessage::encodedSize() const
{
return encodedHeaderSize() + encodedContentSize();
}
-uint32_t BasicMessage::encodedContentSize()
+uint32_t BasicMessage::encodedContentSize() const
{
Mutex::ScopedLock locker(contentLock);
return content.get() ? content->size() : 0;
}
-uint32_t BasicMessage::encodedHeaderSize()
+uint32_t BasicMessage::encodedHeaderSize() const
{
- return getExchange().size() + 1
+ return RecoveryManagerImpl::encodedMessageTypeSize()
+ +getExchange().size() + 1
+ getRoutingKey().size() + 1
+ header->size() + 4;//4 extra bytes for size
}
@@ -216,7 +222,7 @@
{
Mutex::ScopedLock locker(contentLock);
if (!isPersistent() && getPersistenceId() == 0) {
- store->stage(this);
+ store->stage(*this);
}
if (!content.get() || content->size() > 0) {
//set content to lazy loading mode (but only if there is
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.h?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.h Fri Mar 30 08:50:07 2007
@@ -53,7 +53,7 @@
class BasicMessage : public Message {
boost::shared_ptr<framing::AMQHeaderBody> header;
std::auto_ptr<Content> content;
- sys::Mutex contentLock;
+ mutable sys::Mutex contentLock;
uint64_t size;
void sendContent(framing::ChannelAdapter&, uint32_t framesize);
@@ -92,25 +92,25 @@
void decodeHeader(framing::Buffer& buffer);
void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0);
- void encode(framing::Buffer& buffer);
- void encodeHeader(framing::Buffer& buffer);
- void encodeContent(framing::Buffer& buffer);
+ void encode(framing::Buffer& buffer) const;
+ void encodeHeader(framing::Buffer& buffer) const;
+ void encodeContent(framing::Buffer& buffer) const;
/**
* @returns the size of the buffer needed to encode this
* message in its entirety
*/
- uint32_t encodedSize();
+ uint32_t encodedSize() const;
/**
* @returns the size of the buffer needed to encode the
* 'header' of this message (not just the header frame,
* but other meta data e.g.routing key and exchange)
*/
- uint32_t encodedHeaderSize();
+ uint32_t encodedHeaderSize() const;
/**
* @returns the size of the buffer needed to encode the
* (possibly partial) content held by this message
*/
- uint32_t encodedContentSize();
+ uint32_t encodedContentSize() const;
/**
* Releases the in-memory content data held by this
* message. Must pass in a store from which the data can
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageBase.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageBase.h?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageBase.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageBase.h Fri Mar 30 08:50:07 2007
@@ -25,6 +25,7 @@
#include <string>
#include <boost/shared_ptr.hpp>
#include "Content.h"
+#include "PersistableMessage.h"
#include "framing/amqp_types.h"
namespace qpid {
@@ -49,7 +50,7 @@
* abstracting away the operations
* TODO; AMS: for the moment this is mostly a placeholder
*/
-class Message {
+class Message : public PersistableMessage{
public:
typedef boost::shared_ptr<Message> shared_ptr;
typedef boost::shared_ptr<framing::AMQMethodBody> AMQMethodBodyPtr;
@@ -117,25 +118,25 @@
return publisher;
}
- virtual void encode(framing::Buffer& buffer) = 0;
- virtual void encodeHeader(framing::Buffer& buffer) = 0;
+ virtual void encode(framing::Buffer& buffer) const = 0;
+ virtual void encodeHeader(framing::Buffer& buffer) const = 0;
/**
* @returns the size of the buffer needed to encode this
* message in its entirety
*/
- virtual uint32_t encodedSize() = 0;
+ virtual uint32_t encodedSize() const = 0;
/**
* @returns the size of the buffer needed to encode the
* 'header' of this message (not just the header frame,
* but other meta data e.g.routing key and exchange)
*/
- virtual uint32_t encodedHeaderSize() = 0;
+ virtual uint32_t encodedHeaderSize() const = 0;
/**
* @returns the size of the buffer needed to encode the
* (possibly partial) content held by this message
*/
- virtual uint32_t encodedContentSize() = 0;
+ virtual uint32_t encodedContentSize() const = 0;
/**
* If headers have been received, returns the expected
* content size else returns 0.
@@ -145,6 +146,7 @@
virtual void decodeHeader(framing::Buffer& buffer) = 0;
virtual void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0) = 0;
+ static shared_ptr decode(framing::Buffer& buffer);
// TODO: AMS 29/1/2007 Don't think these are really part of base class
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageMessage.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageMessage.cpp Fri Mar 30 08:50:07 2007
@@ -29,6 +29,7 @@
#include "framing/AMQFrame.h"
#include "framing/FieldTable.h"
#include "framing/BasicHeaderProperties.h"
+#include "RecoveryManagerImpl.h"
#include <algorithm>
@@ -217,17 +218,17 @@
return transfer->getDeliveryMode() == PERSISTENT;
}
-uint32_t MessageMessage::encodedSize()
+uint32_t MessageMessage::encodedSize() const
{
return encodedHeaderSize() + encodedContentSize();
}
-uint32_t MessageMessage::encodedHeaderSize()
+uint32_t MessageMessage::encodedHeaderSize() const
{
- return transfer->size() - transfer->baseSize();
+ return RecoveryManagerImpl::encodedMessageTypeSize() + transfer->size() - transfer->baseSize();
}
-uint32_t MessageMessage::encodedContentSize()
+uint32_t MessageMessage::encodedContentSize() const
{
return 0;
}
@@ -237,13 +238,14 @@
return 0;
}
-void MessageMessage::encode(Buffer& buffer)
+void MessageMessage::encode(Buffer& buffer) const
{
encodeHeader(buffer);
}
-void MessageMessage::encodeHeader(Buffer& buffer)
+void MessageMessage::encodeHeader(Buffer& buffer) const
{
+ RecoveryManagerImpl::encodeMessageType(*this, buffer);
if (transfer->getBody().isInline()) {
transfer->encodeContent(buffer);
} else {
@@ -259,6 +261,9 @@
void MessageMessage::decodeHeader(Buffer& buffer)
{
+ //don't care about the type here, but want encode/decode to be symmetric
+ RecoveryManagerImpl::decodeMessageType(buffer);
+
transfer->decodeContent(buffer);
}
@@ -269,7 +274,7 @@
MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version,
const string& destination,
- const framing::Content& body)
+ const framing::Content& body) const
{
return new MessageTransferBody(version,
transfer->getTicket(),
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageMessage.h?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageMessage.h Fri Mar 30 08:50:07 2007
@@ -71,11 +71,11 @@
const framing::FieldTable& getApplicationHeaders();
bool isPersistent();
- void encode(framing::Buffer& buffer);
- void encodeHeader(framing::Buffer& buffer);
- uint32_t encodedSize();
- uint32_t encodedHeaderSize();
- uint32_t encodedContentSize();
+ void encode(framing::Buffer& buffer) const;
+ void encodeHeader(framing::Buffer& buffer) const;
+ uint32_t encodedSize() const;
+ uint32_t encodedHeaderSize() const;
+ uint32_t encodedContentSize() const;
uint64_t expectedContentSize();
void decodeHeader(framing::Buffer& buffer);
void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0);
@@ -86,7 +86,7 @@
uint32_t framesize);
framing::MessageTransferBody* copyTransfer(const framing::ProtocolVersion& version,
const std::string& destination,
- const framing::Content& body);
+ const framing::Content& body) const;
framing::RequestId requestId;
const TransferPtr transfer;
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp Fri Mar 30 08:50:07 2007
@@ -26,6 +26,7 @@
#include <sys/Monitor.h>
#include <sys/Time.h>
#include <iostream>
+#include "QueueRegistry.h"
using namespace qpid::broker;
using namespace qpid::sys;
@@ -53,7 +54,7 @@
Queue::~Queue(){}
void Queue::deliver(Message::shared_ptr& msg){
- enqueue(0, msg, 0);
+ enqueue(0, msg);
process(msg);
}
@@ -195,17 +196,17 @@
return lastUsed && (now()*TIME_MSEC - lastUsed > autodelete);
}
-void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid)
+void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg)
{
if (msg->isPersistent() && store) {
- store->enqueue(ctxt, msg.get(), *this, xid);
+ store->enqueue(ctxt, *msg.get(), *this);
}
}
-void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid)
+void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg)
{
if (msg->isPersistent() && store) {
- store->dequeue(ctxt, msg.get(), *this, xid);
+ store->dequeue(ctxt, *msg.get(), *this);
}
}
@@ -217,8 +218,10 @@
void Queue::create(const FieldTable& settings)
{
+ //TODO: hold onto settings and persist them as part of encode
+ // in fact settings should be passed in on construction
if (store) {
- store->create(*this, settings);
+ store->create(*this);
}
configure(settings);
}
@@ -246,3 +249,34 @@
{
return policy.get();
}
+
+uint64_t Queue::getPersistenceId() const
+{
+ return persistenceId;
+}
+
+void Queue::setPersistenceId(uint64_t _persistenceId)
+{
+ persistenceId = _persistenceId;
+}
+
+void Queue::encode(framing::Buffer& buffer) const
+{
+ buffer.putShortString(name);
+ //TODO store all required properties
+}
+
+uint32_t Queue::encodedSize() const
+{
+ //TODO, revise when storing full set of queue properties
+ return name.size() + 1/*short string size octet*/;
+}
+
+Queue::shared_ptr Queue::decode(QueueRegistry& queues, framing::Buffer& buffer)
+{
+ string name;
+ buffer.getShortString(name);
+ std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true);
+ return result.first;
+}
+
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h Fri Mar 30 08:50:07 2007
@@ -31,6 +31,7 @@
#include <BrokerMessage.h>
#include <FieldTable.h>
#include <sys/Monitor.h>
+#include "PersistableQueue.h"
#include <QueuePolicy.h>
// TODO aconway 2007-02-06: Use auto_ptr and boost::ptr_vector to
@@ -39,6 +40,7 @@
namespace qpid {
namespace broker {
class MessageStore;
+ class QueueRegistry;
/**
* Thrown when exclusive access would be violated.
@@ -51,7 +53,7 @@
* registered consumers or be stored until dequeued or until one
* or more consumers registers.
*/
- class Queue{
+ class Queue : public PersistableQueue{
typedef std::vector<Consumer*> Consumers;
typedef std::queue<Message::shared_ptr> Messages;
@@ -119,22 +121,28 @@
inline const string& getName() const { return name; }
inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; }
inline bool hasExclusiveConsumer() const { return exclusive; }
- inline uint64_t getPersistenceId() const { return persistenceId; }
- inline void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
bool canAutoDelete() const;
- void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid);
+ void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg);
/**
* dequeue from store (only done once messages is acknowledged)
*/
- void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid);
+ void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg);
/**
* dequeues from memory only
*/
Message::shared_ptr dequeue();
const QueuePolicy* const getPolicy();
+
+ //PersistableQueue support:
+ uint64_t getPersistenceId() const;
+ void setPersistenceId(uint64_t persistenceId);
+ void encode(framing::Buffer& buffer) const;
+ uint32_t encodedSize() const;
+
+ static Queue::shared_ptr decode(QueueRegistry& queues, framing::Buffer& buffer);
};
}
}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/DeliveryRecord.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/DeliveryRecord.cpp Fri Mar 30 08:50:07 2007
@@ -42,12 +42,8 @@
pull(true){}
-void DeliveryRecord::discard(TransactionContext* ctxt, const std::string* const xid) const{
- queue->dequeue(ctxt, msg, xid);
-}
-
-void DeliveryRecord::discard() const{
- discard(0, 0);
+void DeliveryRecord::discard(TransactionContext* ctxt) const{
+ queue->dequeue(ctxt, msg);
}
bool DeliveryRecord::matches(uint64_t tag) const{
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/DeliveryRecord.h?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/DeliveryRecord.h Fri Mar 30 08:50:07 2007
@@ -46,8 +46,7 @@
DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const std::string consumerTag, const uint64_t deliveryTag);
DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const uint64_t deliveryTag);
- void discard() const;
- void discard(TransactionContext* ctxt, const std::string* const xid) const;
+ void discard(TransactionContext* ctxt = 0) const;
bool matches(uint64_t tag) const;
bool coveredBy(const AccumulatedAck* const range) const;
void requeue() const;
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.cpp Fri Mar 30 08:50:07 2007
@@ -27,7 +27,7 @@
LazyLoadedContent::~LazyLoadedContent()
{
- store->destroy(msg);
+ store->destroy(*msg);
}
LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, Message* const _msg, uint64_t _expectedSize) :
@@ -35,7 +35,7 @@
void LazyLoadedContent::add(AMQContentBody::shared_ptr data)
{
- store->appendContent(msg, data->getData());
+ store->appendContent(*msg, data->getData());
}
uint32_t LazyLoadedContent::size()
@@ -50,13 +50,13 @@
{
uint64_t remaining = expectedSize - offset;
string data;
- store->loadContent(msg, data, offset,
+ store->loadContent(*msg, data, offset,
remaining > framesize ? framesize : remaining);
channel.send(new AMQContentBody(data));
}
} else {
string data;
- store->loadContent(msg, data, 0, expectedSize);
+ store->loadContent(*msg, data, 0, expectedSize);
channel.send(new AMQContentBody(data));
}
}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.h?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.h Fri Mar 30 08:50:07 2007
@@ -23,6 +23,7 @@
#include <Content.h>
#include <MessageStore.h>
+#include "BrokerMessageBase.h"
namespace qpid {
namespace broker {
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/Makefile.am?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/Makefile.am Fri Mar 30 08:50:07 2007
@@ -61,13 +61,20 @@
NameGenerator.h \
NullMessageStore.cpp \
NullMessageStore.h \
+ Persistable.h \
+ PersistableExchange.h \
+ PersistableMessage.h \
+ PersistableQueue.h \
Prefetch.h \
QueuePolicy.cpp \
QueuePolicy.h \
QueueRegistry.cpp \
QueueRegistry.h \
- RecoveryManager.cpp \
+ RecoverableMessage.h \
+ RecoverableQueue.h \
RecoveryManager.h \
+ RecoveryManagerImpl.cpp \
+ RecoveryManagerImpl.h \
Reference.cpp \
Reference.h \
ConnectionFactory.cpp \
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageBuilder.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/MessageBuilder.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/MessageBuilder.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/MessageBuilder.cpp Fri Mar 30 08:50:07 2007
@@ -56,7 +56,7 @@
}
message->setHeader(header);
if (stagingThreshold && header->getContentSize() >= stagingThreshold) {
- store->stage(message.get());
+ store->stage(*message);
message->releaseContent(store);
} else {
auto_ptr<Content> content(new InMemoryContent());
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h Fri Mar 30 08:50:07 2007
@@ -21,119 +21,108 @@
#ifndef _MessageStore_
#define _MessageStore_
-#include <BrokerMessage.h>
-#include <FieldTable.h>
-#include <RecoveryManager.h>
-#include <TransactionalStore.h>
+#include "PersistableExchange.h"
+#include "PersistableMessage.h"
+#include "PersistableQueue.h"
+#include "RecoveryManager.h"
+#include "TransactionalStore.h"
namespace qpid {
- namespace broker {
- struct MessageStoreSettings
- {
- /**
- * Messages whose content length is larger than this value
- * will be staged (i.e. will have thier data written to
- * disk as it arrives) and will load their data lazily. On
- * recovery therefore, only the headers should be loaded.
- */
- uint64_t stagingThreshold;
- };
- /**
- * An abstraction of the persistent storage for messages. (In
- * all methods, any pointers/references to queues or messages
- * are valid only for the duration of the call).
- */
- class MessageStore : public TransactionalStore{
- public:
- /**
- * Record the existance of a durable queue
- */
- virtual void create(const Queue& queue, const qpid::framing::FieldTable& settings) = 0;
- /**
- * Destroy a durable queue
- */
- virtual void destroy(const Queue& queue) = 0;
+namespace broker {
- /**
- * Request recovery of queue and message state from store
- */
- virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0) = 0;
-
- /**
- * Stores a messages before it has been enqueued
- * (enqueueing automatically stores the message so this is
- * only required if storage is required prior to that
- * point). If the message has not yet been stored it will
- * store the headers as well as any content passed in. A
- * persistence id will be set on the message which can be
- * used to load the content or to append to it.
- */
- virtual void stage(Message* const msg) = 0;
+/**
+ * An abstraction of the persistent storage for messages. (In
+ * all methods, any pointers/references to queues or messages
+ * are valid only for the duration of the call).
+ */
+class MessageStore : public TransactionalStore{
+public:
+ /**
+ * Record the existence of a durable queue
+ */
+ virtual void create(const PersistableQueue& queue) = 0;
+ /**
+ * Destroy a durable queue
+ */
+ virtual void destroy(const PersistableQueue& queue) = 0;
+
+ /**
+ * Record the existence of a durable exchange
+ */
+ virtual void create(const PersistableExchange& exchange) = 0;
+ /**
+ * Destroy a durable exchange
+ */
+ virtual void destroy(const PersistableExchange& exchange) = 0;
+
+ /**
+ * Request recovery of queue and message state from store
+ */
+ virtual void recover(RecoveryManager& queues) = 0;
+
+ /**
+ * Stores a messages before it has been enqueued
+ * (enqueueing automatically stores the message so this is
+ * only required if storage is required prior to that
+ * point). If the message has not yet been stored it will
+ * store the headers as well as any content passed in. A
+ * persistence id will be set on the message which can be
+ * used to load the content or to append to it.
+ */
+ virtual void stage(PersistableMessage& msg) = 0;
- /**
- * Destroys a previously staged message. This only needs
- * to be called if the message is never enqueued. (Once
- * enqueued, deletion will be automatic when the message
- * is dequeued from all queues it was enqueued onto).
- */
- virtual void destroy(Message* const msg) = 0;
-
- /**
- * Appends content to a previously staged message
- */
- virtual void appendContent(Message* const msg, const std::string& data) = 0;
-
- /**
- * Loads (a section) of content data for the specified
- * message (previously stored through a call to stage or
- * enqueue) into data. The offset refers to the content
- * only (i.e. an offset of 0 implies that the start of the
- * content should be loaded, not the headers or related
- * meta-data).
- */
- virtual void loadContent(Message* const msg, std::string& data, uint64_t offset, uint32_t length) = 0;
-
- /**
- * Enqueues a message, storing the message if it has not
- * been previously stored and recording that the given
- * message is on the given queue.
- *
- * @param msg the message to enqueue
- * @param queue the name of the queue onto which it is to be enqueued
- * @param xid (a pointer to) an identifier of the
- * distributed transaction in which the operation takes
- * place or null for 'local' transactions
- */
- virtual void enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const std::string * const xid) = 0;
- /**
- * Dequeues a message, recording that the given message is
- * no longer on the given queue and deleting the message
- * if it is no longer on any other queue.
- *
- * @param msg the message to dequeue
- * @param queue the name of th queue from which it is to be dequeued
- * @param xid (a pointer to) an identifier of the
- * distributed transaction in which the operation takes
- * place or null for 'local' transactions
- */
- virtual void dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const std::string * const xid) = 0;
-
- /**
- * Treat all enqueue/dequeues where this xid was specified as being prepared.
- */
- virtual void prepared(const std::string * const xid) = 0;
- /**
- * Treat all enqueue/dequeues where this xid was specified as being committed.
- */
- virtual void committed(const std::string * const xid) = 0;
- /**
- * Treat all enqueue/dequeues where this xid was specified as being aborted.
- */
- virtual void aborted(const std::string * const xid) = 0;
+ /**
+ * Destroys a previously staged message. This only needs
+ * to be called if the message is never enqueued. (Once
+ * enqueued, deletion will be automatic when the message
+ * is dequeued from all queues it was enqueued onto).
+ */
+ virtual void destroy(PersistableMessage& msg) = 0;
+
+ /**
+ * Appends content to a previously staged message
+ */
+ virtual void appendContent(PersistableMessage& msg, const std::string& data) = 0;
+
+ /**
+ * Loads (a section) of content data for the specified
+ * message (previously stored through a call to stage or
+ * enqueue) into data. The offset refers to the content
+ * only (i.e. an offset of 0 implies that the start of the
+ * content should be loaded, not the headers or related
+ * meta-data).
+ */
+ virtual void loadContent(PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length) = 0;
+
+ /**
+ * Enqueues a message, storing the message if it has not
+ * been previously stored and recording that the given
+ * message is on the given queue.
+ *
+ * @param msg the message to enqueue
+ * @param queue the name of the queue onto which it is to be enqueued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue) = 0;
+ /**
+ * Dequeues a message, recording that the given message is
+ * no longer on the given queue and deleting the message
+ * if it is no longer on any other queue.
+ *
+ * @param msg the message to dequeue
+ * @param queue the name of th queue from which it is to be dequeued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue) = 0;
+
+ virtual ~MessageStore(){}
+};
- virtual ~MessageStore(){}
- };
- }
+}
}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp Fri Mar 30 08:50:07 2007
@@ -28,77 +28,82 @@
{
}
-void MessageStoreModule::create(const Queue& queue, const qpid::framing::FieldTable& settings)
+void MessageStoreModule::create(const PersistableQueue& queue)
{
- store->create(queue, settings);
+ store->create(queue);
}
-void MessageStoreModule::destroy(const Queue& queue)
+void MessageStoreModule::destroy(const PersistableQueue& queue)
{
store->destroy(queue);
}
-void MessageStoreModule::recover(RecoveryManager& registry, const MessageStoreSettings* const settings)
+void MessageStoreModule::create(const PersistableExchange& exchange)
{
- store->recover(registry, settings);
+ store->create(exchange);
}
-void MessageStoreModule::stage(Message* const msg)
+void MessageStoreModule::destroy(const PersistableExchange& exchange)
+{
+ store->destroy(exchange);
+}
+
+void MessageStoreModule::recover(RecoveryManager& registry)
+{
+ store->recover(registry);
+}
+
+void MessageStoreModule::stage(PersistableMessage& msg)
{
store->stage(msg);
}
-void MessageStoreModule::destroy(Message* const msg)
+void MessageStoreModule::destroy(PersistableMessage& msg)
{
store->destroy(msg);
}
-void MessageStoreModule::appendContent(Message* const msg, const std::string& data)
+void MessageStoreModule::appendContent(PersistableMessage& msg, const std::string& data)
{
store->appendContent(msg, data);
}
-void MessageStoreModule::loadContent(Message* const msg, string& data, uint64_t offset, uint32_t length)
+void MessageStoreModule::loadContent(PersistableMessage& msg, string& data, uint64_t offset, uint32_t length)
{
store->loadContent(msg, data, offset, length);
}
-void MessageStoreModule::enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid)
+void MessageStoreModule::enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue)
{
- store->enqueue(ctxt, msg, queue, xid);
+ store->enqueue(ctxt, msg, queue);
}
-void MessageStoreModule::dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid)
+void MessageStoreModule::dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue)
{
- store->dequeue(ctxt, msg, queue, xid);
+ store->dequeue(ctxt, msg, queue);
}
-void MessageStoreModule::prepared(const string * const xid)
-{
- store->prepared(xid);
-}
-
-void MessageStoreModule::committed(const string * const xid)
+std::auto_ptr<TransactionContext> MessageStoreModule::begin()
{
- store->committed(xid);
+ return store->begin();
}
-void MessageStoreModule::aborted(const string * const xid)
+std::auto_ptr<TPCTransactionContext> MessageStoreModule::begin(const std::string& xid)
{
- store->aborted(xid);
+ return store->begin(xid);
}
-std::auto_ptr<TransactionContext> MessageStoreModule::begin()
+void MessageStoreModule::prepare(TPCTransactionContext& txn)
{
- return store->begin();
+ store->prepare(txn);
}
-void MessageStoreModule::commit(TransactionContext* ctxt)
+void MessageStoreModule::commit(TransactionContext& ctxt)
{
store->commit(ctxt);
}
-void MessageStoreModule::abort(TransactionContext* ctxt)
+void MessageStoreModule::abort(TransactionContext& ctxt)
{
store->abort(ctxt);
}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h Fri Mar 30 08:50:07 2007
@@ -28,32 +28,39 @@
#include <sys/Module.h>
namespace qpid {
- namespace broker {
- /**
- * A null implementation of the MessageStore interface
- */
- class MessageStoreModule : public MessageStore{
- qpid::sys::Module<MessageStore> store;
- public:
- MessageStoreModule(const std::string& name);
- void create(const Queue& queue, const qpid::framing::FieldTable& settings);
- void destroy(const Queue& queue);
- void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0);
- void stage(Message* const msg);
- void destroy(Message* const msg);
- void appendContent(Message* const msg, const std::string& data);
- void loadContent(Message* const msg, std::string& data, uint64_t offset, uint32_t length);
- void enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid);
- void dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid);
- void prepared(const std::string * const xid);
- void committed(const std::string * const xid);
- void aborted(const std::string * const xid);
- std::auto_ptr<TransactionContext> begin();
- void commit(TransactionContext* ctxt);
- void abort(TransactionContext* ctxt);
- ~MessageStoreModule(){}
- };
- }
+namespace broker {
+
+/**
+ * A null implementation of the MessageStore interface
+ */
+class MessageStoreModule : public MessageStore
+{
+ qpid::sys::Module<MessageStore> store;
+public:
+ MessageStoreModule(const std::string& name);
+
+ std::auto_ptr<TransactionContext> begin();
+ std::auto_ptr<TPCTransactionContext> begin(const std::string& xid);
+ void prepare(TPCTransactionContext& txn);
+ void commit(TransactionContext& txn);
+ void abort(TransactionContext& txn);
+
+ void create(const PersistableQueue& queue);
+ void destroy(const PersistableQueue& queue);
+ void create(const PersistableExchange& exchange);
+ void destroy(const PersistableExchange& exchange);
+ void recover(RecoveryManager& queues);
+ void stage(PersistableMessage& msg);
+ void destroy(PersistableMessage& msg);
+ void appendContent(PersistableMessage& msg, const std::string& data);
+ void loadContent(PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length);
+ void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
+ void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
+
+ ~MessageStoreModule(){}
+};
+
+}
}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp Fri Mar 30 08:50:07 2007
@@ -21,7 +21,6 @@
#include <NullMessageStore.h>
-#include <BrokerQueue.h>
#include <RecoveryManager.h>
#include <iostream>
@@ -30,75 +29,77 @@
NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){}
-void NullMessageStore::create(const Queue& queue, const qpid::framing::FieldTable&)
+void NullMessageStore::create(const PersistableQueue& queue)
{
if (warn) std::cout << "WARNING: Can't create durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
}
-void NullMessageStore::destroy(const Queue& queue)
+void NullMessageStore::destroy(const PersistableQueue& queue)
{
if (warn) std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
}
-void NullMessageStore::recover(RecoveryManager&, const MessageStoreSettings* const)
+void NullMessageStore::create(const PersistableExchange&)
+{
+}
+
+void NullMessageStore::destroy(const PersistableExchange&)
+{
+}
+
+void NullMessageStore::recover(RecoveryManager&)
{
if (warn) std::cout << "WARNING: Persistence not enabled, no recovery of queues or messages." << std::endl;
}
-void NullMessageStore::stage(Message* const)
+void NullMessageStore::stage(PersistableMessage&)
{
if (warn) std::cout << "WARNING: Can't stage message. Persistence not enabled." << std::endl;
}
-void NullMessageStore::destroy(Message* const)
+void NullMessageStore::destroy(PersistableMessage&)
{
if (warn) std::cout << "WARNING: No need to destroy staged message. Persistence not enabled." << std::endl;
}
-void NullMessageStore::appendContent(Message* const, const string&)
+void NullMessageStore::appendContent(PersistableMessage&, const string&)
{
if (warn) std::cout << "WARNING: Can't append content. Persistence not enabled." << std::endl;
}
-void NullMessageStore::loadContent(Message* const, string&, uint64_t, uint32_t)
+void NullMessageStore::loadContent(PersistableMessage&, string&, uint64_t, uint32_t)
{
if (warn) std::cout << "WARNING: Can't load content. Persistence not enabled." << std::endl;
}
-void NullMessageStore::enqueue(TransactionContext*, Message* const, const Queue& queue, const string * const)
+void NullMessageStore::enqueue(TransactionContext*, PersistableMessage&, const PersistableQueue& queue)
{
if (warn) std::cout << "WARNING: Can't enqueue message onto '" << queue.getName() << "'. Persistence not enabled." << std::endl;
}
-void NullMessageStore::dequeue(TransactionContext*, Message* const, const Queue& queue, const string * const)
+void NullMessageStore::dequeue(TransactionContext*, PersistableMessage&, const PersistableQueue& queue)
{
if (warn) std::cout << "WARNING: Can't dequeue message from '" << queue.getName() << "'. Persistence not enabled." << std::endl;
}
-void NullMessageStore::prepared(const string * const)
-{
- if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl;
-}
-
-void NullMessageStore::committed(const string * const)
+std::auto_ptr<TransactionContext> NullMessageStore::begin()
{
- if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl;
+ return std::auto_ptr<TransactionContext>();
}
-void NullMessageStore::aborted(const string * const)
+std::auto_ptr<TPCTransactionContext> NullMessageStore::begin(const std::string&)
{
- if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl;
+ return std::auto_ptr<TPCTransactionContext>();
}
-std::auto_ptr<TransactionContext> NullMessageStore::begin()
+void NullMessageStore::prepare(TPCTransactionContext&)
{
- return std::auto_ptr<TransactionContext>();
}
-void NullMessageStore::commit(TransactionContext*)
+void NullMessageStore::commit(TransactionContext&)
{
}
-void NullMessageStore::abort(TransactionContext*)
+void NullMessageStore::abort(TransactionContext&)
{
}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h Fri Mar 30 08:50:07 2007
@@ -26,33 +26,38 @@
#include <BrokerQueue.h>
namespace qpid {
- namespace broker {
+namespace broker {
- /**
- * A null implementation of the MessageStore interface
- */
- class NullMessageStore : public MessageStore{
- const bool warn;
- public:
- NullMessageStore(bool warn = false);
- virtual void create(const Queue& queue, const qpid::framing::FieldTable& settings);
- virtual void destroy(const Queue& queue);
- virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0);
- virtual void stage(Message* const msg);
- virtual void destroy(Message* const msg);
- virtual void appendContent(Message* const msg, const std::string& data);
- virtual void loadContent(Message* const msg, std::string& data, uint64_t offset, uint32_t length);
- virtual void enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid);
- virtual void dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid);
- virtual void prepared(const std::string * const xid);
- virtual void committed(const std::string * const xid);
- virtual void aborted(const std::string * const xid);
- virtual std::auto_ptr<TransactionContext> begin();
- virtual void commit(TransactionContext* ctxt);
- virtual void abort(TransactionContext* ctxt);
- ~NullMessageStore(){}
- };
- }
+/**
+ * A null implementation of the MessageStore interface
+ */
+class NullMessageStore : public MessageStore
+{
+ const bool warn;
+public:
+ NullMessageStore(bool warn = false);
+
+ virtual std::auto_ptr<TransactionContext> begin();
+ virtual std::auto_ptr<TPCTransactionContext> begin(const std::string& xid);
+ virtual void prepare(TPCTransactionContext& txn);
+ virtual void commit(TransactionContext& txn);
+ virtual void abort(TransactionContext& txn);
+
+ virtual void create(const PersistableQueue& queue);
+ virtual void destroy(const PersistableQueue& queue);
+ virtual void create(const PersistableExchange& exchange);
+ virtual void destroy(const PersistableExchange& exchange);
+ virtual void recover(RecoveryManager& queues);
+ virtual void stage(PersistableMessage& msg);
+ virtual void destroy(PersistableMessage& msg);
+ virtual void appendContent(PersistableMessage& msg, const std::string& data);
+ virtual void loadContent(PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length);
+ virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
+ virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
+ ~NullMessageStore(){}
+};
+
+}
}
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/Persistable.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/Persistable.h?view=auto&rev=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/Persistable.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/Persistable.h Fri Mar 30 08:50:07 2007
@@ -0,0 +1,62 @@
+#ifndef _broker_Persistable_h
+#define _broker_Persistable_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "framing/amqp_types.h"
+#include "framing/Buffer.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Base class for all persistable objects
+ */
+class Persistable
+{
+public:
+ /**
+ * Allows the store to attach its own identifier to this object
+ */
+ virtual void setPersistenceId(uint64_t id) = 0;
+ /**
+ * Returns any identifier the store may have attached to this
+ * object
+ */
+ virtual uint64_t getPersistenceId() const = 0;
+ /**
+ * Encodes the persistable state of this object into the supplied
+ * buffer
+ */
+ virtual void encode(framing::Buffer& buffer) const = 0;
+ /**
+ * @returns the size of the buffer needed to encode this object
+ */
+ virtual uint32_t encodedSize() const = 0;
+
+ virtual ~Persistable() {};
+};
+
+}}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/Persistable.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/Persistable.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableExchange.h?view=auto&rev=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableExchange.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableExchange.h Fri Mar 30 08:50:07 2007
@@ -0,0 +1,44 @@
+#ifndef _broker_PersistableExchange_h
+#define _broker_PersistableExchange_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include "Persistable.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * The interface exchanges must expose to the MessageStore in order to be
+ * persistable.
+ */
+class PersistableExchange : public Persistable
+{
+public:
+ virtual ~PersistableExchange() {};
+};
+
+}}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableExchange.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableExchange.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableMessage.h?view=auto&rev=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableMessage.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableMessage.h Fri Mar 30 08:50:07 2007
@@ -0,0 +1,53 @@
+#ifndef _broker_PersistableMessage_h
+#define _broker_PersistableMessage_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <boost/shared_ptr.hpp>
+#include "Persistable.h"
+#include "framing/amqp_types.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * The interface messages must expose to the MessageStore in order to
+ * be persistable.
+ */
+ class PersistableMessage : public Persistable
+{
+public:
+ typedef boost::shared_ptr<PersistableMessage> shared_ptr;
+
+ /**
+ * @returns the size of the headers when encoded
+ */
+ virtual uint32_t encodedHeaderSize() const = 0;
+
+ virtual ~PersistableMessage() {};
+};
+
+}}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableMessage.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableMessage.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableQueue.h?view=auto&rev=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableQueue.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableQueue.h Fri Mar 30 08:50:07 2007
@@ -0,0 +1,45 @@
+#ifndef _broker_PersistableQueue_h
+#define _broker_PersistableQueue_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include "Persistable.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * The interface queues must expose to the MessageStore in order to be
+ * persistable.
+ */
+class PersistableQueue : public Persistable
+{
+public:
+ virtual const std::string& getName() const = 0;
+ virtual ~PersistableQueue() {};
+};
+
+}}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableQueue.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableQueue.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoverableMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/RecoverableMessage.h?view=auto&rev=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/RecoverableMessage.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/RecoverableMessage.h Fri Mar 30 08:50:07 2007
@@ -0,0 +1,57 @@
+#ifndef _broker_RecoverableMessage_h
+#define _broker_RecoverableMessage_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+#include "framing/amqp_types.h"
+#include "framing/Buffer.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * The interface through which messages are reloaded on recovery.
+ */
+class RecoverableMessage
+{
+public:
+ typedef boost::shared_ptr<RecoverableMessage> shared_ptr;
+ /**
+ * Used by store to determine whether to load content on recovery
+ * or let message load its own content as and when it requires it.
+ *
+ * @returns true if the content of the message should be loaded
+ */
+ virtual bool loadContent(uint64_t available) = 0;
+ /**
+ * Loads the content held in the supplied buffer (may do checking
+ * of length as necessary)
+ */
+ virtual void decodeContent(framing::Buffer& buffer) = 0;
+ virtual ~RecoverableMessage() {};
+};
+
+}}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoverableMessage.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoverableMessage.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoverableQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/RecoverableQueue.h?view=auto&rev=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/RecoverableQueue.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/RecoverableQueue.h Fri Mar 30 08:50:07 2007
@@ -0,0 +1,49 @@
+#ifndef _broker_RecoverableQueue_h
+#define _broker_RecoverableQueue_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "RecoverableMessage.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * The interface through which messages are added back to queues on
+ * recovery.
+ */
+class RecoverableQueue
+{
+public:
+ typedef boost::shared_ptr<RecoverableQueue> shared_ptr;
+ /**
+ * Used during recovery to add stored messages back to the queue
+ */
+ virtual void recover(RecoverableMessage::shared_ptr msg) = 0;
+ virtual ~RecoverableQueue() {};
+};
+
+}}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoverableQueue.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoverableQueue.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.h?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.h Fri Mar 30 08:50:07 2007
@@ -21,20 +21,20 @@
#ifndef _RecoveryManager_
#define _RecoveryManager_
-#include <ExchangeRegistry.h>
-#include <QueueRegistry.h>
+#include "RecoverableQueue.h"
+#include "RecoverableMessage.h"
+#include "framing/Buffer.h"
namespace qpid {
namespace broker {
class RecoveryManager{
- QueueRegistry& queues;
- ExchangeRegistry& exchanges;
public:
- RecoveryManager(QueueRegistry& queues, ExchangeRegistry& exchanges);
- ~RecoveryManager();
- Queue::shared_ptr recoverQueue(const std::string& name);
- Exchange::shared_ptr recoverExchange(const std::string& name, const std::string& type);
+ virtual ~RecoveryManager(){}
+ virtual void recoverExchange(framing::Buffer& buffer) = 0;
+ virtual RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer) = 0;
+ virtual RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer) = 0;
+ virtual void recoveryComplete() = 0;
};
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManagerImpl.cpp?view=auto&rev=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManagerImpl.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManagerImpl.cpp Fri Mar 30 08:50:07 2007
@@ -0,0 +1,131 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <RecoveryManagerImpl.h>
+
+#include "BrokerMessage.h"
+#include "BrokerMessageMessage.h"
+#include "BrokerQueue.h"
+
+using namespace qpid;
+using namespace qpid::broker;
+using boost::dynamic_pointer_cast;
+
+
+static const uint8_t BASIC = 1;
+static const uint8_t MESSAGE = 2;
+
+RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, uint64_t _stagingThreshold)
+ : queues(_queues), exchanges(_exchanges), stagingThreshold(_stagingThreshold) {}
+
+RecoveryManagerImpl::~RecoveryManagerImpl() {}
+
+class RecoverableMessageImpl : public RecoverableMessage
+{
+ Message::shared_ptr msg;
+ const uint64_t stagingThreshold;
+public:
+ RecoverableMessageImpl(Message::shared_ptr& _msg, uint64_t _stagingThreshold)
+ : msg(_msg), stagingThreshold(_stagingThreshold) {}
+ ~RecoverableMessageImpl() {};
+ bool loadContent(uint64_t available);
+ void decodeContent(framing::Buffer& buffer);
+ void recover(Queue::shared_ptr queue);
+};
+
+class RecoverableQueueImpl : public RecoverableQueue
+{
+ Queue::shared_ptr queue;
+public:
+ RecoverableQueueImpl(Queue::shared_ptr& _queue) : queue(_queue) {}
+ ~RecoverableQueueImpl() {};
+ void recover(RecoverableMessage::shared_ptr msg);
+};
+
+void RecoveryManagerImpl::recoverExchange(framing::Buffer&)
+{
+ //TODO
+}
+
+RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& buffer)
+{
+ Queue::shared_ptr queue = Queue::decode(queues, buffer);
+ try {
+ Exchange::shared_ptr exchange = exchanges.getDefault();
+ if (exchange) {
+ exchange->bind(queue, queue->getName(), 0);
+ }
+ } catch (ChannelException& e) {
+ //assume no default exchange has been declared
+ }
+ return RecoverableQueue::shared_ptr(new RecoverableQueueImpl(queue));
+}
+
+RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buffer& buffer)
+{
+ buffer.record();
+ //peek at type:
+ Message::shared_ptr message(decodeMessageType(buffer) == MESSAGE ?
+ ((Message*) new MessageMessage()) :
+ ((Message*) new BasicMessage()));
+ buffer.restore();
+ message->decodeHeader(buffer);
+ return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, stagingThreshold));
+}
+
+void RecoveryManagerImpl::recoveryComplete()
+{
+ //TODO (finalise binding setup etc)
+}
+
+uint8_t RecoveryManagerImpl::decodeMessageType(framing::Buffer& buffer)
+{
+ return buffer.getOctet();
+}
+
+void RecoveryManagerImpl::encodeMessageType(const Message& msg, framing::Buffer& buffer)
+{
+ buffer.putOctet(dynamic_cast<const MessageMessage*>(&msg) ? MESSAGE : BASIC);
+}
+
+uint32_t RecoveryManagerImpl::encodedMessageTypeSize()
+{
+ return 1;
+}
+
+bool RecoverableMessageImpl::loadContent(uint64_t available)
+{
+ return !stagingThreshold || available < stagingThreshold;
+}
+
+void RecoverableMessageImpl::decodeContent(framing::Buffer& buffer)
+{
+ msg->decodeContent(buffer);
+}
+
+void RecoverableMessageImpl::recover(Queue::shared_ptr queue)
+{
+ queue->recover(msg);
+}
+
+void RecoverableQueueImpl::recover(RecoverableMessage::shared_ptr msg)
+{
+ dynamic_pointer_cast<RecoverableMessageImpl>(msg)->recover(queue);
+}
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManagerImpl.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManagerImpl.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManagerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManagerImpl.h?view=auto&rev=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManagerImpl.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManagerImpl.h Fri Mar 30 08:50:07 2007
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _RecoveryManagerImpl_
+#define _RecoveryManagerImpl_
+
+#include <list>
+#include "ExchangeRegistry.h"
+#include "QueueRegistry.h"
+#include "RecoveryManager.h"
+
+namespace qpid {
+namespace broker {
+
+ class RecoveryManagerImpl : public RecoveryManager{
+ QueueRegistry& queues;
+ ExchangeRegistry& exchanges;
+ const uint64_t stagingThreshold;
+ public:
+ RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, uint64_t stagingThreshold);
+ ~RecoveryManagerImpl();
+
+ void recoverExchange(framing::Buffer& buffer);
+ RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer);
+ RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer);
+ void recoveryComplete();
+
+ static uint8_t decodeMessageType(framing::Buffer& buffer);
+ static void encodeMessageType(const Message& msg, framing::Buffer& buffer);
+ static uint32_t encodedMessageTypeSize();
+ };
+
+
+}
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManagerImpl.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManagerImpl.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/TransactionalStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/TransactionalStore.h?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/TransactionalStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/TransactionalStore.h Fri Mar 30 08:50:07 2007
@@ -22,25 +22,35 @@
#define _TransactionalStore_
#include <memory>
+#include <string>
namespace qpid {
- namespace broker {
- struct InvalidTransactionContextException : public std::exception {};
+namespace broker {
- class TransactionContext{
- public:
- virtual ~TransactionContext(){}
- };
-
- class TransactionalStore{
- public:
- virtual std::auto_ptr<TransactionContext> begin() = 0;
- virtual void commit(TransactionContext*) = 0;
- virtual void abort(TransactionContext*) = 0;
-
- virtual ~TransactionalStore(){}
- };
- }
+struct InvalidTransactionContextException : public std::exception {};
+
+class TransactionContext {
+public:
+ virtual ~TransactionContext(){}
+};
+
+class TPCTransactionContext : public TransactionContext {
+public:
+ virtual ~TPCTransactionContext(){}
+};
+
+class TransactionalStore {
+public:
+ virtual std::auto_ptr<TransactionContext> begin() = 0;
+ virtual std::auto_ptr<TPCTransactionContext> begin(const std::string& xid) = 0;
+ virtual void prepare(TPCTransactionContext& txn) = 0;
+ virtual void commit(TransactionContext& txn) = 0;
+ virtual void abort(TransactionContext& txn) = 0;
+
+ virtual ~TransactionalStore(){}
+};
+
+}
}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.cpp Fri Mar 30 08:50:07 2007
@@ -25,8 +25,8 @@
using std::mem_fun_ref;
using namespace qpid::broker;
-TxAck::TxAck(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked, const std::string* const _xid) :
- acked(_acked), unacked(_unacked), xid(_xid){
+TxAck::TxAck(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked) :
+ acked(_acked), unacked(_unacked){
}
@@ -35,7 +35,7 @@
//dequeue all acked messages from their queues
for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) {
if (i->coveredBy(&acked)) {
- i->discard(ctxt, xid);
+ i->discard(ctxt);
}
}
return true;
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.h?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.h Fri Mar 30 08:50:07 2007
@@ -37,7 +37,6 @@
class TxAck : public TxOp{
AccumulatedAck& acked;
std::list<DeliveryRecord>& unacked;
- const std::string* const xid;
public:
/**
@@ -45,7 +44,7 @@
* acks received
* @param unacked the record of delivered messages
*/
- TxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked, const std::string* const xid = 0);
+ TxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/TxBuffer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/TxBuffer.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/TxBuffer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/TxBuffer.cpp Fri Mar 30 08:50:07 2007
@@ -29,11 +29,11 @@
if(store) ctxt = store->begin();
for(op_iterator i = ops.begin(); i < ops.end(); i++){
if(!(*i)->prepare(ctxt.get())){
- if(store) store->abort(ctxt.get());
+ if(store) store->abort(*ctxt);
return false;
}
}
- if(store) store->commit(ctxt.get());
+ if(store) store->commit(*ctxt);
return true;
}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.cpp Fri Mar 30 08:50:07 2007
@@ -22,11 +22,11 @@
using namespace qpid::broker;
-TxPublish::TxPublish(Message::shared_ptr _msg, const std::string* const _xid) : msg(_msg), xid(_xid) {}
+TxPublish::TxPublish(Message::shared_ptr _msg) : msg(_msg) {}
bool TxPublish::prepare(TransactionContext* ctxt) throw(){
try{
- for_each(queues.begin(), queues.end(), Prepare(ctxt, msg, xid));
+ for_each(queues.begin(), queues.end(), Prepare(ctxt, msg));
return true;
}catch(...){
std::cout << "TxPublish::prepare() - Failed to prepare" << std::endl;
@@ -45,11 +45,11 @@
queues.push_back(queue);
}
-TxPublish::Prepare::Prepare(TransactionContext* _ctxt, Message::shared_ptr& _msg, const string* const _xid)
- : ctxt(_ctxt), msg(_msg), xid(_xid){}
+TxPublish::Prepare::Prepare(TransactionContext* _ctxt, Message::shared_ptr& _msg)
+ : ctxt(_ctxt), msg(_msg){}
void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){
- queue->enqueue(ctxt, msg, xid);
+ queue->enqueue(ctxt, msg);
}
TxPublish::Commit::Commit(Message::shared_ptr& _msg) : msg(_msg){}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.h?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.h Fri Mar 30 08:50:07 2007
@@ -46,9 +46,8 @@
class Prepare{
TransactionContext* ctxt;
Message::shared_ptr& msg;
- const std::string* const xid;
public:
- Prepare(TransactionContext* ctxt, Message::shared_ptr& msg, const std::string* const xid);
+ Prepare(TransactionContext* ctxt, Message::shared_ptr& msg);
void operator()(Queue::shared_ptr& queue);
};
@@ -60,11 +59,10 @@
};
Message::shared_ptr msg;
- const std::string* const xid;
std::list<Queue::shared_ptr> queues;
public:
- TxPublish(Message::shared_ptr msg, const std::string* const xid = 0);
+ TxPublish(Message::shared_ptr msg);
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
Modified: incubator/qpid/trunk/qpid/cpp/tests/BrokerChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/BrokerChannelTest.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/BrokerChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/BrokerChannelTest.cpp Fri Mar 30 08:50:07 2007
@@ -68,7 +68,7 @@
struct MethodCall
{
const string name;
- Message* const msg;
+ PersistableMessage* msg;
const string data;//only needed for appendContent
void check(const MethodCall& other) const
@@ -92,7 +92,7 @@
}
}
- void handle(const string& name, Message* msg, const string& data)
+ void handle(const string& name, PersistableMessage* msg, const string& data)
{
MethodCall call = {name, msg, data};
handle(call);
@@ -102,25 +102,25 @@
MockMessageStore() : expectMode(false) {}
- void stage(Message* const msg)
+ void stage(PersistableMessage& msg)
{
- if(!expectMode) msg->setPersistenceId(1);
- MethodCall call = {"stage", msg, ""};
+ if(!expectMode) msg.setPersistenceId(1);
+ MethodCall call = {"stage", &msg, ""};
handle(call);
}
- void appendContent(Message* msg, const string& data)
+ void appendContent(PersistableMessage& msg, const string& data)
{
- MethodCall call = {"appendContent", msg, data};
+ MethodCall call = {"appendContent", &msg, data};
handle(call);
}
// Don't hide overloads.
using NullMessageStore::destroy;
- void destroy(Message* msg)
+ void destroy(PersistableMessage& msg)
{
- MethodCall call = {"destroy", msg, ""};
+ MethodCall call = {"destroy", &msg, ""};
handle(call);
}
@@ -249,11 +249,11 @@
MockChannel::basicGetBody());
store.expect();
- store.stage(msg);
+ store.stage(*msg);
for (int i = 0; i < 3; i++) {
- store.appendContent(msg, data[i]);
+ store.appendContent(*msg, data[i]);
}
- store.destroy(msg);
+ store.destroy(*msg);
store.test();
Exchange::shared_ptr exchange =
@@ -304,8 +304,8 @@
policy.update(settings);
store.expect();
- store.stage(msg3.get());
- store.destroy(msg3.get());
+ store.stage(*msg3);
+ store.destroy(*msg3);
store.test();
Queue::shared_ptr queue(new Queue("my_queue", false, &store, 0));
Modified: incubator/qpid/trunk/qpid/cpp/tests/LazyLoadedContentTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/LazyLoadedContentTest.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/LazyLoadedContentTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/LazyLoadedContentTest.cpp Fri Mar 30 08:50:07 2007
@@ -50,7 +50,7 @@
public:
TestMessageStore(const string& _content) : content(_content) {}
- void loadContent(Message* const, string& data, uint64_t offset, uint32_t length)
+ void loadContent(PersistableMessage&, string& data, uint64_t offset, uint32_t length)
{
if (offset + length <= content.size()) {
data = content.substr(offset, length);
Modified: incubator/qpid/trunk/qpid/cpp/tests/MessageBuilderTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/MessageBuilderTest.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/MessageBuilderTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/MessageBuilderTest.cpp Fri Mar 30 08:50:07 2007
@@ -51,33 +51,32 @@
public:
- void stage(Message* const msg)
+ void stage(PersistableMessage& msg)
{
- if (msg->getPersistenceId() == 0) {
- header = new Buffer(msg->encodedHeaderSize());
- msg->encodeHeader(*header);
+ if (msg.getPersistenceId() == 0) {
+ header = new Buffer(msg.encodedSize());
+ msg.encode(*header);
content = new Buffer(contentBufferSize);
- msg->setPersistenceId(1);
+ msg.setPersistenceId(1);
} else {
throw qpid::Exception("Message already staged!");
}
}
- void appendContent(Message* msg, const string& data)
+ void appendContent(PersistableMessage& msg, const string& data)
{
- if (msg) {
+ if (msg.getPersistenceId() == 1) {
content->putRawData(data);
} else {
throw qpid::Exception("Invalid message id!");
}
}
- // Don't hide overloads.
using NullMessageStore::destroy;
- void destroy(BasicMessage* msg)
+ void destroy(PersistableMessage& msg)
{
- CPPUNIT_ASSERT(msg->getPersistenceId());
+ CPPUNIT_ASSERT(msg.getPersistenceId());
}
BasicMessage::shared_ptr getRestoredMessage()
Modified: incubator/qpid/trunk/qpid/cpp/tests/TxAckTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/TxAckTest.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/TxAckTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/TxAckTest.cpp Fri Mar 30 08:50:07 2007
@@ -38,11 +38,11 @@
class TestMessageStore : public NullMessageStore
{
public:
- vector< std::pair<Message*, const string*> > dequeued;
+ vector<PersistableMessage*> dequeued;
- void dequeue(TransactionContext*, Message* const msg, const Queue& /*queue*/, const string * const xid)
+ void dequeue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& /*queue*/)
{
- dequeued.push_back(std::pair<Message*, const string*>(msg, xid));
+ dequeued.push_back(&msg);
}
TestMessageStore() : NullMessageStore() {}
@@ -50,7 +50,6 @@
};
CPPUNIT_TEST_SUITE(TxAckTest);
- CPPUNIT_TEST(testPrepare2pc);
CPPUNIT_TEST(testPrepare);
CPPUNIT_TEST(testCommit);
CPPUNIT_TEST_SUITE_END();
@@ -62,12 +61,11 @@
vector<Message::shared_ptr> messages;
list<DeliveryRecord> deliveries;
TxAck op;
- std::string xid;
public:
- TxAckTest() : acked(0), queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries, &xid)
+ TxAckTest() : acked(0), queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries)
{
for(int i = 0; i < 10; i++){
Message::shared_ptr msg(
@@ -93,17 +91,7 @@
CPPUNIT_ASSERT_EQUAL((size_t) 10, deliveries.size());
int dequeued[] = {0, 1, 2, 3, 4, 6, 8};
for (int i = 0; i < 7; i++) {
- CPPUNIT_ASSERT_EQUAL(messages[dequeued[i]].get(), store.dequeued[i].first);
- }
- }
-
- void testPrepare2pc()
- {
- xid = "abcdefg";
- testPrepare();
- const string expected(xid);
- for (int i = 0; i < 7; i++) {
- CPPUNIT_ASSERT_EQUAL(expected, *store.dequeued[i].second);
+ CPPUNIT_ASSERT_EQUAL((PersistableMessage*) messages[dequeued[i]].get(), store.dequeued[i]);
}
}