You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2009/03/11 00:11:10 UTC

svn commit: r752300 [2/12] - in /qpid/branches/qpid-1673/qpid: cpp/ cpp/examples/ cpp/examples/direct/ cpp/examples/failover/ cpp/examples/fanout/ cpp/examples/pub-sub/ cpp/examples/qmf-console/ cpp/examples/request-response/ cpp/examples/tradedemo/ cp...

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Exchange.cpp Tue Mar 10 23:10:57 2009
@@ -102,8 +102,8 @@
 
 Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
                    Manageable* parent)
-    : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0), 
-      sequence(false), sequenceNo(0), ive(false), mgmtExchange(0)
+    : name(_name), durable(_durable), alternateUsers(0), persistenceId(0), 
+      args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0)
 {
     if (parent != 0)
     {
@@ -275,3 +275,7 @@
 {
     return b->queue == queue;
 }
+
+void Exchange::setProperties(const boost::intrusive_ptr<Message>& msg) {
+    msg->getProperties<DeliveryProperties>()->setExchange(getName());
+}

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Exchange.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Exchange.h Tue Mar 10 23:10:57 2009
@@ -59,12 +59,12 @@
 private:
     const std::string name;
     const bool durable;
-    mutable qpid::framing::FieldTable args;
     boost::shared_ptr<Exchange> alternate;
     uint32_t alternateUsers;
     mutable uint64_t persistenceId;
 
 protected:
+    mutable qpid::framing::FieldTable args;
     bool sequence;
     mutable qpid::sys::Mutex sequenceLock;
     int64_t sequenceNo;
@@ -140,13 +140,14 @@
     virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
     virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
     virtual bool isBound(Queue::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0;
+    virtual void setProperties(const boost::intrusive_ptr<Message>&);
     virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
-
+    
     //PersistableExchange:
     void setPersistenceId(uint64_t id) const;
     uint64_t getPersistenceId() const { return persistenceId; }
     uint32_t encodedSize() const;
-    QPID_BROKER_EXTERN void encode(framing::Buffer& buffer) const; 
+    QPID_BROKER_EXTERN virtual void encode(framing::Buffer& buffer) const; 
 
     static QPID_BROKER_EXTERN Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer);
 

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Tue Mar 10 23:10:57 2009
@@ -105,33 +105,42 @@
 
 
 void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){
-    if (!args) return;//can't match if there were no headers passed in
+    if (!args) {
+        //can't match if there were no headers passed in
+        if (mgmtExchange != 0) {
+            mgmtExchange->inc_msgReceives();
+            mgmtExchange->inc_byteReceives(msg.contentSize());
+            mgmtExchange->inc_msgDrops();
+            mgmtExchange->inc_byteDrops(msg.contentSize());
+        }
+        return;
+    }
+
     PreRoute pr(msg, this);
 
     uint32_t count(0);
 
     Bindings::ConstPtr p = bindings.snapshot();
     if (p.get()){
-        for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++) {
-            if (match((*i)->args, *args)) msg.deliverTo((*i)->queue);
-            if ((*i)->mgmtBinding != 0)
-                (*i)->mgmtBinding->inc_msgMatched ();
+        for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) {
+            if (match((*i)->args, *args)) {
+                msg.deliverTo((*i)->queue);
+                count++;
+                if ((*i)->mgmtBinding != 0)
+                    (*i)->mgmtBinding->inc_msgMatched();
+            }
         }
     }
 
-    if (mgmtExchange != 0)
-    {
-        mgmtExchange->inc_msgReceives  ();
-        mgmtExchange->inc_byteReceives (msg.contentSize ());
-        if (count == 0)
-        {
-            mgmtExchange->inc_msgDrops  ();
-            mgmtExchange->inc_byteDrops (msg.contentSize ());
-        }
-        else
-        {
-            mgmtExchange->inc_msgRoutes  (count);
-            mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+    if (mgmtExchange != 0) {
+        mgmtExchange->inc_msgReceives();
+        mgmtExchange->inc_byteReceives(msg.contentSize());
+        if (count == 0) {
+            mgmtExchange->inc_msgDrops();
+            mgmtExchange->inc_byteDrops(msg.contentSize());
+        } else {
+            mgmtExchange->inc_msgRoutes(count);
+            mgmtExchange->inc_byteRoutes(count * msg.contentSize());
         }
     }
 }

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Link.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Link.cpp Tue Mar 10 23:10:57 2009
@@ -158,7 +158,7 @@
     }
 
     for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
-        (*i)->cancel();
+        (*i)->closed();
         created.push_back(*i);
     }
     active.clear();
@@ -217,21 +217,27 @@
 
 void Link::cancel(Bridge::shared_ptr bridge)
 {
-    Mutex::ScopedLock mutex(lock);
-
-    for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
-        if ((*i).get() == bridge.get()) {
-            created.erase(i);
-            break;
+    {
+        Mutex::ScopedLock mutex(lock);
+        
+        for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
+            if ((*i).get() == bridge.get()) {
+                created.erase(i);
+                break;
+            }
         }
-    }
-    for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
-        if ((*i).get() == bridge.get()) {
-            bridge->cancel();
-            active.erase(i);
-            break;
+        for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+            if ((*i).get() == bridge.get()) {
+                cancellations.push_back(bridge);
+                bridge->closed();
+                active.erase(i);
+                break;
+            }
         }
     }
+    if (!cancellations.empty()) {
+        connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+    }
 }
 
 void Link::ioThreadProcessing()
@@ -242,7 +248,7 @@
         return;
     QPID_LOG(debug, "Link::ioThreadProcessing()");
 
-    //process any pending creates
+    //process any pending creates and/or cancellations
     if (!created.empty()) {
         for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
             active.push_back(*i);
@@ -250,6 +256,13 @@
         }
         created.clear();
     }
+    if (!cancellations.empty()) {
+        for (Bridges::iterator i = cancellations.begin(); i != cancellations.end(); ++i) {
+            active.push_back(*i);
+            (*i)->cancel(*connection);
+        }
+        cancellations.clear();
+    }
 }
 
 void Link::setConnection(Connection* c)
@@ -284,7 +297,7 @@
             }
         }
     }
-    else if (state == STATE_OPERATIONAL && !created.empty() && connection != 0)
+    else if (state == STATE_OPERATIONAL && (!created.empty() || !cancellations.empty()) && connection != 0)
         connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
 }
 

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Link.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Link.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Link.h Tue Mar 10 23:10:57 2009
@@ -67,6 +67,7 @@
             typedef std::vector<Bridge::shared_ptr> Bridges;
             Bridges created;   // Bridges pending creation
             Bridges active;    // Bridges active
+            Bridges cancellations;    // Bridges pending cancellation
             uint channelCounter;
             Connection* connection;
             management::ManagementAgent* agent;

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Tue Mar 10 23:10:57 2009
@@ -19,19 +19,24 @@
  *
  */
 #include "LinkRegistry.h"
+#include "Connection.h"
 #include "qpid/log/Statement.h"
 #include <iostream>
+#include <boost/format.hpp>
 
 using namespace qpid::broker;
 using namespace qpid::sys;
 using std::pair;
 using std::stringstream;
 using boost::intrusive_ptr;
+using boost::format;
+using boost::str;
 namespace _qmf = qmf::org::apache::qpid::broker;
 
 #define LINK_MAINT_INTERVAL 2
 
-LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0), passive(false), passiveChanged(false)
+LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0), passive(false), passiveChanged(false), 
+                                               realm(broker ? broker->getOptions().realm : "")
 {
     timer.add (intrusive_ptr<TimerTask> (new Periodic(*this)));
 }
@@ -241,6 +246,7 @@
     {
         l->second->established();
         l->second->setConnection(c);
+        c->setUserId(str(format("%1%@%2%") % l->second->getUsername() % realm));
     }
 }
 

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/LinkRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/LinkRegistry.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/LinkRegistry.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/LinkRegistry.h Tue Mar 10 23:10:57 2009
@@ -66,6 +66,7 @@
         MessageStore* store;
         bool passive;
         bool passiveChanged;
+        std::string realm;
 
         void periodicMaintenance ();
         bool updateAddress(const std::string& oldKey, const TcpAddress& newAddress);

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Message.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Message.cpp Tue Mar 10 23:10:57 2009
@@ -382,4 +382,9 @@
 void Message::setDequeueCompleteCallback(MessageCallback& cb) { dequeueCallback = &cb; }
 void Message::resetDequeueCompleteCallback() { dequeueCallback = 0; }
 
+framing::FieldTable& Message::getOrInsertHeaders()
+{
+    return getProperties<MessageProperties>()->getApplicationHeaders();
+}
+
 }} // namespace qpid::broker

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Message.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Message.h Tue Mar 10 23:10:57 2009
@@ -73,6 +73,7 @@
     QPID_BROKER_EXTERN std::string getExchangeName() const;
     bool isImmediate() const;
     QPID_BROKER_EXTERN const framing::FieldTable* getApplicationHeaders() const;
