You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/11/08 15:05:39 UTC
svn commit: r593159 - in /incubator/qpid/trunk/qpid/cpp/src/qpid:
broker/Broker.cpp broker/Broker.h broker/ExchangeRegistry.h
broker/Message.cpp broker/Message.h broker/NullMessageStore.cpp
broker/NullMessageStore.h framing/frame_functors.h
Author: gsim
Date: Thu Nov 8 06:05:38 2007
New Revision: 593159
URL: http://svn.apache.org/viewvc?rev=593159&view=rev
Log:
Make standard exchanges durable
Ensure flags are set correctly for recovered messages with no content
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=593159&r1=593158&r2=593159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu Nov 8 06:05:38 2007
@@ -128,10 +128,6 @@
}
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
- exchanges.declare(amq_direct, DirectExchange::typeName);
- exchanges.declare(amq_topic, TopicExchange::typeName);
- exchanges.declare(amq_fanout, FanOutExchange::typeName);
- exchanges.declare(amq_match, HeadersExchange::typeName);
if(conf.enableMgmt) {
QPID_LOG(info, "Management enabled");
@@ -153,6 +149,11 @@
store->recover(recoverer);
}
}
+ //ensure standard exchanges exist (done after recovery from store)
+ declareStandardExchange(amq_direct, DirectExchange::typeName);
+ declareStandardExchange(amq_topic, TopicExchange::typeName);
+ declareStandardExchange(amq_fanout, FanOutExchange::typeName);
+ declareStandardExchange(amq_match, HeadersExchange::typeName);
// Initialize plugins
const Plugin::Plugins& plugins=Plugin::getPlugins();
@@ -160,6 +161,15 @@
i != plugins.end();
i++)
(*i)->initialize(*this);
+}
+
+void Broker::declareStandardExchange(const std::string& name, const std::string& type)
+{
+ bool storeEnabled = store.get();
+ std::pair<Exchange::shared_ptr, bool> status = exchanges.declare(name, type, storeEnabled);
+ if (status.second && storeEnabled) {
+ store->create(*status.first);
+ }
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=593159&r1=593158&r2=593159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Thu Nov 8 06:05:38 2007
@@ -136,6 +136,7 @@
ManagementObjectVhost::shared_ptr mgmtVhostObject;
static MessageStore* createStore(const Options& config);
+ void declareStandardExchange(const std::string& name, const std::string& type);
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h?rev=593159&r1=593158&r2=593159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h Thu Nov 8 06:05:38 2007
@@ -40,7 +40,7 @@
std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type)
throw(UnknownExchangeTypeException);
std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type,
- bool durable, const qpid::framing::FieldTable& args)
+ bool durable, const qpid::framing::FieldTable& args = framing::FieldTable())
throw(UnknownExchangeTypeException);
void destroy(const std::string& name);
Exchange::shared_ptr get(const std::string& name);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=593159&r1=593158&r2=593159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Thu Nov 8 06:05:38 2007
@@ -36,7 +36,7 @@
TransferAdapter Message::TRANSFER;
PublishAdapter Message::PUBLISH;
-Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), publisher(0), adapter(0) {}
+Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), publisher(0), adapter(0) {}
std::string Message::getRoutingKey() const
{
@@ -121,12 +121,20 @@
void Message::decodeContent(framing::Buffer& buffer)
{
- //get the data as a string and set that as the content
- //body on a frame then add that frame to the frameset
- AMQFrame frame;
- frame.setBody(AMQContentBody());
- frame.castBody<AMQContentBody>()->decode(buffer, buffer.available());
- frames.append(frame);
+ if (buffer.available()) {
+ //get the data as a string and set that as the content
+ //body on a frame then add that frame to the frameset
+ AMQFrame frame;
+ frame.setBody(AMQContentBody());
+ frame.castBody<AMQContentBody>()->decode(buffer, buffer.available());
+ frames.append(frame);
+ } else {
+ //adjust header flags
+ MarkLastSegment f;
+ frames.map_if(f, TypeFilter(HEADER_BODY));
+ }
+ //mark content loaded
+ loaded = true;
}
void Message::releaseContent(MessageStore* _store)
@@ -205,5 +213,5 @@
bool Message::isContentLoaded() const
{
- return contentSize() > 0;
+ return loaded;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=593159&r1=593158&r2=593159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Thu Nov 8 06:05:38 2007
@@ -124,6 +124,7 @@
mutable boost::shared_ptr<Exchange> exchange;
mutable uint64_t persistenceId;
bool redelivered;
+ bool loaded;
ConnectionToken* publisher;
mutable MessageAdapter* adapter;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?rev=593159&r1=593158&r2=593159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Thu Nov 8 06:05:38 2007
@@ -53,18 +53,18 @@
void NullMessageStore::create(PersistableQueue& queue)
{
- QPID_LOG(info, "Can't create durable queue '" << queue.getName() << "'. Persistence not enabled.");
+ QPID_LOG(info, "Queue '" << queue.getName()
+ << "' will not be durable. Persistence not enabled.");
}
-void NullMessageStore::destroy(PersistableQueue& queue)
+void NullMessageStore::destroy(PersistableQueue&)
{
- QPID_LOG(info, "Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled.");
}
void NullMessageStore::create(const PersistableExchange& exchange)
{
- QPID_LOG(info, "Can't create durable exchange '"
- << exchange.getName() << "'. Persistence not enabled.");
+ QPID_LOG(info, "Exchange'" << exchange.getName()
+ << "' will not be durable. Persistence not enabled.");
}
void NullMessageStore::destroy(const PersistableExchange& )
@@ -86,12 +86,11 @@
void NullMessageStore::destroy(PersistableMessage&)
{
- QPID_LOG(info, "No need to destroy staged message. Persistence not enabled.");
}
void NullMessageStore::appendContent(const PersistableMessage&, const string&)
{
- QPID_LOG(info, "Can't load content. Persistence not enabled.");
+ QPID_LOG(info, "Can't append content. Persistence not enabled.");
}
void NullMessageStore::loadContent(const PersistableMessage&, string&, uint64_t, uint32_t)
@@ -102,18 +101,16 @@
void NullMessageStore::enqueue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& queue)
{
msg.enqueueComplete();
- QPID_LOG(info, "Can't enqueue message onto '" << queue.getName() << "'. Persistence not enabled.");
+ QPID_LOG(info, "Message is not durably recorded on '" << queue.getName() << "'. Persistence not enabled.");
}
-void NullMessageStore::dequeue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& queue)
+void NullMessageStore::dequeue(TransactionContext*, PersistableMessage& msg, const PersistableQueue&)
{
msg.dequeueComplete();
- QPID_LOG(info, "Can't dequeue message from '" << queue.getName() << "'. Persistence not enabled.");
}
-void NullMessageStore::flush(const qpid::broker::PersistableQueue& queue)
+void NullMessageStore::flush(const qpid::broker::PersistableQueue&)
{
- QPID_LOG(info, "Can't flush. Persistence not enabled queue-" << queue.getName());
}
u_int32_t NullMessageStore::outstandingQueueAIO(const PersistableQueue& )
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h?rev=593159&r1=593158&r2=593159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h Thu Nov 8 06:05:38 2007
@@ -38,7 +38,7 @@
public:
NullMessageStore(bool warn = false);
- virtual bool init(const std::string& dir, const bool async, const bool force);
+ virtual bool init(const std::string& dir, const bool async, const bool force);
virtual std::auto_ptr<TransactionContext> begin();
virtual std::auto_ptr<TPCTransactionContext> begin(const std::string& xid);
virtual void prepare(TPCTransactionContext& txn);
@@ -63,7 +63,7 @@
virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
virtual u_int32_t outstandingQueueAIO(const PersistableQueue& queue);
- virtual void flush(const qpid::broker::PersistableQueue& queue);
+ virtual void flush(const qpid::broker::PersistableQueue& queue);
~NullMessageStore(){}
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h?rev=593159&r1=593158&r2=593159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h Thu Nov 8 06:05:38 2007
@@ -111,6 +111,12 @@
}
};
+class MarkLastSegment
+{
+public:
+ void operator()(AMQFrame& f) const { f.setEof(true); }
+};
+
}
}