You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2008/09/23 20:42:46 UTC
svn commit: r698275 - in /incubator/qpid/trunk/qpid:
cpp/src/qpid/broker/Broker.cpp cpp/src/qpid/broker/Broker.h
cpp/src/qpid/broker/Queue.cpp cpp/src/qpid/broker/Queue.h
specs/management-schema.xml
Author: tross
Date: Tue Sep 23 11:42:45 2008
New Revision: 698275
URL: http://svn.apache.org/viewvc?rev=698275&view=rev
Log:
QPID-1290 - Patch from William Henry
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
incubator/qpid/trunk/qpid/specs/management-schema.xml
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=698275&r1=698274&r2=698275&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue Sep 23 11:42:45 2008
@@ -32,6 +32,7 @@
#include "qmf/org/apache/qpid/broker/Package.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h"
#include "qpid/management/ManagementExchange.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/AMQFrame.h"
@@ -349,6 +350,15 @@
status = Manageable::STATUS_OK;
break;
}
+ case _qmf::Broker::METHOD_QUEUEMOVEMESSAGES : {
+ _qmf::ArgsBrokerQueueMoveMessages& moveArgs=
+ dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args);
+ if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty))
+ status = Manageable::STATUS_OK;
+ else
+ return Manageable::STATUS_INVALID_PARAMETER;
+ break;
+ }
default:
status = Manageable::STATUS_NOT_IMPLEMENTED;
break;
@@ -397,6 +407,22 @@
connect(addr.host, addr.port, false, failed, f);
}
+uint32_t Broker::queueMoveMessages(
+ const std::string& srcQueue,
+ const std::string& destQueue,
+ uint32_t qty)
+{
+ Queue::shared_ptr src_queue = queues.find(srcQueue);
+ if (!src_queue)
+ return 0;
+ Queue::shared_ptr dest_queue = queues.find(destQueue);
+ if (!dest_queue)
+ return 0;
+
+ return src_queue->move(dest_queue, qty);
+}
+
+
boost::shared_ptr<sys::Poller> Broker::getPoller() { return poller; }
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=698275&r1=698274&r2=698275&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Tue Sep 23 11:42:45 2008
@@ -175,6 +175,13 @@
boost::function2<void, int, std::string> failed,
sys::ConnectionCodec::Factory* =0);
+ /** Move messages from one queue to another.
+ A zero quantity means to move all messages
+ */
+ uint32_t queueMoveMessages( const std::string& srcQueue,
+ const std::string& destQueue,
+ uint32_t qty);
+
// TODO: There isn't a single ProtocolFactory so the use of the following needs to be fixed
// For the present just return the first ProtocolFactory registered.
boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory() const;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=698275&r1=698274&r2=698275&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Sep 23 11:42:45 2008
@@ -405,10 +405,25 @@
uint32_t count = 0;
// Either purge them all or just the some (purge_count) while the queue isn't empty.
- while((!purge_request || purge_count--) && !messages.empty())
- {
+ while((!purge_request || purge_count--) && !messages.empty()) {
popAndDequeue();
- count++;
+ count++;
+ }
+ return count;
+}
+
+uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
+ Mutex::ScopedLock locker(messageLock);
+ uint32_t move_count = qty; // only comes into play if qty >0
+ uint32_t count = 0; // count how many were moved for returning
+
+ while((!qty || move_count--) && !messages.empty()) {
+ QueuedMessage qmsg = messages.front();
+ boost::intrusive_ptr<Message> msg = qmsg.payload;
+ destq->deliver(msg); // deliver message to the destination queue
+ messages.pop_front();
+ dequeue(0, qmsg);
+ count++;
}
return count;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=698275&r1=698274&r2=698275&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Sep 23 11:42:45 2008
@@ -161,6 +161,9 @@
uint32_t purge(const uint32_t purge_request = 0); //defaults to all messages
+ //move qty # of messages to destination Queue destq
+ uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
+
uint32_t getMessageCount() const;
uint32_t getConsumerCount() const;
inline const string& getName() const { return name; }
Modified: incubator/qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-schema.xml?rev=698275&r1=698274&r2=698275&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-schema.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-schema.xml Tue Sep 23 11:42:45 2008
@@ -96,6 +96,13 @@
<event name="agentDisconnect" desc="QMF Management Agent has disconnected from the broker">
<arg name="remoteAddress" type="sstr"/>
</event>
+
+ <method name="queueMoveMessages" desc="Move messages from one queue to another">
+ <arg name="srcQueue" dir="I" type="sstr" desc="Source queue"/>
+ <arg name="destQueue" dir="I" type="sstr" desc="Destination queue"/>
+ <arg name="qty" dir="I" type="uint32" desc="# of messages to move. 0 means all messages"/>
+ </method>
+
</class>
<!--