You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2008/09/23 20:42:46 UTC

svn commit: r698275 - in /incubator/qpid/trunk/qpid: cpp/src/qpid/broker/Broker.cpp cpp/src/qpid/broker/Broker.h cpp/src/qpid/broker/Queue.cpp cpp/src/qpid/broker/Queue.h specs/management-schema.xml

Author: tross
Date: Tue Sep 23 11:42:45 2008
New Revision: 698275

URL: http://svn.apache.org/viewvc?rev=698275&view=rev
Log:
QPID-1290 - Patch from William Henry

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/specs/management-schema.xml

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=698275&r1=698274&r2=698275&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue Sep 23 11:42:45 2008
@@ -32,6 +32,7 @@
 
 #include "qmf/org/apache/qpid/broker/Package.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h"
 #include "qpid/management/ManagementExchange.h"
 #include "qpid/log/Statement.h"
 #include "qpid/framing/AMQFrame.h"
@@ -349,6 +350,15 @@
         status = Manageable::STATUS_OK;
         break;
       }
+    case _qmf::Broker::METHOD_QUEUEMOVEMESSAGES : {
+        _qmf::ArgsBrokerQueueMoveMessages& moveArgs=
+            dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args);
+	if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty))
+	  status = Manageable::STATUS_OK;
+	else
+	  return Manageable::STATUS_INVALID_PARAMETER;
+        break;
+      }
    default:
         status = Manageable::STATUS_NOT_IMPLEMENTED;
         break;
@@ -397,6 +407,22 @@
     connect(addr.host, addr.port, false, failed, f);
 }
 
+uint32_t Broker::queueMoveMessages( 
+     const std::string& srcQueue, 
+     const std::string& destQueue,
+     uint32_t  qty)
+{
+  Queue::shared_ptr src_queue = queues.find(srcQueue);
+  if (!src_queue)
+    return 0;
+  Queue::shared_ptr dest_queue = queues.find(destQueue);
+  if (!dest_queue)
+    return 0;
+
+  return src_queue->move(dest_queue, qty);
+}
+
+
 boost::shared_ptr<sys::Poller> Broker::getPoller() { return poller; }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=698275&r1=698274&r2=698275&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Tue Sep 23 11:42:45 2008
@@ -175,6 +175,13 @@
                  boost::function2<void, int, std::string> failed,
                  sys::ConnectionCodec::Factory* =0);
 
+    /** Move messages from one queue to another.
+        A zero quantity means to move all messages
+    */
+    uint32_t queueMoveMessages( const std::string& srcQueue, 
+			    const std::string& destQueue,
+			    uint32_t  qty); 
+
     // TODO: There isn't a single ProtocolFactory so the use of the following needs to be fixed
     // For the present just return the first ProtocolFactory registered.
     boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory() const;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=698275&r1=698274&r2=698275&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Sep 23 11:42:45 2008
@@ -405,10 +405,25 @@
 
     uint32_t count = 0;
     // Either purge them all or just the some (purge_count) while the queue isn't empty.
-    while((!purge_request || purge_count--) && !messages.empty()) 
-    {
+    while((!purge_request || purge_count--) && !messages.empty()) {
         popAndDequeue();
-	count++;
+        count++;
+    }
+    return count;
+}
+
+uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
+    Mutex::ScopedLock locker(messageLock);
+    uint32_t move_count = qty; // only comes into play if  qty >0 
+    uint32_t count = 0; // count how many were moved for returning
+
+    while((!qty || move_count--) && !messages.empty()) {
+        QueuedMessage qmsg = messages.front();
+        boost::intrusive_ptr<Message> msg = qmsg.payload;
+        destq->deliver(msg); // deliver message to the destination queue
+        messages.pop_front();
+        dequeue(0, qmsg);
+        count++;
     }
     return count;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=698275&r1=698274&r2=698275&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Sep 23 11:42:45 2008
@@ -161,6 +161,9 @@
 
             uint32_t purge(const uint32_t purge_request = 0); //defaults to all messages 
 
+	    //move qty # of messages to destination Queue destq
+	    uint32_t move(const Queue::shared_ptr destq, uint32_t qty); 
+
             uint32_t getMessageCount() const;
             uint32_t getConsumerCount() const;
             inline const string& getName() const { return name; }

Modified: incubator/qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-schema.xml?rev=698275&r1=698274&r2=698275&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-schema.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-schema.xml Tue Sep 23 11:42:45 2008
@@ -96,6 +96,13 @@
     <event name="agentDisconnect" desc="QMF Management Agent has disconnected from the broker">
       <arg name="remoteAddress" type="sstr"/>
     </event>
+
+    <method name="queueMoveMessages" desc="Move messages from one queue to another">
+      <arg name="srcQueue"          dir="I" type="sstr" desc="Source queue"/>
+      <arg name="destQueue"         dir="I" type="sstr" desc="Destination queue"/>
+      <arg name="qty"               dir="I" type="uint32" desc="# of messages to move. 0 means all messages"/>
+    </method>
+
   </class>
 
   <!--