You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/02/12 12:43:51 UTC
svn commit: r743694 - in /qpid/trunk/qpid/cpp/src: qpid/broker/Queue.cpp
qpid/broker/Queue.h qpid/broker/SemanticState.cpp
qpid/broker/SemanticState.h tests/ClientSessionTest.cpp
Author: gsim
Date: Thu Feb 12 11:43:51 2009
New Revision: 743694
URL: http://svn.apache.org/viewvc?rev=743694&view=rev
Log:
QPID-1660: If selected consumer can't take a message, ensure others are notified of message availability.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=743694&r1=743693&r2=743694&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Feb 12 11:43:51 2009
@@ -256,10 +256,30 @@
return false;
}
+void Queue::notifyListener()
+{
+ QueueListeners::NotificationSet set;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ if (messages.size()) {
+ listeners.populate(set);
+ }
+ }
+ set.notify();
+}
+
bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
if (c->preAcquires()) {
- return consumeNextMessage(m, c);
+ switch (consumeNextMessage(m, c)) {
+ case CONSUMED:
+ return true;
+ case CANT_CONSUME:
+ notifyListener();//let someone else try
+ case NO_MESSAGES:
+ default:
+ return false;
+ }
} else {
return browseNextMessage(m, c);
}
@@ -291,14 +311,14 @@
}
}
-bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
+Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
while (true) {
Mutex::ScopedLock locker(messageLock);
if (messages.empty()) {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
listeners.addListener(c);
- return false;
+ return NO_MESSAGES;
} else {
QueuedMessage msg = getFront();
if (msg.payload->hasExpired()) {
@@ -311,16 +331,16 @@
if (c->accept(msg.payload)) {
m = msg;
popMsg(msg);
- return true;
+ return CONSUMED;
} else {
//message(s) are available but consumer hasn't got enough credit
QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
- return false;
+ return CANT_CONSUME;
}
} else {
//consumer will never want this message
QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
- return false;
+ return CANT_CONSUME;
}
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=743694&r1=743693&r2=743694&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Thu Feb 12 11:43:51 2009
@@ -68,6 +68,7 @@
typedef std::deque<QueuedMessage> Messages;
typedef std::map<string,boost::intrusive_ptr<Message> > LVQ;
+ enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
const string name;
const bool autodelete;
@@ -104,8 +105,9 @@
void setPolicy(std::auto_ptr<QueuePolicy> policy);
bool seek(QueuedMessage& msg, Consumer::shared_ptr position);
bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
- bool consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
+ ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
+ void notifyListener();
void removeListener(Consumer::shared_ptr);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=743694&r1=743693&r2=743694&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Thu Feb 12 11:43:51 2009
@@ -527,9 +527,19 @@
}
}
+bool SemanticState::ConsumerImpl::haveCredit()
+{
+ if (msgCredit) {
+ return true;
+ } else {
+ blocked = true;
+ return false;
+ }
+}
+
void SemanticState::ConsumerImpl::flush()
{
- while(queue->dispatch(shared_from_this()))
+ while(haveCredit() && queue->dispatch(shared_from_this()))
;
stop();
}
@@ -587,7 +597,7 @@
bool SemanticState::ConsumerImpl::doOutput()
{
- return queue->dispatch(shared_from_this());
+ return haveCredit() && queue->dispatch(shared_from_this());
}
void SemanticState::ConsumerImpl::enableNotify()
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=743694&r1=743693&r2=743694&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Thu Feb 12 11:43:51 2009
@@ -82,6 +82,7 @@
bool checkCredit(boost::intrusive_ptr<Message>& msg);
void allocateCredit(boost::intrusive_ptr<Message>& msg);
+ bool haveCredit();
public:
typedef boost::shared_ptr<ConsumerImpl> shared_ptr;
Modified: qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=743694&r1=743693&r2=743694&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Thu Feb 12 11:43:51 2009
@@ -488,6 +488,39 @@
BOOST_CHECK(!q.get(got));
}
+QPID_AUTO_TEST_CASE(testReliableDispatch) {
+ ClientSessionFixture fix;
+ std::string queue("a-queue");
+ fix.session.queueDeclare(arg::queue=queue, arg::autoDelete=true);
+
+ ConnectionSettings settings;
+ settings.port = fix.broker->getPort(qpid::broker::Broker::TCP_TRANSPORT);
+
+ Connection c1;
+ c1.open(settings);
+ Session s1 = c1.newSession();
+ SubscriptionManager subs1(s1);
+ LocalQueue q1;
+ subs1.subscribe(q1, queue, FlowControl());//first subscriber has no credit
+
+ Connection c2;
+ c2.open(settings);
+ Session s2 = c2.newSession();
+ SubscriptionManager subs2(s2);
+ LocalQueue q2;
+ subs2.subscribe(q2, queue);//second subscriber has credit
+
+ fix.session.messageTransfer(arg::content=Message("my-message", queue));
+
+ //check that the second consumer gets the message
+ Message got;
+ BOOST_CHECK(q2.get(got, 1*TIME_SEC));
+ BOOST_CHECK_EQUAL("my-message", got.getData());
+
+ c1.close();
+ c2.close();
+}
+
QPID_AUTO_TEST_SUITE_END()
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org