You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/02/07 23:51:45 UTC

svn commit: r1068199 - in /qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker: AsyncCompletion.h QueueFlowLimit.cpp QueueFlowLimit.h SessionState.cpp SessionState.h

Author: kgiusti
Date: Mon Feb  7 22:51:45 2011
New Revision: 1068199

URL: http://svn.apache.org/viewvc?rev=1068199&view=rev
Log:
QPID-2935: clean up SessionState changes, fix races

Modified:
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/AsyncCompletion.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/AsyncCompletion.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/AsyncCompletion.h?rev=1068199&r1=1068198&r2=1068199&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/AsyncCompletion.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/AsyncCompletion.h Mon Feb  7 22:51:45 2011
@@ -92,9 +92,12 @@ namespace qpid {
                 qpid::sys::Mutex::ScopedLock l(callbackLock);
                 inCallback = true;
                 if (handler) {
-                    qpid::sys::Mutex::ScopedUnlock ul(callbackLock);
-                    (*handler)(sync);
-                    handler.reset();
+                    boost::shared_ptr<CompletionHandler> tmp;
+                    tmp.swap(handler);
+                    {
+                        qpid::sys::Mutex::ScopedUnlock ul(callbackLock);
+                        (*tmp)(sync);
+                    }
                 }
                 inCallback = false;
                 callbackLock.notifyAll();
@@ -142,7 +145,6 @@ namespace qpid {
             /** called by initiator after all potential completers have called
              * startCompleter().
              */
-            //void end(CompletionHandler::shared_ptr& _handler)
             void end(boost::shared_ptr<CompletionHandler> _handler)
             {
                 assert(completionsNeeded.get() > 0);    // ensure begin() has been called!

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1068199&r1=1068198&r2=1068199&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Mon Feb  7 22:51:45 2011
@@ -116,7 +116,7 @@ bool QueueFlowLimit::consume(const Queue
 
     if (!msg.payload) return false;
 
-    sys::Mutex::ScopedLock l(pendingFlowLock);
+    sys::Mutex::ScopedLock l(indexLock);
 
     ++count;
     size += msg.payload->contentSize();
@@ -153,7 +153,7 @@ bool QueueFlowLimit::replenish(const Que
 
     if (!msg.payload) return false;
 
-    sys::Mutex::ScopedLock l(pendingFlowLock);
+    sys::Mutex::ScopedLock l(indexLock);
 
     if (count > 0) {
         --count;
@@ -190,15 +190,6 @@ bool QueueFlowLimit::replenish(const Que
             if (itr != index.end()) {       // this msg is flow controlled, release it:
                 (*itr)->getReceiveCompletion().finishCompleter();
                 index.erase(itr);
-                //// stupid: (hopefully this is the next pending msg)
-                //std::list< boost::intrusive_ptr<Message> >::iterator itr2 = find(pendingFlow.begin(),
-                //                                                                 pendingFlow.end(),
-                //                                                                 msg.payload);
-                //if (itr2 == pendingFlow.end()) {
-                //    QPID_LOG(error, "Queue \"" << queueName << "\": indexed msg missing in list: " << msg.position);
-                //} else {
-                //    pendingFlow.erase(itr2);
-                //}
             }
         }
     }
@@ -274,14 +265,3 @@ std::ostream& operator<<(std::ostream& o
     }
 }
 
-/**
- * TBD:
- * - Is there a direct way to determine if QM is on pendingFlow list?
- * - Rate limit the granting of flow.
- * - What about LVQ?  A newer msg may replace the older one.
- * - What about queueing during a recovery?
- * - What about queue purge?
- * - What about message move?
- * - How do we treat orphaned messages?
- * -- Xfer a message to an alternate exchange - do we ack?
- */

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h?rev=1068199&r1=1068198&r2=1068199&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h Mon Feb  7 22:51:45 2011
@@ -86,9 +86,7 @@ class QueueFlowLimit
  protected:
     // msgs waiting for flow to become available.
     std::set< boost::intrusive_ptr<Message> > index;
-    // KAG: is this necessary?  Not if we release all pending when level < low (?)
-    // std::list< boost::intrusive_ptr<Message> > pendingFlow;     // ordered, oldest @front
-    qpid::sys::Mutex pendingFlowLock;
+    qpid::sys::Mutex indexLock;
 
     QueueFlowLimit(Queue *queue,
                    uint32_t flowStopCount, uint32_t flowResumeCount,

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1068199&r1=1068198&r2=1068199&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp Mon Feb  7 22:51:45 2011
@@ -60,7 +60,8 @@ SessionState::SessionState(
       adapter(semanticState),
       msgBuilder(&broker.getStore()),
       mgmtObject(0),
-      rateFlowcontrol(0)
+      rateFlowcontrol(0),
+      scheduledRcvMsgs(new IncompleteRcvMsg::deque)
 {
     uint32_t maxRate = broker.getOptions().maxSessionRate;
     if (maxRate) {
@@ -95,16 +96,19 @@ SessionState::~SessionState() {
         flowControlTimer->cancel();
 
     // clean up any outstanding incomplete receive messages
-
     qpid::sys::ScopedLock<Mutex> l(incompleteRcvMsgsLock);
-    while (!incompleteRcvMsgs.empty()) {
-        boost::shared_ptr<IncompleteRcvMsg> ref(incompleteRcvMsgs.front());
-        incompleteRcvMsgs.pop_front();
+    std::map<const IncompleteRcvMsg *, IncompleteRcvMsg::shared_ptr> copy(incompleteRcvMsgs);
+    incompleteRcvMsgs.clear();
+    while (!copy.empty()) {
+        boost::shared_ptr<IncompleteRcvMsg> ref(copy.begin()->second);
+        copy.erase(copy.begin());
         {
+            // note: need to drop lock, as callback may attempt to take it.
             qpid::sys::ScopedUnlock<Mutex> ul(incompleteRcvMsgsLock);
             ref->cancel();
         }
     }
+    scheduledRcvMsgs->clear();  // no need to lock - shared with IO thread.
 }
 
 AMQP_ClientProxy& SessionState::getProxy() {
@@ -264,21 +268,7 @@ void SessionState::handleContent(AMQFram
         msg->getReceiveCompletion().begin();
         semanticState.handle(msg);
         msgBuilder.end();
-        if (msg->getReceiveCompletion().getPendingCompleters() == 1) {
-            // There are no other pending receive completers (just this SessionState).
-            // Mark the message as completed.
-            completeRcvMsg( msg );
-        } else {
-            // There are outstanding receive completers.  Save the message until
-            // they are all done.
-            QPID_LOG(debug, getId() << ": delaying completion of msg seq=" << msg->getCommandId());
-            boost::shared_ptr<IncompleteRcvMsg> pendingMsg(new IncompleteRcvMsg(*this, msg));
-            {
-                qpid::sys::ScopedLock<Mutex> l(incompleteRcvMsgsLock);
-                incompleteRcvMsgs.push_back(pendingMsg);
-            }
-            msg->getReceiveCompletion().end( pendingMsg );   // allows others to complete
-        }
+        msg->getReceiveCompletion().end( createPendingMsg(msg) );   // allows msg to complete
     }
 
     // Handle producer session flow control
@@ -453,40 +443,45 @@ void SessionState::addPendingExecutionSy
 void SessionState::IncompleteRcvMsg::operator() (bool sync)
 {
     QPID_LOG(debug, ": async completion callback for msg seq=" << msg->getCommandId() << " sync=" << sync);
-    boost::shared_ptr<IncompleteRcvMsg> tmp;
-    {
-        qpid::sys::ScopedLock<Mutex> l(session->incompleteRcvMsgsLock);
-        for (std::list< boost::shared_ptr<IncompleteRcvMsg> >::iterator i = session->incompleteRcvMsgs.begin();
-             i != session->incompleteRcvMsgs.end(); ++i) {
-            if (i->get() == this) {
-                tmp.swap(*i);
-                session->incompleteRcvMsgs.remove(*i);
-                break;
-            }
-        }
-    }
 
-    if (session->isAttached()) {
-        if (sync) {
-            QPID_LOG(debug, ": receive completed for msg seq=" << msg->getCommandId());
-            session->completeRcvMsg(msg);
-        } else {    // potentially called from a different thread
-            QPID_LOG(debug, ": scheduling completion for msg seq=" << msg->getCommandId());
-            session->getConnection().requestIOProcessing(boost::bind(&SessionState::IncompleteRcvMsg::scheduledCompleter, tmp));
+    qpid::sys::ScopedLock<Mutex> l(session->incompleteRcvMsgsLock);
+    std::map<const IncompleteRcvMsg *, IncompleteRcvMsg::shared_ptr>::iterator i = session->incompleteRcvMsgs.find(this);
+    if (i != session->incompleteRcvMsgs.end()) {
+        boost::shared_ptr<IncompleteRcvMsg> tmp(i->second);
+        session->incompleteRcvMsgs.erase(i);
+
+        if (session->isAttached()) {
+            if (sync) {
+                qpid::sys::ScopedUnlock<Mutex> ul(session->incompleteRcvMsgsLock);
+                QPID_LOG(debug, ": receive completed for msg seq=" << msg->getCommandId());
+                session->completeRcvMsg(msg);
+                return;
+            } else {    // potentially called from a different thread
+                QPID_LOG(debug, ": scheduling completion for msg seq=" << msg->getCommandId());
+                session->scheduledRcvMsgs->push_back(tmp);
+                if (session->scheduledRcvMsgs->size() == 1) {
+                    session->getConnection().requestIOProcessing(boost::bind(&scheduledCompleter,
+                                                                             session->scheduledRcvMsgs));
+                }
+            }
         }
     }
 }
 
 
-/** Scheduled from IncompleteRcvMsg callback, completes the message receive
- * asynchronously
+/** Scheduled from IncompleteRcvMsg callback, completes all pending message
+ * receives asynchronously.
  */
-void SessionState::IncompleteRcvMsg::scheduledCompleter(boost::shared_ptr<SessionState::IncompleteRcvMsg> iMsg)
+void SessionState::IncompleteRcvMsg::scheduledCompleter(boost::shared_ptr<deque> msgs)
 {
-    QPID_LOG(debug, ": scheduled completion for msg seq=" << iMsg->msg->getCommandId());
-    if (iMsg->session && iMsg->session->isAttached()) {
-        QPID_LOG(debug, iMsg->session->getId() << ": receive completed for msg seq=" << iMsg->msg->getCommandId());
-        iMsg->session->completeRcvMsg(iMsg->msg);
+    while (!msgs->empty()) {
+        boost::shared_ptr<IncompleteRcvMsg> iMsg = msgs->front();
+        msgs->pop_front();
+        QPID_LOG(debug, ": scheduled completion for msg seq=" << iMsg->msg->getCommandId());
+        if (iMsg->session && iMsg->session->isAttached()) {
+            QPID_LOG(debug, iMsg->session->getId() << ": receive completed for msg seq=" << iMsg->msg->getCommandId());
+            iMsg->session->completeRcvMsg(iMsg->msg);
+        }
     }
 }
 

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h?rev=1068199&r1=1068198&r2=1068199&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h Mon Feb  7 22:51:45 2011
@@ -131,6 +131,9 @@ class SessionState : public qpid::Sessio
 
     void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
     void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id);
+
+    // indicate that the given ingress msg has been completely received by the
+    // broker, and the msg's message.transfer command can be considered completed.
     void completeRcvMsg(boost::intrusive_ptr<qpid::broker::Message> msg);
 
     void handleIn(framing::AMQFrame& frame);
@@ -169,21 +172,38 @@ class SessionState : public qpid::Sessio
     std::queue<SequenceNumber> pendingExecutionSyncs;
     bool currentCommandComplete;
 
+    // A list of ingress messages whose message.transfer command is pending
+    // completion.  These messages are awaiting some set of asynchronous
+    // operations to complete (eg: store, flow-control, etc). before
+    // the message.transfer can be completed.
     class IncompleteRcvMsg : public AsyncCompletion::CompletionHandler
     {
   public:
         IncompleteRcvMsg(SessionState& _session, boost::intrusive_ptr<Message> _msg)
           : session(&_session), msg(_msg) {}
-        virtual void operator() (bool sync);
+        virtual void operator() (bool sync);    // invoked when msg is completed.
         void cancel();   // cancel pending incomplete callback [operator() above].
 
+        typedef boost::shared_ptr<IncompleteRcvMsg>  shared_ptr;
+        typedef std::deque<shared_ptr> deque;
+
   private:
         SessionState *session;
         boost::intrusive_ptr<Message> msg;
-        static void scheduledCompleter( boost::shared_ptr<IncompleteRcvMsg> incompleteMsg );
+
+        static void scheduledCompleter(boost::shared_ptr<deque>);
     };
-    std::list< boost::shared_ptr<IncompleteRcvMsg> > incompleteRcvMsgs;
+    std::map<const IncompleteRcvMsg *, IncompleteRcvMsg::shared_ptr> incompleteRcvMsgs;  // msgs pending completion
     qpid::sys::Mutex incompleteRcvMsgsLock;
+    boost::shared_ptr<IncompleteRcvMsg> createPendingMsg(boost::intrusive_ptr<Message>& msg) {
+        boost::shared_ptr<IncompleteRcvMsg> pending(new IncompleteRcvMsg(*this, msg));
+        qpid::sys::ScopedLock<qpid::sys::Mutex> l(incompleteRcvMsgsLock);
+        incompleteRcvMsgs[pending.get()] = pending;
+        return pending;
+    }
+
+    // holds msgs waiting for IO thread to run scheduledCompleter()
+    boost::shared_ptr<IncompleteRcvMsg::deque> scheduledRcvMsgs;
 
     friend class SessionManager;
     friend class IncompleteRcvMsg;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org