+    framing::FieldTable& getOrInsertHeaders();
     QPID_BROKER_EXTERN bool isPersistent();
     bool requiresAccept();
 

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Queue.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Queue.cpp Tue Mar 10 23:10:57 2009
@@ -93,7 +93,8 @@
     policyExceeded(false),
     mgmtObject(0),
     eventMode(0),
-    eventMgr(0)
+    eventMgr(0),
+    insertSeqNo(0)
 {
     if (parent != 0)
     {
@@ -176,7 +177,7 @@
 
 
 void Queue::recover(boost::intrusive_ptr<Message>& msg){
-    push(msg);
+    push(msg, true);
     msg->enqueueComplete(); // mark the message as enqueued
     mgntEnqStats(msg);
 
@@ -545,12 +546,13 @@
     ++dequeueTracker;
 }
 
-void Queue::push(boost::intrusive_ptr<Message>& msg){
+void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
     QueueListeners::NotificationSet copy;
     {
         Mutex::ScopedLock locker(messageLock);   
         QueuedMessage qm(this, msg, ++sequence);
         if (policy.get()) policy->tryEnqueue(qm);
+        if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
          
         LVQ::iterator i;
         const framing::FieldTable* ft = msg->getApplicationHeaders();
@@ -566,14 +568,21 @@
                 boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this);
                 if (!old) old = i->second;
                 i->second->setReplacementMessage(msg,this);
-                dequeued(QueuedMessage(qm.queue, old, qm.position));
+                if (isRecovery) {
+                    //can't issue new requests for the store until
+                    //recovery is complete
+                    pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position));
+                } else {
+                    dequeue(0, QueuedMessage(qm.queue, old, qm.position));
+                }
             }		 
         }else {
             messages.push_back(qm);
             listeners.populate(copy);
         }
-        if (eventMode && eventMgr) {
-            eventMgr->enqueued(qm);
+        if (eventMode) {
+            if (eventMgr) eventMgr->enqueued(qm);
+            else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName());
         }
     }
     copy.notify();
@@ -664,7 +673,7 @@
         msg->addTraceId(traceId);
     }
 
-    if (msg->isPersistent() && store && !lastValueQueue) {
+    if (msg->isPersistent() && store) {
         msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
         boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
         store->enqueue(ctxt, pmsg, *this);
@@ -676,14 +685,14 @@
 // return true if store exists, 
 bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
 {
-    if (policy.get() && !policy->isEnqueued(msg)) return false;
     {
         Mutex::ScopedLock locker(messageLock);
+        if (policy.get() && !policy->isEnqueued(msg)) return false;
         if (!ctxt) { 
             dequeued(msg);
         }
     }
-    if (msg.payload->isPersistent() && store && !lastValueQueue) {
+    if (msg.payload->isPersistent() && store) {
         msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
         boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
         store->dequeue(ctxt, pmsg, *this);
@@ -976,3 +985,16 @@
 {
     eventMgr = &mgr;
 }
+
+void Queue::recoveryComplete()
+{
+    //process any pending dequeues
+    for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+    pendingDequeues.clear();
+}
+
+void Queue::insertSequenceNumbers(const std::string& key)
+{
+    seqNoKey = key;
+    insertSeqNo = !seqNoKey.empty();
+}

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Queue.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/Queue.h Tue Mar 10 23:10:57 2009
@@ -87,6 +87,7 @@
             std::vector<std::string> traceExclude;
             QueueListeners listeners;
             Messages messages;
+            Messages pendingDequeues;//used to avoid dequeuing during recovery
             LVQ lvq;
             mutable qpid::sys::Mutex consumerLock;
             mutable qpid::sys::Mutex messageLock;
@@ -102,8 +103,10 @@
             RateTracker dequeueTracker;
             int eventMode;
             QueueEvents* eventMgr;
+            bool insertSeqNo;
+            std::string seqNoKey;
 
-            void push(boost::intrusive_ptr<Message>& msg);
+            void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
             void setPolicy(std::auto_ptr<QueuePolicy> policy);
             bool seek(QueuedMessage& msg, Consumer::shared_ptr position);
             bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
@@ -298,6 +301,11 @@
             void setPosition(framing::SequenceNumber pos);
             int getEventMode();
             void setQueueEventManager(QueueEvents&);
+            void insertSequenceNumbers(const std::string& key);
+            /**
+             * Notify queue that recovery has completed.
+             */
+            void recoveryComplete();
         };
     }
 }

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueEvents.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueEvents.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueEvents.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueEvents.cpp Tue Mar 10 23:10:57 2009
@@ -20,12 +20,13 @@
  */
 #include "QueueEvents.h"
 #include "qpid/Exception.h"
+#include "qpid/log/Statement.h"
 
 namespace qpid {
 namespace broker {
 
 QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller) : 
-    eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller) 
+    eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true) 
 {
     eventQueue.start();
 }
@@ -37,12 +38,12 @@
 
 void QueueEvents::enqueued(const QueuedMessage& m)
 {
-    eventQueue.push(Event(ENQUEUE, m));
+    if (enabled) eventQueue.push(Event(ENQUEUE, m));
 }
 
 void QueueEvents::dequeued(const QueuedMessage& m)
 {
-    eventQueue.push(Event(DEQUEUE, m));
+    if (enabled) eventQueue.push(Event(DEQUEUE, m));
 }
 
 void QueueEvents::registerListener(const std::string& id, const EventListener& listener)
@@ -81,6 +82,18 @@
     if (!eventQueue.empty() && !listeners.empty()) eventQueue.shutdown();
 }
 
+void QueueEvents::enable()
+{
+    enabled = true;
+    QPID_LOG(debug, "Queue events enabled");
+}
+
+void QueueEvents::disable()
+{
+    enabled = false;
+    QPID_LOG(debug, "Queue events disabled");
+}
+
 QueueEvents::Event::Event(EventType t, const QueuedMessage& m) : type(t), msg(m) {}
 
 

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueEvents.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueEvents.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueEvents.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueEvents.h Tue Mar 10 23:10:57 2009
@@ -61,6 +61,8 @@
     QPID_BROKER_EXTERN void registerListener(const std::string& id,
                                              const EventListener&);
     QPID_BROKER_EXTERN void unregisterListener(const std::string& id);
+    void enable();
+    void disable();
     //process all outstanding events
     QPID_BROKER_EXTERN void shutdown();
   private:
@@ -69,6 +71,7 @@
 
     EventQueue eventQueue;
     Listeners listeners;
+    volatile bool enabled;
     qpid::sys::Mutex lock;//protect listeners from concurrent access
     
     void handle(EventQueue::Queue& e);

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Tue Mar 10 23:10:57 2009
@@ -126,7 +126,7 @@
     FieldTable::ValuePtr v = settings.get(typeKey);
     if (v && v->convertsTo<std::string>()) {
         std::string t = v->get<std::string>();
-        transform(t.begin(), t.end(), t.begin(), tolower);        
+        std::transform(t.begin(), t.end(), t.begin(), tolower);        
         if (t == REJECT || t == FLOW_TO_DISK || t == RING || t == RING_STRICT) return t;
     }
     return FLOW_TO_DISK;
