You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/03/08 16:04:07 UTC

svn commit: r1079385 - in /qpid/trunk/qpid/cpp/src: qpid/broker/AsyncCompletion.h qpid/broker/PersistableMessage.h qpid/broker/QueueFlowLimit.cpp qpid/broker/SessionState.cpp qpid/broker/SessionState.h tests/MessageUtils.h tests/QueueTest.cpp

Author: kgiusti
Date: Tue Mar  8 15:04:07 2011
New Revision: 1079385

URL: http://svn.apache.org/viewvc?rev=1079385&view=rev
Log:
QPID-3073: refactor to eliminate locks, malloc, and map insert/remove in receive path.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCompletion.h
    qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    qpid/trunk/qpid/cpp/src/tests/MessageUtils.h
    qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCompletion.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCompletion.h?rev=1079385&r1=1079384&r2=1079385&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCompletion.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCompletion.h Tue Mar  8 15:04:07 2011
@@ -22,6 +22,8 @@
  *
  */
 
+#include <boost/intrusive_ptr.hpp>
+
 #include "qpid/broker/BrokerImportExport.h"
 #include "qpid/sys/AtomicValue.h"
 #include "qpid/sys/Mutex.h"
@@ -77,6 +79,22 @@ namespace broker {
 
 class AsyncCompletion
 {
+ public:
+
+    /** Supplied by the Initiator to the end() method, allows for a callback
+     * when all outstanding completers are done.  If the callback cannot be
+     * made during the end() call, the clone() method must supply a copy of
+     * this callback object that persists after end() returns.  The cloned
+     * callback object will be used by the last completer thread, and
+     * released when the callback returns.
+     */
+    class Callback : public RefCounted
+    {
+  public:
+        virtual void completed(bool) = 0;
+        virtual boost::intrusive_ptr<Callback> clone() = 0;
+    };
+
  private:
     mutable qpid::sys::AtomicValue<uint32_t> completionsNeeded;
     mutable qpid::sys::Monitor callbackLock;
@@ -85,14 +103,17 @@ class AsyncCompletion
     void invokeCallback(bool sync) {
         qpid::sys::Mutex::ScopedLock l(callbackLock);
         if (active) {
-            inCallback = true;
-            {
-                qpid::sys::Mutex::ScopedUnlock ul(callbackLock);
-                completed(sync);
+            if (callback) {
+                inCallback = true;
+                {
+                    qpid::sys::Mutex::ScopedUnlock ul(callbackLock);
+                    callback->completed(sync);
+                }
+                inCallback = false;
+                callback.reset();
+                callbackLock.notifyAll();
             }
-            inCallback = false;
             active = false;
-            callbackLock.notifyAll();
         }
     }
 
@@ -100,17 +121,17 @@ class AsyncCompletion
     /** Invoked when all completers have signalled that they have completed
      * (via calls to finishCompleter()). bool == true if called via end()
      */
-    virtual void completed(bool) = 0;
+    boost::intrusive_ptr<Callback> callback;
 
  public:
     AsyncCompletion() : completionsNeeded(0), inCallback(false), active(true) {};
     virtual ~AsyncCompletion() { cancel(); }
 
+
     /** True when all outstanding operations have compeleted
      */
     bool isDone()
     {
-        qpid::sys::Mutex::ScopedLock l(callbackLock);
         return !active;
     }
 
@@ -135,17 +156,32 @@ class AsyncCompletion
      */
     void begin()
     {
-        qpid::sys::Mutex::ScopedLock l(callbackLock);
         ++completionsNeeded;
     }
 
     /** called by initiator after all potential completers have called
      * startCompleter().
      */
-    void end()
+    void end(Callback& cb)
     {
         assert(completionsNeeded.get() > 0);    // ensure begin() has been called!
+        // the following only "decrements" the count if it is 1.  This means
+        // there are no more outstanding completers and we are done.
+        if (completionsNeeded.boolCompareAndSwap(1, 0)) {
+            // done!  Complete immediately
+            cb.completed(true);
+            return;
+        }
+
+        // the compare-and-swap did not succeed.  This means there are
+        // outstanding completers pending (count > 1).  Get a persistent
+        // Callback object to use when the last completer is done.
+        // Decrement after setting up the callback ensures that pending
+        // completers cannot touch the callback until it is ready.
+        callback = cb.clone();
         if (--completionsNeeded == 0) {
+            // note that a completer may have completed during the
+            // callback setup or decrement:
             invokeCallback(true);
         }
     }
@@ -156,14 +192,9 @@ class AsyncCompletion
     virtual void cancel() {
         qpid::sys::Mutex::ScopedLock l(callbackLock);
         while (inCallback) callbackLock.wait();
+        callback.reset();
         active = false;
     }
-
-    /** may be called by Initiator after all completers have been added but
-     * prior to calling end().  Allows initiator to determine if it _really_
-     * needs to wait for pending Completers (e.g. count > 1).
-     */
-    //uint32_t getPendingCompleters() { return completionsNeeded.get(); }
 };
 
 }}  // qpid::broker::

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=1079385&r1=1079384&r2=1079385&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Tue Mar  8 15:04:07 2011
@@ -56,7 +56,7 @@ class PersistableMessage : public Persis
      * operations have completed, the transfer of this message from the client
      * may be considered complete.
      */
