You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/06/03 22:10:18 UTC

svn commit: r1131182 - in /qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker: AsyncCompletion.h SemanticState.cpp SessionContext.h SessionState.cpp SessionState.h

Author: kgiusti
Date: Fri Jun  3 20:10:17 2011
New Revision: 1131182

URL: http://svn.apache.org/viewvc?rev=1131182&view=rev
Log:
checkpoint - various fixes

Modified:
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/AsyncCompletion.h
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionContext.h
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionState.h

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/AsyncCompletion.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/AsyncCompletion.h?rev=1131182&r1=1131181&r2=1131182&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/AsyncCompletion.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/AsyncCompletion.h Fri Jun  3 20:10:17 2011
@@ -89,7 +89,7 @@ class AsyncCompletion : private boost::n
      * callback object will be used by the last completer thread, and
      * released when the callback returns.
      */
-    class Callback : virtual public RefCounted
+    class Callback : public RefCounted
     {
   public:
         virtual void completed(bool) = 0;

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1131182&r1=1131181&r2=1131182&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Jun  3 20:10:17 2011
@@ -824,13 +824,14 @@ namespace {
             pending.erase(i);
 
             if (ready && pending.empty()) {
-                framing::Invoker::Result r;
+                framing::Invoker::Result r;   // message.accept does not return result data
                 Mutex::ScopedUnlock ul(lock);
                 completed( r );
             }
         }
 
-        /** allow the Message.Accept to complete */
+        /** allow the Message.Accept to complete - do this only after all
+         * deliveryIds have been added() */
         void enable()
         {
             Mutex::ScopedLock l(lock);
@@ -854,14 +855,14 @@ namespace {
         boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd;
     public:
         DequeueDone( const DeliveryId & _id,
-                     boost::intrusive_ptr<AsyncMessageAcceptCmd>& _cmd )
+                     const boost::intrusive_ptr<AsyncMessageAcceptCmd>& _cmd )
             : id(_id), cmd(_cmd) {}
         void operator()() { cmd->complete( id ); }
     };
 
 
     /** factory to create the above callback - passed to queue's dequeue
-        method, only called if dequeue is async! */
+        method, only used if dequeue is asynchronous! */
     boost::shared_ptr<Queue::DequeueDoneCallback> factory( SemanticState *state,
                                                            const DeliveryId& id,
                                                            boost::intrusive_ptr<AsyncMessageAcceptCmd>& cmd )
@@ -873,10 +874,18 @@ namespace {
         boost::shared_ptr<DequeueDone> x( new DequeueDone(id, cmd ) );
         return x;
     }
+
+    /** predicate to process unacked delivery records */
+    bool acceptDelivery( SemanticState *state,
+                         boost::intrusive_ptr<AsyncMessageAcceptCmd>& cmd,
+                         DeliveryRecord dr )
+    {
+        Queue::DequeueDoneCallbackFactory f = boost::bind(factory, state, dr.getId(), cmd);
+        return dr.accept((TransactionContext*) 0, &f);
+    }
 }
 
 void SemanticState::accepted(const SequenceSet& commands) {
-    QPID_LOG(error, "SemanticState::accepted (" << commands << ")");
     assertClusterSafe();
     if (txBuffer.get()) {
         //in transactional mode, don't dequeue or remove, just
@@ -900,26 +909,21 @@ void SemanticState::accepted(const Seque
             unacked.erase(removed, unacked.end());
         }
     } else {
-        /** @todo KAG - the following code removes the command from unacked
-            even if the dequeue has not completed.  note that the command will
-            still not complete until all dequeues complete. I'm doing this to
-            avoid having to lock the unacked list, which would be necessary if
-            we remove when the dequeue completes. Is this ok? */
+        /** @todo KAG - the following code removes the message from unacked
+            list even if the dequeue has not yet completed.  Note that the
+            entire command will still not complete until all dequeues
+            complete. I'm doing this to avoid having to lock the unacked list,
+            which would be necessary if we remove when the dequeue
+            completes. Is this ok? */
         boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd;
-        DeliveryRecords::iterator i;
-        DeliveryRecords undone;
-        for (i = unacked.begin(); i < unacked.end(); ++i) {
-            if (i->coveredBy(&commands)) {
-                Queue::DequeueDoneCallbackFactory f = boost::bind(factory, this, i->getId(), cmd);
-                if (i->accept((TransactionContext*) 0, &f) == false) {
-                    undone.push_back(*i);
-                }
-            }
-        }
-        if (undone.empty())
-            unacked.clear();
-        else
-            unacked.swap(undone);
+        DeliveryRecords::iterator removed =
+          remove_if(unacked.begin(), unacked.end(),
+                    isInSequenceSetAnd(commands,
+                                       bind(acceptDelivery,
+                                            this,
+                                            cmd,
+                                            _1)));
+        unacked.erase(removed, unacked.end());
 
         if (cmd) {
             boost::intrusive_ptr<SessionContext::AsyncCommandContext> pcmd(boost::static_pointer_cast<SessionContext::AsyncCommandContext>(cmd));

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionContext.h?rev=1131182&r1=1131181&r2=1131182&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionContext.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionContext.h Fri Jun  3 20:10:17 2011
@@ -59,7 +59,7 @@ class SessionContext : public OwnershipT
 
     // class for commands that need to complete asynchronously
     friend class AsyncCommandContext;
-    class AsyncCommandContext : virtual public RefCounted
+    class AsyncCommandContext : public RefCounted
     {
      private:
         framing::SequenceNumber id;
@@ -71,11 +71,11 @@ class SessionContext : public OwnershipT
         AsyncCommandContext() : id(0), requiresAccept(false), syncBitSet(false) {}
         virtual ~AsyncCommandContext() {}
 
-        framing::SequenceNumber getId() { return id; }
+        framing::SequenceNumber getId() const { return id; }
         void setId(const framing::SequenceNumber seq) { id = seq; }
-        bool getRequiresAccept() { return requiresAccept; }
+        bool getRequiresAccept() const { return requiresAccept; }
         void setRequiresAccept(const bool a) { requiresAccept = a; }
-        bool getSyncBitSet() { return syncBitSet; }
+        bool getSyncBitSet() const { return syncBitSet; }
         void setSyncBitSet(const bool s) { syncBitSet = s; }
         void setManager(SessionContext::AsyncCommandManager *m) { manager.reset(m); }
 
@@ -95,7 +95,7 @@ class SessionContext : public OwnershipT
     class AsyncCommandManager : public RefCounted
     {
      public:
-        virtual void completePendingCommand(boost::intrusive_ptr<AsyncCommandContext>&,
+        virtual void completePendingCommand(const boost::intrusive_ptr<AsyncCommandContext>&,
                                             const framing::Invoker::Result&) = 0;
     };
  };

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1131182&r1=1131181&r2=1131182&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionState.cpp Fri Jun  3 20:10:17 2011
@@ -208,6 +208,7 @@ void SessionState::handleCommand(framing
 {
     currentCommandComplete = true;      // assumed, can be overridden by invoker method (this sucks).
     syncCurrentCommand = method->isSync();
+    currentCommandId = id;
     acceptRequired = false;
     Invoker::Result invocation = invoke(adapter, *method);
     if (!invocation.wasHandled()) {
@@ -263,6 +264,7 @@ void SessionState::handleContent(AMQFram
         currentCommandComplete = true;      // assumed
         syncCurrentCommand = msg->getFrames().getMethod()->isSync();
         acceptRequired = msg->requiresAccept();
+        currentCommandId = msg->getCommandId();
         semanticState.handle(msg);
         msgBuilder.end();
         IncompleteIngressMsgXfer xfer(this, msg);
@@ -428,21 +430,23 @@ framing::AMQP_ClientProxy& SessionState:
 // (called via the invoker() in handleCommand() above)
 void SessionState::addPendingExecutionSync()
 {
-    SequenceNumber syncCommandId = receiverGetCurrent();
-    if (receiverGetIncomplete().front() < syncCommandId) {
+    if (receiverGetIncomplete().front() < currentCommandId) {
         currentCommandComplete = false;
-        pendingExecutionSyncs.push(syncCommandId);
+        pendingExecutionSyncs.push(currentCommandId);
         asyncCommandManager->flushPendingCommands();
-        QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId);
+        QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << currentCommandId);
     }
 }
 
 
 void SessionState::registerAsyncCommand(boost::intrusive_ptr<AsyncCommandContext>& aCmd)
 {
-    /** @todo KAG: ensure this is invoked during handleCommand() context! */
+    /** @todo KAG: ensure this is invoked during handleCommand()/handleContent() context! */
     currentCommandComplete = false;
-    asyncCommandManager->addPendingCommand( aCmd, receiverGetCurrent(), acceptRequired, syncCurrentCommand );
+    asyncCommandManager->addPendingCommand( aCmd, currentCommandId, acceptRequired, syncCurrentCommand );
+    if (syncCurrentCommand) {   // if client wants ack, force command to complete asap.
+        aCmd->flush();
+    }
 }
 
 
@@ -460,14 +464,10 @@ SessionState::IncompleteIngressMsgXfer::
     boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer> cb(new SessionState::IncompleteIngressMsgXfer(session, msg));
 
     // this routine is *only* invoked when the message needs to be asynchronously completed.  Otherwise, ::completed()
-    // will be invoked directly.
-    pending = true;
-    boost::intrusive_ptr<SessionContext::AsyncCommandContext>ctxt(boost::static_pointer_cast<SessionContext::AsyncCommandContext>(cb));
+    // will be invoked directly.  Thus, let the SessionState know this command is not going to complete immediately:
+    pendingCmdCtxt = boost::intrusive_ptr<CommandContext>(new CommandContext(msg));
+    boost::intrusive_ptr<qpid::broker::SessionContext::AsyncCommandContext> ctxt(pendingCmdCtxt);
     session->registerAsyncCommand(ctxt);
-    if (ctxt->getSyncBitSet()) {
-        // If the client is pending the message.transfer completion, flush now to force immediate write to journal.
-        msg->flush();
-    }
     return cb;
 }
 
@@ -478,35 +478,27 @@ SessionState::IncompleteIngressMsgXfer::
  */
 void SessionState::IncompleteIngressMsgXfer::completed(bool sync)
 {
-    if (!sync) {
-        /** note well: this path may execute in any thread.  It is safe to access
-         * the scheduledCompleterContext, since *this has a shared pointer to it.
-         * but not session!
-         */
+    if (pendingCmdCtxt) {
         session = 0;
-        QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << getId());
-        completed(framing::Invoker::Result());
+        QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << pendingCmdCtxt->getId());
+        pendingCmdCtxt->completed(framing::Invoker::Result());
+        pendingCmdCtxt.reset();
     } else {
+        // Since "clone()" above was _not_ called, this -better- be sync!
+        if (!sync) assert(false);
         // this path runs directly from the ac->end() call in handleContent() above,
         // so *session is definately valid.
         if (session->isAttached()) {
-            QPID_LOG(debug, ": receive completed for msg seq=" << getId());
-            session->completeCommand(getId(), framing::Invoker::Result(), getRequiresAccept(), getSyncBitSet());
-        }
-        if (pending) {
-            boost::intrusive_ptr<AsyncCommandContext> p(this);
-            session->cancelAsyncCommand(p);
+            QPID_LOG(debug, ": receive completed for msg seq=" << session->currentCommandId);
+            session->completeCommand(session->currentCommandId,
+                                     framing::Invoker::Result(),    // dummy
+                                     session->acceptRequired,
+                                     session->syncCurrentCommand);
         }
     }
 }
 
 
-void SessionState::IncompleteIngressMsgXfer::flush()
-{
-    msg->flush();
-}
-
-
 /** Scheduled from an asynchronous command's completed callback to run on
  * the IO thread.
  */
@@ -516,9 +508,9 @@ void SessionState::AsyncCommandManager::
 }
 
 
-void SessionState::AsyncCommandManager::addPendingCommand(boost::intrusive_ptr<AsyncCommandContext>& cmd,
-                                                          framing::SequenceNumber seq,
-                                                          bool acceptRequired, bool syncBitSet)
+void SessionState::AsyncCommandManager::addPendingCommand(const boost::intrusive_ptr<AsyncCommandContext>& cmd,
+                                                          const framing::SequenceNumber& seq,
+                                                          const bool acceptRequired, const bool syncBitSet)
 {
     cmd->setId(seq);
     cmd->setRequiresAccept(acceptRequired);
@@ -531,7 +523,7 @@ void SessionState::AsyncCommandManager::
 }
 
 
-void SessionState::AsyncCommandManager::cancelPendingCommand(boost::intrusive_ptr<AsyncCommandContext>& cmd)
+void SessionState::AsyncCommandManager::cancelPendingCommand(const boost::intrusive_ptr<AsyncCommandContext>& cmd)
 {
     qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
     pendingCommands.erase(cmd->getId());
@@ -559,7 +551,7 @@ void SessionState::AsyncCommandManager::
 /** mark a pending command as completed.
  * This method must be thread safe - it may run on any thread.
  */
-void SessionState::AsyncCommandManager::completePendingCommand(boost::intrusive_ptr<AsyncCommandContext>& cmd,
+void SessionState::AsyncCommandManager::completePendingCommand(const boost::intrusive_ptr<AsyncCommandContext>& cmd,
                                                                const framing::Invoker::Result& result)
 {
     qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionState.h?rev=1131182&r1=1131181&r2=1131182&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionState.h Fri Jun  3 20:10:17 2011
@@ -186,10 +186,11 @@ class SessionState : public qpid::Sessio
     // sequence numbers of received Execution.Sync commands that are pending completion.
     std::queue<SequenceNumber> pendingExecutionSyncs;
 
-    // true if command completes during call to handleCommand()
-    bool currentCommandComplete;
-    bool syncCurrentCommand;
-    bool acceptRequired;
+    // flags that reflect the state of the currently executing received command:
+    bool currentCommandComplete;    // true if the current command completed synchronously
+    SequenceNumber currentCommandId;
+    bool syncCurrentCommand;        // true if sync-bit set in current command headers
+    bool acceptRequired;            // true if current ingress message.transfer requires accept
 
  protected:
     /** This class provides a context for completing asynchronous commands in a thread
@@ -231,24 +232,16 @@ class SessionState : public qpid::Sessio
         AsyncCommandManager(SessionState *s) : session(s), isAttached(s->isAttached()) {};
         ~AsyncCommandManager() {};
 
-        /** track a message pending ingress completion */
-        //void addPendingMessage(boost::intrusive_ptr<Message> m);
-        //void deletePendingMessage(SequenceNumber id);
-        //void flushPendingMessages();
-        /** schedule the processing of a completed ingress message.transfer command */
-        //void scheduleMsgCompletion(SequenceNumber cmd,
-        //                           bool requiresAccept,
-        //                           bool requiresSync);
         void cancel();  // called by SessionState destructor.
         void attached();  // called by SessionState on attach()
         void detached();  // called by SessionState on detach()
 
-        /** called by async command handlers */
-        void addPendingCommand(boost::intrusive_ptr<AsyncCommandContext>&,
-                               framing::SequenceNumber, bool, bool);
-        void cancelPendingCommand(boost::intrusive_ptr<AsyncCommandContext>&);
+        /** for mananging asynchronous commands */
+        void addPendingCommand(const boost::intrusive_ptr<AsyncCommandContext>&,
+                               const framing::SequenceNumber&, const bool, const bool);
+        void cancelPendingCommand(const boost::intrusive_ptr<AsyncCommandContext>&);
         void flushPendingCommands();
-        void completePendingCommand(boost::intrusive_ptr<AsyncCommandContext>&, const framing::Invoker::Result&);
+        void completePendingCommand(const boost::intrusive_ptr<AsyncCommandContext>&, const framing::Invoker::Result&);
     };
     boost::intrusive_ptr<AsyncCommandManager> asyncCommandManager;
 
@@ -256,28 +249,32 @@ class SessionState : public qpid::Sessio
 
     /** incomplete Message.transfer commands - inbound to broker from client
      */
-    class IncompleteIngressMsgXfer : public AsyncCommandContext,
-                                     public AsyncCompletion::Callback
+    friend class IncompleteIngressMsgXfer;
+    class IncompleteIngressMsgXfer : public AsyncCompletion::Callback
     {
      public:
         IncompleteIngressMsgXfer( SessionState *ss,
                                   boost::intrusive_ptr<Message> m )
-          : session(ss),
-          msg(m),
-          pending(false) {}
+          : session(ss), msg(m) {};
         virtual ~IncompleteIngressMsgXfer() {};
 
-        // async completion calls
         virtual void completed(bool);
         virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone();
 
-        // async cmd calls
-        virtual void flush();
-
      private:
+        /** @todo KAG COMMENT ME */
+        class CommandContext : public AsyncCommandContext
+        {
+            boost::intrusive_ptr<Message> msg;
+         public:
+            CommandContext(boost::intrusive_ptr<Message> _m)
+              : msg(_m) {}
+            virtual void flush() { msg->flush(); }
+        };
+
         SessionState *session;  // only valid if sync flag in callback is true
         boost::intrusive_ptr<Message> msg;
-        bool pending;   // true if msg saved on pending list...
+        boost::intrusive_ptr<CommandContext> pendingCmdCtxt;
     };
 
     friend class SessionManager;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org