You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/02/12 12:43:51 UTC

svn commit: r743694 - in /qpid/trunk/qpid/cpp/src: qpid/broker/Queue.cpp qpid/broker/Queue.h qpid/broker/SemanticState.cpp qpid/broker/SemanticState.h tests/ClientSessionTest.cpp

Author: gsim
Date: Thu Feb 12 11:43:51 2009
New Revision: 743694

URL: http://svn.apache.org/viewvc?rev=743694&view=rev
Log:
QPID-1660: If selected consumer can't take a message, ensure others are notified of message availability.


Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=743694&r1=743693&r2=743694&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Feb 12 11:43:51 2009
@@ -256,10 +256,30 @@
     return false;
 }
 
+void Queue::notifyListener()
+{
+    QueueListeners::NotificationSet set;
+    {
+        Mutex::ScopedLock locker(messageLock);
+        if (messages.size()) {
+            listeners.populate(set);
+        }
+    }
+    set.notify();
+}
+
 bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
 {
     if (c->preAcquires()) {
-        return consumeNextMessage(m, c);
+        switch (consumeNextMessage(m, c)) {
+          case CONSUMED:
+            return true;
+          case CANT_CONSUME:
+            notifyListener();//let someone else try
+          case NO_MESSAGES:
+          default:
+            return false;
+        }        
     } else {
         return browseNextMessage(m, c);
     }
@@ -291,14 +311,14 @@
     }
 }
 
-bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
+Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
 {
     while (true) {
         Mutex::ScopedLock locker(messageLock);
         if (messages.empty()) { 
             QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
             listeners.addListener(c);
-            return false;
+            return NO_MESSAGES;
         } else {
             QueuedMessage msg = getFront();
             if (msg.payload->hasExpired()) {
@@ -311,16 +331,16 @@
                 if (c->accept(msg.payload)) {            
                     m = msg;
                     popMsg(msg);
-                    return true;
+                    return CONSUMED;
                 } else {
                     //message(s) are available but consumer hasn't got enough credit
                     QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
-                    return false;
+                    return CANT_CONSUME;
                 }
             } else {
                 //consumer will never want this message
                 QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
-                return false;
+                return CANT_CONSUME;
             } 
         }
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=743694&r1=743693&r2=743694&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Thu Feb 12 11:43:51 2009
@@ -68,6 +68,7 @@
 
             typedef std::deque<QueuedMessage> Messages;
             typedef std::map<string,boost::intrusive_ptr<Message> > LVQ;
+            enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
 
             const string name;
             const bool autodelete;
@@ -104,8 +105,9 @@
             void setPolicy(std::auto_ptr<QueuePolicy> policy);
             bool seek(QueuedMessage& msg, Consumer::shared_ptr position);
             bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
-            bool consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
+            ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
             bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
+            void notifyListener();
 
             void removeListener(Consumer::shared_ptr);
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=743694&r1=743693&r2=743694&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Thu Feb 12 11:43:51 2009
@@ -527,9 +527,19 @@
     }
 }
 
+bool SemanticState::ConsumerImpl::haveCredit()
+{
+    if (msgCredit) {
+        return true;
+    } else {
+        blocked = true;
+        return false;
+    }
+}
+
 void SemanticState::ConsumerImpl::flush()
 {
-    while(queue->dispatch(shared_from_this()))
+    while(haveCredit() && queue->dispatch(shared_from_this()))
         ;
     stop();
 }
@@ -587,7 +597,7 @@
 
 bool SemanticState::ConsumerImpl::doOutput()
 {
-    return queue->dispatch(shared_from_this());
+    return haveCredit() && queue->dispatch(shared_from_this());
 }
 
 void SemanticState::ConsumerImpl::enableNotify()

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=743694&r1=743693&r2=743694&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Thu Feb 12 11:43:51 2009
@@ -82,6 +82,7 @@
 
         bool checkCredit(boost::intrusive_ptr<Message>& msg);
         void allocateCredit(boost::intrusive_ptr<Message>& msg);
+        bool haveCredit();
 
       public:
         typedef boost::shared_ptr<ConsumerImpl> shared_ptr;

Modified: qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=743694&r1=743693&r2=743694&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Thu Feb 12 11:43:51 2009
@@ -488,6 +488,39 @@
     BOOST_CHECK(!q.get(got));
 }
 
+QPID_AUTO_TEST_CASE(testReliableDispatch) {
+    ClientSessionFixture fix;
+    std::string queue("a-queue");
+    fix.session.queueDeclare(arg::queue=queue, arg::autoDelete=true);
+
+    ConnectionSettings settings;
+    settings.port = fix.broker->getPort(qpid::broker::Broker::TCP_TRANSPORT);
+
+    Connection c1;
+    c1.open(settings);
+    Session s1 = c1.newSession();
+    SubscriptionManager subs1(s1);
+    LocalQueue q1;
+    subs1.subscribe(q1, queue, FlowControl());//first subscriber has no credit
+
+    Connection c2;
+    c2.open(settings);
+    Session s2 = c2.newSession();
+    SubscriptionManager subs2(s2);
+    LocalQueue q2;
+    subs2.subscribe(q2, queue);//second subscriber has credit
+
+    fix.session.messageTransfer(arg::content=Message("my-message", queue));
+
+    //check that the second consumer gets the message
+    Message got;
+    BOOST_CHECK(q2.get(got, 1*TIME_SEC));
+    BOOST_CHECK_EQUAL("my-message", got.getData());
+
+    c1.close();
+    c2.close();
+}
+
 
 QPID_AUTO_TEST_SUITE_END()
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org