-    boost::shared_ptr<AsyncCompletion> ingressCompletion;
+    AsyncCompletion ingressCompletion;
 
     /**
      * Tracks the number of outstanding asynchronous dequeue
@@ -115,12 +115,11 @@ class PersistableMessage : public Persis
     virtual QPID_BROKER_EXTERN bool isPersistent() const = 0;
 
     /** track the progress of a message received by the broker - see ingressCompletion above */
-    QPID_BROKER_EXTERN bool isIngressComplete() { return !ingressCompletion || ingressCompletion->isDone(); }
-    QPID_BROKER_EXTERN boost::shared_ptr<AsyncCompletion>& getIngressCompletion() { return ingressCompletion; }
-    QPID_BROKER_EXTERN void setIngressCompletion(boost::shared_ptr<AsyncCompletion>& ic) { ingressCompletion = ic; }
+    QPID_BROKER_EXTERN bool isIngressComplete() { return ingressCompletion.isDone(); }
+    QPID_BROKER_EXTERN AsyncCompletion& getIngressCompletion() { return ingressCompletion; }
 
-    QPID_BROKER_EXTERN void enqueueStart() { if (ingressCompletion) ingressCompletion->startCompleter(); }
-    QPID_BROKER_EXTERN void enqueueComplete() { if (ingressCompletion) ingressCompletion->finishCompleter(); }
+    QPID_BROKER_EXTERN void enqueueStart() { ingressCompletion.startCompleter(); }
+    QPID_BROKER_EXTERN void enqueueComplete() { ingressCompletion.finishCompleter(); }
 
     QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue,
                                          MessageStore* _store);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1079385&r1=1079384&r2=1079385&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Tue Mar  8 15:04:07 2011
@@ -144,11 +144,6 @@ void QueueFlowLimit::enqueued(const Queu
         }
     }
 
-    /** @todo KAG: - REMOVE ONCE STABLE */
-    if (index.find(msg.payload) != index.end()) {
-        QPID_LOG(error, "Queue \"" << queueName << "\": has enqueued a msg twice: " << msg.position);
-    }
-
     if (flowStopped || !index.empty()) {
         // ignore flow control if we are populating the queue due to cluster replication:
         if (broker && broker->isClusterUpdatee()) {
@@ -156,7 +151,7 @@ void QueueFlowLimit::enqueued(const Queu
             return;
         }
         QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.position);
-        msg.payload->getIngressCompletion()->startCompleter();    // don't complete until flow resumes
+        msg.payload->getIngressCompletion().startCompleter();    // don't complete until flow resumes
         index.insert(msg.payload);
     }
 }
