You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/06/21 21:58:07 UTC

svn commit: r1138153 [2/6] - in /qpid/branches/qpid-3079/qpid: ./ cpp/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/python/ cpp/bindings/qmf2/ruby/ cpp/bindings/qpid/dotnet/src/ cpp/bindings/qpid/python/ cpp/bindings/qpid/ruby/ cpp...

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.cpp Tue Jun 21 19:58:01 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -65,7 +65,7 @@ using std::mem_fun;
 namespace _qmf = qmf::org::apache::qpid::broker;
 
 
-namespace 
+namespace
 {
 const std::string qpidMaxSize("qpid.max_size");
 const std::string qpidMaxCount("qpid.max_count");
@@ -87,16 +87,16 @@ const int ENQUEUE_ONLY=1;
 const int ENQUEUE_AND_DEQUEUE=2;
 }
 
-Queue::Queue(const string& _name, bool _autodelete, 
+Queue::Queue(const string& _name, bool _autodelete,
              MessageStore* const _store,
              const OwnershipToken* const _owner,
              Manageable* parent,
              Broker* b) :
 
-    name(_name), 
+    name(_name),
     autodelete(_autodelete),
     store(_store),
-    owner(_owner), 
+    owner(_owner),
     consumerCount(0),
     exclusive(0),
     noLocal(false),
@@ -179,9 +179,9 @@ void Queue::recover(boost::intrusive_ptr
     if (policy.get()) policy->recoverEnqueued(msg);
 
     push(msg, true);
-    if (store){ 
+    if (store){
         // setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure
-        msg->addToSyncList(shared_from_this(), store); 
+        msg->addToSyncList(shared_from_this(), store);
     }
 
     if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) {
@@ -206,13 +206,13 @@ void Queue::process(boost::intrusive_ptr
 void Queue::requeue(const QueuedMessage& msg){
     assertClusterSafe();
     QueueListeners::NotificationSet copy;
-    {    
+    {
         Mutex::ScopedLock locker(messageLock);
         if (!isEnqueued(msg)) return;
         messages->reinsert(msg);
         listeners.populate(copy);
 
-        // for persistLastNode - don't force a message twice to disk, but force it if no force before 
+        // for persistLastNode - don't force a message twice to disk, but force it if no force before
         if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) {
             msg.payload->forcePersistent();
             if (msg.payload->isForcedPersistent() ){
@@ -224,7 +224,7 @@ void Queue::requeue(const QueuedMessage&
     copy.notify();
 }
 
-bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) 
+bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
 {
     Mutex::ScopedLock locker(messageLock);
     assertClusterSafe();
@@ -268,7 +268,7 @@ bool Queue::getNextMessage(QueuedMessage
           case NO_MESSAGES:
           default:
             return false;
-        }        
+        }
     } else {
         return browseNextMessage(m, c);
     }
@@ -278,7 +278,7 @@ Queue::ConsumeCode Queue::consumeNextMes
 {
     while (true) {
         Mutex::ScopedLock locker(messageLock);
-        if (messages->empty()) { 
+        if (messages->empty()) {
             QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
             listeners.addListener(c);
             return NO_MESSAGES;
@@ -291,7 +291,7 @@ Queue::ConsumeCode Queue::consumeNextMes
             }
 
             if (c->filter(msg.payload)) {
-                if (c->accept(msg.payload)) {            
+                if (c->accept(msg.payload)) {
                     m = msg;
                     pop();
                     return CONSUMED;
@@ -304,7 +304,7 @@ Queue::ConsumeCode Queue::consumeNextMes
                 //consumer will never want this message
                 QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
                 return CANT_CONSUME;
-            } 
+            }
         }
     }
 }
@@ -358,7 +358,7 @@ bool Queue::dispatch(Consumer::shared_pt
     }
 }
 
-// Find the next message 
+// Find the next message
 bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
     Mutex::ScopedLock locker(messageLock);
     if (messages->next(c->position, msg)) {
@@ -426,19 +426,25 @@ bool collect_if_expired(std::deque<Queue
     }
 }
 
-void Queue::purgeExpired()
+/**
+ *@param lapse: time since the last purgeExpired
+ */
+void Queue::purgeExpired(qpid::sys::Duration lapse)
 {
     //As expired messages are discarded during dequeue also, only
     //bother explicitly expiring if the rate of dequeues since last
-    //attempt is less than one per second.  
-
-    if (dequeueTracker.sampleRatePerSecond() < 1) {
+    //attempt is less than one per second.
+    int count = dequeueSincePurge.get();
+    dequeueSincePurge -= count;
+    int seconds = int64_t(lapse)/qpid::sys::TIME_SEC;
+    if (seconds == 0 || count / seconds < 1) {
         std::deque<QueuedMessage> expired;
         {
             Mutex::ScopedLock locker(messageLock);
-            messages->removeIf(boost::bind(&collect_if_expired, expired, _1));
+            messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1));
         }
-        for_each(expired.begin(), expired.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+        for_each(expired.begin(), expired.end(),
+                 boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
     }
 }
 
@@ -457,7 +463,7 @@ void Queue::purgeExpired()
 uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest)
 {
     Mutex::ScopedLock locker(messageLock);
-    uint32_t purge_count = purge_request; // only comes into play if  >0 
+    uint32_t purge_count = purge_request; // only comes into play if  >0
     std::deque<DeliverableMessage> rerouteQueue;
 
     uint32_t count = 0;
@@ -490,7 +496,7 @@ uint32_t Queue::purge(const uint32_t pur
 
 uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
     Mutex::ScopedLock locker(messageLock);
-    uint32_t move_count = qty; // only comes into play if  qty >0 
+    uint32_t move_count = qty; // only comes into play if  qty >0
     uint32_t count = 0; // count how many were moved for returning
 
     while((!qty || move_count--) && !messages->empty()) {
@@ -508,7 +514,7 @@ void Queue::pop()
 {
     assertClusterSafe();
     messages->pop();
-    ++dequeueTracker;
+    ++dequeueSincePurge;
 }
 
 void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
@@ -517,10 +523,10 @@ void Queue::push(boost::intrusive_ptr<Me
     QueuedMessage removed;
     bool dequeueRequired = false;
     {
-        Mutex::ScopedLock locker(messageLock);   
+        Mutex::ScopedLock locker(messageLock);
         QueuedMessage qm(this, msg, ++sequence);
         if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
-         
+
         dequeueRequired = messages->push(qm, removed);
         listeners.populate(copy);
         enqueued(qm);
@@ -599,7 +605,7 @@ void Queue::setLastNodeFailure()
 }
 
 
-// return true if store exists, 
+// return true if store exists,
 bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck)
 {
     ScopedUse u(barrier);
@@ -619,7 +625,7 @@ bool Queue::enqueue(TransactionContext* 
     if (inLastNodeFailure && persistLastNode){
         msg->forcePersistent();
     }
-       
+
     if (traceId.size()) {
         //copy on write: take deep copy of message before modifying it
         //as the frames may already be available for delivery on other
@@ -649,9 +655,10 @@ bool Queue::enqueue(TransactionContext* 
 void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
 {
     Mutex::ScopedLock locker(messageLock);
-    if (policy.get()) policy->enqueueAborted(msg);       
+    if (policy.get()) policy->enqueueAborted(msg);
 }
 
+
 /**
  * Returns a null pointer if the dequeue completed, otherwise the dequeue will complete
  * asynchronously, and a pointer to a DequeueCompletion object is returned.
@@ -666,7 +673,7 @@ Queue::dequeue(TransactionContext* ctxt,
     {
         Mutex::ScopedLock locker(messageLock);
         if (!isEnqueued(msg)) return empty;
-        if (!ctxt) { 
+        if (!ctxt) {
             dequeued(msg);
         }
     }
@@ -693,7 +700,7 @@ Queue::dequeue(TransactionContext* ctxt,
 void Queue::dequeueCommitted(const QueuedMessage& msg)
 {
     Mutex::ScopedLock locker(messageLock);
-    dequeued(msg);    
+    dequeued(msg);
     if (mgmtObject != 0) {
         mgmtObject->inc_msgTxnDequeues();
         mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
@@ -748,8 +755,8 @@ int getIntegerSetting(const qpid::framin
         return v->get<int>();
     } else if (v->convertsTo<std::string>()){
         std::string s = v->get<std::string>();
-        try { 
-            return boost::lexical_cast<int>(s); 
+        try {
+            return boost::lexical_cast<int>(s);
         } catch(const boost::bad_lexical_cast&) {
             QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s);
             return 0;
@@ -773,7 +780,7 @@ void Queue::configureImpl(const FieldTab
         broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY);
     }
 
-    if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK && 
+    if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK &&
         (!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync())) )) {
         if ( NullMessageStore::isNullStore(store)) {
             QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName());
@@ -811,7 +818,7 @@ void Queue::configureImpl(const FieldTab
             QPID_LOG(debug, "Configured queue " <<  getName() << " as priority queue.");
         }
     }
-    
+
     persistLastNode= _settings.get(qpidPersistLastNode);
     if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName());
 
@@ -820,15 +827,15 @@ void Queue::configureImpl(const FieldTab
     if (excludeList.size()) {
         split(traceExclude, excludeList, ", ");
     }
-    QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId 
+    QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
              << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements");
 
     FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers);
     if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
 
     autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout);
-    if (autoDeleteTimeout) 
-        QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout); 
+    if (autoDeleteTimeout)
+        QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout);
 
     if (mgmtObject != 0) {
         mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
@@ -892,9 +899,9 @@ const QueuePolicy* Queue::getPolicy()
     return policy.get();
 }
 
-uint64_t Queue::getPersistenceId() const 
-{ 
-    return persistenceId; 
+uint64_t Queue::getPersistenceId() const
+{
+    return persistenceId;
 }
 
 void Queue::setPersistenceId(uint64_t _persistenceId) const
@@ -908,11 +915,11 @@ void Queue::setPersistenceId(uint64_t _p
     persistenceId = _persistenceId;
 }
 
-void Queue::encode(Buffer& buffer) const 
+void Queue::encode(Buffer& buffer) const
 {
     buffer.putShortString(name);
     buffer.put(settings);
-    if (policy.get()) { 
+    if (policy.get()) {
         buffer.put(*policy);
     }
     buffer.putShortString(alternateExchange.get() ? alternateExchange->getName() : std::string(""));
@@ -965,7 +972,7 @@ boost::shared_ptr<Exchange> Queue::getAl
 
 void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue)
 {
-    if (broker.getQueues().destroyIf(queue->getName(), 
+    if (broker.getQueues().destroyIf(queue->getName(),
                                      boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
         QPID_LOG(debug, "Auto-deleting " << queue->getName());
         queue->destroyed();
@@ -977,7 +984,7 @@ struct AutoDeleteTask : qpid::sys::Timer
     Broker& broker;
     Queue::shared_ptr queue;
 
-    AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime) 
+    AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
         : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {}
 
     void fire()
@@ -995,27 +1002,27 @@ void Queue::tryAutoDelete(Broker& broker
     if (queue->autoDeleteTimeout && queue->canAutoDelete()) {
         AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC));
         queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time));
-        broker.getClusterTimer().add(queue->autoDeleteTask);        
+        broker.getClusterTimer().add(queue->autoDeleteTask);
         QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated");
     } else {
         tryAutoDeleteImpl(broker, queue);
     }
 }
 
-bool Queue::isExclusiveOwner(const OwnershipToken* const o) const 
-{ 
+bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
+{
     Mutex::ScopedLock locker(ownershipLock);
-    return o == owner; 
+    return o == owner;
 }
 
-void Queue::releaseExclusiveOwnership() 
-{ 
+void Queue::releaseExclusiveOwnership()
+{
     Mutex::ScopedLock locker(ownershipLock);
-    owner = 0; 
+    owner = 0;
 }
 
-bool Queue::setExclusiveOwner(const OwnershipToken* const o) 
-{ 
+bool Queue::setExclusiveOwner(const OwnershipToken* const o)
+{
     //reset auto deletion timer if necessary
     if (autoDeleteTimeout && autoDeleteTask) {
         autoDeleteTask->cancel();
@@ -1024,20 +1031,20 @@ bool Queue::setExclusiveOwner(const Owne
     if (owner) {
         return false;
     } else {
-        owner = o; 
+        owner = o;
         return true;
     }
 }
 
-bool Queue::hasExclusiveOwner() const 
-{ 
+bool Queue::hasExclusiveOwner() const
+{
     Mutex::ScopedLock locker(ownershipLock);
-    return owner != 0; 
+    return owner != 0;
 }
 
-bool Queue::hasExclusiveConsumer() const 
-{ 
-    return exclusive; 
+bool Queue::hasExclusiveConsumer() const
+{
+    return exclusive;
 }
 
 void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
@@ -1206,6 +1213,10 @@ const Broker* Queue::getBroker()
     return broker;
 }
 
+void Queue::setDequeueSincePurge(uint32_t value) {
+    dequeueSincePurge = value;
+}
+
 
 /** invoked from the store thread when the asynchronous dequeueing of the
  * message has completed. */

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.h?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.h Tue Jun 21 19:58:01 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -32,9 +32,9 @@
 #include "qpid/broker/QueueBindings.h"
 #include "qpid/broker/QueueListeners.h"
 #include "qpid/broker/QueueObserver.h"
-#include "qpid/broker/RateTracker.h"
 
 #include "qpid/framing/FieldTable.h"
+#include "qpid/sys/AtomicValue.h"
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Timer.h"
 #include "qpid/management/Manageable.h"
@@ -75,13 +75,13 @@ class Queue : public boost::enable_share
     {
         Queue& parent;
         uint count;
-                
+
         UsageBarrier(Queue&);
         bool acquire();
         void release();
         void destroy();
     };
-            
+
     struct ScopedUse
     {
         UsageBarrier& barrier;
@@ -89,7 +89,7 @@ class Queue : public boost::enable_share
         ScopedUse(UsageBarrier& b) : barrier(b), acquired(barrier.acquire()) {}
         ~ScopedUse() { if (acquired) barrier.release(); }
     };
-            
+
     typedef std::set< boost::shared_ptr<QueueObserver> > Observers;
     enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
 
@@ -120,7 +120,7 @@ class Queue : public boost::enable_share
     boost::shared_ptr<Exchange> alternateExchange;
     framing::SequenceNumber sequence;
     qmf::org::apache::qpid::broker::Queue* mgmtObject;
-    RateTracker dequeueTracker;
+    sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge.
     int eventMode;
     Observers observers;
     bool insertSeqNo;
@@ -147,7 +147,7 @@ class Queue : public boost::enable_share
     void dequeued(const QueuedMessage& msg);
     void pop();
     void popAndDequeue();
-    QueuedMessage getFront();
+
     void forcePersistent(QueuedMessage& msg);
     int getEventMode();
     void configureImpl(const qpid::framing::FieldTable& settings);
@@ -185,8 +185,8 @@ class Queue : public boost::enable_share
     typedef std::vector<shared_ptr> vector;
 
     QPID_BROKER_EXTERN Queue(const std::string& name,
-                             bool autodelete = false, 
-                             MessageStore* const store = 0, 
+                             bool autodelete = false,
+                             MessageStore* const store = 0,
                              const OwnershipToken* const owner = 0,
                              management::Manageable* parent = 0,
                              Broker* broker = 0);
@@ -246,11 +246,11 @@ class Queue : public boost::enable_share
                                     bool exclusive = false);
     QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c);
 
-    uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages 
-    QPID_BROKER_EXTERN void purgeExpired();
+    uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages
+    QPID_BROKER_EXTERN void purgeExpired(sys::Duration);
 
     //move qty # of messages to destination Queue destq
-    uint32_t move(const Queue::shared_ptr destq, uint32_t qty); 
+    uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
 
     QPID_BROKER_EXTERN uint32_t getMessageCount() const;
     QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const;
@@ -313,8 +313,8 @@ class Queue : public boost::enable_share
      * Inform queue of messages that were enqueued, have since
      * been acquired but not yet accepted or released (and
      * thus are still logically on the queue) - used in
-     * clustered broker.  
-     */ 
+     * clustered broker.
+     */
     void updateEnqueued(const QueuedMessage& msg);
 
     /**
@@ -325,9 +325,9 @@ class Queue : public boost::enable_share
      * accepted it).
      */
     bool isEnqueued(const QueuedMessage& msg);
-            
+
     /**
-     * Gets the next available message 
+     * Gets the next available message
      */
     QPID_BROKER_EXTERN QueuedMessage get();
 
@@ -408,6 +408,9 @@ class Queue : public boost::enable_share
 
     const Broker* getBroker();
 
+    uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
+    void setDequeueSincePurge(uint32_t value);
+
  private:
     std::map<PersistableMessage *, boost::intrusive_ptr<DequeueCompletion> > pendingDequeueCompletions;
 };

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueueCleaner.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueueCleaner.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueueCleaner.cpp Tue Jun 21 19:58:01 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,7 +27,7 @@
 namespace qpid {
 namespace broker {
 
-QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer& t) : queues(q), timer(t) {}
+QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer* t) : queues(q), timer(t) {}
 
 QueueCleaner::~QueueCleaner()
 {
@@ -36,10 +36,16 @@ QueueCleaner::~QueueCleaner()
 
 void QueueCleaner::start(qpid::sys::Duration p)
 {
+    period = p;
     task = new Task(*this, p);
-    timer.add(task);
+    timer->add(task);
 }
 
+void QueueCleaner::setTimer(qpid::sys::Timer* timer) {
+    this->timer = timer;
+}
+
+
 QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : sys::TimerTask(d,"QueueCleaner"), parent(p) {}
 
 void QueueCleaner::Task::fire()
@@ -65,9 +71,9 @@ void QueueCleaner::fired()
     std::vector<Queue::shared_ptr> copy;
     CollectQueues collect(&copy);
     queues.eachQueue(collect);
-    std::for_each(copy.begin(), copy.end(), boost::bind(&Queue::purgeExpired, _1));
+    std::for_each(copy.begin(), copy.end(), boost::bind(&Queue::purgeExpired, _1, period));
     task->setupNextFire();
-    timer.add(task);
+    timer->add(task);
 }
 
 

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueueCleaner.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueueCleaner.h?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueueCleaner.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueueCleaner.h Tue Jun 21 19:58:01 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,14 +35,15 @@ class QueueRegistry;
 class QueueCleaner
 {
   public:
-    QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer& timer);
+    QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer* timer);
     QPID_BROKER_EXTERN ~QueueCleaner();
-    QPID_BROKER_EXTERN void start(qpid::sys::Duration period);
+    QPID_BROKER_EXTERN void start(sys::Duration period);
+    QPID_BROKER_EXTERN void setTimer(sys::Timer* timer);
   private:
     class Task : public sys::TimerTask
     {
       public:
-        Task(QueueCleaner& parent, qpid::sys::Duration duration);
+        Task(QueueCleaner& parent, sys::Duration duration);
         void fire();
       private:
         QueueCleaner& parent;
@@ -50,7 +51,8 @@ class QueueCleaner
 
     boost::intrusive_ptr<sys::TimerTask> task;
     QueueRegistry& queues;
-    sys::Timer& timer;
+    sys::Timer* timer;
+    sys::Duration period;
 
     void fired();
 };

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Tue Jun 21 19:58:01 2011
@@ -167,7 +167,8 @@ void QueueFlowLimit::enqueued(const Queu
         msg.payload->getIngressCompletion().startCompleter();    // don't complete until flow resumes
         bool unique;
         unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(msg.position, msg.payload)).second;
-        assert(unique);
+        // Like this to avoid tripping up unused variable warning when NDEBUG set
+        if (!unique) assert(unique);
     }
 }
 
@@ -379,7 +380,8 @@ void QueueFlowLimit::setState(const qpid
                 QueuedMessage msg(queue->find(seq));   // fyi: msg.payload may be null if msg is delivered & unacked
                 bool unique;
                 unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(seq, msg.payload)).second;
-                assert(unique);
+                // Like this to avoid tripping up unused variable warning when NDEBUG set
+                if (!unique) assert(unique);
             }
         }
     }

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Tue Jun 21 19:58:01 2011
@@ -117,19 +117,20 @@ void QueuePolicy::update(FieldTable& set
     settings.setString(typeKey, type);
 }
 
-uint32_t QueuePolicy::getCapacity(const FieldTable& settings, const std::string& key, uint32_t defaultValue)
+template <typename T>
+T getCapacity(const FieldTable& settings, const std::string& key, T defaultValue)
 {
     FieldTable::ValuePtr v = settings.get(key);
 
-    int32_t result = 0;
+    T result = 0;
 
     if (!v) return defaultValue;
     if (v->getType() == 0x23) {
         QPID_LOG(debug, "Value for " << key << " specified as float: " << v->get<float>());
     } else if (v->getType() == 0x33) {
         QPID_LOG(debug, "Value for " << key << " specified as double: " << v->get<double>());
-    } else if (v->convertsTo<int>()) {
-        result = v->get<int>();
+    } else if (v->convertsTo<T>()) {
+        result = v->get<T>();
         QPID_LOG(debug, "Got integer value for " << key << ": " << result);
         if (result >= 0) return result;
     } else if (v->convertsTo<string>()) {
@@ -319,8 +320,8 @@ std::auto_ptr<QueuePolicy> QueuePolicy::
 
 std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings)
 {
-    uint32_t maxCount = getCapacity(settings, maxCountKey, 0);
-    uint32_t maxSize = getCapacity(settings, maxSizeKey, defaultMaxSize);
+    uint32_t maxCount = getCapacity<int32_t>(settings, maxCountKey, 0);
+    uint64_t maxSize = getCapacity<int64_t>(settings, maxSizeKey, defaultMaxSize);
     if (maxCount || maxSize) {
         return createQueuePolicy(name, maxCount, maxSize, getType(settings));
     } else {

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueuePolicy.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueuePolicy.h?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueuePolicy.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueuePolicy.h Tue Jun 21 19:58:01 2011
@@ -43,8 +43,7 @@ class QueuePolicy
     uint32_t count;
     uint64_t size;
     bool policyExceeded;
-            
-    static uint32_t getCapacity(const qpid::framing::FieldTable& settings, const std::string& key, uint32_t defaultValue);
+
 
   protected:
     uint64_t getCurrentQueueSize() const { return size; } 

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Jun 21 19:58:01 2011
@@ -70,7 +70,7 @@ SemanticState::SemanticState(DeliveryAda
       deliveryAdapter(da),
       tagGenerator("sgen"),
       dtxSelected(false),
-      authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()),
+      authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isUserProxyAuth()),
       userID(getSession().getConnection().getUserId()),
       userName(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))),
       isDefaultRealm(userID.find('@') != std::string::npos && getSession().getBroker().getOptions().realm == userID.substr(userID.find('@')+1,userID.size())),
@@ -469,7 +469,6 @@ void SemanticState::route(intrusive_ptr<
     /* verify the userid if specified: */
     std::string id =
     	msg->hasProperties<MessageProperties>() ? msg->getProperties<MessageProperties>()->getUserId() : nullstring;
-
     if (authMsg &&  !id.empty() && !(id == userID || (isDefaultRealm && id == userName)))
     {
         QPID_LOG(debug, "authorised user id : " << userID << " but user id in message declared as " << id);

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp Tue Jun 21 19:58:01 2011
@@ -81,12 +81,11 @@ class SslProtocolFactory : public qpid::
     SslProtocolFactory(const SslServerOptions&, int backlog, bool nodelay);
     ~SslProtocolFactory();
     void accept(sys::Poller::shared_ptr, sys::ConnectionCodec::Factory*);
-    void connect(sys::Poller::shared_ptr, const std::string& host, int16_t port,
+    void connect(sys::Poller::shared_ptr, const std::string& host, const std::string& port,
                  sys::ConnectionCodec::Factory*,
                  ConnectFailedCallback failed);
 
     uint16_t getPort() const;
-    std::string getHost() const;
     bool supports(const std::string& capability);
 
   private:
@@ -130,7 +129,7 @@ SslProtocolFactory::SslProtocolFactory(c
                                        int backlog,
                                        bool nodelay)
     : tcpNoDelay(nodelay),
-      listeningPort(listener.listen(options.port, backlog)),
+    listeningPort(listener.listen("", boost::lexical_cast<std::string>(options.port), backlog)),
       clientAuthSelected(options.clientAuth) {
 
     SecInvalidateHandle(&credHandle);
@@ -237,10 +236,6 @@ uint16_t SslProtocolFactory::getPort() c
     return listeningPort; // Immutable no need for lock.
 }
 
-std::string SslProtocolFactory::getHost() const {
-    return listener.getSockname();
-}
-
 void SslProtocolFactory::accept(sys::Poller::shared_ptr poller,
                                 sys::ConnectionCodec::Factory* fact) {
     acceptor.reset(
@@ -251,7 +246,7 @@ void SslProtocolFactory::accept(sys::Pol
 
 void SslProtocolFactory::connect(sys::Poller::shared_ptr poller,
                                  const std::string& host,
-                                 int16_t port,
+                                 const std::string& port,
                                  sys::ConnectionCodec::Factory* fact,
                                  ConnectFailedCallback failed)
 {

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Tue Jun 21 19:58:01 2011
@@ -36,6 +36,7 @@
 
 #include <boost/bind.hpp>
 #include <boost/format.hpp>
+#include <boost/lexical_cast.hpp>
 #include <boost/shared_ptr.hpp>
 
 #include <limits>
@@ -258,16 +259,16 @@ void ConnectionImpl::open()
     connector->setInputHandler(&handler);
     connector->setShutdownHandler(this);
     try {
-        connector->connect(host, port);
-    
+        std::string p = boost::lexical_cast<std::string>(port);
+        connector->connect(host, p);
+
     } catch (const std::exception& e) {
         QPID_LOG(debug, "Failed to connect to " << protocol << ":" << host << ":" << port << " " << e.what());
         connector.reset();
         throw;
     }
     connector->init();
-    QPID_LOG(info, *this << " connected to " << protocol << ":" << host << ":" << port);
- 
+
     // Enable heartbeat if requested
     uint16_t heartbeat = static_cast<ConnectionSettings&>(handler).heartbeat;
     if (heartbeat) {
@@ -281,6 +282,7 @@ void ConnectionImpl::open()
     // - in that case in connector.reset() above;
     // - or when we are deleted
     handler.waitForOpen();
+    QPID_LOG(info, *this << " connected to " << protocol << ":" << host << ":" << port);
 
     // If the SASL layer has provided an "operational" userId for the connection,
     // put it in the negotiated settings.

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/Connector.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/Connector.h?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/Connector.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/Connector.h Tue Jun 21 19:58:01 2011
@@ -61,7 +61,7 @@ class Connector : public framing::Output
     static void registerFactory(const std::string& proto, Factory* connectorFactory);
 
     virtual ~Connector() {};
-    virtual void connect(const std::string& host, int port) = 0;
+    virtual void connect(const std::string& host, const std::string& port) = 0;
     virtual void init() {};
     virtual void close() = 0;
     virtual void send(framing::AMQFrame& frame) = 0;

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/RdmaConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/RdmaConnector.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/RdmaConnector.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/RdmaConnector.cpp Tue Jun 21 19:58:01 2011
@@ -95,7 +95,7 @@ class RdmaConnector : public Connector, 
 
     std::string identifier;
 
-    void connect(const std::string& host, int port);
+    void connect(const std::string& host, const std::string& port);
     void close();
     void send(framing::AMQFrame& frame);
     void abort() {} // TODO: need to fix this for heartbeat timeouts to work
@@ -173,7 +173,7 @@ RdmaConnector::~RdmaConnector() {
     }
 }
 
-void RdmaConnector::connect(const std::string& host, int port){
+void RdmaConnector::connect(const std::string& host, const std::string& port){
     Mutex::ScopedLock l(dataConnectedLock);
     assert(!dataConnected);
 
@@ -184,7 +184,7 @@ void RdmaConnector::connect(const std::s
         boost::bind(&RdmaConnector::disconnected, this),
         boost::bind(&RdmaConnector::rejected, this, poller, _1, _2));
 
-    SocketAddress sa(host, boost::lexical_cast<std::string>(port));
+    SocketAddress sa(host, port);
     acon->start(poller, sa);
 }
 

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/SslConnector.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/SslConnector.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/SslConnector.cpp Tue Jun 21 19:58:01 2011
@@ -114,7 +114,7 @@ class SslConnector : public Connector
 
     std::string identifier;
 
-    void connect(const std::string& host, int port);
+    void connect(const std::string& host, const std::string& port);
     void init();
     void close();
     void send(framing::AMQFrame& frame);
@@ -190,7 +190,7 @@ SslConnector::~SslConnector() {
     close();
 }
 
-void SslConnector::connect(const std::string& host, int port){
+void SslConnector::connect(const std::string& host, const std::string& port){
     Mutex::ScopedLock l(closedLock);
     assert(closed);
     try {

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/TCPConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/TCPConnector.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/TCPConnector.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/TCPConnector.cpp Tue Jun 21 19:58:01 2011
@@ -88,7 +88,7 @@ TCPConnector::~TCPConnector() {
     close();
 }
 
-void TCPConnector::connect(const std::string& host, int port) {
+void TCPConnector::connect(const std::string& host, const std::string& port) {
     Mutex::ScopedLock l(lock);
     assert(closed);
     connector = AsynchConnector::create(
@@ -121,7 +121,7 @@ void TCPConnector::start(sys::AsynchIO* 
         aio->queueReadBuffer(new Buff(maxFrameSize));
     }
 
-    identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
+    identifier = str(format("[%1%]") % socket.getFullAddress());
 }
 
 void TCPConnector::initAmqp() {

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/TCPConnector.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/TCPConnector.h?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/TCPConnector.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/TCPConnector.h Tue Jun 21 19:58:01 2011
@@ -98,7 +98,7 @@ class TCPConnector : public Connector, p
 
 protected:
     virtual ~TCPConnector();
-    void connect(const std::string& host, int port);
+    void connect(const std::string& host, const std::string& port);
     void start(sys::AsynchIO* aio_);
     void initAmqp();
     virtual void connectFailed(const std::string& msg);

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp Tue Jun 21 19:58:01 2011
@@ -30,12 +30,23 @@ void AcceptTracker::State::accept()
     unaccepted.clear();
 }
 
-void AcceptTracker::State::accept(qpid::framing::SequenceNumber id)
+SequenceSet AcceptTracker::State::accept(qpid::framing::SequenceNumber id, bool cumulative)
 {
-    if (unaccepted.contains(id)) {
-        unaccepted.remove(id);
-        unconfirmed.add(id);
+    SequenceSet accepting;
+    if (cumulative) {
+        for (SequenceSet::iterator i = unaccepted.begin(); i != unaccepted.end() && *i <= id; ++i) {
+            accepting.add(*i);
+        }
+        unconfirmed.add(accepting);
+        unaccepted.remove(accepting);
+    } else {
+        if (unaccepted.contains(id)) {
+            unaccepted.remove(id);
+            unconfirmed.add(id);
+            accepting.add(id);
+        }
     }
+    return accepting;
 }
 
 void AcceptTracker::State::release()
@@ -71,16 +82,15 @@ void AcceptTracker::accept(qpid::client:
     aggregateState.accept();
 }
 
-void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session)
+void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session, bool cumulative)
 {
     for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
-        i->second.accept(id);
+        i->second.accept(id, cumulative);
     }
     Record record;
-    record.accepted.add(id);
+    record.accepted = aggregateState.accept(id, cumulative);
     record.status = session.messageAccept(record.accepted);
     pending.push_back(record);
-    aggregateState.accept(id);
 }
 
 void AcceptTracker::release(qpid::client::AsyncSession& session)

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h Tue Jun 21 19:58:01 2011
@@ -42,7 +42,7 @@ class AcceptTracker
   public:
     void delivered(const std::string& destination, const qpid::framing::SequenceNumber& id);
     void accept(qpid::client::AsyncSession&);
-    void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&);
+    void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&, bool cumulative);
     void release(qpid::client::AsyncSession&);
     uint32_t acceptsPending();
     uint32_t acceptsPending(const std::string& destination);
@@ -62,7 +62,7 @@ class AcceptTracker
         qpid::framing::SequenceSet unconfirmed;
 
         void accept();
-        void accept(qpid::framing::SequenceNumber);
+        qpid::framing::SequenceSet accept(qpid::framing::SequenceNumber, bool cumulative);
         void release();
         uint32_t acceptsPending();
         void completed(qpid::framing::SequenceSet&);

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Tue Jun 21 19:58:01 2011
@@ -233,6 +233,8 @@ class Subscription : public Exchange, pu
     const bool reliable;
     const bool durable;
     const std::string actualType;
+    const bool exclusiveQueue;
+    const bool exclusiveSubscription;
     FieldTable queueOptions;
     FieldTable subscriptionOptions;
     Bindings bindings;
@@ -307,6 +309,7 @@ struct Opt
     Opt& operator/(const std::string& name);
     operator bool() const;
     std::string str() const;
+    bool asBool(bool defaultValue) const;
     const Variant::List& asList() const;
     void collect(qpid::framing::FieldTable& args) const;
 
@@ -338,6 +341,12 @@ Opt::operator bool() const
     return value && !value->isVoid() && value->asBool();
 }
 
+bool Opt::asBool(bool defaultValue) const
+{
+    if (value) return value->asBool();
+    else return defaultValue;
+}
+
 std::string Opt::str() const
 {
     if (value) return value->asString();
@@ -490,7 +499,9 @@ Subscription::Subscription(const Address
       queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())),
       reliable(AddressResolution::is_reliable(address)),
       durable(Opt(address)/LINK/DURABLE),
-      actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type)
+      actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type),
+      exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)),
+      exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue))
 {
     (Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions);
     (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions);
@@ -550,7 +561,7 @@ void Subscription::subscribe(qpid::clien
     checkAssert(session, FOR_RECEIVER);
 
     //create subscription queue:
-    session.queueDeclare(arg::queue=queue, arg::exclusive=true, 
+    session.queueDeclare(arg::queue=queue, arg::exclusive=exclusiveQueue,
                          arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions);
     //'default' binding:
     bindings.bind(session);
@@ -559,15 +570,15 @@ void Subscription::subscribe(qpid::clien
     linkBindings.bind(session);
     //subscribe to subscription queue:
     AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE;
-    session.messageSubscribe(arg::queue=queue, arg::destination=destination, 
-                             arg::exclusive=true, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
+    session.messageSubscribe(arg::queue=queue, arg::destination=destination,
+                             arg::exclusive=exclusiveSubscription, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
 }
 
 void Subscription::cancel(qpid::client::AsyncSession& session, const std::string& destination)
 {
     linkBindings.unbind(session);
     session.messageCancel(destination);
-    session.queueDelete(arg::queue=queue);
+    if (reliable) session.queueDelete(arg::queue=queue, arg::ifUnused=true);
     checkDelete(session, FOR_RECEIVER);
 }
 

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Tue Jun 21 19:58:01 2011
@@ -40,11 +40,15 @@ using qpid::types::VAR_LIST;
 using qpid::framing::Uuid;
 
 namespace {
-void convert(const Variant::List& from, std::vector<std::string>& to)
+void merge(const std::string& value, std::vector<std::string>& list) {
+    if (std::find(list.begin(), list.end(), value) == list.end())
+        list.push_back(value);
+}
+
+void merge(const Variant::List& from, std::vector<std::string>& to)
 {
-    for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) {
-        to.push_back(i->asString());
-    }
+    for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i)
+        merge(i->asString(), to);
 }
 
 std::string asString(const std::vector<std::string>& v) {
@@ -93,9 +97,9 @@ void ConnectionImpl::setOption(const std
         maxReconnectInterval = value;
     } else if (name == "reconnect-urls" || name == "reconnect_urls") {
         if (value.getType() == VAR_LIST) {
-            convert(value.asList(), urls);
+            merge(value.asList(), urls);
         } else {
-            urls.push_back(value.asString());
+            merge(value.asString(), urls);
         }
     } else if (name == "username") {
         settings.username = value.asString();
@@ -198,7 +202,7 @@ qpid::messaging::Session ConnectionImpl:
             sessions[name] = impl;
             break;
         } catch (const qpid::TransportFailure&) {
-            open();
+            reopen();
         } catch (const qpid::SessionException& e) {
             throw qpid::messaging::SessionError(e.what());
         } catch (const std::exception& e) {
@@ -219,6 +223,15 @@ void ConnectionImpl::open()
     catch (const qpid::Exception& e) { throw messaging::ConnectionError(e.what()); }
 }
 
+void ConnectionImpl::reopen()
+{
+    if (!reconnect) {
+        throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
+    }
+    open();
+}
+
+
 bool expired(const qpid::sys::AbsTime& start, int64_t timeout)
 {
     if (timeout == 0) return true;
@@ -246,14 +259,9 @@ void ConnectionImpl::connect(const qpid:
 }
 
 void ConnectionImpl::mergeUrls(const std::vector<Url>& more, const sys::Mutex::ScopedLock&) {
-    if (more.size()) {
-        for (size_t i = 0; i < more.size(); ++i) {
-            if (std::find(urls.begin(), urls.end(), more[i].str()) == urls.end()) {
-                urls.push_back(more[i].str());
-            }
-        }
-        QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls));
-    }
+    for (std::vector<Url>::const_iterator i = more.begin(); i != more.end(); ++i)
+        merge(i->str(), urls);
+    QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls));
 }
 
 bool ConnectionImpl::tryConnect()

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h Tue Jun 21 19:58:01 2011
@@ -43,6 +43,7 @@ class ConnectionImpl : public qpid::mess
   public:
     ConnectionImpl(const std::string& url, const qpid::types::Variant::Map& options);
     void open();
+    void reopen();
     bool isOpen() const;
     void close();
     qpid::messaging::Session newSession(bool transactional, const std::string& name);

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Tue Jun 21 19:58:01 2011
@@ -144,10 +144,10 @@ void IncomingMessages::accept()
     acceptTracker.accept(session);
 }
 
-void IncomingMessages::accept(qpid::framing::SequenceNumber id)
+void IncomingMessages::accept(qpid::framing::SequenceNumber id, bool cumulative)
 {
     sys::Mutex::ScopedLock l(lock);
-    acceptTracker.accept(id, session);
+    acceptTracker.accept(id, session, cumulative);
 }
 
 

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h Tue Jun 21 19:58:01 2011
@@ -72,7 +72,7 @@ class IncomingMessages
     bool get(Handler& handler, qpid::sys::Duration timeout);
     bool getNextDestination(std::string& destination, qpid::sys::Duration timeout);
     void accept();
-    void accept(qpid::framing::SequenceNumber id);
+    void accept(qpid::framing::SequenceNumber id, bool cumulative);
     void releaseAll();
     void releasePending(const std::string& destination);
 

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Tue Jun 21 19:58:01 2011
@@ -112,13 +112,14 @@ void SessionImpl::release(qpid::messagin
     execute1<Release>(m);
 }
 
-void SessionImpl::acknowledge(qpid::messaging::Message& m)
+void SessionImpl::acknowledge(qpid::messaging::Message& m, bool cumulative)
 {
     //Should probably throw an exception on failure here, or indicate
     //it through a return type at least. Failure means that the
     //message may be redelivered; i.e. the application cannot delete
     //any state necessary for preventing reprocessing of the message
-    execute1<Acknowledge1>(m);
+    Acknowledge2 ack(*this, m, cumulative);
+    execute(ack);
 }
 
 void SessionImpl::close()
@@ -467,10 +468,10 @@ void SessionImpl::acknowledgeImpl()
     if (!transactional) incoming.accept();
 }
 
-void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m)
+void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m, bool cumulative)
 {
     ScopedLock l(lock);
-    if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId());
+    if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId(), cumulative);
 }
 
 void SessionImpl::rejectImpl(qpid::messaging::Message& m)
@@ -509,7 +510,7 @@ void SessionImpl::senderCancelled(const 
 
 void SessionImpl::reconnect()
 {
-    connection->open();
+    connection->reopen();
 }
 
 bool SessionImpl::backoff()

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h Tue Jun 21 19:58:01 2011
@@ -63,7 +63,7 @@ class SessionImpl : public qpid::messagi
     void acknowledge(bool sync);
     void reject(qpid::messaging::Message&);
     void release(qpid::messaging::Message&);
-    void acknowledge(qpid::messaging::Message& msg);
+    void acknowledge(qpid::messaging::Message& msg, bool cumulative);
     void close();
     void sync(bool block);
     qpid::messaging::Sender createSender(const qpid::messaging::Address& address);
@@ -139,7 +139,7 @@ class SessionImpl : public qpid::messagi
     void commitImpl();
     void rollbackImpl();
     void acknowledgeImpl();
-    void acknowledgeImpl(qpid::messaging::Message&);
+    void acknowledgeImpl(qpid::messaging::Message&, bool cumulative);
     void rejectImpl(qpid::messaging::Message&);
     void releaseImpl(qpid::messaging::Message&);
     void closeImpl();
@@ -204,12 +204,13 @@ class SessionImpl : public qpid::messagi
         void operator()() { impl.releaseImpl(message); }
     };
 
-    struct Acknowledge1 : Command
+    struct Acknowledge2 : Command
     {
         qpid::messaging::Message& message;
+        bool cumulative;
 
-        Acknowledge1(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {}
-        void operator()() { impl.acknowledgeImpl(message); }
+        Acknowledge2(SessionImpl& i, qpid::messaging::Message& m, bool c) : Command(i), message(m), cumulative(c) {}
+        void operator()() { impl.acknowledgeImpl(message, cumulative); }
     };
 
     struct CreateSender;

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/windows/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/windows/SslConnector.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/windows/SslConnector.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/windows/SslConnector.cpp Tue Jun 21 19:58:01 2011
@@ -77,7 +77,7 @@ public:
                  framing::ProtocolVersion pVersion,
                  const ConnectionSettings&, 
                  ConnectionImpl*);
-    virtual void connect(const std::string& host, int port);
+    virtual void connect(const std::string& host, const std::string& port);
     virtual void connected(const Socket&);
     unsigned int getSSF();
 };
@@ -153,7 +153,7 @@ SslConnector::~SslConnector()
 
   // Will this get reach via virtual method via boost::bind????
 
-void SslConnector::connect(const std::string& host, int port) {
+void SslConnector::connect(const std::string& host, const std::string& port) {
     brokerHost = host;
     TCPConnector::connect(host, port);
 }

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Jun 21 19:58:01 2011
@@ -146,6 +146,7 @@
 #include "qpid/framing/AMQP_AllOperations.h"
 #include "qpid/framing/AllInvoker.h"
 #include "qpid/framing/ClusterConfigChangeBody.h"
+#include "qpid/framing/ClusterClockBody.h"
 #include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
 #include "qpid/framing/ClusterConnectionAbortBody.h"
 #include "qpid/framing/ClusterRetractOfferBody.h"
@@ -198,7 +199,7 @@ namespace _qmf = ::qmf::org::apache::qpi
  * Currently use SVN revision to avoid clashes with versions from
  * different branches.
  */
-const uint32_t Cluster::CLUSTER_VERSION = 1097431;
+const uint32_t Cluster::CLUSTER_VERSION = 1128070;
 
 struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
     qpid::cluster::Cluster& cluster;
@@ -230,7 +231,6 @@ struct ClusterDispatcher : public framin
         cluster.updateOffer(member, updatee, l);
     }
     void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); }
