You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/09/08 13:13:47 UTC
svn commit: r693053 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/
qpid/sys/ tests/
Author: gsim
Date: Mon Sep 8 04:13:38 2008
New Revision: 693053
URL: http://svn.apache.org/viewvc?rev=693053&view=rev
Log:
QPID-1264: initial fix for fanout, direct and headers exchanges (fix for remaining types to follow)
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=693053&r1=693052&r2=693053&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Mon Sep 8 04:13:38 2008
@@ -542,6 +542,7 @@
qpid/sys/AtomicValue_gcc.h \
qpid/sys/AtomicValue_mutex.h \
qpid/sys/BlockingQueue.h \
+ qpid/sys/CopyOnWriteArray.h \
qpid/sys/Condition.h \
qpid/sys/ConnectionCodec.h \
qpid/sys/ConnectionInputHandler.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?rev=693053&r1=693052&r2=693053&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Mon Sep 8 04:13:38 2008
@@ -42,41 +42,22 @@
}
bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){
- RWlock::ScopedWlock l(lock);
- std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
- std::vector<Binding::shared_ptr>::iterator i;
-
- for (i = queues.begin(); i != queues.end(); i++)
- if ((*i)->queue == queue)
- break;
-
- if (i == queues.end()) {
- Binding::shared_ptr binding (new Binding (routingKey, queue, this));
- bindings[routingKey].push_back(binding);
+ Mutex::ScopedLock l(lock);
+ Binding::shared_ptr b(new Binding (routingKey, queue, this));
+ if (bindings[routingKey].add_unless(b, MatchQueue(queue))) {
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
((management::Queue*) queue->GetManagementObject())->inc_bindingCount();
}
return true;
- } else{
+ } else {
return false;
}
}
bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
- RWlock::ScopedWlock l(lock);
- std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
- std::vector<Binding::shared_ptr>::iterator i;
-
- for (i = queues.begin(); i != queues.end(); i++)
- if ((*i)->queue == queue)
- break;
-
- if (i < queues.end()) {
- queues.erase(i);
- if (queues.empty()) {
- bindings.erase(routingKey);
- }
+ Mutex::ScopedLock l(lock);
+ if (bindings[routingKey].remove_if(MatchQueue(queue))) {
if (mgmtExchange != 0) {
mgmtExchange->dec_bindingCount();
((management::Queue*) queue->GetManagementObject())->dec_bindingCount();
@@ -88,16 +69,20 @@
}
void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
- RWlock::ScopedRlock l(lock);
- std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
- std::vector<Binding::shared_ptr>::iterator i;
+ Queues::ConstPtr p;
+ {
+ Mutex::ScopedLock l(lock);
+ p = bindings[routingKey].snapshot();
+ }
int count(0);
- for(i = queues.begin(); i != queues.end(); i++, count++) {
- msg.deliverTo((*i)->queue);
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched ();
- }
+ if (p) {
+ for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) {
+ msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding != 0)
+ (*i)->mgmtBinding->inc_msgMatched ();
+ }
+ }
if(!count){
QPID_LOG(warning, "DirectExchange " << getName() << " could not route message with key " << routingKey);
@@ -105,8 +90,7 @@
mgmtExchange->inc_msgDrops ();
mgmtExchange->inc_byteDrops (msg.contentSize ());
}
- }
- else {
+ } else {
if (mgmtExchange != 0) {
mgmtExchange->inc_msgRoutes (count);
mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
@@ -122,8 +106,7 @@
bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
{
- std::vector<Binding::shared_ptr>::iterator j;
-
+ Mutex::ScopedLock l(lock);
if (routingKey) {
Bindings::iterator i = bindings.find(*routingKey);
@@ -131,17 +114,17 @@
return false;
if (!queue)
return true;
- for (j = i->second.begin(); j != i->second.end(); j++)
- if ((*j)->queue == queue)
- return true;
+
+ Queues::ConstPtr p = i->second.snapshot();
+ return p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end();
} else if (!queue) {
//if no queue or routing key is specified, just report whether any bindings exist
return bindings.size() > 0;
} else {
- for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++)
- for (j = i->second.begin(); j != i->second.end(); j++)
- if ((*j)->queue == queue)
- return true;
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) {
+ Queues::ConstPtr p = i->second.snapshot();
+ if (p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end()) return true;
+ }
return false;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h?rev=693053&r1=693052&r2=693053&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h Mon Sep 8 04:13:38 2008
@@ -25,16 +25,17 @@
#include <vector>
#include "Exchange.h"
#include "qpid/framing/FieldTable.h"
-#include "qpid/sys/Monitor.h"
+#include "qpid/sys/CopyOnWriteArray.h"
+#include "qpid/sys/Mutex.h"
#include "Queue.h"
namespace qpid {
namespace broker {
class DirectExchange : public virtual Exchange{
- typedef std::vector<Binding::shared_ptr> Queues;
+ typedef qpid::sys::CopyOnWriteArray<Binding::shared_ptr> Queues;
typedef std::map<string, Queues> Bindings;
Bindings bindings;
- qpid::sys::RWlock lock;
+ qpid::sys::Mutex lock;
public:
static const std::string typeName;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=693053&r1=693052&r2=693053&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Mon Sep 8 04:13:38 2008
@@ -153,3 +153,11 @@
{
return Manageable::STATUS_UNKNOWN_METHOD;
}
+
+
+Exchange::MatchQueue::MatchQueue(Queue::shared_ptr q) : queue(q) {}
+
+bool Exchange::MatchQueue::operator()(Exchange::Binding::shared_ptr b)
+{
+ return b->queue == queue;
+}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=693053&r1=693052&r2=693053&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Mon Sep 8 04:13:38 2008
@@ -62,6 +62,12 @@
management::ManagementObject* GetManagementObject () const;
management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args);
};
+ struct MatchQueue
+ {
+ const Queue::shared_ptr queue;
+ MatchQueue(Queue::shared_ptr q);
+ bool operator()(Exchange::Binding::shared_ptr b);
+ };
management::Exchange* mgmtExchange;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?rev=693053&r1=693052&r2=693053&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Mon Sep 8 04:13:38 2008
@@ -40,18 +40,10 @@
mgmtExchange->set_type (typeName);
}
-bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
- RWlock::ScopedWlock locker(lock);
- std::vector<Binding::shared_ptr>::iterator i;
-
- // Add if not already present.
- for (i = bindings.begin (); i != bindings.end(); i++)
- if ((*i)->queue == queue)
- break;
-
- if (i == bindings.end()) {
- Binding::shared_ptr binding (new Binding ("", queue, this));
- bindings.push_back(binding);
+bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* /*args*/)
+{
+ Binding::shared_ptr binding (new Binding ("", queue, this));
+ if (bindings.add_unless(binding, MatchQueue(queue))) {
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
((management::Queue*) queue->GetManagementObject())->inc_bindingCount();
@@ -62,16 +54,9 @@
}
}
-bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
- RWlock::ScopedWlock locker(lock);
- std::vector<Binding::shared_ptr>::iterator i;
-
- for (i = bindings.begin (); i != bindings.end(); i++)
- if ((*i)->queue == queue)
- break;
-
- if (i != bindings.end()) {
- bindings.erase(i);
+bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* /*args*/)
+{
+ if (bindings.remove_if(MatchQueue(queue))) {
if (mgmtExchange != 0) {
mgmtExchange->dec_bindingCount();
((management::Queue*) queue->GetManagementObject())->dec_bindingCount();
@@ -83,10 +68,10 @@
}
void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){
- RWlock::ScopedRlock locker(lock);
uint32_t count(0);
- for(std::vector<Binding::shared_ptr>::iterator i = bindings.begin(); i != bindings.end(); ++i, count++){
+ BindingsArray::ConstPtr p = bindings.snapshot();
+ for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++){
msg.deliverTo((*i)->queue);
if ((*i)->mgmtBinding != 0)
(*i)->mgmtBinding->inc_msgMatched ();
@@ -111,13 +96,8 @@
bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const)
{
- std::vector<Binding::shared_ptr>::iterator i;
-
- for (i = bindings.begin (); i != bindings.end(); i++)
- if ((*i)->queue == queue)
- break;
-
- return i != bindings.end();
+ BindingsArray::ConstPtr ptr = bindings.snapshot();
+ return ptr && std::find_if(ptr->begin(), ptr->end(), MatchQueue(queue)) != ptr->end();
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h?rev=693053&r1=693052&r2=693053&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h Mon Sep 8 04:13:38 2008
@@ -25,16 +25,15 @@
#include <vector>
#include "Exchange.h"
#include "qpid/framing/FieldTable.h"
-#include "qpid/sys/Monitor.h"
+#include "qpid/sys/CopyOnWriteArray.h"
#include "Queue.h"
namespace qpid {
namespace broker {
class FanOutExchange : public virtual Exchange {
- std::vector<Binding::shared_ptr> bindings;
- qpid::sys::RWlock lock;
-
+ typedef qpid::sys::CopyOnWriteArray<Binding::shared_ptr> BindingsArray;
+ BindingsArray bindings;
public:
static const std::string typeName;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?rev=693053&r1=693052&r2=693053&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Mon Sep 8 04:13:38 2008
@@ -73,22 +73,12 @@
}
bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args){
- RWlock::ScopedWlock locker(lock);
std::string what = getMatch(args);
if (what != all && what != any)
throw InternalErrorException(QPID_MSG("Invalid x-match value binding to headers exchange."));
- Bindings::iterator i;
-
- for (i = bindings.begin(); i != bindings.end(); i++)
- if (i->first == *args && i->second->queue == queue)
- break;
-
- if (i == bindings.end()) {
- Binding::shared_ptr binding (new Binding (bindingKey, queue, this, *args));
- HeaderMap headerMap(*args, binding);
-
- bindings.push_back(headerMap);
+ Binding::shared_ptr binding (new Binding (bindingKey, queue, this, *args));
+ if (bindings.add_unless(binding, MatchArgs(queue, args))) {
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
((management::Queue*) queue->GetManagementObject())->inc_bindingCount();
@@ -99,21 +89,8 @@
}
}
-bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args){
- RWlock::ScopedWlock locker(lock);
- Bindings::iterator i;
- for (i = bindings.begin(); i != bindings.end(); i++) {
- if (bindingKey.empty() && args) {
- if (i->first == *args && i->second->queue == queue)
- break;
- } else {
- if (i->second->key == bindingKey && i->second->queue == queue)
- break;
- }
- }
-
- if (i != bindings.end()) {
- bindings.erase(i);
+bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable*){
+ if (bindings.remove_if(MatchKey(queue, bindingKey))) {
if (mgmtExchange != 0) {
mgmtExchange->dec_bindingCount();
((management::Queue*) queue->GetManagementObject())->dec_bindingCount();
@@ -128,13 +105,13 @@
void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){
if (!args) return;//can't match if there were no headers passed in
- RWlock::ScopedRlock locker(lock);
uint32_t count(0);
- for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i, count++) {
- if (match(i->first, *args)) msg.deliverTo(i->second->queue);
- if (i->second->mgmtBinding != 0)
- i->second->mgmtBinding->inc_msgMatched ();
+ Bindings::ConstPtr p = bindings.snapshot();
+ for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++) {
+ if (match((*i)->args, *args)) msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding != 0)
+ (*i)->mgmtBinding->inc_msgMatched ();
}
if (mgmtExchange != 0)
@@ -157,8 +134,9 @@
bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const args)
{
- for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- if ( (!args || equal(i->first, *args)) && (!queue || i->second->queue == queue)) {
+ Bindings::ConstPtr p = bindings.snapshot();
+ for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) {
+ if ( (!args || equal((*i)->args, *args)) && (!queue || (*i)->queue == queue)) {
return true;
}
}
@@ -227,5 +205,15 @@
return true;
}
+HeadersExchange::MatchArgs::MatchArgs(Queue::shared_ptr q, const qpid::framing::FieldTable* a) : queue(q), args(a) {}
+bool HeadersExchange::MatchArgs::operator()(Exchange::Binding::shared_ptr b)
+{
+ return b->queue == queue && b->args == *args;
+}
+HeadersExchange::MatchKey::MatchKey(Queue::shared_ptr q, const std::string& k) : queue(q), key(k) {}
+bool HeadersExchange::MatchKey::operator()(Exchange::Binding::shared_ptr b)
+{
+ return b->queue == queue && b->key == key;
+}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h?rev=693053&r1=693052&r2=693053&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h Mon Sep 8 04:13:38 2008
@@ -24,7 +24,8 @@
#include <vector>
#include "Exchange.h"
#include "qpid/framing/FieldTable.h"
-#include "qpid/sys/Monitor.h"
+#include "qpid/sys/CopyOnWriteArray.h"
+#include "qpid/sys/Mutex.h"
#include "Queue.h"
namespace qpid {
@@ -33,10 +34,25 @@
class HeadersExchange : public virtual Exchange {
typedef std::pair<qpid::framing::FieldTable, Binding::shared_ptr> HeaderMap;
- typedef std::vector<HeaderMap> Bindings;
+ typedef qpid::sys::CopyOnWriteArray<Binding::shared_ptr> Bindings;
+
+ struct MatchArgs
+ {
+ const Queue::shared_ptr queue;
+ const qpid::framing::FieldTable* args;
+ MatchArgs(Queue::shared_ptr q, const qpid::framing::FieldTable* a);
+ bool operator()(Exchange::Binding::shared_ptr b);
+ };
+ struct MatchKey
+ {
+ const Queue::shared_ptr queue;
+ const std::string& key;
+ MatchKey(Queue::shared_ptr q, const std::string& k);
+ bool operator()(Exchange::Binding::shared_ptr b);
+ };
Bindings bindings;
- qpid::sys::RWlock lock;
+ qpid::sys::Mutex lock;
static std::string getMatch(const framing::FieldTable* args);
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h?rev=693053&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h Mon Sep 8 04:13:38 2008
@@ -0,0 +1,126 @@
+#ifndef QPID_SYS_COPYONWRITEARRAY_H
+#define QPID_SYS_COPYONWRITEARRAY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Mutex.h"
+#include <algorithm>
+#include <vector>
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * An array that copies on adding/removing element allowing lock-free
+ * iteration.
+ */
+template <class T>
+class CopyOnWriteArray
+{
+public:
+ typedef boost::shared_ptr<const std::vector<T> > ConstPtr;
+
+ CopyOnWriteArray() {}
+ CopyOnWriteArray(const CopyOnWriteArray& c) : array(c.array) {}
+
+ void add(T& t)
+ {
+ Mutex::ScopedLock l(lock);
+ ArrayPtr copy(array ? new std::vector<T>(*array) : new std::vector<T>());
+ copy->push_back(t);
+ array = copy;
+ }
+
+ bool remove(T& t)
+ {
+ Mutex::ScopedLock l(lock);
+ if (array && std::find(array->begin(), array->end(), t) != array->end()) {
+ ArrayPtr copy(new std::vector<T>(*array));
+ copy->erase(std::find(copy->begin(), copy->end(), t));
+ array = copy;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ template <class F>
+ bool add_unless(T& t, F f)
+ {
+ Mutex::ScopedLock l(lock);
+ if (array && find_if(array->begin(), array->end(), f) != array->end()) {
+ return false;
+ } else {
+ ArrayPtr copy(array ? new std::vector<T>(*array) : new std::vector<T>());
+ copy->push_back(t);
+ array = copy;
+ return true;
+ }
+ }
+
+ template <class F>
+ bool remove_if(F f)
+ {
+ Mutex::ScopedLock l(lock);
+ if (array && std::find_if(array->begin(), array->end(), f) != array->end()) {
+ ArrayPtr copy(new std::vector<T>(*array));
+ copy->erase(std::remove_if(copy->begin(), copy->end(), f), copy->end());
+ array = copy;
+ return true;
+ }
+ return false;
+ }
+
+ template <class F>
+ F for_each(F f)
+ {
+ ArrayPtr a;
+ {
+ Mutex::ScopedLock l(lock);
+ a = array;
+ }
+ if (!a) return f;
+ return std::for_each(a->begin(), a->end(), f);
+ }
+
+ ConstPtr snapshot()
+ {
+ ConstPtr a;
+ {
+ Mutex::ScopedLock l(lock);
+ a = array;
+ }
+ return a;
+ }
+
+private:
+ typedef boost::shared_ptr< std::vector<T> > ArrayPtr;
+ Mutex lock;
+ ArrayPtr array;
+};
+
+}}
+
+
+
+#endif /*!QPID_SYS_COPYONWRITEARRAY_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp?rev=693053&r1=693052&r2=693053&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp Mon Sep 8 04:13:38 2008
@@ -76,9 +76,9 @@
string k3("xyz");
FanOutExchange fanout("fanout");
- fanout.bind(a, "", 0);
- fanout.bind(b, "", 0);
- fanout.bind(c, "", 0);
+ BOOST_CHECK(fanout.bind(a, "", 0));
+ BOOST_CHECK(fanout.bind(b, "", 0));
+ BOOST_CHECK(fanout.bind(c, "", 0));
BOOST_CHECK(fanout.isBound(a, 0, 0));
BOOST_CHECK(fanout.isBound(b, 0, 0));
@@ -86,10 +86,10 @@
BOOST_CHECK(!fanout.isBound(d, 0, 0));
DirectExchange direct("direct");
- direct.bind(a, k1, 0);
- direct.bind(a, k3, 0);
- direct.bind(b, k2, 0);
- direct.bind(c, k1, 0);
+ BOOST_CHECK(direct.bind(a, k1, 0));
+ BOOST_CHECK(direct.bind(a, k3, 0));
+ BOOST_CHECK(direct.bind(b, k2, 0));
+ BOOST_CHECK(direct.bind(c, k1, 0));
BOOST_CHECK(direct.isBound(a, 0, 0));
BOOST_CHECK(direct.isBound(a, &k1, 0));
@@ -104,10 +104,10 @@
BOOST_CHECK(!direct.isBound(d, &k3, 0));
TopicExchange topic("topic");
- topic.bind(a, k1, 0);
- topic.bind(a, k3, 0);
- topic.bind(b, k2, 0);
- topic.bind(c, k1, 0);
+ BOOST_CHECK(topic.bind(a, k1, 0));
+ BOOST_CHECK(topic.bind(a, k3, 0));
+ BOOST_CHECK(topic.bind(b, k2, 0));
+ BOOST_CHECK(topic.bind(c, k1, 0));
BOOST_CHECK(topic.isBound(a, 0, 0));
BOOST_CHECK(topic.isBound(a, &k1, 0));