@@ -196,14 +191,14 @@ void QueueFlowLimit::dequeued(const Queu
             // flow enabled - release all pending msgs
             while (!index.empty()) {
                 std::set< boost::intrusive_ptr<Message> >::iterator itr = index.begin();
-                (*itr)->getIngressCompletion()->finishCompleter();
+                (*itr)->getIngressCompletion().finishCompleter();
                 index.erase(itr);
             }
         } else {
             // even if flow controlled, we must release this msg as it is being dequeued
             std::set< boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.payload);
             if (itr != index.end()) {       // this msg is flow controlled, release it:
-                (*itr)->getIngressCompletion()->finishCompleter();
+                (*itr)->getIngressCompletion().finishCompleter();
                 index.erase(itr);
             }
         }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1079385&r1=1079384&r2=1079385&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Tue Mar  8 15:04:07 2011
@@ -62,7 +62,7 @@ SessionState::SessionState(
       msgBuilder(&broker.getStore()),
       mgmtObject(0),
       rateFlowcontrol(0),
-      scheduledCompleterContext(new ScheduledCompleterContext(this))
+      asyncCommandCompleter(new AsyncCommandCompleter(this))
 {
     uint32_t maxRate = broker.getOptions().maxSessionRate;
     if (maxRate) {
@@ -102,25 +102,7 @@ SessionState::~SessionState() {
     if (flowControlTimer)
         flowControlTimer->cancel();
 
-    // clean up any outstanding incomplete commands
-    {
-        qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock);
-        std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > copy(incompleteCmds);
-        incompleteCmds.clear();
-        while (!copy.empty()) {
-            boost::shared_ptr<IncompleteCommandContext> ref(copy.begin()->second);
-            copy.erase(copy.begin());
-            {
-                // note: need to drop lock, as callback may attempt to take it.
-                qpid::sys::ScopedUnlock<Mutex> ul(incompleteCmdsLock);
-                ref->cancel();
-            }
-        }
-    }
-
-    // At this point, we are guaranteed no further completion callbacks will be
-    // made.  Cancel any outstanding scheduledCompleter calls...
-    scheduledCompleterContext->cancel();
+    asyncCommandCompleter->cancel();
 }
 
 AMQP_ClientProxy& SessionState::getProxy() {
@@ -276,13 +258,11 @@ void SessionState::handleContent(AMQFram
             msg->getFrames().append(header);
         }
         msg->setPublisher(&getConnection());
-
-        boost::shared_ptr<AsyncCompletion> ac(boost::dynamic_pointer_cast<AsyncCompletion>(createIngressMsgXferContext(msg)));
-        msg->setIngressCompletion( ac );
-        ac->begin();
+        msg->getIngressCompletion().begin();
         semanticState.handle(msg);
         msgBuilder.end();
-        ac->end();  // allows msg to complete xfer
+        IncompleteIngressMsgXfer xfer(this, msg);
+        msg->getIngressCompletion().end(xfer);  // allows msg to complete xfer
     }
 
     // Handle producer session flow control
@@ -451,110 +431,94 @@ void SessionState::addPendingExecutionSy
 }
 
 
-/** factory for creating IncompleteIngressMsgXfer objects which
- * can be references from Messages as ingress AsyncCompletion objects.
+/** factory for creating a reference-counted IncompleteIngressMsgXfer object
+ * which will be attached to a message that will be completed asynchronously.
  */
-boost::shared_ptr<SessionState::IncompleteIngressMsgXfer>
-SessionState::createIngressMsgXferContext(boost::intrusive_ptr<Message> msg)
+boost::intrusive_ptr<AsyncCompletion::Callback>
+SessionState::IncompleteIngressMsgXfer::clone()
 {
-    SequenceNumber id = msg->getCommandId();
-    boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> cmd(new SessionState::IncompleteIngressMsgXfer(this, id, msg));
-    qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock);
-    incompleteCmds[id] = cmd;
-    return cmd;
+    boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer> cb(new SessionState::IncompleteIngressMsgXfer(session, msg));
+    return cb;
 }
 
 
