You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/05/25 20:20:51 UTC
svn commit: r778464 - in /qpid/trunk/qpid/cpp/src/qpid:
broker/QueueEvents.cpp broker/QueueEvents.h cluster/Connection.h
cluster/Event.cpp cluster/Event.h cluster/Multicaster.cpp
cluster/Multicaster.h cluster/PollableQueue.h sys/PollableQueue.h
Author: aconway
Date: Mon May 25 18:20:50 2009
New Revision: 778464
URL: http://svn.apache.org/viewvc?rev=778464&view=rev
Log:
PollableQueue optimization - replace deque with vector.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h
qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp?rev=778464&r1=778463&r2=778464&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp Mon May 25 18:20:50 2009
@@ -66,15 +66,14 @@
}
}
-void QueueEvents::handle(EventQueue::Queue& events)
-{
+QueueEvents::EventQueue::Batch::const_iterator
+QueueEvents::handle(const EventQueue::Batch& events) {
qpid::sys::Mutex::ScopedLock l(lock);
- while (!events.empty()) {
- for (Listeners::iterator i = listeners.begin(); i != listeners.end(); i++) {
- i->second(events.front());
- }
- events.pop_front();
+ for (EventQueue::Batch::const_iterator i = events.begin(); i != events.end(); ++i) {
+ for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++)
+ j->second(*i);
}
+ return events.end();
}
void QueueEvents::shutdown()
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h?rev=778464&r1=778463&r2=778464&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h Mon May 25 18:20:50 2009
@@ -74,7 +74,7 @@
volatile bool enabled;
qpid::sys::Mutex lock;//protect listeners from concurrent access
- void handle(EventQueue::Queue& e);
+ EventQueue::Batch::const_iterator handle(const EventQueue::Batch& e);
};
}} // namespace qpid::broker
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=778464&r1=778463&r2=778464&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Mon May 25 18:20:50 2009
@@ -61,8 +61,6 @@
{
public:
- typedef sys::PollableQueue<EventFrame> PollableFrameQueue;
-
/** Local connection. */
Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, MemberId, bool catchUp, bool isLink);
/** Shadow connection. */
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=778464&r1=778463&r2=778464&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Mon May 25 18:20:50 2009
@@ -87,7 +87,7 @@
return control(framing::AMQFrame(body), cid);
}
-iovec Event::toIovec() {
+iovec Event::toIovec() const {
encodeHeader();
iovec iov = { const_cast<char*>(getStore()), getStoreSize() };
return iov;
@@ -103,8 +103,8 @@
}
// Encode my header in my buffer.
-void Event::encodeHeader () {
- Buffer b(getStore(), HEADER_SIZE);
+void Event::encodeHeader () const {
+ Buffer b(const_cast<char*>(getStore()), HEADER_SIZE);
encode(b);
assert(b.getPosition() == HEADER_SIZE);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=778464&r1=778463&r2=778464&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Mon May 25 18:20:50 2009
@@ -54,7 +54,7 @@
/** Size of payload data, excluding header. */
size_t getSize() const { return size; }
/** Size of header + payload. */
- size_t getStoreSize() { return size + HEADER_SIZE; }
+ size_t getStoreSize() const { return size + HEADER_SIZE; }
bool isCluster() const { return connectionId.getNumber() == 0; }
bool isConnection() const { return connectionId.getNumber() != 0; }
@@ -99,10 +99,10 @@
operator framing::Buffer() const;
- iovec toIovec();
+ iovec toIovec() const;
private:
- void encodeHeader();
+ void encodeHeader() const;
RefCountedBuffer::pointer store;
};
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=778464&r1=778463&r2=778464&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp Mon May 25 18:20:50 2009
@@ -71,9 +71,9 @@
}
-void Multicaster::sendMcast(PollableEventQueue::Queue& values) {
+Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(const PollableEventQueue::Batch& values) {
try {
- PollableEventQueue::Queue::iterator i = values.begin();
+ PollableEventQueue::Batch::const_iterator i = values.begin();
while( i != values.end()) {
iovec iov = i->toIovec();
if (!cpg.mcast(&iov, 1)) {
@@ -82,12 +82,13 @@
}
++i;
}
- values.erase(values.begin(), i); // Erase sent events.
+ return i;
}
catch (const std::exception& e) {
QPID_LOG(critical, "Multicast error: " << e.what());
queue.stop();
onError();
+ return values.end();
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h?rev=778464&r1=778463&r2=778464&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h Mon May 25 18:20:50 2009
@@ -28,6 +28,7 @@
#include "qpid/sys/Mutex.h"
#include "qpid/sys/LatencyTracker.h"
#include <boost/shared_ptr.hpp>
+#include <deque>
namespace qpid {
@@ -63,7 +64,7 @@
typedef sys::PollableQueue<Event> PollableEventQueue;
typedef std::deque<Event> PlainEventQueue;
- void sendMcast(PollableEventQueue::Queue& );
+ PollableEventQueue::Batch::const_iterator sendMcast(const PollableEventQueue::Batch& );
sys::Mutex lock;
boost::function<void()> onError;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h?rev=778464&r1=778463&r2=778464&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h Mon May 25 18:20:50 2009
@@ -37,24 +37,27 @@
typedef boost::function<void (const T&)> Callback;
typedef boost::function<void()> ErrorCallback;
- PollableQueue(Callback f, ErrorCallback err, const std::string& msg, const boost::shared_ptr<sys::Poller>& poller)
- : sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1), poller),
+ PollableQueue(Callback f, ErrorCallback err, const std::string& msg,
+ const boost::shared_ptr<sys::Poller>& poller)
+ : sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1),
+ poller),
callback(f), error(err), message(msg) {}
- void handleBatch(typename sys::PollableQueue<T>::Queue& values) {
+ typename sys::PollableQueue<T>::Batch::const_iterator
+ handleBatch(const typename sys::PollableQueue<T>::Batch& values) {
try {
- typename sys::PollableQueue<T>::Queue::iterator i = values.begin();
+ typename sys::PollableQueue<T>::Batch::const_iterator i = values.begin();
while (i != values.end() && !this->isStopped()) {
callback(*i);
++i;
}
- values.erase(values.begin(), i);
+ return i;
}
catch (const std::exception& e) {
QPID_LOG(error, message << ": " << e.what());
- values.clear();
this->stop();
error();
+ return values.end();
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h?rev=778464&r1=778463&r2=778464&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h Mon May 25 18:20:50 2009
@@ -28,7 +28,7 @@
#include <boost/function.hpp>
#include <boost/bind.hpp>
#include <algorithm>
-#include <deque>
+#include <vector>
namespace qpid {
namespace sys {
@@ -44,16 +44,18 @@
template <class T>
class PollableQueue {
public:
- typedef std::deque<T> Queue;
+ typedef std::vector<T> Batch;
typedef T value_type;
/**
* Callback to process a batch of items from the queue.
*
- * @param values Queue of values to process. Any items remaining
+ * @param batch Queue of values to process. Any items remaining
* on return from Callback are put back on the queue.
+ * @return iterator pointing to the first un-processed item in batch.
+ * Items from this point up to batch.end() are put back on the queue.
*/
- typedef boost::function<void (Queue&)> Callback;
+ typedef boost::function<typename Batch::const_iterator (const Batch& batch)> Callback;
/**
* Constructor; sets necessary parameters.
@@ -99,7 +101,7 @@
mutable sys::Monitor lock;
Callback callback;
PollableCondition condition;
- Queue queue, batch;
+ Batch queue, batch;
Thread dispatcher;
bool stopped;
};
@@ -141,17 +143,18 @@
}
template <class T> void PollableQueue<T>::process() {
+ // Called with lock held
while (!stopped && !queue.empty()) {
assert(batch.empty());
batch.swap(queue);
+ typename Batch::const_iterator putBack;
{
ScopedUnlock u(lock); // Allow concurrent push to queue.
- callback(batch);
- }
- if (!batch.empty()) {
- queue.insert(queue.begin(), batch.begin(), batch.end()); // put back unprocessed items.
- batch.clear();
+ putBack = callback(batch);
}
+ // put back unprocessed items.
+ queue.insert(queue.begin(), putBack, typename Batch::const_iterator(batch.end()));
+ batch.clear();
}
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org