@@ -197,11 +197,12 @@
 void RingQueuePolicy::dequeued(const QueuedMessage& m)
 {
     qpid::sys::Mutex::ScopedLock l(lock);
-    QueuePolicy::dequeued(m);
     //find and remove m from queue
-    for (Messages::iterator i = queue.begin(); i != queue.end() && m.position <= i->position; i++) {
-        if (i->position == m.position) {
+    for (Messages::iterator i = queue.begin(); i != queue.end(); i++) {
+        if (i->payload == m.payload) {
             queue.erase(i);
+            //now update count and size
+            QueuePolicy::dequeued(m);
             break;
         }
     }
@@ -210,9 +211,11 @@
 bool RingQueuePolicy::isEnqueued(const QueuedMessage& m)
 {
     qpid::sys::Mutex::ScopedLock l(lock);
-    //for non-strict ring policy, a message can be dequeued before acked; need to detect this
-    for (Messages::iterator i = queue.begin(); i != queue.end() && m.position <= i->position; i++) {
-        if (i->position == m.position) {
+    //for non-strict ring policy, a message can be replaced (and
+    //therefore dequeued) before it is accepted or released by
+    //subscriber; need to detect this
+    for (Messages::const_iterator i = queue.begin(); i != queue.end(); i++) {
+        if (i->payload == m.payload) {
             return true;
         }
     }
@@ -236,13 +239,10 @@
         oldest = queue.front();
     }
     if (oldest.queue->acquire(oldest) || !strict) {
-        qpid::sys::Mutex::ScopedLock l(lock);
-        if (oldest.position == queue.front().position) {
-            queue.pop_front();
-            QPID_LOG(debug, "Ring policy triggered in queue " 
-                     << (m.queue ? m.queue->getName() : std::string("unknown queue"))
-                     << ": removed message " << oldest.position << " to make way for " << m.position);
-        }
+        oldest.queue->dequeue(0, oldest);
+        QPID_LOG(debug, "Ring policy triggered in queue " 
+                 << (m.queue ? m.queue->getName() : std::string("unknown queue"))
+                 << ": removed message " << oldest.position << " to make way for " << m.position);
         return true;
     } else {
         QPID_LOG(debug, "Ring policy could not be triggered in queue " 

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Tue Mar 10 23:10:57 2009
@@ -19,6 +19,7 @@
  *
  */
 #include "QueueRegistry.h"
+#include "QueueEvents.h"
 #include "qpid/log/Statement.h"
 #include <sstream>
 #include <assert.h>
@@ -27,7 +28,7 @@
 using namespace qpid::sys;
 
 QueueRegistry::QueueRegistry() :
-    counter(1), store(0), parent(0), lastNode(false) {}
+    counter(1), store(0), events(0), parent(0), lastNode(false) {}
 
 QueueRegistry::~QueueRegistry(){}
 
@@ -43,7 +44,8 @@
     if (i == queues.end()) {
         Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent));
         queues[name] = queue;
-		if (lastNode) queue->setLastNodeFailure();
+        if (lastNode) queue->setLastNodeFailure();
+        if (events) queue->setQueueEventManager(*events);
 
         return std::pair<Queue::shared_ptr, bool>(queue, true);
     } else {
@@ -105,3 +107,7 @@
     lastNode = _lastNode;
 }
 
+void QueueRegistry::setQueueEvents(QueueEvents* e)
+{
+    events = e;
+}

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueRegistry.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/QueueRegistry.h Tue Mar 10 23:10:57 2009
@@ -32,6 +32,8 @@
 namespace qpid {
 namespace broker {
 
+class QueueEvents;
+
 /**
  * A registry of queues indexed by queue name.
  *
@@ -90,6 +92,8 @@
      */
     string generateName();
 
+    void setQueueEvents(QueueEvents*);
+
     /**
      * Set the store to use.  May only be called once.
      */
@@ -124,6 +128,7 @@
     mutable qpid::sys::RWlock lock;
     int counter;
     MessageStore* store;
+    QueueEvents* events;
     management::Manageable* parent;
     bool lastNode; //used to set mode on queue declare
 

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Tue Mar 10 23:10:57 2009
@@ -149,7 +149,8 @@
 
 void RecoveryManagerImpl::recoveryComplete()
 {
-    //TODO (finalise binding setup etc)
+    //notify all queues
+    queues.eachQueue(boost::bind(&Queue::recoveryComplete, _1));
 }
 
 bool RecoverableMessageImpl::loadContent(uint64_t available)

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp Tue Mar 10 23:10:57 2009
@@ -141,21 +141,31 @@
 void NullAuthenticator::getMechanisms(Array& mechanisms)
 {
     mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value("ANONYMOUS")));
+    mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value("PLAIN")));//useful for testing
 }
 
 void NullAuthenticator::start(const string& mechanism, const string& response)
 {
     if (mechanism == "PLAIN") { // Old behavior
-        if (response.size() > 0 && response[0] == (char) 0) {
-            string temp = response.substr(1);
-            string::size_type i = temp.find((char)0);
-            string uid = temp.substr(0, i);
-            string pwd = temp.substr(i + 1);
-            i = uid.find_last_of(realm);
-            if (i == string::npos || i != (uid.size() - 1)) {
-                uid = str(format("%1%@%2%") % uid % realm);
+        if (response.size() > 0) {
+            string uid;
+            string::size_type i = response.find((char)0);
+            if (i == 0 && response.size() > 1) {
+                //no authorization id; use authentication id
+                i = response.find((char)0, 1);
+                if (i != string::npos) uid = response.substr(1, i-1);
+            } else if (i != string::npos) {
+                //authorization id is first null delimited field
+                uid = response.substr(0, i);
+            }//else not a valid SASL PLAIN response, throw error?            
+            if (!uid.empty()) {
+                //append realm if it has not already been added
+                i = uid.find(realm);
+                if (i == string::npos || realm.size() + i < uid.size()) {
+                    uid = str(format("%1%@%2%") % uid % realm);
+                }
+                connection.setUserId(uid);
             }
-            connection.setUserId(uid);
         }
     } else {
         connection.setUserId("anonymous");

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Mar 10 23:10:57 2009
@@ -355,20 +355,12 @@
 }
 
 void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
+    msg->setTimestamp(getSession().getBroker().getExpiryPolicy());
+    
     std::string exchangeName = msg->getExchangeName();
-    //TODO: the following should be hidden behind message (using MessageAdapter or similar)
-
-    if (msg->isA<MessageTransferBody>()) {
-        // Do not replace the delivery-properties.exchange if it is is already set.
-        // This is used internally (by the cluster) to force the exchange name on a message.
-        // The client library ensures this is always empty for messages from normal clients.
-        if (!msg->hasProperties<DeliveryProperties>() || msg->getProperties<DeliveryProperties>()->getExchange().empty())
-            msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
-        msg->setTimestamp(getSession().getBroker().getExpiryPolicy());
-    }
-    if (!cacheExchange || cacheExchange->getName() != exchangeName){
+    if (!cacheExchange || cacheExchange->getName() != exchangeName)
         cacheExchange = session.getBroker().getExchanges().get(exchangeName);
-    }
+    cacheExchange->setProperties(msg);
 
     /* verify the userid if specified: */
     std::string id =
@@ -516,14 +508,16 @@
 void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
 {
     if (byteCredit != 0xFFFFFFFF) {
-        byteCredit += value;
+        if (value == 0xFFFFFFFF) byteCredit = value;
+        else byteCredit += value;
     }
 }
 
 void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
 {
     if (msgCredit != 0xFFFFFFFF) {
-        msgCredit += value;
+        if (value == 0xFFFFFFFF) msgCredit = value;
+        else msgCredit += value;
     }
 }
 

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Tue Mar 10 23:10:57 2009
@@ -362,10 +362,6 @@
             getBroker().getExchanges().getDefault()->bind(queue, name, 0);
             queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments);
 
-            //if event generation is turned on, pass in a pointer to
-            //the QueueEvents instance to use
-            if (queue->getEventMode()) queue->setQueueEventManager(getBroker().getQueueEvents());
-
             //handle automatic cleanup:
             if (exclusive) {
                 exclusiveQueues.push_back(queue);

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionHandler.cpp Tue Mar 10 23:10:57 2009
@@ -34,7 +34,8 @@
 SessionHandler::SessionHandler(Connection& c, ChannelId ch)
     : amqp_0_10::SessionHandler(&c.getOutput(), ch),
       connection(c), 
-      proxy(out)
+      proxy(out),
+      clusterOrderProxy(c.getClusterOrderOutput() ? new SetChannelProxy(ch, c.getClusterOrderOutput()) : 0)
 {}
 
 SessionHandler::~SessionHandler() {}
@@ -84,11 +85,23 @@
     if (session.get()) session->readyToSend();
 }
 
-// TODO aconway 2008-05-12: hacky - handle attached for bridge clients.
-// We need to integrate the client code so we can run a real client
-// in the bridge.
-// 
-void SessionHandler::attached(const std::string& name) {
+/**
+ * Used by inter-broker bridges to set up session id and attach
+ */
+void SessionHandler::attachAs(const std::string& name)
+{
+    SessionId id(connection.getUserId(), name);
+    SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig();
+    session.reset(new SessionState(connection.getBroker(), *this, id, config));
+    sendAttach(false);
+}
+
+/**
+ * TODO: this is a little ugly, fix it; its currently still relied on
+ * for 'push' bridges
+ */
+void SessionHandler::attached(const std::string& name)
+{
     if (session.get()) {
         amqp_0_10::SessionHandler::attached(name);
     } else {

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionHandler.h Tue Mar 10 23:10:57 2009
@@ -54,10 +54,20 @@
     framing::AMQP_ClientProxy& getProxy() { return proxy; }
     const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
 
+    /**
+     * If commands are sent based on the local time (e.g. in timers), they don't have
+     * a well-defined ordering across cluster nodes.
+     * This proxy is for sending such commands. In a clustered broker it will take steps
+     * to synchronize command order across the cluster. In a stand-alone broker
+     * it is just a synonym for getProxy()
+     */  
+    framing::AMQP_ClientProxy& getClusterOrderProxy() {
+        return clusterOrderProxy.get() ? *clusterOrderProxy : proxy;
+    }
+
     virtual void handleDetach();
-    
-    // Overrides
-    void attached(const std::string& name);
+    void attached(const std::string& name);//used by 'pushing' inter-broker bridges
+    void attachAs(const std::string& name);//used by 'pulling' inter-broker bridges
 
   protected:
     virtual void setState(const std::string& sessionName, bool force);
@@ -69,9 +79,16 @@
     virtual void readyToSend();
 
   private:
+    struct SetChannelProxy : public framing::AMQP_ClientProxy { // Proxy that sets the channel.
+        framing::ChannelHandler setChannel;
+        SetChannelProxy(uint16_t ch, framing::FrameHandler* out)
+            : framing::AMQP_ClientProxy(setChannel), setChannel(ch, out) {}
+    };
+
     Connection& connection;
     framing::AMQP_ClientProxy proxy;
     std::auto_ptr<SessionState> session;
+    std::auto_ptr<SetChannelProxy> clusterOrderProxy;
 };
 
 }} // namespace qpid::broker

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionState.cpp Tue Mar 10 23:10:57 2009
@@ -66,7 +66,7 @@
     uint32_t maxRate = broker.getOptions().maxSessionRate;
     if (maxRate) {
         if (handler->getConnection().getClientThrottling()) {
-            rateFlowcontrol = new RateFlowcontrol(maxRate);
+            rateFlowcontrol.reset(new RateFlowcontrol(maxRate));
         } else {
             QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support");
         }
@@ -210,14 +210,17 @@
     {}
 
     void fire() {
-        QPID_LOG(critical, "ScheduledCreditTask fired"); // FIXME aconway 2009-02-23: REMOVE
         // This is the best we can currently do to avoid a destruction/fire race
         if (!isCancelled()) {
-            if ( !sessionState.processSendCredit(0) ) {
-                QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit");
-                reset();
-                timer.add(this);
-            }
+            sessionState.getConnection().requestIOProcessing(boost::bind(&ScheduledCreditTask::sendCredit, this));
+        }
+    }
+    
+    void sendCredit() {
+        if ( !sessionState.processSendCredit(0) ) {
+            QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit");
+            reset();
+            timer.add(this);
         }
     }
 };
@@ -275,7 +278,8 @@
     if ( msgs > 0 && rateFlowcontrol->flowStopped() ) {
         QPID_LOG(warning, getId() << ": producer throttling violation");
         // TODO: Probably do message.stop("") first time then disconnect
-        getProxy().getMessage().stop("");
+        // See comment on getClusterOrderProxy() in .h file
+        getClusterOrderProxy().getMessage().stop("");
         return true;
     }
     AbsTime now = AbsTime::now();
@@ -283,7 +287,7 @@
     if (mgmtObject) mgmtObject->dec_clientCredit(msgs);
     if ( sendCredit>0 ) {
         QPID_LOG(debug, getId() << ": send producer credit " << sendCredit);
-        getProxy().getMessage().flow("", 0, sendCredit);
+        getClusterOrderProxy().getMessage().flow("", 0, sendCredit);
         rateFlowcontrol->sentCredit(now, sendCredit);
         if (mgmtObject) mgmtObject->inc_clientCredit(sendCredit);
         return true;
@@ -364,8 +368,9 @@
         // Issue initial credit - use a heuristic here issue min of 300 messages or 1 secs worth
         uint32_t credit = std::min(rateFlowcontrol->getRate(), 300U);
         QPID_LOG(debug, getId() << ": Issuing producer message credit " << credit);
-        getProxy().getMessage().setFlowMode("", 0);
-        getProxy().getMessage().flow("", 0, credit);
+        // See comment on getClusterOrderProxy() in .h file
+        getClusterOrderProxy().getMessage().setFlowMode("", 0);
+        getClusterOrderProxy().getMessage().flow("", 0, credit);
         rateFlowcontrol->sentCredit(AbsTime::now(), credit);
         if (mgmtObject) mgmtObject->inc_clientCredit(credit);
     }
@@ -373,4 +378,8 @@
 
 Broker& SessionState::getBroker() { return broker; }
 
+framing::AMQP_ClientProxy& SessionState::getClusterOrderProxy() {
+    return handler->getClusterOrderProxy();
+}
+
 }} // namespace qpid::broker

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionState.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/SessionState.h Tue Mar 10 23:10:57 2009
@@ -125,6 +125,15 @@
 
     void sendAcceptAndCompletion();
 
+    /**
+     * If commands are sent based on the local time (e.g. in timers), they don't have
+     * a well-defined ordering across cluster nodes.
+     * This proxy is for sending such commands. In a clustered broker it will take steps
+     * to synchronize command order across the cluster. In a stand-alone broker
+     * it is just a synonym for getProxy()
+     */  
+    framing::AMQP_ClientProxy& getClusterOrderProxy();
+    
     Broker& broker;
     SessionHandler* handler;
     sys::AbsTime expiry;        // Used by SessionManager.
@@ -138,7 +147,7 @@
 
     // State used for producer flow control (rate limited)
     qpid::sys::Mutex rateLock;
-    RateFlowcontrol* rateFlowcontrol;
+    boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol;
     boost::intrusive_ptr<TimerTask> flowControlTimer;
 
     friend class SessionManager;

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/TxAccept.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/TxAccept.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/TxAccept.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/broker/TxAccept.cpp Tue Mar 10 23:10:57 2009
@@ -50,12 +50,12 @@
 
 void TxAccept::RangeOps::prepare(TransactionContext* ctxt)
 {
-    for_each(ranges.begin(), ranges.end(), bind(&RangeOp::prepare, _1, ctxt));
+    std::for_each(ranges.begin(), ranges.end(), bind(&RangeOp::prepare, _1, ctxt));
 }
 
 void TxAccept::RangeOps::commit()
 {
-    for_each(ranges.begin(), ranges.end(), bind(&RangeOp::commit, _1));
+    std::for_each(ranges.begin(), ranges.end(), bind(&RangeOp::commit, _1));
     //now remove if isRedundant():
     if (!ranges.empty()) {
         ack_iterator i = ranges.front().range.start;

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Tue Mar 10 23:10:57 2009
@@ -180,6 +180,9 @@
 
 
 template <class F> void ConnectionImpl::closeInternal(const F& f) {
+    if (heartbeatTask) {
+        heartbeatTask->cancel();
+    }
     {
         Mutex::ScopedUnlock u(lock);
         connector->close();

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/Connector.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/Connector.cpp Tue Mar 10 23:10:57 2009
@@ -92,8 +92,6 @@
     
     framing::ProtocolVersion version;
     bool initiated;
-
-    sys::Mutex closedLock;    
     bool closed;
     bool joined;
 
@@ -185,7 +183,7 @@
 }
 
 void TCPConnector::connect(const std::string& host, int port){
-    Mutex::ScopedLock l(closedLock);
+    Mutex::ScopedLock l(lock);
     assert(closed);
     try {
         socket.connect(host, port);
@@ -207,7 +205,7 @@
 }
 
 void TCPConnector::init(){
-    Mutex::ScopedLock l(closedLock);
+    Mutex::ScopedLock l(lock);
     assert(joined);
     ProtocolInitiation init(version);
     writeDataBlock(init);
@@ -216,17 +214,21 @@
 }
 
 bool TCPConnector::closeInternal() {
-    Mutex::ScopedLock l(closedLock);
-    bool ret = !closed;
+    bool ret;
+    {
+    Mutex::ScopedLock l(lock);
+    ret = !closed;
     if (!closed) {
         closed = true;
+        aio->queueForDeletion();
         poller->shutdown();
     }
-    if (!joined && receiver.id() != Thread::current().id()) {
-        joined = true;
-        Mutex::ScopedUnlock u(closedLock);
-        receiver.join();
+    if (joined || receiver.id() == Thread::current().id()) {
+        return ret;
     }
+    joined = true;
+    }
+    receiver.join();
     return ret;
 }
         
@@ -259,21 +261,19 @@
 }
 
 void TCPConnector::send(AMQFrame& frame) {
+    Mutex::ScopedLock l(lock);
+    frames.push_back(frame);
+    //only ask to write if this is the end of a frameset or if we
+    //already have a buffers worth of data
+    currentSize += frame.encodedSize();
     bool notifyWrite = false;
-    {
-        Mutex::ScopedLock l(lock);
-        frames.push_back(frame);
-        //only ask to write if this is the end of a frameset or if we
-        //already have a buffers worth of data
-        currentSize += frame.encodedSize();
-        if (frame.getEof()) {
-            lastEof = frames.size();
-            notifyWrite = true;
-        } else {
-            notifyWrite = (currentSize >= maxFrameSize);
-        }
+    if (frame.getEof()) {
+        lastEof = frames.size();
+        notifyWrite = true;
+    } else {
+        notifyWrite = (currentSize >= maxFrameSize);
     }
-    if (notifyWrite) aio->notifyPendingWrite();
+    if (notifyWrite && !closed) aio->notifyPendingWrite();
 }
 
 void TCPConnector::handleClosed() {
@@ -384,14 +384,13 @@
     assert(protect);
     try {
         Dispatcher d(poller);
-	
+
         for (int i = 0; i < 32; i++) {
             aio->queueReadBuffer(new Buff(maxFrameSize));
         }
-	
+
         aio->start(poller);
         d.run();
-        aio->queueForDeletion();
         socket.close();
     } catch (const std::exception& e) {
         QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what()));

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/Dispatcher.cpp Tue Mar 10 23:10:57 2009
@@ -136,8 +136,7 @@
 
 void Dispatcher::cancel(const std::string& destination) {
     ScopedLock<Mutex> l(lock);
-    listeners.erase(destination);
-    if (autoStop && listeners.empty())
+    if (listeners.erase(destination) && running && autoStop && listeners.empty())
         queue->close();
 }
 

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/FailoverManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/FailoverManager.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/FailoverManager.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/FailoverManager.cpp Tue Mar 10 23:10:57 2009
@@ -21,12 +21,15 @@
 #include "FailoverManager.h"
 #include "qpid/Exception.h"
 #include "qpid/log/Statement.h"
+#include "qpid/sys/Time.h"
 
 
 namespace qpid {
 namespace client {
 
 using qpid::sys::Monitor;
+using qpid::sys::AbsTime;
+using qpid::sys::Duration;
 
 FailoverManager::FailoverManager(const ConnectionSettings& s, 
                                  ReconnectionStrategy* rs) : settings(s), strategy(rs), state(IDLE) {}
@@ -35,15 +38,21 @@
 {
     bool retry = false;
     bool completed = false;
+    AbsTime failed;
     while (!completed) {
         try {
             AsyncSession session = connect().newSession();
+            if (retry) {
+                Duration failoverTime(failed, AbsTime::now());
+                QPID_LOG(info, "Failed over for " << &c << " in " << (failoverTime/qpid::sys::TIME_MSEC) << " milliseconds");
+            }
             c.execute(session, retry);
             session.sync();//TODO: shouldn't be required
             session.close();
             completed = true;
         } catch(const TransportFailure&) {
-            retry = true;            
+            retry = true;
+            failed = AbsTime::now();
         }            
     }
 }

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/SessionImpl.cpp Tue Mar 10 23:10:57 2009
@@ -512,6 +512,7 @@
     if (id.getName() != _name) throw InternalErrorException("Incorrect session name");
     setState(DETACHED);
     QPID_LOG(info, "Session detached by peer: " << id);
+    proxy.detached(_name, DETACH_CODE_NORMAL);
 }
 
 void SessionImpl::detached(const std::string& _name, uint8_t _code) {
@@ -744,7 +745,8 @@
 
 void SessionImpl::handleClosed()
 {
-    demux.close(exceptionHolder.empty() ? new ClosedException() : exceptionHolder);
+    demux.close(exceptionHolder.empty() ?
+                sys::ExceptionHolder(new ClosedException()) : exceptionHolder);
     results.close();
 }
 

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/SslConnector.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/SslConnector.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/client/SslConnector.cpp Tue Mar 10 23:10:57 2009
@@ -221,6 +221,7 @@
     bool ret = !closed;
     if (!closed) {
         closed = true;
+        aio->queueForDeletion();
         poller->shutdown();
     }
     if (!joined && receiver.id() != Thread::current().id()) {
@@ -386,7 +387,6 @@
 	
         aio->start(poller);
         d.run();
-        aio->queueForDeletion();
         socket.close();
     } catch (const std::exception& e) {
         QPID_LOG(error, e.what());

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Mar 10 23:10:57 2009
@@ -21,7 +21,9 @@
 #include "Connection.h"
 #include "UpdateClient.h"
 #include "FailoverExchange.h"
+#include "UpdateExchange.h"
 
+#include "qpid/assert.h"
 #include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
 #include "qmf/org/apache/qpid/cluster/Package.h"
 #include "qpid/broker/Broker.h"
@@ -91,9 +93,10 @@
     cpg(*this),
     name(settings.name),
     myUrl(settings.url.empty() ? Url() : Url(settings.url)),
-    myId(cpg.self()),
+    self(cpg.self()),
     readMax(settings.readMax),
     writeEstimate(settings.writeEstimate),
+    expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())),
     mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
     dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
     deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1),
@@ -104,15 +107,12 @@
                       boost::bind(&Cluster::leave, this),
                       "Error delivering frames",
                       poller),
-    connections(*this),
-    decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1), connections),
-    expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, myId, broker.getTimer())),
-    frameId(0),
     initialized(false),
+    decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
+    discarding(true),
     state(INIT),
     lastSize(0),
-    lastBroker(false),
-    sequence(0)
+    lastBroker(false)
 {
     mAgent = ManagementAgent::Singleton::getInstance();
     if (mAgent != 0){
@@ -122,7 +122,13 @@
         mgmtObject->set_status("JOINING");
     }
 
+    // Failover exchange provides membership updates to clients.
     failoverExchange.reset(new FailoverExchange(this));
+    broker.getExchanges().registerExchange(failoverExchange);
+
+    // Update exchange is used during updates to replicate messages without modifying delivery-properties.exchange.
+    broker.getExchanges().registerExchange(boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
+
     if (settings.quorum) quorum.init();
     cpg.join(name);
     // pump the CPG dispatch manually till we get initialized. 
@@ -149,21 +155,21 @@
 
 // Called in connection thread to insert a client connection.
 void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
-    Lock l(lock);
-    connections.insert(c);
+    localConnections.insert(c);
 }
 
 // Called in connection thread to insert an updated shadow connection.
 void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
-    Lock l(lock);
-    assert(state <= UPDATEE);   // Only during update.
-    connections.insert(c);
+    // Safe to use connections here because we're pre-catchup, either
+    // discarding or stalled, so deliveredFrame is not processing any
+    // connection events.
+    assert(discarding);         
+    connections.insert(ConnectionMap::value_type(c->getId(), c));
 }
 
+// Called by Connection::deliverClose() in deliverFrameQueue thread.
 void Cluster::erase(const ConnectionId& id) {
-    // Called only by Connection::deliverClose in deliver thread, no need to lock.
     connections.erase(id);
-    decoder.erase(id);
 }
 
 std::vector<string> Cluster::getIds() const {
@@ -193,7 +199,6 @@
     if (state != LEFT) {
         state = LEFT;
         QPID_LOG(notice, *this << " leaving cluster " << name);
-        connections.clear();
         try { broker.shutdown(); }
         catch (const std::exception& e) {
             QPID_LOG(critical, *this << " error during broker shutdown: " << e.what());
@@ -213,52 +218,88 @@
     MemberId from(nodeid, pid);
     framing::Buffer buf(static_cast<char*>(msg), msg_len);
     Event e(Event::decodeCopy(from, buf));
-    e.setSequence(sequence++);
-    if (from == myId)  // Record self-deliveries for flow control.
+    if (from == self)  // Record self-deliveries for flow control.
         mcast.selfDeliver(e);
-    deliver(e);
+    deliverEvent(e);
 }
 
-void Cluster::deliver(const Event& e) {
-    if (state == LEFT) return;
-    QPID_LATENCY_INIT(e);
+void Cluster::deliverEvent(const Event& e) {
     deliverEventQueue.push(e);
 }
 
-// Handler for deliverEventQueue
+void Cluster::deliverFrame(const EventFrame& e) {
+    deliverFrameQueue.push(e);
+}
+
+// Handler for deliverEventQueue.
+// This thread decodes frames from events.
 void Cluster::deliveredEvent(const Event& e) {
-    QPID_LATENCY_RECORD("delivered event queue", e);
-    Buffer buf(const_cast<char*>(e.getData()), e.getSize());
-    if (e.getType() == CONTROL) {
-        AMQFrame frame;
-        while (frame.decode(buf)) 
-            deliverFrameQueue.push(EventFrame(e, frame));
-    }
-    else if (e.getType() == DATA)
-        decoder.decode(e, e.getData());
+        QPID_LOG(trace, *this << " DLVR: " << e);
+    if (e.isCluster()) {
+        EventFrame ef(e, e.getFrame());
+        // Stop the deliverEventQueue on update offers.
+        // This preserves the connection decoder fragments for an update.
+        ClusterUpdateOfferBody* offer = dynamic_cast<ClusterUpdateOfferBody*>(ef.frame.getBody());
+        if (offer)
+            deliverEventQueue.stop();
+        deliverFrame(ef);
+    }
+    else if(!discarding) {    
+        if (e.isControl())
+            deliverFrame(EventFrame(e, e.getFrame()));
+        else
+            decoder.decode(e, e.getData());
+}
+    else // Discard connection events if discarding is set.
+        QPID_LOG(trace, *this << " DROP: " << e);
 }
 
-// Handler for deliverFrameQueue
+// Handler for deliverFrameQueue.
+// This thread executes the main logic.
 void Cluster::deliveredFrame(const EventFrame& e) {
     Mutex::ScopedLock l(lock);
-    const_cast<AMQFrame&>(e.frame).setClusterId(frameId++);
-    QPID_LOG(trace, *this << " DLVR: " << e);
-    QPID_LATENCY_RECORD("delivered frame queue", e.frame);
-    if (e.isCluster()) {        // Cluster control frame
+    if (e.isCluster()) {
+        QPID_LOG(trace, *this << " DLVR: " << e);
         ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
         if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled())
             throw Exception(QPID_MSG("Invalid cluster control"));
     }
-    else {                      // Connection frame.
-        if (state <= UPDATEE) {
-            QPID_LOG(trace, *this << " DROP: " << e);
-            return;
-        }
-        boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId);
-        if (connection)         // Ignore frames to closed local connections.
+    else if (state >= CATCHUP) {
+        QPID_LOG(trace, *this << " DLVR:  " << e);
+        ConnectionPtr connection = getConnection(e.connectionId, l);
+        if (connection)
             connection->deliveredFrame(e);
     }
-    QPID_LATENCY_RECORD("processed", e.frame);
+    else // Drop connection frames while state < CATCHUP
+        QPID_LOG(trace, *this << " DROP: " << e);        
+}
+
+// Called in deliverFrameQueue thread
+ConnectionPtr Cluster::getConnection(const ConnectionId& id, Lock&) {
+    ConnectionPtr cp;
+    ConnectionMap::iterator i = connections.find(id);
+    if (i != connections.end())
+        cp = i->second;
+    else {
+        if(id.getMember() == self) 
+            cp = localConnections.getErase(id);
+        else {
+            // New remote connection, create a shadow.
+            std::ostringstream mgmtId;
+            mgmtId << id;
+            cp = new Connection(*this, shadowOut, mgmtId.str(), id);
+        }
+        if (cp)
+            connections.insert(ConnectionMap::value_type(id, cp));
+    }
+    return cp;
+}
+
+Cluster::ConnectionVector Cluster::getConnections(Lock&) {
+    ConnectionVector result(connections.size());
+    std::transform(connections.begin(), connections.end(), result.begin(),
+                   boost::bind(&ConnectionMap::value_type::second, _1));
+    return result;
 }
   
 struct AddrList {
@@ -306,42 +347,45 @@
     std::string addresses;
     for (cpg_address* p = current; p < current+nCurrent; ++p) 
         addresses.append(MemberId(*p).str());
-    deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), myId));
+    deliverEvent(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self));
 }
 
 void Cluster::setReady(Lock&) {
     state = READY;
     if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
     mcast.release();
+    broker.getQueueEvents().enable();
 }
 
 void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& l) {
     bool memberChange = map.configChange(addresses);
     if (state == LEFT) return;
     
-    if (!map.isAlive(myId)) {  // Final config change.
+    if (!map.isAlive(self)) {  // Final config change.
         leave(l);
         return;
     }
 
     if (state == INIT) {        // First configChange
         if (map.aliveCount() == 1) {
-            setClusterId(true);
+            setClusterId(true, l);
+            discarding = false;
             setReady(l);
-            map = ClusterMap(myId, myUrl, true);
+            map = ClusterMap(self, myUrl, true);
             memberUpdate(l);
             QPID_LOG(notice, *this << " first in cluster");
         }
         else {                  // Joining established group.
             state = JOINER;
             QPID_LOG(info, *this << " joining cluster: " << map);
-            mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), myId);
+            mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
             elders = map.getAlive();
-            elders.erase(myId);
+            elders.erase(self);
             broker.getLinks().setPassive(true);
+            broker.getQueueEvents().disable();
         }
-    }
-    else if (state >= READY && memberChange) {
+    } 
+    else if (state >= CATCHUP && memberChange) {
         memberUpdate(l);
         elders = ClusterMap::intersection(elders, map.getAlive());
         if (elders.empty()) {
@@ -351,13 +395,11 @@
     }
 }
 
-bool Cluster::isLeader() const { return elders.empty(); }
-
-void Cluster::tryMakeOffer(const MemberId& id, Lock& ) {
+void Cluster::makeOffer(const MemberId& id, Lock& ) {
     if (state == READY && map.isJoiner(id)) {
         state = OFFER;
         QPID_LOG(info, *this << " send update-offer to " << id);
-        mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), myId);
+        mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), self);
     }
 }
 
