You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cc...@apache.org on 2008/09/30 18:04:29 UTC
svn commit: r700516 - in /incubator/qpid/trunk/qpid/cpp/src:
qpid/broker/Queue.cpp tests/QueueTest.cpp
Author: cctrieloff
Date: Tue Sep 30 09:04:29 2008
New Revision: 700516
URL: http://svn.apache.org/viewvc?rev=700516&view=rev
Log:
QPID-1306
Cleaner consume check, pointed out by gsim.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=700516&r1=700515&r2=700516&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Sep 30 09:04:29 2008
@@ -244,7 +244,7 @@
return false;
} else {
QueuedMessage msg = messages.front();
- if (store && !msg.payload->isEnqueueComplete()) {
+ if (!optimisticConsume && store && !msg.payload->isEnqueueComplete()) {
QPID_LOG(debug, "Messages not ready to dispatch on queue '" << name << "'");
addListener(c);
return false;
@@ -482,11 +482,7 @@
}
if (msg->isPersistent() && store) {
- if (optimisticConsume){
- msg->enqueueComplete(); // (optimistic) allow consume before written to disk
- } else {
- msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
- }
+ msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
store->enqueue(ctxt, pmsg, *this);
return true;
@@ -503,11 +499,7 @@
dequeued(msg);
}
if (msg.payload->isPersistent() && store) {
- if (optimisticConsume) {
- msg.payload->dequeueComplete();
- } else {
- msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
- }
+ msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
store->dequeue(ctxt, pmsg, *this);
return true;
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=700516&r1=700515&r2=700516&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Tue Sep 30 09:04:29 2008
@@ -301,27 +301,23 @@
queue->setLastNodeFailure();
intrusive_ptr<Message> msg1 = message("e", "A");
- intrusive_ptr<Message> msg2 = message("e", "B");
- intrusive_ptr<Message> msg3 = message("e", "C");
msg1->forcePersistent();
- msg2->forcePersistent();
- msg3->forcePersistent();
- //enqueue 2 messages
- queue->deliver(msg1);
- queue->deliver(msg2);
-
//change mode
args.setInt("qpid.optimistic_consume", 1);
queue->configure(args);
//enqueue 1 message
- queue->deliver(msg3);
+ queue->deliver(msg1);
- //check all have persistent ids.
- BOOST_CHECK(!msg1->isEnqueueComplete());
- BOOST_CHECK(!msg2->isEnqueueComplete());
- BOOST_CHECK(msg3->isEnqueueComplete());
+ TestConsumer::shared_ptr consumer(new TestConsumer());
+ queue->consume(consumer);
+ queue->dispatch(consumer);
+ if (!consumer->received)
+ sleep(2);
+
+ BOOST_CHECK_EQUAL(msg1.get(), consumer->last.get());
+ BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount());
}