You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/01/29 23:58:23 UTC

svn commit: r904654 - in /qpid/trunk/qpid: cpp/src/qpid/broker/Queue.cpp cpp/src/qpid/broker/Queue.h python/tests_0-10/management.py specs/management-schema.xml

Author: tross
Date: Fri Jan 29 22:58:22 2010
New Revision: 904654

URL: http://svn.apache.org/viewvc?rev=904654&view=rev
Log:
QPID-2365 - Reroute messages from a queue feature

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/python/tests_0-10/management.py
    qpid/trunk/qpid/specs/management-schema.xml

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=904654&r1=904653&r2=904654&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Fri Jan 29 22:58:22 2010
@@ -37,6 +37,7 @@
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Time.h"
 #include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h"
+#include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h"
 
 #include <iostream>
 #include <algorithm>
@@ -518,17 +519,43 @@
  * purge_request == 0 then purge all messages
  *               == N then purge N messages from queue
  * Sometimes purge_request == 1 to unblock the top of queue
+ *
+ * The dest exchange may be supplied to re-route messages through the exchange.
+ * It is safe to re-route messages such that they arrive back on the same queue,
+ * even if the queue is ordered by priority.
  */
-uint32_t Queue::purge(const uint32_t purge_request){
+uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest)
+{
     Mutex::ScopedLock locker(messageLock);
     uint32_t purge_count = purge_request; // only comes into play if  >0 
+    std::deque<DeliverableMessage> rerouteQueue;
 
     uint32_t count = 0;
     // Either purge them all or just the some (purge_count) while the queue isn't empty.
     while((!purge_request || purge_count--) && !messages.empty()) {
+        if (dest.get()) {
+            //
+            // If there is a destination exchange, stage the messages onto a reroute queue
+            // so they don't wind up getting purged more than once.
+            //
+            DeliverableMessage msg(getFront().payload);
+            rerouteQueue.push_back(msg);
+        }
         popAndDequeue();
         count++;
     }
+
+    //
+    // Re-route purged messages into the destination exchange.  Note that there's no need
+    // to test dest.get() here because if it is NULL, the rerouteQueue will be empty.
+    //
+    while (!rerouteQueue.empty()) {
+        DeliverableMessage msg(rerouteQueue.front());
+        rerouteQueue.pop_front();
+        dest->route(msg, msg.getMessage().getRoutingKey(),
+                    msg.getMessage().getApplicationHeaders());
+    }
+
     return count;
 }
 
@@ -1038,18 +1065,40 @@
     return (ManagementObject*) mgmtObject;
 }
 
-Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, string&)
+Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, string& etext)
 {
     Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
 
     QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
 
-    switch (methodId)
-    {
-      case _qmf::Queue::METHOD_PURGE :
-        _qmf::ArgsQueuePurge& iargs = (_qmf::ArgsQueuePurge&) args;
-        purge (iargs.i_request);
-        status = Manageable::STATUS_OK;
+    switch (methodId) {
+    case _qmf::Queue::METHOD_PURGE :
+        {
+            _qmf::ArgsQueuePurge& purgeArgs = (_qmf::ArgsQueuePurge&) args;
+            purge(purgeArgs.i_request);
+            status = Manageable::STATUS_OK;
+        }
+        break;
+
+    case _qmf::Queue::METHOD_REROUTE :
+        {
+            _qmf::ArgsQueueReroute& rerouteArgs = (_qmf::ArgsQueueReroute&) args;
+            boost::shared_ptr<Exchange> dest;
+            if (rerouteArgs.i_useAltExchange)
+                dest = alternateExchange;
+            else {
+                try {
+                    dest = broker->getExchanges().get(rerouteArgs.i_exchange);
+                } catch(const std::exception&) {
+                    status = Manageable::STATUS_PARAMETER_INVALID;
+                    etext = "Exchange not found";
+                    break;
+                }
+            }
+
+            purge(rerouteArgs.i_request, dest);
+            status = Manageable::STATUS_OK;
+        }
         break;
     }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=904654&r1=904653&r2=904654&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Fri Jan 29 22:58:22 2010
@@ -211,7 +211,7 @@
                                             bool exclusive = false);
             QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c);
 
-            uint32_t purge(const uint32_t purge_request = 0); //defaults to all messages 
+            uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages 
             QPID_BROKER_EXTERN void purgeExpired();
 
             //move qty # of messages to destination Queue destq

