You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2011/04/04 16:25:25 UTC

svn commit: r1088634 - in /qpid/branches/0.10/qpid/cpp/src/qpid/broker: Queue.cpp RecoveredDequeue.cpp RecoveredEnqueue.cpp RecoveryManagerImpl.cpp TxPublish.cpp

Author: gsim
Date: Mon Apr  4 14:25:25 2011
New Revision: 1088634

URL: http://svn.apache.org/viewvc?rev=1088634&view=rev
Log:
QPID-3174: remove unnecessary enqueueComplete() calls (merge of r1087868 and r1088539)

Modified:
    qpid/branches/0.10/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/0.10/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp
    qpid/branches/0.10/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp
    qpid/branches/0.10/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
    qpid/branches/0.10/qpid/cpp/src/qpid/broker/TxPublish.cpp

Modified: qpid/branches/0.10/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.10/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1088634&r1=1088633&r2=1088634&view=diff
==============================================================================
--- qpid/branches/0.10/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/0.10/qpid/cpp/src/qpid/broker/Queue.cpp Mon Apr  4 14:25:25 2011
@@ -183,7 +183,6 @@ void Queue::recover(boost::intrusive_ptr
         // setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure
         msg->addToSyncList(shared_from_this(), store); 
     }
-    msg->enqueueComplete(); // mark the message as enqueued
 
     if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) {
         //content has not been loaded, need to ensure that lazy loading mode is set:
@@ -210,7 +209,6 @@ void Queue::requeue(const QueuedMessage&
     {    
         Mutex::ScopedLock locker(messageLock);
         if (!isEnqueued(msg)) return;
-        msg.payload->enqueueComplete(); // mark the message as enqueued
         messages->reinsert(msg);
         listeners.populate(copy);
 
@@ -632,7 +630,9 @@ bool Queue::enqueue(TransactionContext* 
     }
 
     if ((msg->isPersistent() || msg->checkContentReleasable()) && store) {
-        msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
+        // mark the message as being enqueued - the store MUST CALL msg->enqueueComplete()
+        // when it considers the message stored.
+        msg->enqueueAsync(shared_from_this(), store);
         boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
         store->enqueue(ctxt, pmsg, *this);
         return true;

Modified: qpid/branches/0.10/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.10/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp?rev=1088634&r1=1088633&r2=1088634&view=diff
==============================================================================
--- qpid/branches/0.10/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp (original)
+++ qpid/branches/0.10/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp Mon Apr  4 14:25:25 2011
@@ -43,7 +43,6 @@ void RecoveredDequeue::commit() throw()
 
 void RecoveredDequeue::rollback() throw()
 {
-    msg->enqueueComplete();
     queue->process(msg);
 }
 

Modified: qpid/branches/0.10/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.10/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp?rev=1088634&r1=1088633&r2=1088634&view=diff
==============================================================================
--- qpid/branches/0.10/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp (original)
+++ qpid/branches/0.10/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp Mon Apr  4 14:25:25 2011
@@ -36,7 +36,6 @@ bool RecoveredEnqueue::prepare(Transacti
 }
 
 void RecoveredEnqueue::commit() throw(){
-    msg->enqueueComplete();
     queue->process(msg);
 }
 

Modified: qpid/branches/0.10/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.10/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=1088634&r1=1088633&r2=1088634&view=diff
==============================================================================
--- qpid/branches/0.10/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/branches/0.10/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Mon Apr  4 14:25:25 2011
@@ -252,7 +252,6 @@ void RecoverableMessageImpl::dequeue(Dtx
 
 void RecoverableMessageImpl::enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue)
 {
-    msg->enqueueComplete(); // recoved nmessage to enqueued in store already
     buffer->enlist(TxOp::shared_ptr(new RecoveredEnqueue(queue, msg)));
 }
 

Modified: qpid/branches/0.10/qpid/cpp/src/qpid/broker/TxPublish.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.10/qpid/cpp/src/qpid/broker/TxPublish.cpp?rev=1088634&r1=1088633&r2=1088634&view=diff
==============================================================================
--- qpid/branches/0.10/qpid/cpp/src/qpid/broker/TxPublish.cpp (original)
+++ qpid/branches/0.10/qpid/cpp/src/qpid/broker/TxPublish.cpp Mon Apr  4 14:25:25 2011
@@ -90,14 +90,7 @@ void TxPublish::deliverTo(const boost::s
 
 void TxPublish::prepare(TransactionContext* ctxt, const boost::shared_ptr<Queue> queue)
 {
-    if (!queue->enqueue(ctxt, msg)){
-        /**
-         * if not store then mark message for ack and deleivery once
-         * commit happens, as async IO will never set it when no store
-         * exists
-         */
-	msg->enqueueComplete();
-    }
+    queue->enqueue(ctxt, msg);
 }
 
 TxPublish::Commit::Commit(intrusive_ptr<Message>& _msg) : msg(_msg){}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org