You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/11/08 15:05:39 UTC

svn commit: r593159 - in /incubator/qpid/trunk/qpid/cpp/src/qpid: broker/Broker.cpp broker/Broker.h broker/ExchangeRegistry.h broker/Message.cpp broker/Message.h broker/NullMessageStore.cpp broker/NullMessageStore.h framing/frame_functors.h

Author: gsim
Date: Thu Nov  8 06:05:38 2007
New Revision: 593159

URL: http://svn.apache.org/viewvc?rev=593159&view=rev
Log:
Make standard exchanges durable
Ensure flags are set correctly for recovered messages with no content


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=593159&r1=593158&r2=593159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu Nov  8 06:05:38 2007
@@ -128,10 +128,6 @@
     }
 
     exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
-    exchanges.declare(amq_direct, DirectExchange::typeName);
-    exchanges.declare(amq_topic, TopicExchange::typeName);
-    exchanges.declare(amq_fanout, FanOutExchange::typeName);
-    exchanges.declare(amq_match, HeadersExchange::typeName);
     
     if(conf.enableMgmt) {
         QPID_LOG(info, "Management enabled");
@@ -153,6 +149,11 @@
              store->recover(recoverer);
         }
     }
+    //ensure standard exchanges exist (done after recovery from store)
+    declareStandardExchange(amq_direct, DirectExchange::typeName);
+    declareStandardExchange(amq_topic, TopicExchange::typeName);
+    declareStandardExchange(amq_fanout, FanOutExchange::typeName);
+    declareStandardExchange(amq_match, HeadersExchange::typeName);
 
     // Initialize plugins
     const Plugin::Plugins& plugins=Plugin::getPlugins();
@@ -160,6 +161,15 @@
          i != plugins.end();
          i++)
         (*i)->initialize(*this);
+}
+
+void Broker::declareStandardExchange(const std::string& name, const std::string& type)
+{
+    bool storeEnabled = store.get();
+    std::pair<Exchange::shared_ptr, bool> status = exchanges.declare(name, type, storeEnabled);
+    if (status.second && storeEnabled) {
+        store->create(*status.first);
+    }
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=593159&r1=593158&r2=593159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Thu Nov  8 06:05:38 2007
@@ -136,6 +136,7 @@
     ManagementObjectVhost::shared_ptr  mgmtVhostObject;
 
     static MessageStore* createStore(const Options& config);
+    void declareStandardExchange(const std::string& name, const std::string& type);
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h?rev=593159&r1=593158&r2=593159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h Thu Nov  8 06:05:38 2007
@@ -40,7 +40,7 @@
         std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type)
             throw(UnknownExchangeTypeException);
         std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type, 
-                                                      bool durable, const qpid::framing::FieldTable& args)
+                                                      bool durable, const qpid::framing::FieldTable& args = framing::FieldTable())
             throw(UnknownExchangeTypeException);
         void destroy(const std::string& name);
         Exchange::shared_ptr get(const std::string& name);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=593159&r1=593158&r2=593159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Thu Nov  8 06:05:38 2007
@@ -36,7 +36,7 @@
 TransferAdapter Message::TRANSFER;
 PublishAdapter Message::PUBLISH;
 
-Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), publisher(0), adapter(0) {}
+Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), publisher(0), adapter(0) {}
 
 std::string Message::getRoutingKey() const
 {
@@ -121,12 +121,20 @@
 
 void Message::decodeContent(framing::Buffer& buffer)
 {
-    //get the data as a string and set that as the content
-    //body on a frame then add that frame to the frameset
-    AMQFrame frame;
-    frame.setBody(AMQContentBody());
-    frame.castBody<AMQContentBody>()->decode(buffer, buffer.available());
-    frames.append(frame);
+    if (buffer.available()) {
+        //get the data as a string and set that as the content
+        //body on a frame then add that frame to the frameset
+        AMQFrame frame;
+        frame.setBody(AMQContentBody());
+        frame.castBody<AMQContentBody>()->decode(buffer, buffer.available());
+        frames.append(frame);
+    } else {
+        //adjust header flags
+        MarkLastSegment f;
+        frames.map_if(f, TypeFilter(HEADER_BODY));    
+    }
+    //mark content loaded
+    loaded = true;
 }
 
 void Message::releaseContent(MessageStore* _store)
@@ -205,5 +213,5 @@
 
 bool Message::isContentLoaded() const
 {
-    return contentSize() > 0;
+    return loaded;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=593159&r1=593158&r2=593159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Thu Nov  8 06:05:38 2007
@@ -124,6 +124,7 @@
     mutable boost::shared_ptr<Exchange> exchange;
     mutable uint64_t persistenceId;
     bool redelivered;
+    bool loaded;
     ConnectionToken* publisher;
     mutable MessageAdapter* adapter;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?rev=593159&r1=593158&r2=593159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Thu Nov  8 06:05:38 2007
@@ -53,18 +53,18 @@
 
 void NullMessageStore::create(PersistableQueue& queue)
 {
-    QPID_LOG(info, "Can't create durable queue '" << queue.getName() << "'. Persistence not enabled.");
+    QPID_LOG(info, "Queue '" << queue.getName() 
+             << "' will not be durable. Persistence not enabled.");
 }
 
-void NullMessageStore::destroy(PersistableQueue& queue)
+void NullMessageStore::destroy(PersistableQueue&)
 {
-    QPID_LOG(info, "Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled.");
 }
 
 void NullMessageStore::create(const PersistableExchange& exchange)
 {
-    QPID_LOG(info, "Can't create durable exchange '"
-             << exchange.getName() << "'. Persistence not enabled.");
+    QPID_LOG(info, "Exchange'" << exchange.getName() 
+             << "' will not be durable. Persistence not enabled.");
 }
 
 void NullMessageStore::destroy(const PersistableExchange& )
@@ -86,12 +86,11 @@
 
 void NullMessageStore::destroy(PersistableMessage&)
 {
-    QPID_LOG(info, "No need to destroy staged message. Persistence not enabled.");
 }
 
 void NullMessageStore::appendContent(const PersistableMessage&, const string&)
 {
-    QPID_LOG(info, "Can't load content. Persistence not enabled.");
+    QPID_LOG(info, "Can't append content. Persistence not enabled.");
 }
 
 void NullMessageStore::loadContent(const PersistableMessage&, string&, uint64_t, uint32_t)
@@ -102,18 +101,16 @@
 void NullMessageStore::enqueue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& queue)
 {
     msg.enqueueComplete(); 
-    QPID_LOG(info, "Can't enqueue message onto '" << queue.getName() << "'. Persistence not enabled.");
+    QPID_LOG(info, "Message is not durably recorded on '" << queue.getName() << "'. Persistence not enabled.");
 }
 
-void NullMessageStore::dequeue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& queue)
+void NullMessageStore::dequeue(TransactionContext*, PersistableMessage& msg, const PersistableQueue&)
 {
     msg.dequeueComplete();
-    QPID_LOG(info, "Can't dequeue message from '" << queue.getName() << "'. Persistence not enabled.");
 }
 
-void NullMessageStore::flush(const qpid::broker::PersistableQueue& queue)
+void NullMessageStore::flush(const qpid::broker::PersistableQueue&)
 {
-    QPID_LOG(info, "Can't flush. Persistence not enabled queue-" << queue.getName());
 }
 
 u_int32_t NullMessageStore::outstandingQueueAIO(const PersistableQueue& )

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h?rev=593159&r1=593158&r2=593159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h Thu Nov  8 06:05:38 2007
@@ -38,7 +38,7 @@
 public:
     NullMessageStore(bool warn = false);
 
-	virtual bool init(const std::string& dir, const bool async, const bool force);
+    virtual bool init(const std::string& dir, const bool async, const bool force);
     virtual std::auto_ptr<TransactionContext> begin();
     virtual std::auto_ptr<TPCTransactionContext> begin(const std::string& xid);
     virtual void prepare(TPCTransactionContext& txn);
@@ -63,7 +63,7 @@
     virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
     virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
     virtual u_int32_t outstandingQueueAIO(const PersistableQueue& queue);
-	virtual void flush(const qpid::broker::PersistableQueue& queue);
+    virtual void flush(const qpid::broker::PersistableQueue& queue);
     ~NullMessageStore(){}
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h?rev=593159&r1=593158&r2=593159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h Thu Nov  8 06:05:38 2007
@@ -111,6 +111,12 @@
     }
 };
 
+class MarkLastSegment
+{
+public:
+    void operator()(AMQFrame& f) const { f.setEof(true); }
+};
+
 }
 }