-    void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
     void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) {
         cluster.errorCheck(member, type, frameSeq, l);
     }
@@ -240,6 +240,7 @@ struct ClusterDispatcher : public framin
     void deliverToQueue(const std::string& queue, const std::string& message) {
         cluster.deliverToQueue(queue, message, l);
     }
+    void clock(uint64_t time) { cluster.clock(time, l); }
     bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
 };
 
@@ -253,7 +254,7 @@ Cluster::Cluster(const ClusterSettings& 
     self(cpg.self()),
     clusterId(true),
     mAgent(0),
-    expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())),
+    expiryPolicy(new ExpiryPolicy(*this)),
     mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
     dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
     deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1),
@@ -365,7 +366,8 @@ void Cluster::addShadowConnection(const 
     assert(discarding);
     pair<ConnectionMap::iterator, bool> ib
         = connections.insert(ConnectionMap::value_type(c->getId(), c));
-    assert(ib.second);
+    // Like this to avoid tripping up unused variable warning when NDEBUG set
+    if (!ib.second) assert(ib.second);
 }
 
 void Cluster::erase(const ConnectionId& id) {
@@ -667,6 +669,8 @@ void Cluster::initMapCompleted(Lock& l) 
         else {                  // I can go ready.
             discarding = false;
             setReady(l);
+            // Must be called *before* memberUpdate so first update will be generated.
+            failoverExchange->setReady();
             memberUpdate(l);
             updateMgmtMembership(l);
             mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
@@ -719,6 +723,20 @@ void Cluster::configChange(const MemberI
     updateMgmtMembership(l);     // Update on every config change for consistency
 }
 
+struct ClusterClockTask : public sys::TimerTask {
+    Cluster& cluster;
+    sys::Timer& timer;
+
+    ClusterClockTask(Cluster& cluster, sys::Timer& timer, uint16_t clockInterval)
+      : TimerTask(Duration(clockInterval * TIME_MSEC),"ClusterClock"), cluster(cluster), timer(timer) {}
+
+    void fire() {
+      cluster.sendClockUpdate();
+      setupNextFire();
+      timer.add(this);
+    }
+};
+
 void Cluster::becomeElder(Lock&) {
     if (elder) return;          // We were already the elder.
     // We are the oldest, reactive links if necessary
@@ -726,6 +744,8 @@ void Cluster::becomeElder(Lock&) {
     elder = true;
     broker.getLinks().setPassive(false);
     timer->becomeElder();
+
+    clockTimer.add(new ClusterClockTask(*this, clockTimer, settings.clockInterval));
 }
 
 void Cluster::makeOffer(const MemberId& id, Lock& ) {
@@ -846,7 +866,7 @@ void Cluster::updateOffer(const MemberId
     if (updatee != self && url) {
         QPID_LOG(debug, debugSnapshot());
         if (mAgent) mAgent->clusterUpdate();
-        // Updatee will call clusterUpdate when update completes
+        // Updatee will call clusterUpdate() via checkUpdateIn() when update completes
     }
 }
 
@@ -927,10 +947,11 @@ void Cluster::checkUpdateIn(Lock& l) {
     if (!updateClosed) return;  // Wait till update connection closes.
     if (updatedMap) { // We're up to date
         map = *updatedMap;
-        failoverExchange->setUrls(getUrls(l));
         mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
         state = CATCHUP;
         memberUpdate(l);
+        // Must be called *after* memberUpdate() to avoid sending an extra update.
+        failoverExchange->setReady();
         // NB: don't updateMgmtMembership() here as we are not in the deliver
         // thread. It will be updated on delivery of the "ready" we just mcast.
         broker.setClusterUpdatee(false);
@@ -1120,10 +1141,6 @@ void Cluster::setClusterId(const Uuid& u
     QPID_LOG(notice, *this << " cluster-uuid = " << clusterId);
 }
 
-void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
-    expiryPolicy->deliverExpire(id);
-}
-
 void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq, Lock&) {
     // If we see an errorCheck here (rather than in the ErrorCheck
     // class) then we have processed succesfully past the point of the
@@ -1161,6 +1178,35 @@ void Cluster::deliverToQueue(const std::
     q->deliver(msg);
 }
 
+sys::AbsTime Cluster::getClusterTime() {
+    Mutex::ScopedLock l(lock);
+    return clusterTime;
+}
+
+// This method is called during update on the updatee to set the initial cluster time.
+void Cluster::clock(const uint64_t time) {
+    Mutex::ScopedLock l(lock);
+    clock(time, l);
+}
+
+// called when broadcast message received
+void Cluster::clock(const uint64_t time, Lock&) {
+    clusterTime = AbsTime(EPOCH, time);
+    AbsTime now = AbsTime::now();
+
+    if (!elder) {
+      clusterTimeOffset = Duration(now, clusterTime);
+    }
+}
+
+// called by elder timer to send clock broadcast
+void Cluster::sendClockUpdate() {
+    Mutex::ScopedLock l(lock);
+    int64_t nanosecondsSinceEpoch = Duration(EPOCH, now());
+    nanosecondsSinceEpoch += clusterTimeOffset;
+    mcast.mcastControl(ClusterClockBody(ProtocolVersion(), nanosecondsSinceEpoch), self);
+}
+
 bool Cluster::deferDeliveryImpl(const std::string& queue,
                                 const boost::intrusive_ptr<broker::Message>& msg)
 {

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Cluster.h?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Cluster.h Tue Jun 21 19:58:01 2011
@@ -63,6 +63,12 @@ class AMQBody;
 struct Uuid;
 }
 
+namespace sys {
+class Timer;
+class AbsTime;
+class Duration;
+}
+
 namespace cluster {
 
 class Connection;
@@ -135,6 +141,9 @@ class Cluster : private Cpg::Handler, pu
     bool deferDeliveryImpl(const std::string& queue,
                            const boost::intrusive_ptr<broker::Message>& msg);
 
+    sys::AbsTime getClusterTime();
+    void sendClockUpdate();
+    void clock(const uint64_t time);
   private:
     typedef sys::Monitor::ScopedLock Lock;
 
@@ -180,12 +189,12 @@ class Cluster : private Cpg::Handler, pu
                       const std::string& left,
                       const std::string& joined,
                       Lock& l);
-    void messageExpired(const MemberId&, uint64_t, Lock& l);
     void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&);
     void timerWakeup(const MemberId&, const std::string& name, Lock&);
     void timerDrop(const MemberId&, const std::string& name, Lock&);
     void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&);
     void deliverToQueue(const std::string& queue, const std::string& message, Lock&);
+    void clock(const uint64_t time, Lock&);
 
     // Helper functions
     ConnectionPtr getConnection(const EventFrame&, Lock&);
@@ -296,6 +305,9 @@ class Cluster : private Cpg::Handler, pu
     ErrorCheck error;
     UpdateReceiver updateReceiver;
     ClusterTimer* timer;
+    sys::Timer clockTimer;
+    sys::AbsTime clusterTime;
+    sys::Duration clusterTimeOffset;
 
   friend std::ostream& operator<<(std::ostream&, const Cluster&);
   friend struct ClusterDispatcher;

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Tue Jun 21 19:58:01 2011
@@ -72,6 +72,7 @@ struct ClusterOptions : public Options {
             ("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.")
 #endif
             ("cluster-size", optValue(settings.size, "N"), "Wait for N cluster members before allowing clients to connect.")
+            ("cluster-clock-interval", optValue(settings.clockInterval,"N"), "How often to broadcast the current time to the cluster nodes, in milliseconds. A value between 5 and 1000 is recommended.")
             ("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: flow-control limit  reads per connection. 0=no limit.")
             ;
     }

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ClusterSettings.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ClusterSettings.h?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ClusterSettings.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ClusterSettings.h Tue Jun 21 19:58:01 2011
@@ -35,8 +35,9 @@ struct ClusterSettings {
     size_t readMax;
     std::string username, password, mechanism;
     size_t size;
+    uint16_t clockInterval;
 
-    ClusterSettings() : quorum(false), readMax(10), size(1)
+    ClusterSettings() : quorum(false), readMax(10), size(1), clockInterval(10)
     {}
   
     Url getUrl(uint16_t port) const {

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp Tue Jun 21 19:58:01 2011
@@ -70,6 +70,7 @@ void ClusterTimer::add(intrusive_ptr<Tim
     if (i != map.end())
         throw Exception(QPID_MSG("Task already exists with name " << task->getName()));
     map[task->getName()] = task;
+
     // Only the elder actually activates the task with the Timer base class.
     if (cluster.isElder()) {
         QPID_LOG(trace, "Elder activating cluster timer task " << task->getName());
@@ -112,6 +113,9 @@ void ClusterTimer::deliverWakeup(const s
     else {
         intrusive_ptr<TimerTask> t = i->second;
         map.erase(i);
+        // Move the nextFireTime so readyToFire() is true. This is to ensure we
+        // don't get an error if the fired task calls setupNextFire()
+        t->setFired();
         Timer::fire(t);
     }
 }

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Jun 21 19:58:01 2011
@@ -322,10 +322,10 @@ size_t Connection::decode(const char* da
         while (localDecoder.decode(buf))
             received(localDecoder.getFrame());
         if (!wasOpen && connection->isOpen()) {
-            // Connections marked as federation links are allowed to proxy
+            // Connections marked with setUserProxyAuth are allowed to proxy
             // messages with user-ID that doesn't match the connection's
             // authenticated ID. This is important for updates.
-            connection->setFederationLink(isCatchUp());
+            connection->setUserProxyAuth(isCatchUp());
         }
     }
     else {                      // Multicast local connections.
@@ -601,10 +601,6 @@ void Connection::queueObserverState(cons
     QPID_LOG(error, "Failed to find observer " << observerId << " state on queue " << qname << "; this will result in inconsistencies.");
 }
 
-void Connection::expiryId(uint64_t id) {
-    cluster.getExpiryPolicy().setId(id);
-}
-
 std::ostream& operator<<(std::ostream& o, const Connection& c) {
     const char* type="unknown";
     if (c.isLocal()) type = "local";
@@ -724,5 +720,16 @@ void Connection::doCatchupIoCallbacks() 
 
     if (catchUp) getBrokerConnection()->doIoCallbacks();
 }
+
+void Connection::clock(uint64_t time) {
+    QPID_LOG(debug, "Cluster connection received time update");
+    cluster.clock(time);
+}
+
+void Connection::queueDequeueSincePurgeState(const std::string& qname, uint32_t dequeueSincePurge) {
+    boost::shared_ptr<broker::Queue> queue(findQueue(qname));
+    queue->setDequeueSincePurge(dequeueSincePurge);
+}
+
 }} // Namespace qpid::cluster
 

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Connection.h?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Connection.h Tue Jun 21 19:58:01 2011
@@ -155,7 +155,6 @@ class Connection :
     void queuePosition(const std::string&, const framing::SequenceNumber&);
     void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count);
     void queueObserverState(const std::string&, const std::string&, const framing::FieldTable&);
-    void expiryId(uint64_t);
 
     void txStart();
     void txAccept(const framing::SequenceSet&);
@@ -192,6 +191,10 @@ class Connection :
 
     void doCatchupIoCallbacks();
 
+    void clock(uint64_t time);
+
+    void queueDequeueSincePurgeState(const std::string&, uint32_t);
+
   private:
     struct NullFrameHandler : public framing::FrameHandler {
         void handle(framing::AMQFrame&) {}

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp Tue Jun 21 19:58:01 2011
@@ -21,106 +21,21 @@
 
 #include "qpid/broker/Message.h"
 #include "qpid/cluster/ExpiryPolicy.h"
-#include "qpid/cluster/Multicaster.h"
-#include "qpid/framing/ClusterMessageExpiredBody.h"
+#include "qpid/cluster/Cluster.h"
 #include "qpid/sys/Time.h"
-#include "qpid/sys/Timer.h"
 #include "qpid/log/Statement.h"
 
 namespace qpid {
 namespace cluster {
 
-ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, sys::Timer& t)
-    : expiryId(1), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {}
+ExpiryPolicy::ExpiryPolicy(Cluster& cluster) : cluster(cluster) {}
 
-struct ExpiryTask : public sys::TimerTask {
-    ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when)
-        : TimerTask(when,"ExpiryPolicy"), expiryPolicy(policy), expiryId(id) {}
-    void fire() { expiryPolicy->sendExpire(expiryId); }
-    boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
-    const uint64_t expiryId;
-};
-
-// Called while receiving an update
-void ExpiryPolicy::setId(uint64_t id) {
-    sys::Mutex::ScopedLock l(lock);
-    expiryId = id;
-}
-
-// Called while giving an update
-uint64_t ExpiryPolicy::getId() const {
-    sys::Mutex::ScopedLock l(lock);
-    return expiryId;
-}
-
-// Called in enqueuing connection thread
-void ExpiryPolicy::willExpire(broker::Message& m) {
-    uint64_t id;
-    {
-        // When messages are fanned out to multiple queues, update sends
-        // them as independenty messages so we can have multiple messages
-        // with the same expiry ID.
-        //
-        sys::Mutex::ScopedLock l(lock);
-        id = expiryId++;
-        if (!id) {              // This is an update of an already-expired message.
-            m.setExpiryPolicy(expiredPolicy);
-        }
-        else {
-            assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end());
-            // If this is an update, the id may already exist
-            unexpiredById.insert(IdMessageMap::value_type(id, &m));
-            unexpiredByMessage[&m] = id;
-        }
-    }
-    timer.add(new ExpiryTask(this, id, m.getExpiration()));
-}
-
-// Called in dequeueing connection thread
-void ExpiryPolicy::forget(broker::Message& m) {
-    sys::Mutex::ScopedLock l(lock);
-    MessageIdMap::iterator i = unexpiredByMessage.find(&m);
-    assert(i != unexpiredByMessage.end());
-    unexpiredById.erase(i->second);
-    unexpiredByMessage.erase(i);
-}
-
-// Called in dequeueing connection or cleanup thread.
 bool ExpiryPolicy::hasExpired(broker::Message& m) {
-    sys::Mutex::ScopedLock l(lock);
-    return unexpiredByMessage.find(&m) == unexpiredByMessage.end();
-}
-
-// Called in timer thread
-void ExpiryPolicy::sendExpire(uint64_t id) {
-    {
-        sys::Mutex::ScopedLock l(lock);
-        // Don't multicast an expiry notice if message is already forgotten.
-        if (unexpiredById.find(id) == unexpiredById.end()) return;
-    }
-    mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId);
+    return m.getExpiration() < cluster.getClusterTime();
 }
 
-// Called in CPG deliver thread.
-void ExpiryPolicy::deliverExpire(uint64_t id) {
-    sys::Mutex::ScopedLock l(lock);
-    std::pair<IdMessageMap::iterator, IdMessageMap::iterator> expired = unexpiredById.equal_range(id);
-    IdMessageMap::iterator i = expired.first;
-    while (i != expired.second) {
-        i->second->setExpiryPolicy(expiredPolicy); // hasExpired() == true; 
-        unexpiredByMessage.erase(i->second);
-        unexpiredById.erase(i++);
-    }
+sys::AbsTime ExpiryPolicy::getCurrentTime() {
+    return cluster.getClusterTime();
 }
 
-// Called in update thread on the updater.
-boost::optional<uint64_t> ExpiryPolicy::getId(broker::Message& m) {
-    sys::Mutex::ScopedLock l(lock);
-    MessageIdMap::iterator i = unexpiredByMessage.find(&m);
-    return i == unexpiredByMessage.end() ? boost::optional<uint64_t>() : i->second;
-}
-
-bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; }
-void ExpiryPolicy::Expired::willExpire(broker::Message&) { }
-
 }} // namespace qpid::cluster

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h Tue Jun 21 19:58:01 2011
@@ -36,12 +36,8 @@ namespace broker {
 class Message;
 }
 
-namespace sys {
-class Timer;
-}
-
 namespace cluster {
-class Multicaster;
+class Cluster;
 
 /**
  * Cluster expiry policy
@@ -49,43 +45,13 @@ class Multicaster;
 class ExpiryPolicy : public broker::ExpiryPolicy
 {
   public:
-    ExpiryPolicy(Multicaster&, const MemberId&, sys::Timer&);
+    ExpiryPolicy(Cluster& cluster);
 
-    void willExpire(broker::Message&);
     bool hasExpired(broker::Message&);
-    void forget(broker::Message&);
-
-    // Send expiration notice to cluster.
-    void sendExpire(uint64_t);
-
-    // Cluster delivers expiry notice.
-    void deliverExpire(uint64_t);
+    qpid::sys::AbsTime getCurrentTime();
 
-    void setId(uint64_t id);
-    uint64_t getId() const;
-    
-    boost::optional<uint64_t> getId(broker::Message&);
-    
   private:
-    typedef std::map<broker::Message*,  uint64_t> MessageIdMap;
-    // When messages are fanned out to multiple queues, update sends
-    // them as independenty messages so we can have multiple messages
-    // with the same expiry ID.
-    typedef std::multimap<uint64_t, broker::Message*> IdMessageMap;
-
-    struct Expired : public broker::ExpiryPolicy {
-        bool hasExpired(broker::Message&);
-        void willExpire(broker::Message&);
-    };
-
-    mutable sys::Mutex lock;
-    MessageIdMap unexpiredByMessage;
-    IdMessageMap unexpiredById;
-    uint64_t expiryId;
-    boost::intrusive_ptr<Expired> expiredPolicy;
-    Multicaster& mcast;
-    MemberId memberId;
-    sys::Timer& timer;
+    Cluster& cluster;
 };
 
 }} // namespace qpid::cluster

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp Tue Jun 21 19:58:01 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,8 +39,10 @@ using namespace broker;
 using namespace framing;
 
 const string FailoverExchange::typeName("amq.failover");
-    
-FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b)  : Exchange(typeName, parent, b ) {
+
+FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b)
+    : Exchange(typeName, parent, b ), ready(false)
+{
     if (mgmtExchange != 0)
         mgmtExchange->set_type(typeName);
 }
@@ -53,16 +55,17 @@ void FailoverExchange::setUrls(const vec
 void FailoverExchange::updateUrls(const vector<Url>& u) {
     Lock l(lock);
     urls=u;
-    if (urls.empty()) return;
-    std::for_each(queues.begin(), queues.end(),
-                  boost::bind(&FailoverExchange::sendUpdate, this, _1));
+    if (ready && !urls.empty()) {
+        std::for_each(queues.begin(), queues.end(),
+                      boost::bind(&FailoverExchange::sendUpdate, this, _1));
+    }
 }
 
 string FailoverExchange::getType() const { return typeName; }
 
 bool FailoverExchange::bind(Queue::shared_ptr queue, const string&, const framing::FieldTable*) {
     Lock l(lock);
-    sendUpdate(queue);
+    if (ready) sendUpdate(queue);
     return queues.insert(queue).second;
 }
 
@@ -84,7 +87,7 @@ void FailoverExchange::sendUpdate(const 
     // Called with lock held.
     if (urls.empty()) return;
     framing::Array array(0x95);
-    for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i) 
+    for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i)
         array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str())));
     const ProtocolVersion v;
     boost::intrusive_ptr<Message> msg(new Message);
@@ -96,9 +99,12 @@ void FailoverExchange::sendUpdate(const 
     header.get<MessageProperties>(true)->getApplicationHeaders().setArray(typeName, array);
     AMQFrame headerFrame(header);
     headerFrame.setFirstSegment(false);
-    msg->getFrames().append(headerFrame);    
+    msg->getFrames().append(headerFrame);
     DeliverableMessage(msg).deliverTo(queue);
 }
 
+void FailoverExchange::setReady() {
+    ready = true;
+}
 
 }} // namespace cluster

Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/FailoverExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/FailoverExchange.h?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/FailoverExchange.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/FailoverExchange.h Tue Jun 21 19:58:01 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -46,6 +46,8 @@ class FailoverExchange : public broker::
     void setUrls(const std::vector<Url>&);
     /** Set the URLs and send an update.*/
     void updateUrls(const std::vector<Url>&);
+    /** Flag the failover exchange as ready to generate updates (caught up) */
+    void setReady();
 
     // Exchange overrides
     std::string getType() const;
@@ -56,7 +58,7 @@ class FailoverExchange : public broker::
 
   private:
     void sendUpdate(const boost::shared_ptr<broker::Queue>&);
-    
+
     typedef sys::Mutex::ScopedLock Lock;
     typedef std::vector<Url> Urls;
     typedef std::set<boost::shared_ptr<broker::Queue> > Queues;
@@ -64,7 +66,7 @@ class FailoverExchange : public broker::
     sys::Mutex lock;
     Urls urls;
     Queues queues;
-    
+    bool ready;
 };
 }} // namespace qpid::cluster
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org