@@ -367,88 +409,89 @@
 // callbacks will be invoked.
 // 
 void Cluster::brokerShutdown()  {
-    if (state != LEFT) {
-        try { cpg.shutdown(); }
-        catch (const std::exception& e) {
-            QPID_LOG(error, *this << " shutting down CPG: " << e.what());
-        }
+    try { cpg.shutdown(); }
+    catch (const std::exception& e) {
+        QPID_LOG(error, *this << " shutting down CPG: " << e.what());
     }
     delete this;
 }
 
 void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) {
     map.updateRequest(id, url);
-    tryMakeOffer(id, l);
+    makeOffer(id, l);
 }
 
 void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
     if (map.ready(id, Url(url))) 
         memberUpdate(l);
-    if (state == CATCHUP && id == myId) {
+    if (state == CATCHUP && id == self) {
         setReady(l);
         QPID_LOG(notice, *this << " caught up, active cluster member");
     }
 }
 
 void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) {
+    // NOTE: deliverEventQueue has been stopped at the update offer by
+    // deliveredEvent in case an update is required.
     if (state == LEFT) return;
     MemberId updatee(updateeInt);
     boost::optional<Url> url = map.updateOffer(updater, updatee);
-    if (updater == myId) {
+    if (updater == self) {
         assert(state == OFFER);
-        if (url) {              // My offer was first.
+        if (url)               // My offer was first.
             updateStart(updatee, *url, l);
-        }
         else {                  // Another offer was first.
+            deliverEventQueue.start(); // Don't need to update
             setReady(l);
             QPID_LOG(info, *this << " cancelled update offer to " << updatee);
-            tryMakeOffer(map.firstJoiner(), l); // Maybe make another offer.
+            makeOffer(map.firstJoiner(), l); // Maybe make another offer.
         }
     }
-    else if (updatee == myId && url) {
+    else if (updatee == self && url) {
         assert(state == JOINER);
-        setClusterId(uuid);
+        setClusterId(uuid, l);
         state = UPDATEE;
         QPID_LOG(info, *this << " receiving update from " << updater);
-        deliverFrameQueue.stop();
         checkUpdateIn(l);
     }
+    else
+        deliverEventQueue.start(); // Don't need to update
 }
 
-void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) {
+void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
+    // NOTE: deliverEventQueue is already stopped at the stall point by deliveredEvent.
     if (state == LEFT) return;
     assert(state == OFFER);
     state = UPDATER;
-    QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url);
-    deliverFrameQueue.stop();
-    if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
+    QPID_LOG(info, *this << " sending update to " << updatee << " at " << url);
+    if (updateThread.id())
+        updateThread.join(); // Join the previous updateThread to avoid leaks.
     client::ConnectionSettings cs;
     cs.username = settings.username;
     cs.password = settings.password;
     cs.mechanism = settings.mechanism;
     updateThread = Thread(
-        new UpdateClient(myId, updatee, url, broker, map, frameId, connections.values(),
+        new UpdateClient(self, updatee, url, broker, map, *expiryPolicy, getConnections(l), decoder,
                          boost::bind(&Cluster::updateOutDone, this),
                          boost::bind(&Cluster::updateOutError, this, _1),
                          cs));
 }
 
 // Called in update thread.
-void Cluster::updateInDone(const ClusterMap& m, uint64_t fid) {
+void Cluster::updateInDone(const ClusterMap& m) {
     Lock l(lock);
     updatedMap = m;
-    frameId = fid;
     checkUpdateIn(l);
 }
 
-void Cluster::checkUpdateIn(Lock& ) {
-    if (state == LEFT) return;
+void Cluster::checkUpdateIn(Lock&) {
     if (state == UPDATEE && updatedMap) {
         map = *updatedMap;
-        mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId);
+        mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
         state = CATCHUP;
+        discarding = false;     // ok to set, we're stalled for update.
         QPID_LOG(info, *this << " received update, starting catch-up");
-        deliverFrameQueue.start();
+        deliverEventQueue.start();
     }
 }
 
@@ -462,8 +505,8 @@
     assert(state == UPDATER);
     state = READY;
     mcast.release();
-    deliverFrameQueue.start();
-    tryMakeOffer(map.firstJoiner(), l); // Try another offer
+    deliverEventQueue.start();       // Start processing events again.
+    makeOffer(map.firstJoiner(), l); // Try another offer
 }
 
 void Cluster::updateOutError(const std::exception& e)  {
@@ -487,7 +530,7 @@
         {
             _qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args;
             stringstream stream;
-            stream << myId;
+            stream << self;
             if (iargs.i_brokerId == stream.str())
                 stopClusterNode(l);
         }
@@ -508,7 +551,7 @@
 
 void Cluster::stopFullCluster(Lock& ) {
     QPID_LOG(notice, *this << " shutting down cluster " << name);
-    mcast.mcastControl(ClusterShutdownBody(), myId);
+    mcast.mcastControl(ClusterShutdownBody(), self);
 }
 
 void Cluster::memberUpdate(Lock& l) {
@@ -518,13 +561,13 @@
     size_t size = urls.size();
     failoverExchange->setUrls(urls);
 
-    if (size == 1 && lastSize > 1 && state >= READY) { 
-        QPID_LOG(info, *this << " last broker standing, update queue policies");
+    if (size == 1 && lastSize > 1 && state >= CATCHUP) { 
+        QPID_LOG(notice, *this << " last broker standing, update queue policies");
         lastBroker = true;
         broker.getQueues().updateQueueClusterState(true);
     }
     else if (size > 1 && lastBroker) {
-        QPID_LOG(info, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size);
+        QPID_LOG(notice, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size);
         lastBroker = false;
         broker.getQueues().updateQueueClusterState(false);
     }
@@ -546,17 +589,23 @@
         mgmtObject->set_memberIDs(idstr);
     }
 
-    // Close connections belonging to members that have now been excluded
-    connections.update(myId, map);
+    // Erase connections belonging to members that have left the cluster.
+    ConnectionMap::iterator i = connections.begin();
+    while (i != connections.end()) {
+        ConnectionMap::iterator j = i++;
+        MemberId m = j->second->getId().getMember();
+        if (m != self && !map.isMember(m))
+            connections.erase(j);
+    }
 }
 
 std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
     static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" };
-    return o << cluster.myId << "(" << STATE[cluster.state] << ")";
+    return o << cluster.self << "(" << STATE[cluster.state] << ")";
 }
 
 MemberId Cluster::getId() const {
-    return myId;            // Immutable, no need to lock.
+    return self;            // Immutable, no need to lock.
 }
 
 broker::Broker& Cluster::getBroker() const {
@@ -571,11 +620,11 @@
     }
 }
 
