You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/06/20 20:04:05 UTC

svn commit: r1137726 - in /qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker: Queue.cpp Queue.h SemanticState.cpp

Author: kgiusti
Date: Mon Jun 20 18:04:05 2011
New Revision: 1137726

URL: http://svn.apache.org/viewvc?rev=1137726&view=rev
Log:
QPID-3079: update with review feedback

Modified:
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.h
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1137726&r1=1137725&r2=1137726&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.cpp Mon Jun 20 18:04:05 2011
@@ -1263,14 +1263,12 @@ void Queue::DequeueCompletion::dequeueDo
     assert(completionsNeeded.get() > 0);
     if (--completionsNeeded == 0) {
         assert(cb);
-        (*cb)(ctxt);
-        ctxt.reset();
+        cb();
     }
 }
 
-void Queue::DequeueCompletion::registerCallback( callback *f, boost::intrusive_ptr<RefCounted>& _ctxt )
+void Queue::DequeueCompletion::registerCallback( boost::function<void()> f )
 {
     cb = f;
-    ctxt = _ctxt;
     dequeueDone();  // invoke callback if dequeue already done.
 }

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.h?rev=1137726&r1=1137725&r2=1137726&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.h Mon Jun 20 18:04:05 2011
@@ -289,21 +289,14 @@ class Queue : public boost::enable_share
     class DequeueCompletion : public RefCounted
     {
     public:
-        typedef void callback( boost::intrusive_ptr<RefCounted>& ctxt );
-
         DequeueCompletion()
-          : completionsNeeded(2),  // one for register call, another for done call
-          cb(0) {}
-
+          : completionsNeeded(2) {}// one for register call, another for done call
         void dequeueDone();
-        void registerCallback( callback *f, boost::intrusive_ptr<RefCounted>& _ctxt );
+        void registerCallback(boost::function<void()> f);
 
     private:
         mutable qpid::sys::AtomicValue<int> completionsNeeded;
-        callback *cb;
-        boost::intrusive_ptr<RefCounted> ctxt;
-        friend class Queue;
-
+        boost::function<void()> cb;
     };
     QPID_BROKER_EXTERN boost::intrusive_ptr<DequeueCompletion> dequeue(TransactionContext* ctxt, const QueuedMessage& msg);
 

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1137726&r1=1137725&r2=1137726&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon Jun 20 18:04:05 2011
@@ -783,30 +783,24 @@ namespace {
         std::vector<boost::shared_ptr<Queue> > queues;    // for flush()
         SemanticState& state;
 
-        /** completes this command. Note: may run in *any* thread */
-        void complete()
+    public:
+        AsyncMessageAcceptCmd(SemanticState& _state)
+            : pending(1), state(_state) {}
+
+        /** signal a dequeue is done. Note: may be run in *any* thread */
+        void dequeueDone()
         {
             Mutex::ScopedLock l(lock);
             assert(pending);
             if (--pending == 0) {
                 framing::Invoker::Result r;   // message.accept does not return result data
+                queues.clear();
                 Mutex::ScopedUnlock ul(lock);
                 QPID_LOG(trace, "Completing async message.accept cmd=" << getId());
                 completed( r );
             }
         }
 
-    public:
-        AsyncMessageAcceptCmd(SemanticState& _state)
-            : pending(1), state(_state) {}
-
-        /** signal this dequeue done. Note: may be run in *any* thread */
-        static void dequeueDone( boost::intrusive_ptr<RefCounted>& ctxt )
-        {
-            boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd(boost::static_pointer_cast<AsyncMessageAcceptCmd>(ctxt));
-            cmd->complete();
-        }
-
         /** called from session to urge pending dequeues to complete ASAP, done
             as a result of an execution.sync */
         void flush()
@@ -894,8 +888,7 @@ void SemanticState::accepted(const Seque
                 if (async) {
                     if (!cmd) cmd = boost::intrusive_ptr<AsyncMessageAcceptCmd>(new AsyncMessageAcceptCmd(*this));
                     cmd->add(i->getQueue());
-                    boost::intrusive_ptr<qpid::RefCounted> rc(boost::static_pointer_cast<RefCounted>(cmd));
-                    async->registerCallback(&AsyncMessageAcceptCmd::dequeueDone, rc);
+                    async->registerCallback(boost::bind(&AsyncMessageAcceptCmd::dequeueDone, cmd));
                 }
                 if (i->isRedundant())
                     i = unacked.erase(i);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org