You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/11/06 23:41:07 UTC

svn commit: r711998 - in /incubator/qpid/trunk/qpid/cpp: rubygen/framing.0-10/ src/qpid/broker/ src/qpid/sys/ src/tests/

Author: aconway
Date: Thu Nov  6 14:40:57 2008
New Revision: 711998

URL: http://svn.apache.org/viewvc?rev=711998&view=rev
Log:

Add Message callbacks for async completion.
Add unit test for async completion.
Add sync parameter to generated session functions, defaults as before but allows greater control.

Added:
    incubator/qpid/trunk/qpid/cpp/src/tests/AsyncCompletion.cpp   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/Session.rb
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h
    incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am

Modified: incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/Session.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/Session.rb?rev=711998&r1=711997&r2=711998&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/Session.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/Session.rb Thu Nov  6 14:40:57 2008
@@ -23,11 +23,12 @@
 require 'cppgen'
 
 class CppGen
-  def session_methods
+  def session_methods(sync_default)
     excludes = ["connection", "session", "file", "stream"]
     gen_methods=@amqp.methods_on(@chassis).reject { |m|
       excludes.include? m.parent.name or m.body_name.include?("010")
     }
+    gen_methods.each { |m| m.set_sync_default(sync_default) }
   end
 
   
@@ -70,6 +71,8 @@
       genl "return *this;"
     }
   end
+
+  def sync_default() !@async end
 end
 
 class ContentField               # For extra content parameters
@@ -80,13 +83,23 @@
   def doc() "Message content"; end
 end
 
+class SyncField               # For extra sync parameters
+  def initialize(default_value) @default_value=default_value ? "true" : "false" end
+  def cppname() "sync"  end
+  def signature() "bool sync" end
+  def sig_default() signature+"="+@default_value end
+  def unpack() "p[arg::sync|#{@default_value}]"; end
+  def doc() "If true the broker will respond with completion status as soon as possible."; end
+end
+
 class AmqpField
   def unpack() "p[arg::#{cppname}|#{default_value}]"; end
   def sig_default() signature+"="+default_value; end
 end
 
 class AmqpMethod
-  def fields_c() content ? fields+[ContentField.new] : fields end
+  def set_sync_default(sync_default) @sync_default=sync_default end
+  def fields_c() result = fields + (content ? [ContentField.new] : []) + [SyncField.new(@sync_default)] end
   def param_names_c() fields_c.map { |f| f.cppname} end
   def signature_c()  fields_c.map { |f| f.signature }; end
   def sig_c_default()  fields_c.map { |f| f.sig_default }; end