-/** Invoked by the asynchronous completer associated with
- * a received msg that is pending Completion.  May be invoked
- * by the SessionState directly (sync == true), or some external
- * entity (!sync).
+/** Invoked by the asynchronous completer associated with a received
+ * msg that is pending Completion.  May be invoked by the IO thread
+ * (sync == true), or some external thread (!sync).
  */
 void SessionState::IncompleteIngressMsgXfer::completed(bool sync)
 {
     if (!sync) {
         /** note well: this path may execute in any thread.  It is safe to access
-         * the session, as the SessionState destructor will cancel all outstanding
-         * callbacks before getting destroyed (so we'll never get here).
+         * the scheduledCompleterContext, since *this has a shared pointer to it.
+         * but not session or msg!
          */
+        session = 0; msg = 0;
         QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id);
-        if (session->scheduledCompleterContext->scheduleCompletion(id))
-            session->getConnection().requestIOProcessing(boost::bind(&scheduledCompleter,
-                                                                     session->scheduledCompleterContext));
-    } else {  // command is being completed in IO thread.
-        // this path runs only on the IO thread.
-        qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock);
-        std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd;
-        cmd = session->incompleteCmds.find(id);
-        if (cmd != session->incompleteCmds.end()) {
-            boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second);
-            session->incompleteCmds.erase(cmd);
-
-            if (session->isAttached()) {
-                QPID_LOG(debug, ": receive completed for msg seq=" << id);
-                qpid::sys::ScopedUnlock<Mutex> ul(session->incompleteCmdsLock);
-                session->completeRcvMsg(id, requiresAccept, requiresSync);
-                return;
-            }
+        completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync);
+    } else {
+        // this path runs directly from the ac->end() call in handleContent() above,
+        // so *session and *msg are definately valid.
+        if (session->isAttached()) {
+            QPID_LOG(debug, ": receive completed for msg seq=" << id);
+            session->completeRcvMsg(id, requiresAccept, requiresSync);
         }
     }
+    completerContext.reset();  // ??? KAG optional ???
 }
 
 
-/** Scheduled from incomplete command's completed callback, safely completes all
- * completed commands in the IO Thread.  Guaranteed not to be running at the same
- * time as the message receive code.
+/** Scheduled from an asynchronous command's completed callback to run on
+ * the IO thread.
  */
-void SessionState::scheduledCompleter(boost::shared_ptr<SessionState::ScheduledCompleterContext> ctxt)
+void SessionState::AsyncCommandCompleter::schedule(boost::intrusive_ptr<AsyncCommandCompleter> ctxt)
 {
     ctxt->completeCommands();
 }
 
 
-/** mark a command (sequence) as completed, return True if caller should
- * schedule a call to completeCommands()
+/** mark an ingress Message.Transfer command as completed.
+ * This method must be thread safe - it may run on any thread.
  */
-bool SessionState::ScheduledCompleterContext::scheduleCompletion(SequenceNumber cmd)
-{
-    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock);
-
-    completedCmds.push_back(cmd);
-    return (completedCmds.size() == 1);
+void SessionState::AsyncCommandCompleter::scheduleMsgCompletion(SequenceNumber cmd,
+                                                                bool requiresAccept,
+                                                                bool requiresSync)
+{
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+
+    if (session) {
+        MessageInfo msg(cmd, requiresAccept, requiresSync);
+        completedMsgs.push_back(msg);
+        if (completedMsgs.size() == 1) {
+            session->getConnection().requestIOProcessing(boost::bind(&schedule,
+                                                                     session->asyncCommandCompleter));
+        }
+    }
 }
 
 
-/** Cause the session to complete all completed commands */
-void SessionState::ScheduledCompleterContext::completeCommands()
+/** Cause the session to complete all completed commands.
+ * Executes on the IO thread.
+ */
+void SessionState::AsyncCommandCompleter::completeCommands()
 {
-    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock);
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
 
     // when session is destroyed, it clears the session pointer via cancel().
