You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/09/08 13:13:47 UTC

svn commit: r693053 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/sys/ tests/

Author: gsim
Date: Mon Sep  8 04:13:38 2008
New Revision: 693053

URL: http://svn.apache.org/viewvc?rev=693053&view=rev
Log:
QPID-1264: initial fix for fanout, direct and headers exchanges (fix for remaining types to follow)


Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=693053&r1=693052&r2=693053&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Mon Sep  8 04:13:38 2008
@@ -542,6 +542,7 @@
   qpid/sys/AtomicValue_gcc.h \
   qpid/sys/AtomicValue_mutex.h \
   qpid/sys/BlockingQueue.h \
+  qpid/sys/CopyOnWriteArray.h \
   qpid/sys/Condition.h \
   qpid/sys/ConnectionCodec.h \
   qpid/sys/ConnectionInputHandler.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?rev=693053&r1=693052&r2=693053&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Mon Sep  8 04:13:38 2008
@@ -42,41 +42,22 @@
 }
 
 bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){
-    RWlock::ScopedWlock l(lock);
-    std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
-    std::vector<Binding::shared_ptr>::iterator i;
-
-    for (i = queues.begin(); i != queues.end(); i++)
-        if ((*i)->queue == queue)
-            break;
-
-    if (i == queues.end()) {
-        Binding::shared_ptr binding (new Binding (routingKey, queue, this));
-        bindings[routingKey].push_back(binding);
+    Mutex::ScopedLock l(lock);
+    Binding::shared_ptr b(new Binding (routingKey, queue, this));
+    if (bindings[routingKey].add_unless(b, MatchQueue(queue))) {
         if (mgmtExchange != 0) {
             mgmtExchange->inc_bindingCount();
             ((management::Queue*) queue->GetManagementObject())->inc_bindingCount();
         }
         return true;
-    } else{
+    } else {
         return false;
     }
 }
 
 bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
-    RWlock::ScopedWlock l(lock);
-    std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
-    std::vector<Binding::shared_ptr>::iterator i;
-
-    for (i = queues.begin(); i != queues.end(); i++)
-        if ((*i)->queue == queue)
-            break;
-
-    if (i < queues.end()) {
-        queues.erase(i);
-        if (queues.empty()) {
-            bindings.erase(routingKey);
-        }
+    Mutex::ScopedLock l(lock);
+    if (bindings[routingKey].remove_if(MatchQueue(queue))) {
         if (mgmtExchange != 0) {
             mgmtExchange->dec_bindingCount();
             ((management::Queue*) queue->GetManagementObject())->dec_bindingCount();
@@ -88,16 +69,20 @@
 }
 
 void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
-    RWlock::ScopedRlock l(lock);
-    std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
-    std::vector<Binding::shared_ptr>::iterator i;
+    Queues::ConstPtr p;
+    {
+        Mutex::ScopedLock l(lock);
+        p = bindings[routingKey].snapshot();
+    }
     int count(0);
 
-    for(i = queues.begin(); i != queues.end(); i++, count++) {
-        msg.deliverTo((*i)->queue);
-        if ((*i)->mgmtBinding != 0)
-            (*i)->mgmtBinding->inc_msgMatched ();
-     }
+    if (p) {
+        for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) {
+            msg.deliverTo((*i)->queue);
+            if ((*i)->mgmtBinding != 0)
+                (*i)->mgmtBinding->inc_msgMatched ();
+        }
+    }
 
     if(!count){
         QPID_LOG(warning, "DirectExchange " << getName() << " could not route message with key " << routingKey);
@@ -105,8 +90,7 @@
             mgmtExchange->inc_msgDrops  ();
             mgmtExchange->inc_byteDrops (msg.contentSize ());
         }
-    }
-    else {
+    } else {
         if (mgmtExchange != 0) {
             mgmtExchange->inc_msgRoutes  (count);
             mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
@@ -122,8 +106,7 @@
 
 bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
 {
-    std::vector<Binding::shared_ptr>::iterator j;
-
+    Mutex::ScopedLock l(lock);
     if (routingKey) {
         Bindings::iterator i = bindings.find(*routingKey);
 
@@ -131,17 +114,17 @@
             return false;
         if (!queue)
             return true;
-        for (j = i->second.begin(); j != i->second.end(); j++)
-            if ((*j)->queue == queue)
-                return true;
+
+        Queues::ConstPtr p = i->second.snapshot();
+        return p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end();
     } else if (!queue) {
         //if no queue or routing key is specified, just report whether any bindings exist
         return bindings.size() > 0;
     } else {
-        for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++)
-            for (j = i->second.begin(); j != i->second.end(); j++)
-                if ((*j)->queue == queue)
-                    return true;
+        for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) {
+            Queues::ConstPtr p = i->second.snapshot();
+            if (p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end()) return true;
+        }
         return false;
     }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h?rev=693053&r1=693052&r2=693053&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h Mon Sep  8 04:13:38 2008
@@ -25,16 +25,17 @@
 #include <vector>
 #include "Exchange.h"
 #include "qpid/framing/FieldTable.h"
-#include "qpid/sys/Monitor.h"
+#include "qpid/sys/CopyOnWriteArray.h"
+#include "qpid/sys/Mutex.h"
 #include "Queue.h"
 
 namespace qpid {
 namespace broker {
     class DirectExchange : public virtual Exchange{
-        typedef std::vector<Binding::shared_ptr> Queues;
+        typedef qpid::sys::CopyOnWriteArray<Binding::shared_ptr> Queues;
         typedef std::map<string, Queues> Bindings;
         Bindings bindings;
-        qpid::sys::RWlock lock;
+        qpid::sys::Mutex lock;
 
     public:
         static const std::string typeName;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=693053&r1=693052&r2=693053&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Mon Sep  8 04:13:38 2008
@@ -153,3 +153,11 @@
 {
     return Manageable::STATUS_UNKNOWN_METHOD;
 }
+
+
+Exchange::MatchQueue::MatchQueue(Queue::shared_ptr q) : queue(q) {}
+
+bool Exchange::MatchQueue::operator()(Exchange::Binding::shared_ptr b)
+{
+    return b->queue == queue;
+}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=693053&r1=693052&r2=693053&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Mon Sep  8 04:13:38 2008
@@ -62,6 +62,12 @@
                 management::ManagementObject* GetManagementObject () const;
                 management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args);
             };
+            struct MatchQueue
+            {
+                const Queue::shared_ptr queue;        
+                MatchQueue(Queue::shared_ptr q);
+                bool operator()(Exchange::Binding::shared_ptr b);
+            };
 
             management::Exchange* mgmtExchange;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?rev=693053&r1=693052&r2=693053&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Mon Sep  8 04:13:38 2008
@@ -40,18 +40,10 @@
         mgmtExchange->set_type (typeName);
 }
 
-bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
-    RWlock::ScopedWlock locker(lock);
-    std::vector<Binding::shared_ptr>::iterator i;
-
-    // Add if not already present.
-    for (i = bindings.begin (); i != bindings.end(); i++)
-        if ((*i)->queue == queue)
-            break;
-
-    if (i == bindings.end()) {
-        Binding::shared_ptr binding (new Binding ("", queue, this));
-        bindings.push_back(binding);
+bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* /*args*/)
+{
+    Binding::shared_ptr binding (new Binding ("", queue, this));
+    if (bindings.add_unless(binding, MatchQueue(queue))) {
         if (mgmtExchange != 0) {
             mgmtExchange->inc_bindingCount();
             ((management::Queue*) queue->GetManagementObject())->inc_bindingCount();
@@ -62,16 +54,9 @@
     }
 }
 
-bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
-    RWlock::ScopedWlock locker(lock);
-    std::vector<Binding::shared_ptr>::iterator i;
-
-    for (i = bindings.begin (); i != bindings.end(); i++)
-        if ((*i)->queue == queue)
-            break;
-
-    if (i != bindings.end()) {
-        bindings.erase(i);
+bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* /*args*/)
+{
+    if (bindings.remove_if(MatchQueue(queue))) {
         if (mgmtExchange != 0) {
             mgmtExchange->dec_bindingCount();
             ((management::Queue*) queue->GetManagementObject())->dec_bindingCount();
@@ -83,10 +68,10 @@
 }
 
 void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){
-    RWlock::ScopedRlock locker(lock);
     uint32_t count(0);
 
-    for(std::vector<Binding::shared_ptr>::iterator i = bindings.begin(); i != bindings.end(); ++i, count++){
+    BindingsArray::ConstPtr p = bindings.snapshot();
+    for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++){
         msg.deliverTo((*i)->queue);
         if ((*i)->mgmtBinding != 0)
             (*i)->mgmtBinding->inc_msgMatched ();
@@ -111,13 +96,8 @@
 
 bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const)
 {
-    std::vector<Binding::shared_ptr>::iterator i;
-
-    for (i = bindings.begin (); i != bindings.end(); i++)
-        if ((*i)->queue == queue)
-            break;
-
-    return i != bindings.end();
+    BindingsArray::ConstPtr ptr = bindings.snapshot();
+    return ptr && std::find_if(ptr->begin(), ptr->end(), MatchQueue(queue)) != ptr->end();
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h?rev=693053&r1=693052&r2=693053&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h Mon Sep  8 04:13:38 2008
@@ -25,16 +25,15 @@
 #include <vector>
 #include "Exchange.h"
 #include "qpid/framing/FieldTable.h"
-#include "qpid/sys/Monitor.h"
+#include "qpid/sys/CopyOnWriteArray.h"
 #include "Queue.h"
 
 namespace qpid {
 namespace broker {
 
 class FanOutExchange : public virtual Exchange {
-    std::vector<Binding::shared_ptr> bindings;
-    qpid::sys::RWlock lock;
-
+    typedef qpid::sys::CopyOnWriteArray<Binding::shared_ptr> BindingsArray;
+    BindingsArray bindings;
   public:
     static const std::string typeName;
         

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?rev=693053&r1=693052&r2=693053&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Mon Sep  8 04:13:38 2008
@@ -73,22 +73,12 @@
 }
 
 bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args){
-    RWlock::ScopedWlock locker(lock);
     std::string what = getMatch(args);
     if (what != all && what != any)
         throw InternalErrorException(QPID_MSG("Invalid x-match value binding to headers exchange."));
 
-    Bindings::iterator i;
-
-    for (i = bindings.begin(); i != bindings.end(); i++)
-        if (i->first == *args && i->second->queue == queue)
-            break;
-
-    if (i == bindings.end()) {
-        Binding::shared_ptr binding (new Binding (bindingKey, queue, this, *args));
-        HeaderMap headerMap(*args, binding);
-
-        bindings.push_back(headerMap);
+    Binding::shared_ptr binding (new Binding (bindingKey, queue, this, *args));
+    if (bindings.add_unless(binding, MatchArgs(queue, args))) {
         if (mgmtExchange != 0) {
             mgmtExchange->inc_bindingCount();
             ((management::Queue*) queue->GetManagementObject())->inc_bindingCount();
@@ -99,21 +89,8 @@
     }
 }
 
-bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args){
-    RWlock::ScopedWlock locker(lock);
-    Bindings::iterator i;
-    for (i = bindings.begin(); i != bindings.end(); i++) {
-        if (bindingKey.empty() && args) {
-            if (i->first == *args && i->second->queue == queue)
-                break;
-        } else {
-            if (i->second->key == bindingKey && i->second->queue == queue)
-                break;
-        }
-    }
-
-    if (i != bindings.end()) {
-        bindings.erase(i);
+bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable*){
+    if (bindings.remove_if(MatchKey(queue, bindingKey))) {
         if (mgmtExchange != 0) {
             mgmtExchange->dec_bindingCount();
             ((management::Queue*) queue->GetManagementObject())->dec_bindingCount();
@@ -128,13 +105,13 @@
 void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){
     if (!args) return;//can't match if there were no headers passed in
 
-    RWlock::ScopedRlock locker(lock);
     uint32_t count(0);
 
-    for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i, count++) {
-        if (match(i->first, *args)) msg.deliverTo(i->second->queue);
-        if (i->second->mgmtBinding != 0)
-            i->second->mgmtBinding->inc_msgMatched ();
+    Bindings::ConstPtr p = bindings.snapshot();
+    for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++) {
+        if (match((*i)->args, *args)) msg.deliverTo((*i)->queue);
+        if ((*i)->mgmtBinding != 0)
+            (*i)->mgmtBinding->inc_msgMatched ();
     }
 
     if (mgmtExchange != 0)
@@ -157,8 +134,9 @@
 
 bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const args)
 {
-    for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
-        if ( (!args || equal(i->first, *args)) && (!queue || i->second->queue == queue)) {
+    Bindings::ConstPtr p = bindings.snapshot();
+    for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) {
+        if ( (!args || equal((*i)->args, *args)) && (!queue || (*i)->queue == queue)) {
             return true;
         }
     }
@@ -227,5 +205,15 @@
     return true;
 }
 
+HeadersExchange::MatchArgs::MatchArgs(Queue::shared_ptr q, const qpid::framing::FieldTable* a) : queue(q), args(a) {}
+bool HeadersExchange::MatchArgs::operator()(Exchange::Binding::shared_ptr b)
+{
+    return b->queue == queue && b->args == *args;
+}
 
+HeadersExchange::MatchKey::MatchKey(Queue::shared_ptr q, const std::string& k) : queue(q), key(k) {}
 
+bool HeadersExchange::MatchKey::operator()(Exchange::Binding::shared_ptr b)
+{
+    return b->queue == queue && b->key == key;
+}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h?rev=693053&r1=693052&r2=693053&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h Mon Sep  8 04:13:38 2008
@@ -24,7 +24,8 @@
 #include <vector>
 #include "Exchange.h"
 #include "qpid/framing/FieldTable.h"
-#include "qpid/sys/Monitor.h"
+#include "qpid/sys/CopyOnWriteArray.h"
+#include "qpid/sys/Mutex.h"
 #include "Queue.h"
 
 namespace qpid {
@@ -33,10 +34,25 @@
 
 class HeadersExchange : public virtual Exchange {    
     typedef std::pair<qpid::framing::FieldTable, Binding::shared_ptr> HeaderMap;
-    typedef std::vector<HeaderMap> Bindings;
+    typedef qpid::sys::CopyOnWriteArray<Binding::shared_ptr> Bindings;
+
+    struct MatchArgs
+    {
+        const Queue::shared_ptr queue;        
+        const qpid::framing::FieldTable* args;
+        MatchArgs(Queue::shared_ptr q, const qpid::framing::FieldTable* a);
+        bool operator()(Exchange::Binding::shared_ptr b);        
+    };
+    struct MatchKey
+    {
+        const Queue::shared_ptr queue;        
+        const std::string& key;
+        MatchKey(Queue::shared_ptr q, const std::string& k);
+        bool operator()(Exchange::Binding::shared_ptr b);        
+    };
 
     Bindings bindings;
-    qpid::sys::RWlock lock;
+    qpid::sys::Mutex lock;
 
     static std::string getMatch(const framing::FieldTable* args);
 

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h?rev=693053&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h Mon Sep  8 04:13:38 2008
@@ -0,0 +1,126 @@
+#ifndef QPID_SYS_COPYONWRITEARRAY_H
+#define QPID_SYS_COPYONWRITEARRAY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Mutex.h"
+#include <algorithm>
+#include <vector>
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * An array that copies on adding/removing element allowing lock-free
+ * iteration.
+ */
+template <class T>
+class CopyOnWriteArray
+{
+public:
+    typedef boost::shared_ptr<const std::vector<T> > ConstPtr;
+
+    CopyOnWriteArray() {}
+    CopyOnWriteArray(const CopyOnWriteArray& c) : array(c.array) {}
+
+    void add(T& t)
+    {
+        Mutex::ScopedLock l(lock);
+        ArrayPtr copy(array ? new std::vector<T>(*array) : new std::vector<T>());
+        copy->push_back(t);
+        array = copy;
+    }
+
+    bool remove(T& t)
+    {
+        Mutex::ScopedLock l(lock);
+        if (array && std::find(array->begin(), array->end(), t) != array->end()) {
+            ArrayPtr copy(new std::vector<T>(*array));
+            copy->erase(std::find(copy->begin(), copy->end(), t));
+            array = copy;
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    template <class F>
+    bool add_unless(T& t, F f)
+    {
+        Mutex::ScopedLock l(lock);
+        if (array && find_if(array->begin(), array->end(), f) != array->end()) {
+            return false;
+        } else {
+            ArrayPtr copy(array ? new std::vector<T>(*array) : new std::vector<T>());
+            copy->push_back(t);
+            array = copy;
+            return true;
+        }
+    }
+
+    template <class F>
+    bool remove_if(F f)
+    {
+        Mutex::ScopedLock l(lock);
+        if (array && std::find_if(array->begin(), array->end(), f) != array->end()) {
+            ArrayPtr copy(new std::vector<T>(*array));
+            copy->erase(std::remove_if(copy->begin(), copy->end(), f), copy->end());
+            array = copy;
+            return true;
+        }
+        return false;
+    }
+
+    template <class F>
+    F for_each(F f)
+    {
+        ArrayPtr a;
+        {
+            Mutex::ScopedLock l(lock);
+            a = array;
+        }
+        if (!a) return f;
+        return std::for_each(a->begin(), a->end(), f);
+    }
+
+    ConstPtr snapshot()
+    {
+        ConstPtr a;
+        {
+            Mutex::ScopedLock l(lock);
+            a = array;
+        }
+        return a;
+    }
+
+private:
+    typedef boost::shared_ptr< std::vector<T> > ArrayPtr;
+    Mutex lock;
+    ArrayPtr array;
+};
+
+}}
+
+
+
+#endif  /*!QPID_SYS_COPYONWRITEARRAY_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp?rev=693053&r1=693052&r2=693053&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp Mon Sep  8 04:13:38 2008
@@ -76,9 +76,9 @@
     string k3("xyz");
 
     FanOutExchange fanout("fanout");
-    fanout.bind(a, "", 0);
-    fanout.bind(b, "", 0);
-    fanout.bind(c, "", 0);
+    BOOST_CHECK(fanout.bind(a, "", 0));
+    BOOST_CHECK(fanout.bind(b, "", 0));
+    BOOST_CHECK(fanout.bind(c, "", 0));
 
     BOOST_CHECK(fanout.isBound(a, 0, 0));
     BOOST_CHECK(fanout.isBound(b, 0, 0));
@@ -86,10 +86,10 @@
     BOOST_CHECK(!fanout.isBound(d, 0, 0));
 
     DirectExchange direct("direct");
-    direct.bind(a, k1, 0);
-    direct.bind(a, k3, 0);
-    direct.bind(b, k2, 0);
-    direct.bind(c, k1, 0);
+    BOOST_CHECK(direct.bind(a, k1, 0));
+    BOOST_CHECK(direct.bind(a, k3, 0));
+    BOOST_CHECK(direct.bind(b, k2, 0));
+    BOOST_CHECK(direct.bind(c, k1, 0));
 
     BOOST_CHECK(direct.isBound(a, 0, 0));
     BOOST_CHECK(direct.isBound(a, &k1, 0));
@@ -104,10 +104,10 @@
     BOOST_CHECK(!direct.isBound(d, &k3, 0));
 
     TopicExchange topic("topic");
-    topic.bind(a, k1, 0);
-    topic.bind(a, k3, 0);
-    topic.bind(b, k2, 0);
-    topic.bind(c, k1, 0);
+    BOOST_CHECK(topic.bind(a, k1, 0));
+    BOOST_CHECK(topic.bind(a, k3, 0));
+    BOOST_CHECK(topic.bind(b, k2, 0));
+    BOOST_CHECK(topic.bind(c, k1, 0));
 
     BOOST_CHECK(topic.isBound(a, 0, 0));
     BOOST_CHECK(topic.isBound(a, &k1, 0));