@@ -134,7 +147,7 @@
         cpp_class(@classname, "public #{@version_base}") {
           public
           decl_ctor_opeq()
-          session_methods.each { |m|
+          session_methods(sync_default).each { |m|
             genl
             doxygen(m)
             args=m.sig_c_default.join(", ") 
@@ -148,14 +161,14 @@
       include "qpid/framing/all_method_bodies.h"
       namespace(@namespace) {
         genl "using namespace framing;"
-        session_methods.each { |m|
+        session_methods(sync_default).each { |m|
           genl
           sig=m.signature_c.join(", ")
           func="#{@classname}::#{m.session_function}"
           scope("#{m.return_type(@async)} #{func}(#{sig}) {") {
             args=(["ProtocolVersion(#{@amqp.major},#{@amqp.minor})"]+m.param_names).join(", ")
             genl "#{m.body_name} body(#{args});";
-            genl "body.setSync(#{@async ? 'false':'true'});"
+            genl "body.setSync(sync);"
             sendargs="body"
             sendargs << ", content" if m.content
             async_retval="#{m.return_type(true)}(impl->send(#{sendargs}), impl)"
@@ -200,7 +213,7 @@
   end
 
   def generate()
-    keyword_methods=session_methods.reject { |m| m.fields_c.empty? }
+    keyword_methods=session_methods(sync_default).reject { |m| m.fields_c.empty? }
     max_arity = keyword_methods.map{ |m| m.fields_c.size }.max
 
     h_file("qpid/client/arg.h") {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=711998&r1=711997&r2=711998&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu Nov  6 14:40:57 2008
@@ -194,16 +194,12 @@
         (*i)->earlyInitialize(*this);
 
     // If no plugin store module registered itself, set up the null store.
-    if (store == 0)
+    if (store.get() == 0)
         setStore (new NullMessageStore (false));
 
-    queues.setStore     (store);
-    dtxManager.setStore (store);
-    links.setStore      (store);
-
     exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
     
-    if (store != 0) {
+    if (store.get() != 0) {
         RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, 
                                       conf.stagingThreshold);
         store->recover(recoverer);
@@ -247,7 +243,7 @@
 
 void Broker::declareStandardExchange(const std::string& name, const std::string& type)
 {
-    bool storeEnabled = store != NULL;
+    bool storeEnabled = store.get() != NULL;
     std::pair<Exchange::shared_ptr, bool> status = exchanges.declare(name, type, storeEnabled);
     if (status.second && storeEnabled) {
         store->create(*status.first, framing::FieldTable ());
@@ -269,9 +265,10 @@
 
 void Broker::setStore (MessageStore* _store)
 {
-    assert (store == 0 && _store != 0);
-    if (store == 0 && _store != 0)
-        store = new MessageStoreModule (_store);
+    store.reset(new MessageStoreModule (_store));
+    queues.setStore     (store.get());
+    dtxManager.setStore (store.get());
+    links.setStore      (store.get());
 }
 
 void Broker::run() {
@@ -304,7 +301,6 @@
 Broker::~Broker() {
     shutdown();
     finalize();                 // Finalize any plugins.
-    delete store;    
     if (config.auth)
         SaslAuthenticator::fini();
     QPID_LOG(notice, "Shut down");

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=711998&r1=711997&r2=711998&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Thu Nov  6 14:40:57 2008
@@ -112,7 +112,7 @@
     Options config;
     management::ManagementAgent::Singleton managementAgentSingleton;
     ProtocolFactoryMap protocolFactories;
-    MessageStore* store;
+    std::auto_ptr<MessageStore> store;
     AclModule* acl;
     DataDir dataDir;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp?rev=711998&r1=711997&r2=711998&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp Thu Nov  6 14:40:57 2008
@@ -167,6 +167,5 @@
 
 void DtxManager::setStore (TransactionalStore* _store)
 {
-    assert (store == 0 && _store != 0);
     store = _store;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=711998&r1=711997&r2=711998&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Thu Nov  6 14:40:57 2008
@@ -180,7 +180,6 @@
 
 void LinkRegistry::setStore (MessageStore* _store)
 {
-    assert (store == 0 && _store != 0);
     store = _store;
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=711998&r1=711997&r2=711998&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Thu Nov  6 14:40:57 2008
@@ -45,9 +45,10 @@
 
 TransferAdapter Message::TRANSFER;
 
-  Message::Message(const framing::SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false),
-                                             staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), 
-                                             expiration(FAR_FUTURE) {}
+Message::Message(const framing::SequenceNumber& id) :
+    frames(id), persistenceId(0), redelivered(false), loaded(false),
+    staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), 
+    expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0) {}
 
 Message::~Message()
 {
@@ -268,7 +269,7 @@
 
 namespace 
 {
-    const std::string X_QPID_TRACE("x-qpid.trace");
+const std::string X_QPID_TRACE("x-qpid.trace");
 }
 
 bool Message::isExcluded(const std::vector<std::string>& excludes) const
@@ -341,4 +342,22 @@
     replacement[qfor] = msg;
 }
 
+void Message::allEnqueuesComplete() {
+    MessageCallback* cb = 0;
+    {
+        sys::Mutex::ScopedLock l(lock);
+        swap(cb, enqueueCallback);
+    }
+    if (cb && *cb) (*cb)(*this);
+}
+
+void Message::allDequeuesComplete() {
+    MessageCallback* cb = 0;
+    {
+        sys::Mutex::ScopedLock l(lock);
+        swap(cb, dequeueCallback);
+    }
+    if (cb && *cb) (*cb)(*this);
+}
+
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=711998&r1=711997&r2=711998&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Thu Nov  6 14:40:57 2008
@@ -22,15 +22,15 @@
  *
  */
 
-#include <string>
-#include <vector>
-#include <boost/shared_ptr.hpp>
-#include <boost/variant.hpp>
 #include "PersistableMessage.h"
 #include "MessageAdapter.h"
 #include "qpid/framing/amqp_types.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/sys/Time.h"
+#include "qpid/shared_ptr.h"
+#include <boost/function.hpp>
+#include <string>
+#include <vector>
 
 namespace qpid {
 	
@@ -48,6 +48,8 @@
 
 class Message : public PersistableMessage {
 public:
+    typedef boost::function<void (Message&)> MessageCallback;
+    
     Message(const framing::SequenceNumber& id = framing::SequenceNumber());
     ~Message();
         
@@ -142,7 +144,19 @@
     boost::intrusive_ptr<Message>& getReplacementMessage(const Queue* qfor) const;
     void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor);
 
+    /** Call cb when enqueue is complete, may call immediately. Holds cb by reference. */
+    void setEnqueueCompleteCallback(const MessageCallback* cb);
+
+    /** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */
+    void setDequeueCompleteCallback(const MessageCallback& cb);
+
   private:
+    typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement;
+
+    MessageAdapter& getAdapter() const;
+    void allEnqueuesComplete();
+    void allDequeuesComplete();
+
     mutable sys::Mutex lock;
     framing::FrameSet frames;
     mutable boost::shared_ptr<Exchange> exchange;
@@ -157,11 +171,10 @@
 
     static TransferAdapter TRANSFER;
 
-    MessageAdapter& getAdapter() const;
-	typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement;
-
     mutable Replacement replacement;
     mutable boost::intrusive_ptr<Message> empty;
+    MessageCallback* enqueueCallback;
+    MessageCallback* dequeueCallback;
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp?rev=711998&r1=711997&r2=711998&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp Thu Nov  6 14:40:57 2008
@@ -87,6 +87,7 @@
         }
     }
     if (notify) {
+        allEnqueuesComplete();
         sys::ScopedLock<sys::Mutex> l(storeLock);
         if (store) {
             for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
@@ -118,13 +119,17 @@
 }
     
 void PersistableMessage::dequeueComplete() { 
-
-    sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
-    if (asyncDequeueCounter > 0) {
-        if (--asyncDequeueCounter == 0) {
-            asyncDequeueLock.notify();
+    bool notify = false;
+    {
+        sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
+        if (asyncDequeueCounter > 0) {
+            if (--asyncDequeueCounter == 0) {
+                notify = true;
+                asyncDequeueLock.notify();
+            }
         }
     }
+    if (notify) allDequeuesComplete();
 }
 
 void PersistableMessage::waitForDequeueComplete() {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=711998&r1=711997&r2=711998&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Thu Nov  6 14:40:57 2008
@@ -68,10 +68,15 @@
     syncList synclist;
 
   protected:
-    MessageStore* store;
-    
+    /** Called when all enqueues are complete for this message. */
+    virtual void allEnqueuesComplete() = 0;
+    /** Called when all dequeues are complete for this message. */
+    virtual void allDequeuesComplete() = 0;
+
     void setContentReleased();
 
+    MessageStore* store;
+
   public:
     typedef boost::shared_ptr<PersistableMessage> shared_ptr;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=711998&r1=711997&r2=711998&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Thu Nov  6 14:40:57 2008
@@ -85,7 +85,6 @@
 
 void QueueRegistry::setStore (MessageStore* _store)
 {
-    assert (store == 0 && _store != 0);
     store = _store;
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h?rev=711998&r1=711997&r2=711998&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h Thu Nov  6 14:40:57 2008
@@ -66,10 +66,11 @@
         return true;
     }
 
-    T pop() {
+    T pop(Duration timeout=TIME_INFINITE) {
         T result;
-        bool ok = pop(result);
-        assert(ok); (void) ok;  // Infinite wait.
+        bool ok = pop(result, timeout);
+        if (!ok)
+            throw Exception("Timed out waiting on a blocking queue");
         return result;
     }
         

Added: incubator/qpid/trunk/qpid/cpp/src/tests/AsyncCompletion.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/AsyncCompletion.cpp?rev=711998&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/AsyncCompletion.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/AsyncCompletion.cpp Thu Nov  6 14:40:57 2008
@@ -0,0 +1,100 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+#include "unit_test.h"
+#include "test_tools.h"
+#include "BrokerFixture.h"
+#include "qpid/broker/NullMessageStore.h"
+#include "qpid/sys/BlockingQueue.h"
+#include "qpid/client/AsyncSession.h"
+#include "qpid/sys/Time.h"
+
+using namespace std;
+using namespace qpid;
+using namespace client;
+using namespace framing;
+
+namespace qpid { namespace broker {
+class TransactionContext;
+class PersistableQueue;
+}}
+
+using broker::PersistableMessage;
+using broker::NullMessageStore;
+using broker::TransactionContext;
+using broker::PersistableQueue;
+using sys::TIME_SEC;
+using boost::intrusive_ptr;
+
+/** @file Unit tests for async completion.
+ * Using a dummy store, verify that the broker indicates async completion of
+ * message enqueues at the correct time.
+ */
+
+class AsyncCompletionMessageStore : public NullMessageStore {
+  public:
+    sys::BlockingQueue<boost::intrusive_ptr<PersistableMessage> > enqueued;
+    
+    AsyncCompletionMessageStore() : NullMessageStore() {}
+    ~AsyncCompletionMessageStore(){}
+
+    void enqueue(TransactionContext*,
+                 const boost::intrusive_ptr<PersistableMessage>& msg,
+                 const PersistableQueue& )
+    {
+        enqueued.push(msg);
+    }
+};
+
+QPID_AUTO_TEST_SUITE(AsyncCompletionTestSuite)
+
+QPID_AUTO_TEST_CASE(testWaitTillComplete) {
+    AsyncCompletionMessageStore* store = new AsyncCompletionMessageStore;
+    SessionFixture fix;
+    fix.broker->setStore(store); // Broker will delete store.
+    AsyncSession s = fix.session;
+
+    static const int count = 3;
+
+    s.queueDeclare("q", arg::durable=true);
+    Completion transfers[count];
+    for (int i = 0; i < count; ++i) {
+        Message msg(boost::lexical_cast<string>(i), "q");
+        msg.getDeliveryProperties().setDeliveryMode(PERSISTENT);
+        transfers[i] = s.messageTransfer(arg::content=msg);
+    }
+
+    // Get hold of the broker-side messages. 
+    typedef vector<intrusive_ptr<PersistableMessage> > BrokerMessages;
+    BrokerMessages enqueued;
+    for (int j = 0; j < count; ++j) 
+        enqueued.push_back(store->enqueued.pop(TIME_SEC));
+
+    // Send a sync, make sure it does not complete till all messages are complete.
+    // In reverse order for fun.
+    Completion sync = s.executionSync(arg::sync=true);
+    for (int k = count-1; k >= 0; --k) {
+        BOOST_CHECK(!transfers[k].isComplete()); // Should not be complete yet.
+        BOOST_CHECK(!sync.isComplete()); // Should not be complete yet.
+        enqueued[k]->enqueueComplete();
+    }
+    sync.wait();                // Should complete now, all messages are completed.
+}
+
+QPID_AUTO_TEST_SUITE_END()

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/AsyncCompletion.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/AsyncCompletion.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=711998&r1=711997&r2=711998&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Thu Nov  6 14:40:57 2008
@@ -35,6 +35,7 @@
 	exception_test.cpp \
 	RefCounted.cpp \
 	SessionState.cpp Blob.cpp logging.cpp \
+	AsyncCompletion.cpp \
 	Url.cpp Uuid.cpp \
 	Shlib.cpp FieldValue.cpp FieldTable.cpp Array.cpp \
 	QueueOptionsTest.cpp \