-    if (!session) return;
-
-    while (!completedCmds.empty()) {
-        SequenceNumber id = completedCmds.front();
-        completedCmds.pop_front();
-        std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd;
-        {
-            qpid::sys::ScopedLock<qpid::sys::Mutex> l(session->incompleteCmdsLock);
-
-            cmd = session->incompleteCmds.find(id);
-            if (cmd !=session->incompleteCmds.end()) {
-                boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second);
-                {
-                    qpid::sys::ScopedUnlock<qpid::sys::Mutex> ul(session->incompleteCmdsLock);
-                    tmp->do_completion();   // retakes incompleteCmdslock
-                }
-            }
+    if (session && session->isAttached()) {
+        for (std::vector<MessageInfo>::iterator msg = completedMsgs.begin();
+             msg != completedMsgs.end(); ++msg) {
+            session->completeRcvMsg(msg->cmd, msg->requiresAccept, msg->requiresSync);
         }
     }
+    completedMsgs.clear();
 }
 
 
 /** cancel any pending calls to scheduleComplete */
-void SessionState::ScheduledCompleterContext::cancel()
+void SessionState::AsyncCommandCompleter::cancel()
 {
-    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock);
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
     session = 0;
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=1079385&r1=1079384&r2=1079385&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Tue Mar  8 15:04:07 2011
@@ -38,6 +38,7 @@
 
 #include <boost/noncopyable.hpp>
 #include <boost/scoped_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
 
 #include <set>
 #include <vector>
@@ -176,79 +177,84 @@ class SessionState : public qpid::Sessio
     std::queue<SequenceNumber> pendingExecutionSyncs;
     bool currentCommandComplete;
 
-    /** Abstract class that represents a command that is pending
-     * completion.
+    /** This class provides a context for completing asynchronous commands in a thread
+     * safe manner.  Asynchronous commands save their completion state in this class.
+     * This class then schedules the completeCommands() method in the IO thread.
+     * While running in the IO thread, completeCommands() may safely complete all
+     * saved commands without the risk of colliding with other operations on this
+     * SessionState.
      */
-    class IncompleteCommandContext : public AsyncCompletion
+    class AsyncCommandCompleter : public RefCounted {
+    private:
+        SessionState *session;
+        qpid::sys::Mutex completerLock;
+
+        // special-case message.transfer commands for optimization
+        struct MessageInfo {
+            SequenceNumber cmd; // message.transfer command id
+            bool requiresAccept;
+            bool requiresSync;
+        MessageInfo(SequenceNumber c, bool a, bool s)
+        : cmd(c), requiresAccept(a), requiresSync(s) {}
+        };
+        std::vector<MessageInfo> completedMsgs;
+
+        /** complete all pending commands, runs in IO thread */
+        void completeCommands();
+
+        /** for scheduling a run of "completeCommands()" on the IO thread */
+        static void schedule(boost::intrusive_ptr<AsyncCommandCompleter>);
+
+  public:
+        AsyncCommandCompleter(SessionState *s) : session(s) {};
+        ~AsyncCommandCompleter() {};
+
+        /** schedule the completion of an ingress message.transfer command */
+        void scheduleMsgCompletion(SequenceNumber cmd,
+                                   bool requiresAccept,
+                                   bool requiresSync);
+        void cancel();  // called by SessionState destructor.
+    };
+    boost::intrusive_ptr<AsyncCommandCompleter> asyncCommandCompleter;
+
+    /** Abstract class that represents a single asynchronous command that is
+     * pending completion.
+     */
+    class AsyncCommandContext : public AsyncCompletion::Callback
     {
      public:
-        IncompleteCommandContext( SessionState *ss, SequenceNumber _id )
-          : id(_id), session(ss) {}
-        virtual ~IncompleteCommandContext() {}
-
-        /* allows manual invokation of completion, used by IO thread to
-         * complete a command that was originally finished on a different
-         * thread.
-         */
-        void do_completion() { completed(true); }
+        AsyncCommandContext( SessionState *ss, SequenceNumber _id )
+          : id(_id), completerContext(ss->asyncCommandCompleter) {}
+        virtual ~AsyncCommandContext() {}
 
      protected:
         SequenceNumber id;
-        SessionState    *session;
+        boost::intrusive_ptr<AsyncCommandCompleter> completerContext;
     };
 
     /** incomplete Message.transfer commands - inbound to broker from client
      */
