You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cc...@apache.org on 2008/09/30 18:04:29 UTC

svn commit: r700516 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/broker/Queue.cpp tests/QueueTest.cpp

Author: cctrieloff
Date: Tue Sep 30 09:04:29 2008
New Revision: 700516

URL: http://svn.apache.org/viewvc?rev=700516&view=rev
Log:
QPID-1306

Cleaner consume check, pointed out by gsim.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=700516&r1=700515&r2=700516&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Sep 30 09:04:29 2008
@@ -244,7 +244,7 @@
             return false;
         } else {
             QueuedMessage msg = messages.front();
-            if (store && !msg.payload->isEnqueueComplete()) { 
+            if (!optimisticConsume && store && !msg.payload->isEnqueueComplete()) { 
                 QPID_LOG(debug, "Messages not ready to dispatch on queue '" << name << "'");
                 addListener(c);
                 return false;
@@ -482,11 +482,7 @@
     }
 
     if (msg->isPersistent() && store) {
-        if (optimisticConsume){
-		    msg->enqueueComplete(); // (optimistic) allow consume before written to disk
-        } else {
-		    msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
-		}
+	    msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
 		boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
         store->enqueue(ctxt, pmsg, *this);
         return true;
@@ -503,11 +499,7 @@
         dequeued(msg);
     }
     if (msg.payload->isPersistent() && store) {
-        if (optimisticConsume) {
-		    msg.payload->dequeueComplete();
-        } else {
-            msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
-        }
+        msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
 		boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
         store->dequeue(ctxt, pmsg, *this);
         return true;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=700516&r1=700515&r2=700516&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Tue Sep 30 09:04:29 2008
@@ -301,27 +301,23 @@
 	queue->setLastNodeFailure();
 	
     intrusive_ptr<Message> msg1 = message("e", "A");
-    intrusive_ptr<Message> msg2 = message("e", "B");
-    intrusive_ptr<Message> msg3 = message("e", "C");
 	msg1->forcePersistent();
-	msg2->forcePersistent();
-	msg3->forcePersistent();
 
-	//enqueue 2 messages
-    queue->deliver(msg1);
-    queue->deliver(msg2);
-	
 	//change mode
 	args.setInt("qpid.optimistic_consume", 1);
     queue->configure(args);
 	
 	//enqueue 1 message
-    queue->deliver(msg3);
+    queue->deliver(msg1);
 	
-	//check all have persistent ids.
-    BOOST_CHECK(!msg1->isEnqueueComplete());
-    BOOST_CHECK(!msg2->isEnqueueComplete());
-    BOOST_CHECK(msg3->isEnqueueComplete());
+    TestConsumer::shared_ptr consumer(new TestConsumer());
+    queue->consume(consumer);
+    queue->dispatch(consumer);
+    if (!consumer->received)
+        sleep(2);
+
+    BOOST_CHECK_EQUAL(msg1.get(), consumer->last.get());
+    BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount());
 
 }