-void Cluster::setClusterId(const Uuid& uuid) {
+void Cluster::setClusterId(const Uuid& uuid, Lock&) {
     clusterId = uuid;
     if (mgmtObject) {
         stringstream stream;
-        stream << myId;
+        stream << self;
         mgmtObject->set_clusterID(clusterId.str());
         mgmtObject->set_memberID(stream.str());
     }

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cluster.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cluster.h Tue Mar 10 23:10:57 2009
@@ -19,34 +19,34 @@
  *
  */
 
-#include "ClusterSettings.h"
 #include "ClusterMap.h"
-#include "ConnectionMap.h"
+#include "ClusterSettings.h"
 #include "Cpg.h"
+#include "Decoder.h"
 #include "Event.h"
+#include "EventFrame.h"
+#include "ExpiryPolicy.h"
 #include "FailoverExchange.h"
+#include "LockedConnectionMap.h"
 #include "Multicaster.h"
-#include "EventFrame.h"
 #include "NoOpConnectionOutputHandler.h"
+#include "PollableQueue.h"
 #include "PollerDispatch.h"
 #include "Quorum.h"
-#include "Decoder.h"
-#include "PollableQueue.h"
-#include "ExpiryPolicy.h"
 
+#include "qmf/org/apache/qpid/cluster/Cluster.h"
+#include "qpid/Url.h"
 #include "qpid/broker/Broker.h"
-#include "qpid/sys/Monitor.h"
 #include "qpid/management/Manageable.h"
-#include "qpid/Url.h"
-#include "qmf/org/apache/qpid/cluster/Cluster.h"
+#include "qpid/sys/Monitor.h"
 
-#include <boost/intrusive_ptr.hpp>
 #include <boost/bind.hpp>
+#include <boost/intrusive_ptr.hpp>
 #include <boost/optional.hpp>
 
 #include <algorithm>
-#include <vector>
 #include <map>
+#include <vector>
 
 namespace qpid {
 
@@ -58,6 +58,7 @@
 namespace cluster {
 
 class Connection;
+class EventFrame;
 
 /**
  * Connection to the cluster
@@ -65,82 +66,91 @@
 class Cluster : private Cpg::Handler, public management::Manageable {
   public:
     typedef boost::intrusive_ptr<Connection> ConnectionPtr;
-    typedef std::vector<ConnectionPtr> Connections;
+    typedef std::vector<ConnectionPtr> ConnectionVector;
 
-    /** Construct the cluster in plugin earlyInitialize */ 
+    // Public functions are thread safe unless otherwise mentioned in a comment.
+
+    // Construct the cluster in plugin earlyInitialize.
     Cluster(const ClusterSettings&, broker::Broker&);
     virtual ~Cluster();
 
-    /** Join the cluster in plugin initialize. Requires transport
-     * plugins to be available.. */
+    // Called by plugin initialize: cluster start-up requires transport plugins .
+    // Thread safety: only called by plugin initialize.
     void initialize();
 
-    // Connection map - called in connection threads.
+    // Connection map.
     void addLocalConnection(const ConnectionPtr&); 
     void addShadowConnection(const ConnectionPtr&); 
     void erase(const ConnectionId&);       
     
-    // URLs of current cluster members - called in connection threads.
+    // URLs of current cluster members.
     std::vector<std::string> getIds() const;
     std::vector<Url> getUrls() const;
     boost::shared_ptr<FailoverExchange> getFailoverExchange() const { return failoverExchange; }
 
-    // Leave the cluster - called in any thread.
+    // Leave the cluster - called when fatal errors occur.
     void leave();
 
     // Update completed - called in update thread
-    void updateInDone(const ClusterMap&, uint64_t frameId);
+    void updateInDone(const ClusterMap&);
 
     MemberId getId() const;
     broker::Broker& getBroker() const;
     Multicaster& getMulticast() { return mcast; }
 
-    boost::function<bool ()> isQuorate;
-    void checkQuorum();         // called in connection threads.
+    void checkQuorum();
 
     size_t getReadMax() { return readMax; }
     size_t getWriteEstimate() { return writeEstimate; }
 
-    bool isLeader() const;       // Called in deliver thread.
+    void deliverFrame(const EventFrame&);
+
+    // Called only during update by Connection::shadowReady
+    Decoder& getDecoder() { return decoder; }
+
+    ExpiryPolicy& getExpiryPolicy() { return *expiryPolicy; }
     
   private:
     typedef sys::Monitor::ScopedLock Lock;
 
     typedef PollableQueue<Event> PollableEventQueue;
     typedef PollableQueue<EventFrame> PollableFrameQueue;
+    typedef std::map<ConnectionId, ConnectionPtr> ConnectionMap;
 
-    // NB: The final Lock& parameter on functions below is used to mark functions
-    // that should only be called by a function that already holds the lock.
-    // The parameter makes it hard to forget since you have to have an instance of
-    // a Lock to call the unlocked functions.
-
+    // NB: A dummy Lock& parameter marks functions that must only be
+    // called with Cluster::lock  locked.
+ 
     void leave(Lock&);
     std::vector<std::string> getIds(Lock&) const;
     std::vector<Url> getUrls(Lock&) const;
 
-    // Make an offer if we can - called in deliver thread.
-    void tryMakeOffer(const MemberId&, Lock&);
-
-    // Called in main thread in ~Broker.
+    // == Called in main thread from Broker destructor.
     void brokerShutdown();
 
+    // == Called in deliverEventQueue thread
+    void deliveredEvent(const Event&); 
+
+    // == Called in deliverFrameQueue thread
+    void deliveredFrame(const EventFrame&); 
+
     // Cluster controls implement XML methods from cluster.xml.
-    // Called in deliver thread.
-    // 
     void updateRequest(const MemberId&, const std::string&, Lock&);
     void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&);
     void ready(const MemberId&, const std::string&, Lock&);
     void configChange(const MemberId&, const std::string& addresses, Lock& l);
     void messageExpired(const MemberId&, uint64_t, Lock& l);
     void shutdown(const MemberId&, Lock&);
-    void deliveredEvent(const Event&); 
-    void deliveredFrame(const EventFrame&); 
 
-    // Helper, called in deliver thread.
+    // Helper functions
+    ConnectionPtr getConnection(const ConnectionId&, Lock&);
+    ConnectionVector getConnections(Lock&);
     void updateStart(const MemberId& updatee, const Url& url, Lock&);
-
+    void makeOffer(const MemberId&, Lock&);
     void setReady(Lock&);
+    void memberUpdate(Lock&);
+    void setClusterId(const framing::Uuid&, Lock&);
 
+    // == Called in CPG dispatch thread
     void deliver( // CPG deliver callback. 
         cpg_handle_t /*handle*/,
         struct cpg_name *group,
@@ -149,7 +159,7 @@
         void* /*msg*/,
         int /*msg_len*/);
 
-    void deliver(const Event&);
+    void deliverEvent(const Event&);
     
     void configChange( // CPG config change callback.
         cpg_handle_t /*handle*/,
@@ -159,23 +169,21 @@
         struct cpg_address */*joined*/, int /*nJoined*/
     );
 
+    // == Called in management threads.
     virtual qpid::management::ManagementObject* GetManagementObject() const;
     virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
 
     void stopClusterNode(Lock&);
     void stopFullCluster(Lock&);
-    void memberUpdate(Lock&);
 
-    // Called in connection IO threads .
+    // == Called in connection IO threads .
     void checkUpdateIn(Lock&);
 
-    // Called in UpdateClient thread.
+    // == Called in UpdateClient thread.
     void updateOutDone();
     void updateOutError(const std::exception&);
     void updateOutDone(Lock&);
 
-    void setClusterId(const framing::Uuid&);
-
     // Immutable members set on construction, never changed.
     ClusterSettings settings;
     broker::Broker& broker;
@@ -184,34 +192,38 @@
     Cpg cpg;
     const std::string name;
     Url myUrl;
-    const MemberId myId;
+    const MemberId self;
     const size_t readMax;
     const size_t writeEstimate;
     framing::Uuid clusterId;
     NoOpConnectionOutputHandler shadowOut;
     qpid::management::ManagementAgent* mAgent;
+    boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
 
     // Thread safe members
     Multicaster mcast;
     PollerDispatch dispatcher;
     PollableEventQueue deliverEventQueue;
     PollableFrameQueue deliverFrameQueue;
-    ConnectionMap connections;
     boost::shared_ptr<FailoverExchange> failoverExchange;
     Quorum quorum;
-
-    // Used only in delivery thread
-    Decoder decoder;
-    ClusterMap::Set elders;
-    boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
-    uint64_t frameId;
+    LockedConnectionMap localConnections;
 
     // Used only during initialization
     bool initialized;
 
-    // Remaining members are protected by lock
+    // Used only in deliverEventQueue thread or when stalled for update.
+    Decoder decoder;
+    bool discarding;
+    
+    // Remaining members are protected by lock.
+    // FIXME aconway 2009-03-06: Most of these members are also only used in
+    // deliverFrameQueue thread or during stall. Review and separate members
+    // that require a lock, drop lock when not needed.
+    // 
     mutable sys::Monitor lock;
 
+
     //    Local cluster state, cluster map
     enum {
         INIT,    ///< Initial state, no CPG messages received.
@@ -223,15 +235,16 @@
         UPDATER, ///< Offer accepted, sending a state update.
         LEFT     ///< Final state, left the cluster.
     } state;
+
+    ConnectionMap connections;
     ClusterMap map;
+    ClusterMap::Set elders;
     size_t lastSize;
     bool lastBroker;
-    uint64_t sequence;
-
-    //     Update related
     sys::Thread updateThread;
     boost::optional<ClusterMap> updatedMap;
 
+
   friend std::ostream& operator<<(std::ostream&, const Cluster&);
   friend class ClusterDispatcher;
 };

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Tue Mar 10 23:10:57 2009
@@ -138,7 +138,6 @@
         broker->setConnectionFactory(
             boost::shared_ptr<sys::ConnectionCodec::Factory>(
                 new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));
-        broker->getExchanges().registerExchange(cluster->getFailoverExchange());
         ManagementBroker* mgmt = dynamic_cast<ManagementBroker*>(ManagementAgent::Singleton::getInstance());
         if (mgmt) {
             std::auto_ptr<IdAllocator> allocator(new UpdateClientIdAllocator());

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ClusterSettings.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ClusterSettings.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ClusterSettings.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ClusterSettings.h Tue Mar 10 23:10:57 2009
@@ -35,7 +35,7 @@
     size_t readMax, writeEstimate;
     std::string username, password, mechanism;
 
-    ClusterSettings() : quorum(false), readMax(10), writeEstimate(64), username("guest"), password("guest") {}
+    ClusterSettings() : quorum(false), readMax(10), writeEstimate(64) {}
   
     Url getUrl(uint16_t port) const {
         if (url.empty()) return Url::getIpAddressesUrl(port);

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Mar 10 23:10:57 2009
@@ -40,6 +40,7 @@
 #include "qpid/framing/ConnectionCloseOkBody.h"
 #include "qpid/log/Statement.h"
 #include "qpid/sys/LatencyMetric.h"
+#include "qpid/sys/AtomicValue.h"
 
 #include <boost/current_function.hpp>
 
@@ -58,27 +59,36 @@
 
 NoOpConnectionOutputHandler Connection::discardHandler;
 
-// Shadow connections
-Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
-                       const std::string& wrappedId, ConnectionId myId)
-    : cluster(c), self(myId), catchUp(false), output(*this, out),
-      connection(&output, cluster.getBroker(), wrappedId), expectProtocolHeader(false)
+namespace {
+sys::AtomicValue<uint64_t> idCounter;
+}
+
+// Shadow connection
+Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id)
+    : cluster(c), self(id), catchUp(false), output(*this, out),
+      connection(&output, cluster.getBroker(), logId), expectProtocolHeader(false),
+      mcastFrameHandler(cluster.getMulticast(), self)
 { init(); }
 
-// Local connections
+// Local connection
 Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
-                       const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink)
-    : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
-      connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId : 0),
-      expectProtocolHeader(isLink)
+                       const std::string& logId, MemberId member, bool isCatchUp, bool isLink)
+    : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out),
+      connection(&output, cluster.getBroker(), logId, isLink, catchUp ? ++catchUpId : 0),
+      expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self)
 { init(); }
 
 void Connection::init() {
     QPID_LOG(debug, cluster << " new connection: " << *this);
-    if (isLocalClient()) {
+    if (isLocalClient()) {  
+        connection.setClusterOrderOutput(mcastFrameHandler); // Actively send cluster-order frames from local node
         cluster.addLocalConnection(this);
         giveReadCredit(cluster.getReadMax());
     }
+    else {                                                  // Shadow or catch-up connection
+        connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames
+        connection.setClientThrottling(false);              // Disable client throttling, done by active node.
+    }
 }
 
 void Connection::giveReadCredit(int credit) {
@@ -140,10 +150,16 @@
 void Connection::deliveredFrame(const EventFrame& f) {
     assert(!catchUp);
     currentChannel = f.frame.getChannel(); 
-    if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
+    if (f.frame.getBody()       // frame can be emtpy with just readCredit
+        && !framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
         && !checkUnsupported(*f.frame.getBody())) // Unsupported operation.
     {
-        connection.received(const_cast<AMQFrame&>(f.frame)); // Pass to broker connection.
+        if (f.type == DATA) // incoming data frames to broker::Connection
+            connection.received(const_cast<AMQFrame&>(f.frame)); 
+        else {                    // frame control, send frame via SessionState
+            broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession();
+            if (ss) ss->out(const_cast<AMQFrame&>(f.frame));
+        }
     }
     giveReadCredit(f.readCredit);
 }
@@ -186,12 +202,12 @@
     connection.closed();
 }
 
-// Decode data from local clients.
+// ConnectoinCodec::decode receives read buffers from  directly-connected clients.
 size_t Connection::decode(const char* buffer, size_t size) {
     if (catchUp) {  // Handle catch-up locally.
         Buffer buf(const_cast<char*>(buffer), size);
         while (localDecoder.decode(buf))
-            received(localDecoder.frame);
+            received(localDecoder.getFrame());
     }
     else {                      // Multicast local connections.
         assert(isLocal());
@@ -242,6 +258,7 @@
     const SequenceSet& unknownCompleted,
     const SequenceSet& receivedIncomplete)
 {
+    
     sessionState().setState(
         replayStart,
         sendCommandPoint,
@@ -253,21 +270,23 @@
     QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
 }
     
-void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username) {
-    ConnectionId shadow = ConnectionId(memberId, connectionId);
-    QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadow);
-    self = shadow;
+void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment) {
+    ConnectionId shadowId = ConnectionId(memberId, connectionId);
+    QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId);
+    self = shadowId;
     connection.setUserId(username);
+    // OK to use decoder here because we are stalled for update.
+    cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size());
 }
 
-void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) {
+void Connection::membership(const FieldTable& joiners, const FieldTable& members) {
     QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
-    cluster.updateInDone(ClusterMap(joiners, members), frameId);
+    cluster.updateInDone(ClusterMap(joiners, members));
     self.second = 0;        // Mark this as completed update connection.
 }
 
 bool Connection::isLocal() const {
-    return self.first == cluster.getId() && self.second == this;
+    return self.first == cluster.getId() && self.second;
 }
 
 bool Connection::isShadow() const {
@@ -333,6 +352,10 @@
     q->setPosition(position);
 }
 
+void Connection::expiryId(uint64_t id) {
+    cluster.getExpiryPolicy().setId(id);
+}
+
 std::ostream& operator<<(std::ostream& o, const Connection& c) {
     const char* type="unknown";
     if (c.isLocal()) type = "local";



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org