-    class IncompleteIngressMsgXfer : public SessionState::IncompleteCommandContext
+    class IncompleteIngressMsgXfer : public SessionState::AsyncCommandContext
     {
      public:
         IncompleteIngressMsgXfer( SessionState *ss,
-                                  SequenceNumber _id,
-                                  boost::intrusive_ptr<Message> msg )
-          : IncompleteCommandContext(ss, _id),
-          requiresAccept(msg->requiresAccept()),
-          requiresSync(msg->getFrames().getMethod()->isSync()) {};
+                                  boost::intrusive_ptr<Message> m )
+          : AsyncCommandContext(ss, m->getCommandId()),
+            session(ss),
+            msg(m.get()),
+            requiresAccept(msg->requiresAccept()),
+            requiresSync(msg->getFrames().getMethod()->isSync()) {};
         virtual ~IncompleteIngressMsgXfer() {};
 
-     protected:
         virtual void completed(bool);
+        virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone();
 
      private:
-        /** meta-info required to complete the message */
+        SessionState *session;  // only valid if sync == true
+        Message *msg;           // only valid if sync == true
         bool requiresAccept;
-        bool requiresSync;  // method's isSync() flag
+        bool requiresSync;
     };
-    /** creates a command context suitable for use as an AsyncCompletion in a message */
-    boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> createIngressMsgXferContext( boost::intrusive_ptr<Message> msg);
-
-    /* A list of commands that are pending completion.  These commands are
-     * awaiting some set of asynchronous operations to finish (eg: store,
-     * flow-control, etc). before the command can be completed to the client
-     */
-    std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > incompleteCmds;
-    qpid::sys::Mutex incompleteCmdsLock;  // locks above container
-
-    /** This context is shared between the SessionState and scheduledCompleter,
-     * holds the sequence numbers of all commands that have completed asynchronously.
-     */
-    class ScheduledCompleterContext {
-    private:
-        std::list<SequenceNumber> completedCmds;
-        // ordering: take this lock first, then incompleteCmdsLock
-        qpid::sys::Mutex completedCmdsLock;
-        SessionState *session;
-    public:
-        ScheduledCompleterContext(SessionState *s) : session(s) {};
-        bool scheduleCompletion(SequenceNumber cmd);
-        void completeCommands();
-        void cancel();
-    };
-    boost::shared_ptr<ScheduledCompleterContext> scheduledCompleterContext;
-
-    /** The following method runs the in IO thread and completes commands that
-     * where finished asynchronously.
-     */
-    static void scheduledCompleter(boost::shared_ptr<ScheduledCompleterContext>);
 
     friend class SessionManager;
 };

Modified: qpid/trunk/qpid/cpp/src/tests/MessageUtils.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessageUtils.h?rev=1079385&r1=1079384&r2=1079385&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessageUtils.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessageUtils.h Tue Mar  8 15:04:07 2011
@@ -20,7 +20,6 @@
  */
 
 #include "qpid/broker/Message.h"
-#include "qpid/broker/AsyncCompletion.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/Uuid.h"
@@ -29,17 +28,6 @@ using namespace qpid;
 using namespace broker;
 using namespace framing;
 
-namespace {
-    class DummyCompletion : public AsyncCompletion
-    {
-  public:
-        DummyCompletion() {}
-        virtual ~DummyCompletion() {}
-  protected:
-        void completed(bool) {}
-    };
-}
-
 namespace qpid {
 namespace tests {
 
@@ -62,8 +50,6 @@ struct MessageUtils
         msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
         if (durable)
             msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setDeliveryMode(2);
-        boost::shared_ptr<AsyncCompletion>dc(new DummyCompletion());
-        msg->setIngressCompletion(dc);
         return msg;
     }
 

Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=1079385&r1=1079384&r2=1079385&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Tue Mar  8 15:04:07 2011
@@ -88,8 +88,6 @@ intrusive_ptr<Message> create_message(st
     msg->getFrames().append(method);
     msg->getFrames().append(header);
     msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
-    boost::shared_ptr<AsyncCompletion>dc(new DummyCompletion());
-    msg->setIngressCompletion(dc);
     return msg;
 }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org