You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/07/17 10:58:57 UTC

svn commit: r1504058 - in /qpid/trunk/qpid/cpp/src: qpid/broker/LossyQueue.cpp qpid/broker/Queue.cpp qpid/broker/Queue.h tests/MessagingSessionTests.cpp

Author: gsim
Date: Wed Jul 17 08:58:57 2013
New Revision: 1504058

URL: http://svn.apache.org/r1504058
Log:
QPID-4993: reroute dropped messages in ring queue if alternate exchange is specified

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/LossyQueue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LossyQueue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LossyQueue.cpp?rev=1504058&r1=1504057&r2=1504058&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LossyQueue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LossyQueue.cpp Wed Jul 17 08:58:57 2013
@@ -52,7 +52,7 @@ bool LossyQueue::checkDepth(const QueueD
         QPID_LOG(debug, "purging " << name << ": current depth is [" << current << "], max depth is [" << settings.maxDepth << "], new message has size " << increment.getSize());
         qpid::sys::Mutex::ScopedUnlock u(messageLock);
         //TODO: arguably we should try and purge expired messages first but that is potentially expensive
-        if (remove(1, settings.priorities ? boost::bind(&isLowerPriorityThan, message.getPriority(), _1) : MessagePredicate(), MessageFunctor(), PURGE, false)) {
+        if (remove(1, settings.priorities ? boost::bind(&isLowerPriorityThan, message.getPriority(), _1) : MessagePredicate(), boost::bind(&reroute, alternateExchange, _1), PURGE, false)) {
             if (mgmtObject) {
                 mgmtObject->inc_discardsRing(1);
                 if (brokerMgmtObject)

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1504058&r1=1504057&r2=1504058&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed Jul 17 08:58:57 2013
@@ -677,17 +677,6 @@ namespace {
         return new MessageFilter();
     }
 
-    bool reroute(boost::shared_ptr<Exchange> e, const Message& m)
-    {
-        if (e) {
-            DeliverableMessage d(m, 0);
-            d.getMessage().clearTrace();
-            e->routeWithAlternate(d);
-            return true;
-        } else {
-            return false;
-        }
-    }
     void moveTo(boost::shared_ptr<Queue> q, Message& m)
     {
         if (q) {
@@ -1684,6 +1673,19 @@ void Queue::setMgmtRedirectState( std::s
         mgmtObject->set_redirectSource(isSrc);
     }
 }
+
+bool Queue::reroute(boost::shared_ptr<Exchange> e, const Message& m)
+{
+    if (e) {
+        DeliverableMessage d(m, 0);
+        d.getMessage().clearTrace();
+        e->routeWithAlternate(d);
+        return true;
+    } else {
+        return false;
+    }
+}
+
 Queue::QueueUsers::QueueUsers() : consumers(0), browsers(0), others(0), controller(false) {}
 void Queue::QueueUsers::addConsumer() { ++consumers; }
 void Queue::QueueUsers::addBrowser() { ++browsers; }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1504058&r1=1504057&r2=1504058&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Wed Jul 17 08:58:57 2013
@@ -501,6 +501,9 @@ class Queue : public boost::enable_share
     QPID_BROKER_EXTERN bool isRedirectSource() const { return redirectSource; }
     QPID_BROKER_EXTERN void setMgmtRedirectState( std::string peer, bool enabled, bool isSrc );
 
+    //utility function
+    static bool reroute(boost::shared_ptr<Exchange> e, const Message& m);
+
   friend class QueueFactory;
 };
 }

Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=1504058&r1=1504057&r2=1504058&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Wed Jul 17 08:58:57 2013
@@ -1292,7 +1292,6 @@ QPID_AUTO_TEST_CASE(testSimpleRequestRes
 QPID_AUTO_TEST_CASE(testSelfDestructQueue)
 {
     MessagingFixture fix;
-    //create receiver on temp queue for responses (using shorthand for temp queue)
     Session other = fix.connection.createSession();
     Receiver r1 = other.createReceiver("amq.fanout; {link:{reliability:at-least-once, x-declare:{arguments:{qpid.max_count:10,qpid.policy_type:self-destruct}}}}");
     Receiver r2 = fix.session.createReceiver("amq.fanout");
@@ -1315,6 +1314,24 @@ QPID_AUTO_TEST_CASE(testSelfDestructQueu
     }
 }
 
+QPID_AUTO_TEST_CASE(testReroutingRingQueue)
+{
+    MessagingFixture fix;
+    Receiver r1 = fix.session.createReceiver("my-queue; {create:always, node:{x-declare:{alternate-exchange:amq.fanout, auto-delete:True, arguments:{qpid.max_count:10,qpid.policy_type:ring}}}}");
+    Receiver r2 = fix.session.createReceiver("amq.fanout");
+
+    Sender s = fix.session.createSender("my-queue");
+    for (uint i = 0; i < 20; ++i) {
+        s.send(Message((boost::format("MSG_%1%") % (i+1)).str()));
+    }
+    for (uint i = 10; i < 20; ++i) {
+        BOOST_CHECK_EQUAL(r1.fetch(Duration::SECOND).getContent(), (boost::format("MSG_%1%") % (i+1)).str());
+    }
+    for (uint i = 0; i < 10; ++i) {
+        BOOST_CHECK_EQUAL(r2.fetch(Duration::SECOND).getContent(), (boost::format("MSG_%1%") % (i+1)).str());
+    }
+}
+
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org