Modified: qpid/trunk/qpid/python/tests_0-10/management.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/tests_0-10/management.py?rev=904654&r1=904653&r2=904654&view=diff
==============================================================================
--- qpid/trunk/qpid/python/tests_0-10/management.py (original)
+++ qpid/trunk/qpid/python/tests_0-10/management.py Fri Jan 29 22:58:22 2010
@@ -241,6 +241,75 @@
         pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
         self.assertEqual (pq.msgDepth,0)
 
+    def test_reroute_queue(self):
+        """
+        Test ability to reroute messages from the head of a queue.
+        Need to test moving all, 1 (top message) and N messages.
+        """
+        self.startQmf()
+        session = self.session
+        "Set up test queue"
+        session.exchange_declare(exchange="alt.direct1", type="direct")
+        session.queue_declare(queue="alt-queue1", exclusive=True, auto_delete=True)
+        session.exchange_bind(queue="alt-queue1", exchange="alt.direct1", binding_key="routing_key")
+        session.exchange_declare(exchange="alt.direct2", type="direct")
+        session.queue_declare(queue="alt-queue2", exclusive=True, auto_delete=True)
+        session.exchange_bind(queue="alt-queue2", exchange="alt.direct2", binding_key="routing_key")
+        session.queue_declare(queue="reroute-queue", exclusive=True, auto_delete=True, alternate_exchange="alt.direct1")
+        session.exchange_bind(queue="reroute-queue", exchange="amq.direct", binding_key="routing_key")
+
+        twenty = range(1,21)
+        props = session.delivery_properties(routing_key="routing_key")
+        for count in twenty:
+            body = "Reroute Message %d" % count
+            msg = Message(props, body)
+            session.message_transfer(destination="amq.direct", message=msg)
+
+        pq = self.qmf.getObjects(_class="queue", name="reroute-queue")[0]
+
+        "Reroute top message from reroute-queue to alternate exchange"
+        result = pq.reroute(1, True, "")
+        self.assertEqual(result.status, 0) 
+        pq.update()
+        aq = self.qmf.getObjects(_class="queue", name="alt-queue1")[0]
+        self.assertEqual(pq.msgDepth,19)
+        self.assertEqual(aq.msgDepth,1)
+
+        "Reroute top 9 messages from reroute-queue to alt.direct2"
+        result = pq.reroute(9, False, "alt.direct2")
+        self.assertEqual(result.status, 0) 
+        pq.update()
+        aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0]
+        self.assertEqual(pq.msgDepth,10)
+        self.assertEqual(aq.msgDepth,9)
+
+        "Reroute using a non-existent exchange"
+        result = pq.reroute(0, False, "amq.nosuchexchange")
+        self.assertEqual(result.status, 4)
+
+        "Reroute all messages from reroute-queue"
+        result = pq.reroute(0, False, "alt.direct2")
+        self.assertEqual(result.status, 0) 
+        pq.update()
+        aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0]
+        self.assertEqual(pq.msgDepth,0)
+        self.assertEqual(aq.msgDepth,19)
+
+        "Make more messages"
+        twenty = range(1,21)
+        props = session.delivery_properties(routing_key="routing_key")
+        for count in twenty:
+            body = "Reroute Message %d" % count
+            msg = Message(props, body)
+            session.message_transfer(destination="amq.direct", message=msg)
+
+        "Reroute onto the same queue"
+        result = pq.reroute(0, False, "amq.direct")
+        self.assertEqual(result.status, 0) 
+        pq.update()
+        self.assertEqual(pq.msgDepth,20)
+        
+
     def test_methods_async (self):
         """
         """

Modified: qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/specs/management-schema.xml?rev=904654&r1=904653&r2=904654&view=diff
==============================================================================
--- qpid/trunk/qpid/specs/management-schema.xml (original)
+++ qpid/trunk/qpid/specs/management-schema.xml Fri Jan 29 22:58:22 2010
@@ -155,7 +155,13 @@
     <statistic name="messageLatency"      type="mmaTime"  unit="nanosecond"  desc="Broker latency through this queue"/>
 
     <method name="purge" desc="Discard all or some messages on a queue">
-      <arg name="request"          dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/>
+      <arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/>
+    </method>
+
+    <method name="reroute" desc="Remove all or some messages on this queue and route them to an exchange">
+      <arg name="request"        dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/>
+      <arg name="useAltExchange" dir="I" type="bool"   desc="Iff true, use the queue's configured alternate exchange; iff false, use exchange named in the 'exchange' argument"/>
+      <arg name="exchange"       dir="I" type="sstr"   desc="Name of the exchange to route the messages through"/>
     </method>
   </class>
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org