You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2009/10/12 01:22:20 UTC

svn commit: r824198 [3/9] - in /qpid/branches/java-network-refactor: ./ qpid/cpp/ qpid/cpp/bindings/qmf/ qpid/cpp/bindings/qmf/python/ qpid/cpp/bindings/qmf/ruby/ qpid/cpp/bindings/qmf/tests/ qpid/cpp/boost-1.32-support/ qpid/cpp/etc/ qpid/cpp/examples...

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/PersistableMessage.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/PersistableMessage.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/PersistableMessage.cpp Sun Oct 11 23:22:08 2009
@@ -36,7 +36,6 @@
 PersistableMessage::PersistableMessage() :
     asyncEnqueueCounter(0), 
     asyncDequeueCounter(0),
-    contentReleased(false),
     store(0)
 {}
 
@@ -59,9 +58,15 @@
     } 
 }
 
-void PersistableMessage::setContentReleased() {contentReleased = true; }
+void PersistableMessage::setContentReleased()
+{
+    contentReleaseState.released = true;
+}
 
-bool PersistableMessage::isContentReleased()const { return contentReleased; }
+bool PersistableMessage::isContentReleased() const
+{ 
+    return contentReleaseState.released;
+}
        
 bool PersistableMessage::isEnqueueComplete() {
     sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
@@ -153,6 +158,26 @@
     asyncDequeueCounter++; 
 }
 
+PersistableMessage::ContentReleaseState::ContentReleaseState() : blocked(false), requested(false), released(false) {}
+
+void PersistableMessage::setStore(MessageStore* s)
+{
+    store = s;
+}
+
+void PersistableMessage::requestContentRelease()
+{
+    contentReleaseState.requested = true;
+}
+void PersistableMessage::blockContentRelease()
+{ 
+    contentReleaseState.blocked = true;
+}
+bool PersistableMessage::checkContentReleasable()
+{ 
+    return contentReleaseState.requested && !contentReleaseState.blocked;
+}
+
 }}
 
 

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/PersistableMessage.h Sun Oct 11 23:22:08 2009
@@ -68,8 +68,16 @@
     void enqueueAsync();
     void dequeueAsync();
 
-    bool contentReleased;
     syncList synclist;
+    struct ContentReleaseState
+    {
+        bool blocked;
+        bool requested;
+        bool released;
+        
+        ContentReleaseState();
+    };
+    ContentReleaseState contentReleaseState;
 
   protected:
     /** Called when all enqueues are complete for this message. */
@@ -96,8 +104,15 @@
 
     void flush();
     
-    bool isContentReleased() const;
-       
+    bool QPID_BROKER_EXTERN isContentReleased() const;
+
+    void QPID_BROKER_EXTERN setStore(MessageStore*);
+    void requestContentRelease();
+    void blockContentRelease();
+    bool checkContentReleasable();
+
+    virtual QPID_BROKER_EXTERN bool isPersistent() const = 0;
+
     QPID_BROKER_EXTERN bool isEnqueueComplete();
 
     QPID_BROKER_EXTERN void enqueueComplete();

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/Queue.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/Queue.cpp Sun Oct 11 23:22:08 2009
@@ -181,6 +181,8 @@
 
 
 void Queue::recover(boost::intrusive_ptr<Message>& msg){
+    if (policy.get()) policy->recoverEnqueued(msg);
+
     push(msg, true);
     if (store){ 
         // setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure
@@ -206,11 +208,10 @@
 }
 
 void Queue::requeue(const QueuedMessage& msg){
-    if (!isEnqueued(msg)) return;
-
     QueueListeners::NotificationSet copy;
     {    
         Mutex::ScopedLock locker(messageLock);
+        if (!isEnqueued(msg)) return;
         msg.payload->enqueueComplete(); // mark the message as enqueued
         messages.push_front(msg);
         listeners.populate(copy);
@@ -563,16 +564,10 @@
 }
 
 void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
-    Messages dequeues;
     QueueListeners::NotificationSet copy;
     {
         Mutex::ScopedLock locker(messageLock);   
         QueuedMessage qm(this, msg, ++sequence);
-        if (policy.get()) {
-            policy->tryEnqueue(qm);
-            //depending on policy, may have some dequeues
-            if (!isRecovery) pendingDequeues.swap(dequeues);
-        }
         if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
          
         LVQ::iterator i;
@@ -606,12 +601,11 @@
             if (eventMgr) eventMgr->enqueued(qm);
             else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName());
         }
+        if (policy.get()) {
+            policy->enqueued(qm);
+        }
     }
     copy.notify();
-    if (!dequeues.empty()) {
-        //depending on policy, may have some dequeues
-        for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
-    }
 }
 
 QueuedMessage Queue::getFront()
@@ -697,8 +691,19 @@
 }
 
 // return true if store exists, 
-bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
+bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg, bool suppressPolicyCheck)
 {
+    if (policy.get() && !suppressPolicyCheck) {
+        Messages dequeues;
+        {
+            Mutex::ScopedLock locker(messageLock);
+            policy->tryEnqueue(msg);
+            policy->getPendingDequeues(dequeues);
+        }
+        //depending on policy, may have some dequeues that need to performed without holding the lock
+        for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));        
+    }
+
     if (inLastNodeFailure && persistLastNode){
         msg->forcePersistent();
     }
@@ -707,15 +712,27 @@
         msg->addTraceId(traceId);
     }
 
-    if (msg->isPersistent() && store) {
+    if ((msg->isPersistent() || msg->checkContentReleasable()) && store) {
         msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
         boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
         store->enqueue(ctxt, pmsg, *this);
         return true;
     }
+    if (!store) {
+        //Messages enqueued on a transient queue should be prevented
+        //from having their content released as it may not be
+        //recoverable by these queue for delivery
+        msg->blockContentRelease();
+    }
     return false;
 }
 
+void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
+{
+    Mutex::ScopedLock locker(messageLock);
+    if (policy.get()) policy->enqueueAborted(msg);       
+}
+
 // return true if store exists, 
 bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
 {
@@ -726,7 +743,7 @@
             dequeued(msg);
         }
     }
-    if (msg.payload->isPersistent() && store) {
+    if ((msg.payload->isPersistent() || msg.payload->checkContentReleasable()) && store) {
         msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
         boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
         store->dequeue(ctxt, pmsg, *this);
@@ -781,22 +798,37 @@
 
 void Queue::configure(const FieldTable& _settings, bool recovering)
 {
-    setPolicy(QueuePolicy::createQueuePolicy(_settings));
+
+    eventMode = _settings.getAsInt(qpidQueueEventGeneration);
+
+    if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK && 
+        (!store || NullMessageStore::isNullStore(store) || (eventMode && eventMgr && !eventMgr->isSync()) )) {
+        if ( NullMessageStore::isNullStore(store)) {
+            QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName());
+        } else if (eventMgr && !eventMgr->isSync() ) {
+            QPID_LOG(warning, "Flow to disk not valid with async Queue Events:" << getName());
+        }
+        FieldTable copy(_settings);
+        copy.erase(QueuePolicy::typeKey);
+        setPolicy(QueuePolicy::createQueuePolicy(getName(), copy));
+    } else {
+        setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings));
+    }
     //set this regardless of owner to allow use of no-local with exclusive consumers also
     noLocal = _settings.get(qpidNoLocal);
-    QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
+    QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal);
 
     lastValueQueue= _settings.get(qpidLastValueQueue);
-    if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue");
+    if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue for: " << getName());
 
     lastValueQueueNoBrowse = _settings.get(qpidLastValueQueueNoBrowse);
     if (lastValueQueueNoBrowse){
-        QPID_LOG(debug, "Configured queue as Last Value Queue No Browse");
+        QPID_LOG(debug, "Configured queue as Last Value Queue No Browse for: " << getName());
         lastValueQueue = lastValueQueueNoBrowse;
     }
     
     persistLastNode= _settings.get(qpidPersistLastNode);
-    if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node");
+    if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName());
 
     traceId = _settings.getAsString(qpidTraceIdentity);
     std::string excludeList = _settings.getAsString(qpidTraceExclude);
@@ -806,8 +838,6 @@
     QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId 
              << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements");
 
-    eventMode = _settings.getAsInt(qpidQueueEventGeneration);
-
     FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers);
     if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
 
@@ -975,19 +1005,6 @@
     }
 }
 
-bool Queue::releaseMessageContent(const QueuedMessage& m)
-{
-    if (store && !NullMessageStore::isNullStore(store)) {
-        QPID_LOG(debug, "Message " << m.position << " on " << name << " released from memory");
-        m.payload->releaseContent(store);
-        return true;
-    } else {
-        QPID_LOG(warning, "Message " << m.position << " on " << name
-                 << " cannot be released from memory as the queue is not durable");
-        return false;
-    }    
-}
-
 ManagementObject* Queue::GetManagementObject (void) const
 {
     return (ManagementObject*) mgmtObject;
@@ -1044,11 +1061,12 @@
 void Queue::enqueued(const QueuedMessage& m)
 {
     if (m.payload) {
-        if (policy.get()) policy->tryEnqueue(m);
-        mgntEnqStats(m.payload);
-        if (m.payload->isPersistent()) {
-            enqueue ( 0, m.payload );
+        if (policy.get()) {
+            policy->recoverEnqueued(m.payload);
+            policy->enqueued(m);
         }
+        mgntEnqStats(m.payload);
+        enqueue ( 0, m.payload, true );
     } else {
         QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
     }
@@ -1059,10 +1077,4 @@
     return !policy.get() || policy->isEnqueued(msg);
 }
 
-void Queue::addPendingDequeue(const QueuedMessage& msg)
-{
-    //assumes lock is held - true at present but rather nasty as this is a public method
-    pendingDequeues.push_back(msg);    
-}
-
 QueueListeners& Queue::getListeners() { return listeners; }

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/Queue.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/Queue.h Sun Oct 11 23:22:08 2009
@@ -239,7 +239,8 @@
             QPID_BROKER_EXTERN void setLastNodeFailure();
             QPID_BROKER_EXTERN void clearLastNodeFailure();
 
-            bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg);
+            bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg, bool suppressPolicyCheck = false);
+            void enqueueAborted(boost::intrusive_ptr<Message> msg);
             /**
              * dequeue from store (only done once messages is acknowledged)
              */
@@ -315,8 +316,6 @@
                 bindings.eachBinding(f);
             }
 
-            bool releaseMessageContent(const QueuedMessage&);
-
             void popMsg(QueuedMessage& qmsg);
 
             /** Set the position sequence number  for the next message on the queue.
@@ -335,18 +334,6 @@
              */
             void recoveryComplete();
 
-            /**
-             * This is a hack to avoid deadlocks in durable ring
-             * queues. It is used for dequeueing messages in response
-             * to an enqueue while avoid holding lock over call to
-             * store.
-             * 
-             * Assumes messageLock is held - true for curent use case
-             * (QueuePolicy::tryEnqueue()) but rather nasty as this is a public
-             * method
-             **/
-            void addPendingDequeue(const QueuedMessage &msg);
-
             // For cluster update
             QueueListeners& getListeners();
         };

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueueEvents.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueueEvents.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueueEvents.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueueEvents.cpp Sun Oct 11 23:22:08 2009
@@ -25,25 +25,41 @@
 namespace qpid {
 namespace broker {
 
-QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller) : 
-    eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true) 
+QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller, bool isSync) : 
+    eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true), sync(isSync) 
 {
-    eventQueue.start();
+    if (!sync) eventQueue.start();
 }
 
 QueueEvents::~QueueEvents() 
 {
-    eventQueue.stop();
+    if (!sync) eventQueue.stop();
 }
 
 void QueueEvents::enqueued(const QueuedMessage& m)
 {
-    if (enabled) eventQueue.push(Event(ENQUEUE, m));
+    if (enabled) {
+        Event enq(ENQUEUE, m);
+        if (sync) {
+            for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) 
+                j->second(enq);
+        } else {
+            eventQueue.push(enq);
+        }
+    }
 }
 
 void QueueEvents::dequeued(const QueuedMessage& m)
 {
-    if (enabled) eventQueue.push(Event(DEQUEUE, m));
+    if (enabled) {
+        Event deq(DEQUEUE, m);
+        if (sync) {
+            for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) 
+                j->second(deq);
+        } else {
+            eventQueue.push(Event(DEQUEUE, m));
+        }
+    }
 }
 
 void QueueEvents::registerListener(const std::string& id, const EventListener& listener)
@@ -70,15 +86,16 @@
 QueueEvents::handle(const EventQueue::Batch& events) {
     qpid::sys::Mutex::ScopedLock l(lock);
     for (EventQueue::Batch::const_iterator i = events.begin(); i != events.end(); ++i) {
-        for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) 
-            j->second(*i);
+        for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) {
+             j->second(*i);
+        }
     }
     return events.end();
 }
 
 void QueueEvents::shutdown()
 {
-    if (!eventQueue.empty() && !listeners.empty()) eventQueue.shutdown();
+    if (!sync && !eventQueue.empty() && !listeners.empty()) eventQueue.shutdown();
 }
 
 void QueueEvents::enable()
@@ -93,6 +110,12 @@
     QPID_LOG(debug, "Queue events disabled");
 }
 
+bool QueueEvents::isSync()
+{
+    return sync;
+}
+
+
 QueueEvents::Event::Event(EventType t, const QueuedMessage& m) : type(t), msg(m) {}
 
 

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueueEvents.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueueEvents.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueueEvents.h (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueueEvents.h Sun Oct 11 23:22:08 2009
@@ -54,7 +54,7 @@
 
     typedef boost::function<void (Event)> EventListener;
 
-    QPID_BROKER_EXTERN QueueEvents(const boost::shared_ptr<sys::Poller>& poller);
+    QPID_BROKER_EXTERN QueueEvents(const boost::shared_ptr<sys::Poller>& poller, bool isSync = false);
     QPID_BROKER_EXTERN ~QueueEvents();
     QPID_BROKER_EXTERN void enqueued(const QueuedMessage&);
     QPID_BROKER_EXTERN void dequeued(const QueuedMessage&);
@@ -65,6 +65,7 @@
     void disable();
     //process all outstanding events
     QPID_BROKER_EXTERN void shutdown();
+    QPID_BROKER_EXTERN bool isSync();
   private:
     typedef qpid::sys::PollableQueue<Event> EventQueue;
     typedef std::map<std::string, EventListener> Listeners;
@@ -73,6 +74,7 @@
     Listeners listeners;
     volatile bool enabled;
     qpid::sys::Mutex lock;//protect listeners from concurrent access
+    bool sync;
     
     EventQueue::Batch::const_iterator handle(const EventQueue::Batch& e);
 

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Sun Oct 11 23:22:08 2009
@@ -28,8 +28,8 @@
 using namespace qpid::broker;
 using namespace qpid::framing;
 
-QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : 
-    maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false) {}
+QueuePolicy::QueuePolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : 
+    maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), name(_name) {}
 
 void QueuePolicy::enqueued(uint64_t _size)
 {
@@ -39,18 +39,15 @@
 
 void QueuePolicy::dequeued(uint64_t _size)
 {
-    //Note: underflow detection is not reliable in the face of
-    //concurrent updates (at present locking in Queue.cpp prevents
-    //these anyway); updates are atomic and are safe regardless.
     if (maxCount) {
-        if (count.get() > 0) {
+        if (count > 0) {
             --count;
         } else {
             throw Exception(QPID_MSG("Attempted count underflow on dequeue(" << _size << "): " << *this));
         }
     }
     if (maxSize) {
-        if (_size > size.get()) {
+        if (_size > size) {
             throw Exception(QPID_MSG("Attempted size underflow on dequeue(" << _size << "): " << *this));
         } else {
             size -= _size;
@@ -58,47 +55,47 @@
     }
 }
 
-bool QueuePolicy::checkLimit(const QueuedMessage& m)
+bool QueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
 {
-    bool sizeExceeded = maxSize && (size.get() + m.payload->contentSize()) > maxSize;
-    bool countExceeded = maxCount && (count.get() + 1) > maxCount;
+    bool sizeExceeded = maxSize && (size + m->contentSize()) > maxSize;
+    bool countExceeded = maxCount && (count + 1) > maxCount;
     bool exceeded = sizeExceeded || countExceeded;
     if (exceeded) {
         if (!policyExceeded) {
-            policyExceeded = true;
-            if (m.queue) {
-                if (sizeExceeded) QPID_LOG(info, "Queue cumulative message size exceeded policy for " << m.queue->getName());
-                if (countExceeded) QPID_LOG(info, "Queue message count exceeded policy for " << m.queue->getName());
-            }
+            policyExceeded = true;            
+            if (sizeExceeded) QPID_LOG(info, "Queue cumulative message size exceeded policy for " << name);
+            if (countExceeded) QPID_LOG(info, "Queue message count exceeded policy for " << name);
         }
     } else {
         if (policyExceeded) {
             policyExceeded = false;
-            if (m.queue) {
-                QPID_LOG(info, "Queue cumulative message size and message count within policy for " << m.queue->getName());
-            }
+            QPID_LOG(info, "Queue cumulative message size and message count within policy for " << name);
         }
     }
     return !exceeded;
 }
 
-void QueuePolicy::tryEnqueue(const QueuedMessage& m)
+void QueuePolicy::tryEnqueue(boost::intrusive_ptr<Message> m)
 {
     if (checkLimit(m)) {
-        enqueued(m);
+        enqueued(m->contentSize());
     } else {
-        std::string queue = m.queue ? m.queue->getName() : std::string("unknown queue");
-        throw ResourceLimitExceededException(
-            QPID_MSG("Policy exceeded on " << queue << " by message " << m.position 
-                     << " of size " << m.payload->contentSize() << " , policy: " << *this));
+        throw ResourceLimitExceededException(QPID_MSG("Policy exceeded on " << name << ", policy: " << *this));
     }
 }
 
-void QueuePolicy::enqueued(const QueuedMessage& m)
+void QueuePolicy::recoverEnqueued(boost::intrusive_ptr<Message> m)
 {
-    enqueued(m.payload->contentSize());
+    enqueued(m->contentSize());
 }
 
+void QueuePolicy::enqueueAborted(boost::intrusive_ptr<Message> m)
+{
+    dequeued(m->contentSize());
+}
+
+void QueuePolicy::enqueued(const QueuedMessage&) {}
+
 void QueuePolicy::dequeued(const QueuedMessage& m)
 {
     dequeued(m.payload->contentSize());
@@ -132,7 +129,7 @@
         std::transform(t.begin(), t.end(), t.begin(), tolower);        
         if (t == REJECT || t == FLOW_TO_DISK || t == RING || t == RING_STRICT) return t;
     }
-    return FLOW_TO_DISK;
+    return REJECT;
 }
 
 void QueuePolicy::setDefaultMaxSize(uint64_t s)
@@ -140,6 +137,7 @@
     defaultMaxSize = s;
 }
 
+void QueuePolicy::getPendingDequeues(Messages&) {}
 
 
 
@@ -148,8 +146,8 @@
 {
   buffer.putLong(maxCount);
   buffer.putLongLong(maxSize);
-  buffer.putLong(count.get());
-  buffer.putLongLong(size.get());
+  buffer.putLong(count);
+  buffer.putLongLong(size);
 }
 
 void QueuePolicy::decode ( Buffer& buffer ) 
@@ -179,16 +177,18 @@
 const std::string QueuePolicy::RING_STRICT("ring_strict");
 uint64_t QueuePolicy::defaultMaxSize(0);
 
-FlowToDiskPolicy::FlowToDiskPolicy(uint32_t _maxCount, uint64_t _maxSize) : 
-    QueuePolicy(_maxCount, _maxSize, FLOW_TO_DISK) {}
+FlowToDiskPolicy::FlowToDiskPolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize) : 
+    QueuePolicy(_name, _maxCount, _maxSize, FLOW_TO_DISK) {}
 
-bool FlowToDiskPolicy::checkLimit(const QueuedMessage& m)
+bool FlowToDiskPolicy::checkLimit(boost::intrusive_ptr<Message> m)
 {
-    return QueuePolicy::checkLimit(m) || m.queue->releaseMessageContent(m);
+    if (!QueuePolicy::checkLimit(m)) m->requestContentRelease(); 
+    return true;
 }
 
-RingQueuePolicy::RingQueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : 
-    QueuePolicy(_maxCount, _maxSize, _type), strict(_type == RING_STRICT) {}
+RingQueuePolicy::RingQueuePolicy(const std::string& _name, 
+                                 uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : 
+    QueuePolicy(_name, _maxCount, _maxSize, _type), strict(_type == RING_STRICT) {}
 
 bool before(const QueuedMessage& a, const QueuedMessage& b)
 {
@@ -197,15 +197,12 @@
 
 void RingQueuePolicy::enqueued(const QueuedMessage& m)
 {
-    QueuePolicy::enqueued(m);
-    qpid::sys::Mutex::ScopedLock l(lock);
     //need to insert in correct location based on position
     queue.insert(lower_bound(queue.begin(), queue.end(), m, before), m);
 }
 
 void RingQueuePolicy::dequeued(const QueuedMessage& m)
 {
-    qpid::sys::Mutex::ScopedLock l(lock);
     //find and remove m from queue
     if (find(m, pendingDequeues, true) || find(m, queue, true)) {
         //now update count and size
@@ -215,49 +212,32 @@
 
 bool RingQueuePolicy::isEnqueued(const QueuedMessage& m)
 {
-    qpid::sys::Mutex::ScopedLock l(lock);
     //for non-strict ring policy, a message can be replaced (and
     //therefore dequeued) before it is accepted or released by
     //subscriber; need to detect this
     return find(m, pendingDequeues, false) || find(m, queue, false); 
 }
 
-bool RingQueuePolicy::checkLimit(const QueuedMessage& m)
+bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
 {
     if (QueuePolicy::checkLimit(m)) return true;//if haven't hit limit, ok to accept
     
     QueuedMessage oldest;
-    {
-        qpid::sys::Mutex::ScopedLock l(lock);
-        if (queue.empty()) {
-            QPID_LOG(debug, "Message too large for ring queue " 
-                     << (m.queue ? m.queue->getName() : std::string("unknown queue")) 
-                     << " [" << *this  << "] "
-                     << ": message size = " << m.payload->contentSize() << " bytes");
-            return false;
-        }
-        oldest = queue.front();
+    if (queue.empty()) {
+        QPID_LOG(debug, "Message too large for ring queue " << name 
+                 << " [" << *this  << "] "
+                 << ": message size = " << m->contentSize() << " bytes");
+        return false;
     }
+    oldest = queue.front();
     if (oldest.queue->acquire(oldest) || !strict) {
-        {
-            //TODO: fix this!  In the current code, this method is
-            //only ever called with the Queue lock already taken. This
-            //should not be relied upon going forward however and
-            //clearly the locking in this class is insufficient as
-            //there is no guarantee that the message previously atthe
-            //front is still there.
-            qpid::sys::Mutex::ScopedLock l(lock);
-            queue.pop_front();
-            pendingDequeues.push_back(oldest);
-        }
-        oldest.queue->addPendingDequeue(oldest);
-        QPID_LOG(debug, "Ring policy triggered in queue " 
-                 << (m.queue ? m.queue->getName() : std::string("unknown queue"))
-                 << ": removed message " << oldest.position << " to make way for " << m.position);
+        queue.pop_front();
+        pendingDequeues.push_back(oldest);
+        QPID_LOG(debug, "Ring policy triggered in " << name 
+                 << ": removed message " << oldest.position << " to make way for new message");
         return true;
     } else {
-        QPID_LOG(debug, "Ring policy could not be triggered in queue " 
-                 << (m.queue ? m.queue->getName() : std::string("unknown queue")) 
+        QPID_LOG(debug, "Ring policy could not be triggered in " << name 
                  << ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued");
         //in strict mode, if oldest message has been delivered (hence
         //cannot be acquired) but not yet acked, it should not be
@@ -266,6 +246,11 @@
     }
 }
 
+void RingQueuePolicy::getPendingDequeues(Messages& result)
+{
+    result = pendingDequeues;
+}
+
 bool RingQueuePolicy::find(const QueuedMessage& m, Messages& q, bool remove)
 {
     for (Messages::iterator i = q.begin(); i != q.end(); i++) {
@@ -277,25 +262,36 @@
     return false;
 }
 
+std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type)
+{
+    return createQueuePolicy("<unspecified>", maxCount, maxSize, type);
+}
+
 std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const qpid::framing::FieldTable& settings)
 {
+    return createQueuePolicy("<unspecified>", settings);
+}
+
+std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings)
+{
     uint32_t maxCount = getInt(settings, maxCountKey, 0);
     uint32_t maxSize = getInt(settings, maxSizeKey, defaultMaxSize);
     if (maxCount || maxSize) {
-        return createQueuePolicy(maxCount, maxSize, getType(settings));
+        return createQueuePolicy(name, maxCount, maxSize, getType(settings));
     } else {
         return std::auto_ptr<QueuePolicy>();
     }
 }
 
-std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type)
+std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, 
+                                                          uint32_t maxCount, uint64_t maxSize, const std::string& type)
 {
     if (type == RING || type == RING_STRICT) {
-        return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(maxCount, maxSize, type));
+        return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(name, maxCount, maxSize, type));
     } else if (type == FLOW_TO_DISK) {
-        return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(maxCount, maxSize));
+        return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(name, maxCount, maxSize));
     } else {
-        return std::auto_ptr<QueuePolicy>(new QueuePolicy(maxCount, maxSize, type));
+        return std::auto_ptr<QueuePolicy>(new QueuePolicy(name, maxCount, maxSize, type));
     }
 
 }
@@ -305,10 +301,10 @@
 
 std::ostream& operator<<(std::ostream& out, const QueuePolicy& p)
 {
-    if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size.get();
+    if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size;
     else out << "size: unlimited";
     out << "; ";
-    if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count.get();
+    if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count;
     else out << "count: unlimited";    
     out << "; type=" << p.type;
     return out;

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueuePolicy.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueuePolicy.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueuePolicy.h (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/QueuePolicy.h Sun Oct 11 23:22:08 2009
@@ -40,14 +40,14 @@
     uint32_t maxCount;
     uint64_t maxSize;
     const std::string type;
-    qpid::sys::AtomicValue<uint32_t> count;
-    qpid::sys::AtomicValue<uint64_t> size;
+    uint32_t count;
+    uint64_t size;
     bool policyExceeded;
             
     static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue);
-    static std::string getType(const qpid::framing::FieldTable& settings);
 
   public:
+    typedef std::deque<QueuedMessage> Messages;
     static QPID_BROKER_EXTERN const std::string maxCountKey;
     static QPID_BROKER_EXTERN const std::string maxSizeKey;
     static QPID_BROKER_EXTERN const std::string typeKey;
@@ -57,27 +57,34 @@
     static QPID_BROKER_EXTERN const std::string RING_STRICT;            
 
     virtual ~QueuePolicy() {}
-    QPID_BROKER_EXTERN void tryEnqueue(const QueuedMessage&);
+    QPID_BROKER_EXTERN void tryEnqueue(boost::intrusive_ptr<Message> msg);
+    QPID_BROKER_EXTERN void recoverEnqueued(boost::intrusive_ptr<Message> msg);
+    QPID_BROKER_EXTERN void enqueueAborted(boost::intrusive_ptr<Message> msg);
+    virtual void enqueued(const QueuedMessage&);
     virtual void dequeued(const QueuedMessage&);
     virtual bool isEnqueued(const QueuedMessage&);
-    virtual bool checkLimit(const QueuedMessage&);
     QPID_BROKER_EXTERN void update(qpid::framing::FieldTable& settings);
     uint32_t getMaxCount() const { return maxCount; }
     uint64_t getMaxSize() const { return maxSize; }           
     void encode(framing::Buffer& buffer) const;
     void decode ( framing::Buffer& buffer );
     uint32_t encodedSize() const;
+    virtual void getPendingDequeues(Messages& result);
 
-
+    static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings);
+    static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
     static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const qpid::framing::FieldTable& settings);
     static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
+    static std::string getType(const qpid::framing::FieldTable& settings);
     static void setDefaultMaxSize(uint64_t);
     friend QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&,
                                                        const QueuePolicy&);
   protected:
-    QueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
+    const std::string name;
 
-    virtual void enqueued(const QueuedMessage&);
+    QueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
+
+    virtual bool checkLimit(boost::intrusive_ptr<Message> msg);
     void enqueued(uint64_t size);
     void dequeued(uint64_t size);
 };
@@ -86,21 +93,20 @@
 class FlowToDiskPolicy : public QueuePolicy
 {
   public:
-    FlowToDiskPolicy(uint32_t maxCount, uint64_t maxSize);
-    bool checkLimit(const QueuedMessage&);
+    FlowToDiskPolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize);
+    bool checkLimit(boost::intrusive_ptr<Message> msg);
 };
 
 class RingQueuePolicy : public QueuePolicy
 {
   public:
-    RingQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = RING);
+    RingQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = RING);
     void enqueued(const QueuedMessage&);
     void dequeued(const QueuedMessage&);
     bool isEnqueued(const QueuedMessage&);
-    bool checkLimit(const QueuedMessage&);
+    bool checkLimit(boost::intrusive_ptr<Message> msg);
+    void getPendingDequeues(Messages& result);
   private:
-    typedef std::deque<QueuedMessage> Messages;
-    qpid::sys::Mutex lock;
     Messages pendingDequeues;
     Messages queue;
     const bool strict;

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SemanticState.cpp Sun Oct 11 23:22:08 2009
@@ -65,7 +65,7 @@
       tagGenerator("sgen"),
       dtxSelected(false),
       authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()),
-      userID(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@')))
+      userID(getSession().getConnection().getUserId())
 {
     acl = getSession().getBroker().getAcl();
 }
@@ -302,6 +302,18 @@
     return !blocked;
 }
 
+namespace {
+struct ConsumerName {
+    const SemanticState::ConsumerImpl& consumer;
+    ConsumerName(const SemanticState::ConsumerImpl& ci) : consumer(ci) {}
+};
+
+ostream& operator<<(ostream& o, const ConsumerName& pc) {
+    return o << pc.consumer.getName() << " on "
+             << pc.consumer.getParent().getSession().getSessionId();
+}
+}
+
 void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
 {
     uint32_t originalMsgCredit = msgCredit;
@@ -312,7 +324,7 @@
     if (byteCredit != 0xFFFFFFFF) {
         byteCredit -= msg->getRequiredCredit();
     }
-    QPID_LOG(debug, "Credit allocated for '" << name << "' on " << parent
+    QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this)
              << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit
              << " now bytes: " << byteCredit << " msgs: " << msgCredit);
     
@@ -320,15 +332,13 @@
 
 bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
 {
-    if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) {
-        QPID_LOG(debug, "Not enough credit for '" << name  << "' on " << parent 
-                 << ", bytes: " << byteCredit << " msgs: " << msgCredit);
-        return false;
-    } else {
-        QPID_LOG(debug, "Credit available for '" << name << "' on " << parent
-                 << " bytes: " << byteCredit << " msgs: " << msgCredit);
-        return true;
-    }
+    bool enoughCredit = msgCredit > 0 &&
+        (byteCredit == 0xFFFFFFFF || byteCredit >= msg->getRequiredCredit());
+    QPID_LOG(debug, (enoughCredit ? "Sufficient credit for " : "Insufficient credit for ")
+             << ConsumerName(*this)
+             << ", have bytes: " << byteCredit << " msgs: " << msgCredit
+             << ", need " << msg->getRequiredCredit() << " bytes");
+    return enoughCredit;
 }
 
 SemanticState::ConsumerImpl::~ConsumerImpl() {}
@@ -356,6 +366,9 @@
     } else {
         DeliverableMessage deliverable(msg);
         route(msg, deliverable);
+        if (msg->checkContentReleasable()) {
+            msg->releaseContent();
+        }
     }
 }
 

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SemanticState.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SemanticState.h Sun Oct 11 23:22:08 2009
@@ -129,6 +129,7 @@
         const framing::FieldTable& getArguments() const { return arguments; }
 
         SemanticState& getParent() { return *parent; }
+        const SemanticState& getParent() const { return *parent; }
     };
 
   private:
@@ -163,6 +164,7 @@
     ~SemanticState();
 
     SessionContext& getSession() { return session; }
+    const SessionContext& getSession() const { return session; }
 
     ConsumerImpl& find(const std::string& destination);
     

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Sun Oct 11 23:22:08 2009
@@ -337,6 +337,10 @@
         params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
         params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE)));
         params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE)));
+        params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type")));
+        params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count"))));
+        params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size"))));
+
         if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,&params) )
             throw NotAllowedException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId()));
     }
@@ -472,8 +476,7 @@
 
     AclModule* acl = getBroker().getAcl();
     if (acl)
-    {
-        // add flags as needed
+    {        
          if (!acl->authorise(getConnection().getUserId(),acl::ACT_CONSUME,acl::OBJ_QUEUE,queueName,NULL) )
              throw NotAllowedException(QPID_MSG("ACL denied Queue subscribe request from " << getConnection().getUserId()));
     }

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SessionContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SessionContext.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SessionContext.h (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SessionContext.h Sun Oct 11 23:22:08 2009
@@ -28,7 +28,7 @@
 #include "qpid/sys/OutputControl.h"
 #include "qpid/broker/ConnectionState.h"
 #include "qpid/broker/OwnershipToken.h"
-
+#include "qpid/SessionId.h"
 
 #include <boost/noncopyable.hpp>
 
@@ -45,6 +45,7 @@
     virtual framing::AMQP_ClientProxy& getProxy() = 0;
     virtual Broker& getBroker() = 0;
     virtual uint16_t getChannel() const = 0;
+    virtual const SessionId& getSessionId() const = 0;
 };
 
 }} // namespace qpid::broker

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SessionState.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SessionState.h Sun Oct 11 23:22:08 2009
@@ -118,6 +118,8 @@
 
     bool processSendCredit(uint32_t msgs);
 
+    const SessionId& getSessionId() const { return getId(); }
+
   private:
 
     void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SignalHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SignalHandler.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SignalHandler.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SignalHandler.cpp Sun Oct 11 23:22:08 2009
@@ -38,6 +38,8 @@
     signal(SIGCHLD,SIG_IGN); 
 }
 
+void SignalHandler::shutdown() { shutdownHandler(0); }
+
 void SignalHandler::shutdownHandler(int) {
     if (broker.get()) {
         broker->shutdown();

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SignalHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SignalHandler.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SignalHandler.h (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/SignalHandler.h Sun Oct 11 23:22:08 2009
@@ -38,6 +38,9 @@
     /** Set the broker to be shutdown on signals */
     static void setBroker(const boost::intrusive_ptr<Broker>& broker);
 
+    /** Initiate shut-down of broker */
+    static void shutdown();
+
   private:
     static void shutdownHandler(int);
     static boost::intrusive_ptr<Broker> broker;

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TopicExchange.cpp Sun Oct 11 23:22:08 2009
@@ -293,44 +293,23 @@
     return q != qv.end();
 }
 
-void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
+void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
+{
     Binding::vector mb;
+    BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
     PreRoute pr(msg, this);
-    uint32_t count(0);
-
     {
-    RWlock::ScopedRlock l(lock);
-    for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
-        if (match(i->first, routingKey)) {
-            Binding::vector& qv(i->second.bindingVector);
-            for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){
-                mb.push_back(*j);
+        RWlock::ScopedRlock l(lock);
+        for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+            if (match(i->first, routingKey)) {
+                Binding::vector& qv(i->second.bindingVector);
+                for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++){
+                    b->push_back(*j);
+                }
             }
         }
     }
-    }
-    
-    for (Binding::vector::iterator j = mb.begin(); j != mb.end(); ++j) {
-        msg.deliverTo((*j)->queue);
-        if ((*j)->mgmtBinding != 0)
-            (*j)->mgmtBinding->inc_msgMatched ();
-    }
-
-    if (mgmtExchange != 0)
-    {
-        mgmtExchange->inc_msgReceives  ();
-        mgmtExchange->inc_byteReceives (msg.contentSize ());
-        if (count == 0)
-        {
-            mgmtExchange->inc_msgDrops  ();
-            mgmtExchange->inc_byteDrops (msg.contentSize ());
-        }
-        else
-        {
-            mgmtExchange->inc_msgRoutes  (count);
-            mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
-        }
-    }
+    doRoute(msg, b);
 }
 
 bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TxAccept.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TxAccept.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TxAccept.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TxAccept.cpp Sun Oct 11 23:22:08 2009
@@ -88,7 +88,13 @@
 
 void TxAccept::commit() throw() 
 {
-    ops.commit();
+    try {
+        ops.commit();
+    } catch (const std::exception& e) {
+        QPID_LOG(error, "Failed to commit: " << e.what());
+    } catch(...) {
+        QPID_LOG(error, "Failed to commit (unknown error)");
+    }
 }
 
 void TxAccept::rollback() throw() {}

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TxPublish.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TxPublish.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TxPublish.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TxPublish.cpp Sun Oct 11 23:22:08 2009
@@ -26,9 +26,14 @@
 
 TxPublish::TxPublish(intrusive_ptr<Message> _msg) : msg(_msg) {}
 
-bool TxPublish::prepare(TransactionContext* ctxt) throw(){
+bool TxPublish::prepare(TransactionContext* ctxt) throw()
+{
     try{
-        for_each(queues.begin(), queues.end(), Prepare(ctxt, msg));
+        while (!queues.empty()) {
+            prepare(ctxt, queues.front());
+            prepared.push_back(queues.front());
+            queues.pop_front();
+        }
         return true;
     }catch(const std::exception& e){
         QPID_LOG(error, "Failed to prepare: " << e.what());
@@ -38,11 +43,30 @@
     return false;
 }
 
-void TxPublish::commit() throw(){
-    for_each(queues.begin(), queues.end(), Commit(msg));
+void TxPublish::commit() throw()
+{
+    try {
+        for_each(prepared.begin(), prepared.end(), Commit(msg));
+        if (msg->checkContentReleasable()) {
+            msg->releaseContent();
+        }
+    } catch (const std::exception& e) {
+        QPID_LOG(error, "Failed to commit: " << e.what());
+    } catch(...) {
+        QPID_LOG(error, "Failed to commit (unknown error)");
+    }
 }
 
-void TxPublish::rollback() throw(){
+void TxPublish::rollback() throw()
+{
+    try {
+        for_each(prepared.begin(), prepared.end(), Rollback(msg));
+    } catch (const std::exception& e) {
+        QPID_LOG(error, "Failed to complete rollback: " << e.what());
+    } catch(...) {
+        QPID_LOG(error, "Failed to complete rollback (unknown error)");
+    }
+
 }
 
 void TxPublish::deliverTo(const boost::shared_ptr<Queue>& queue){
@@ -54,16 +78,14 @@
     }
 }
 
-TxPublish::Prepare::Prepare(TransactionContext* _ctxt, intrusive_ptr<Message>& _msg) 
-    : ctxt(_ctxt), msg(_msg){}
-
-void TxPublish::Prepare::operator()(const boost::shared_ptr<Queue>& queue){
+void TxPublish::prepare(TransactionContext* ctxt, const boost::shared_ptr<Queue> queue)
+{
     if (!queue->enqueue(ctxt, msg)){
         /**
-	* if not store then mark message for ack and deleivery once
-	* commit happens, as async IO will never set it when no store
-	* exists
-	*/
+         * if not store then mark message for ack and deleivery once
+         * commit happens, as async IO will never set it when no store
+         * exists
+         */
 	msg->enqueueComplete();
     }
 }
@@ -74,6 +96,12 @@
     queue->process(msg);
 }
 
+TxPublish::Rollback::Rollback(intrusive_ptr<Message>& _msg) : msg(_msg){}
+
+void TxPublish::Rollback::operator()(const boost::shared_ptr<Queue>& queue){
+    queue->enqueueAborted(msg);
+}
+
 uint64_t TxPublish::contentSize ()
 {
     return msg->contentSize ();

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TxPublish.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TxPublish.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TxPublish.h (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker/TxPublish.h Sun Oct 11 23:22:08 2009
@@ -47,23 +47,25 @@
          * dispatch or to be added to the in-memory queue.
          */
         class TxPublish : public TxOp, public Deliverable{
-            class Prepare{
-                TransactionContext* ctxt;
+
+            class Commit{
                 boost::intrusive_ptr<Message>& msg;
             public:
-                Prepare(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg);
+                Commit(boost::intrusive_ptr<Message>& msg);
                 void operator()(const boost::shared_ptr<Queue>& queue);            
             };
-
-            class Commit{
+            class Rollback{
                 boost::intrusive_ptr<Message>& msg;
             public:
-                Commit(boost::intrusive_ptr<Message>& msg);
+                Rollback(boost::intrusive_ptr<Message>& msg);
                 void operator()(const boost::shared_ptr<Queue>& queue);            
             };
 
             boost::intrusive_ptr<Message> msg;
             std::list<Queue::shared_ptr> queues;
+            std::list<Queue::shared_ptr> prepared;
+
+            void prepare(TransactionContext* ctxt, boost::shared_ptr<Queue>);
 
         public:
             QPID_BROKER_EXTERN TxPublish(boost::intrusive_ptr<Message> msg);

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Sun Oct 11 23:22:08 2009
@@ -257,6 +257,7 @@
         knownBrokersUrls.push_back(Url((*i)->get<std::string>()));
     if (sasl.get()) {
         securityLayer = sasl->getSecurityLayer(maxFrameSize);
+        operUserId = sasl->getUserId();
     }
     setState(OPEN);
     QPID_LOG(debug, "Known-brokers for connection: " << log::formatList(knownBrokersUrls));

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/ConnectionHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/ConnectionHandler.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/ConnectionHandler.h (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/ConnectionHandler.h Sun Oct 11 23:22:08 2009
@@ -71,6 +71,7 @@
     std::auto_ptr<Sasl> sasl;
     std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
     boost::intrusive_ptr<qpid::sys::TimerTask> rcvTimeoutTask;
+    std::string operUserId;
 
     void checkState(STATES s, const std::string& msg);
 
@@ -120,6 +121,7 @@
     std::vector<Url> knownBrokersUrls;
 
     static framing::connection::CloseCode convert(uint16_t replyCode);
+    const std::string& getUserId() const { return operUserId; }
 };
 
 }}

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Sun Oct 11 23:22:08 2009
@@ -151,6 +151,12 @@
  
     handler.waitForOpen();
 
+    // If the SASL layer has provided an "operational" userId for the connection,
+    // put it in the negotiated settings.
+    const std::string& userId(handler.getUserId());
+    if (!userId.empty())
+        handler.username = userId;
+
     //enable security layer if one has been negotiated:
     std::auto_ptr<SecurityLayer> securityLayer = handler.getSecurityLayer();
     if (securityLayer.get()) {

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/Connector.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/Connector.cpp Sun Oct 11 23:22:08 2009
@@ -51,10 +51,10 @@
 // Stuff for the registry of protocol connectors (maybe should be moved to its own file)
 namespace {
     typedef std::map<std::string, Connector::Factory*> ProtocolRegistry;
-    
+
     ProtocolRegistry& theProtocolRegistry() {
         static ProtocolRegistry protocolRegistry;
-        
+
         return protocolRegistry;
     } 
 }
@@ -93,7 +93,7 @@
     size_t lastEof; // Position after last EOF in frames
     uint64_t currentSize;
     Bounds* bounds;
-    
+
     framing::ProtocolVersion version;
     bool initiated;
     bool closed;
@@ -118,16 +118,17 @@
     void run();
     void handleClosed();
     bool closeInternal();
-    
+
+    void connected(const Socket&);
+    void connectFailed(const std::string& msg);
     bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
     void writebuff(qpid::sys::AsynchIO&);
     void writeDataBlock(const framing::AMQDataBlock& data);
     void eof(qpid::sys::AsynchIO&);
 
     boost::weak_ptr<ConnectionImpl> impl;
-    
+
     void connect(const std::string& host, int port);
-    void init();
     void close();
     void send(framing::AMQFrame& frame);
     void abort();
@@ -142,7 +143,6 @@
     size_t decode(const char* buffer, size_t size);
     size_t encode(const char* buffer, size_t size);
     bool canEncode();
-    
 
 public:
     TCPConnector(framing::ProtocolVersion pVersion,
@@ -163,6 +163,11 @@
     } init;
 }
 
+struct TCPConnector::Buff : public AsynchIO::BufferBase {
+    Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
+    ~Buff() { delete [] bytes;}
+};
+
 TCPConnector::TCPConnector(ProtocolVersion ver,
                      const ConnectionSettings& settings,
                      ConnectionImpl* cimpl)
@@ -189,15 +194,19 @@
 void TCPConnector::connect(const std::string& host, int port){
     Mutex::ScopedLock l(lock);
     assert(closed);
-    try {
-        socket.connect(host, port);
-    } catch (const std::exception& /*e*/) {
-        socket.close();
-        throw;
-    }
-
-    identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
+    assert(joined);
     poller = Poller::shared_ptr(new Poller);
+    AsynchConnector::create(socket,
+                            poller,
+                            host, port,
+                            boost::bind(&TCPConnector::connected, this, _1),
+                            boost::bind(&TCPConnector::connectFailed, this, _3));
+    closed = false;
+    joined = false;
+    receiver = Thread(this);
+}
+
+void TCPConnector::connected(const Socket&) {
     aio = AsynchIO::create(socket,
                        boost::bind(&TCPConnector::readbuff, this, _1, _2),
                        boost::bind(&TCPConnector::eof, this, _1),
@@ -205,16 +214,23 @@
                        0, // closed
                        0, // nobuffs
                        boost::bind(&TCPConnector::writebuff, this, _1));
-    closed = false;
-}
+    for (int i = 0; i < 32; i++) {
+        aio->queueReadBuffer(new Buff(maxFrameSize));
+    }
+    aio->start(poller);
 
-void TCPConnector::init(){
-    Mutex::ScopedLock l(lock);
-    assert(joined);
+    identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
     ProtocolInitiation init(version);
     writeDataBlock(init);
-    joined = false;
-    receiver = Thread(this);
+}
+
+void TCPConnector::connectFailed(const std::string& msg) {
+    QPID_LOG(warning, "Connecting failed: " << msg);
+    closed = true;
+    poller->shutdown();
+    closeInternal();
+    if (shutdownHandler)
+        shutdownHandler->shutdown();
 }
 
 bool TCPConnector::closeInternal() {
@@ -235,7 +251,7 @@
     receiver.join();
     return ret;
 }
-        
+
 void TCPConnector::close() {
     closeInternal();
 }
@@ -243,7 +259,13 @@
 void TCPConnector::abort() {
     // Can't abort a closed connection
     if (!closed) {
-        aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1));
+        if (aio) {
+            // Established connection
+            aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1));
+        } else {
+            // We're still connecting
+            connectFailed("Connection timedout");
+        }
     }
 }
 
@@ -288,18 +310,13 @@
         shutdownHandler->shutdown();
 }
 
-struct TCPConnector::Buff : public AsynchIO::BufferBase {
-    Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}    
-    ~Buff() { delete [] bytes;}
-};
-
 void TCPConnector::writebuff(AsynchIO& /*aio*/) 
 {
     Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
     if (codec->canEncode()) {
         std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer());
         if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize));
-        
+
         size_t encoded = codec->encode(buffer->bytes, buffer->byteCount);
 
         buffer->dataStart = 0;
@@ -395,11 +412,6 @@
     try {
         Dispatcher d(poller);
 
-        for (int i = 0; i < 32; i++) {
-            aio->queueReadBuffer(new Buff(maxFrameSize));
-        }
-
-        aio->start(poller);
         d.run();
     } catch (const std::exception& e) {
         QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what()));

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/RdmaConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/RdmaConnector.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/RdmaConnector.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/RdmaConnector.cpp Sun Oct 11 23:22:08 2009
@@ -167,20 +167,9 @@
     assert(joined);
     poller = Poller::shared_ptr(new Poller);
 
-    // This stuff needs to abstracted out of here to a platform specific file
-    ::addrinfo *res;
-    ::addrinfo hints;
-    hints.ai_flags = 0;
-    hints.ai_family = AF_INET;
-    hints.ai_socktype = SOCK_STREAM;
-    hints.ai_protocol = 0;
-    int n = ::getaddrinfo(host.c_str(), boost::lexical_cast<std::string>(port).c_str(), &hints, &res);
-    if (n<0) {
-        throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n)));
-    }
-
+    SocketAddress sa(host, boost::lexical_cast<std::string>(port));
     Rdma::Connector* c = new Rdma::Connector(
-        *res->ai_addr,
+        sa,
         Rdma::ConnectionParams(maxFrameSize, Rdma::DEFAULT_WR_ENTRIES),
         boost::bind(&RdmaConnector::connected, this, poller, _1, _2),
         boost::bind(&RdmaConnector::connectionError, this, poller, _1, _2),

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/Sasl.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/Sasl.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/Sasl.h (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/Sasl.h Sun Oct 11 23:22:08 2009
@@ -45,6 +45,7 @@
     virtual std::string start(const std::string& mechanisms) = 0;
     virtual std::string step(const std::string& challenge) = 0;
     virtual std::string getMechanism() = 0;
+    virtual std::string getUserId() = 0;
     virtual std::auto_ptr<qpid::sys::SecurityLayer> getSecurityLayer(uint16_t maxFrameSize) = 0;    
     virtual ~Sasl() {}
 };

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/SaslFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/SaslFactory.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/SaslFactory.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/SaslFactory.cpp Sun Oct 11 23:22:08 2009
@@ -82,6 +82,7 @@
     std::string start(const std::string& mechanisms);
     std::string step(const std::string& challenge);
     std::string getMechanism();
+    std::string getUserId();
     std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize);
   private:
     sasl_conn_t* conn;    
@@ -266,6 +267,18 @@
     return mechanism;
 }
 
+std::string CyrusSasl::getUserId()
+{
+    int propResult;
+    const void* operName;
+
+    propResult = sasl_getprop(conn, SASL_USERNAME, &operName);
+    if (propResult == SASL_OK)
+        return std::string((const char*) operName);
+
+    return std::string();
+}
+
 void CyrusSasl::interact(sasl_interact_t* client_interact)
 {
 

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/SessionImpl.cpp Sun Oct 11 23:22:08 2009
@@ -64,7 +64,8 @@
       proxy(ioHandler),
       nextIn(0),
       nextOut(0),
-      sendMsgCredit(0)
+      sendMsgCredit(0),
+      doClearDeliveryPropertiesExchange(true)
 {
     channel.next = connectionShared.get();
 }
@@ -396,11 +397,16 @@
 {
     AMQFrame header(content.getHeader());
 
-    // Client is not allowed to set the delivery-properties.exchange.
-    AMQHeaderBody* headerp = static_cast<AMQHeaderBody*>(header.getBody());
-    if (headerp && headerp->get<DeliveryProperties>())
-        headerp->get<DeliveryProperties>(true)->clearExchangeFlag();
-
+    // doClearDeliveryPropertiesExchange is set by cluster update client so
+    // it can send messages with delivery-properties.exchange set.
+    //
+    if (doClearDeliveryPropertiesExchange) {
+        // Normal client is not allowed to set the delivery-properties.exchange
+        // so clear it here.
+        AMQHeaderBody* headerp = static_cast<AMQHeaderBody*>(header.getBody());
+        if (headerp && headerp->get<DeliveryProperties>())
+            headerp->get<DeliveryProperties>(true)->clearExchangeFlag();
+    }
     header.setFirstSegment(false);
     uint64_t data_length = content.getData().length();
     if(data_length > 0){

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/SessionImpl.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/SessionImpl.h (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/SessionImpl.h Sun Oct 11 23:22:08 2009
@@ -130,6 +130,8 @@
      */
     boost::shared_ptr<ConnectionImpl> getConnection();
 
+    void setDoClearDeliveryPropertiesExchange(bool b=true) { doClearDeliveryPropertiesExchange = b; }
+
 private:
     enum State {
         INACTIVE,
@@ -243,6 +245,8 @@
     // Only keep track of message credit 
     sys::Semaphore* sendMsgCredit;
 
+    bool doClearDeliveryPropertiesExchange;
+
   friend class client::SessionHandler;
 };
 

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Sun Oct 11 23:22:08 2009
@@ -362,21 +362,12 @@
 
 void QueueSink::cancel(qpid::client::AsyncSession&, const std::string&) {}
 
-template <class T> void encode(qpid::messaging::Message& from)
-{
-    T codec;
-    from.encode(codec);
-    from.setContentType(T::contentType);
-}
-
 void translate(const Variant::Map& from, FieldTable& to);//implementation in Codecs.cpp
 
 void convert(qpid::messaging::Message& from, qpid::client::Message& to)
 {
     //TODO: need to avoid copying as much as possible
-    if (from.getContent().isList()) encode<ListCodec>(from);
-    if (from.getContent().isMap())  encode<MapCodec>(from);
-    to.setData(from.getBytes());
+    to.setData(from.getContent());
     to.getDeliveryProperties().setRoutingKey(from.getSubject());
     //TODO: set other delivery properties
     to.getMessageProperties().setContentType(from.getContentType());

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Sun Oct 11 23:22:08 2009
@@ -269,18 +269,9 @@
     //e.g. for rejecting.
     MessageImplAccess::get(message).setInternalId(command.getId());
         
-    command.getContent(message.getBytes());
+    command.getContent(message.getContent());
 
     populateHeaders(message, command.getHeaders());
-        
-    //decode content if necessary
-    if (message.getContentType() == ListCodec::contentType) {
-        ListCodec codec;
-        message.decode(codec);
-    } else if (message.getContentType() == MapCodec::contentType) {
-        MapCodec codec;
-        message.decode(codec);
-    }
 }
 
 

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp Sun Oct 11 23:22:08 2009
@@ -33,24 +33,11 @@
 using qpid::messaging::Address;
 using qpid::messaging::MessageImplAccess;
 
-template <class T> void encode(const qpid::messaging::Message& from, qpid::client::Message& to)
-{
-    T codec;
-    MessageImplAccess::get(from).getEncodedContent(codec, to.getData());
-    to.getMessageProperties().setContentType(T::contentType);
-}
-
 void OutgoingMessage::convert(const qpid::messaging::Message& from)
 {
     //TODO: need to avoid copying as much as possible
-    if (from.getContent().isList()) {
-        encode<ListCodec>(from, message);
-    } else if (from.getContent().isMap()) {
-        encode<MapCodec>(from, message);
-    } else {
-        message.setData(from.getBytes());
-        message.getMessageProperties().setContentType(from.getContentType());
-    }
+    message.setData(from.getContent());
+    message.getMessageProperties().setContentType(from.getContentType());
     const Address& address = from.getReplyTo();
     if (!address.value.empty()) {
         message.getMessageProperties().setReplyTo(AddressResolution::convert(address));

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp Sun Oct 11 23:22:08 2009
@@ -43,6 +43,7 @@
     std::string start(const std::string& mechanisms);
     std::string step(const std::string& challenge);
     std::string getMechanism();
+    std::string getUserId();
     std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize);
   private:
     ConnectionSettings settings;
@@ -131,6 +132,11 @@
     return mechanism;
 }
 
+std::string WindowsSasl::getUserId()
+{
+    return std::string(); // TODO - when GSSAPI is supported, return userId for connection.
+}
+
 std::auto_ptr<SecurityLayer> WindowsSasl::getSecurityLayer(uint16_t maxFrameSize)
 {
     return std::auto_ptr<SecurityLayer>(0);

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Cluster.cpp Sun Oct 11 23:22:08 2009
@@ -99,6 +99,7 @@
 #include "qpid/broker/Connection.h"
 #include "qpid/broker/QueueRegistry.h"
 #include "qpid/broker/SessionState.h"
+#include "qpid/broker/SignalHandler.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/AMQP_AllOperations.h"
 #include "qpid/framing/AllInvoker.h"
@@ -120,7 +121,6 @@
 #include "qpid/management/ManagementAgent.h"
 #include "qpid/memory.h"
 #include "qpid/sys/Thread.h"
-#include "qpid/sys/LatencyTracker.h"
 
 #include <boost/shared_ptr.hpp>
 #include <boost/bind.hpp>
@@ -144,12 +144,16 @@
 using qpid::management::Args;
 namespace _qmf = ::qmf::org::apache::qpid::cluster;
 
-/** NOTE: increment this number whenever any incompatible changes in
+/**
+ * NOTE: must increment this number whenever any incompatible changes in
  * cluster protocol/behavior are made. It allows early detection and
  * sensible reporting of an attempt to mix different versions in a
  * cluster.
+ *
+ * Currently use SVN revision to avoid clashes with versions from
+ * different branches.
  */
-const uint32_t Cluster::CLUSTER_VERSION = 2;
+const uint32_t Cluster::CLUSTER_VERSION = 820783;
 
 struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
     qpid::cluster::Cluster& cluster;
@@ -308,7 +312,7 @@
         // Finalize connections now now to avoid problems later in destructor.
         LEAVE_TRY(localConnections.clear());
         LEAVE_TRY(connections.clear());
-        LEAVE_TRY(broker.shutdown());
+        LEAVE_TRY(broker::SignalHandler::shutdown());
     }
 }
 
@@ -324,20 +328,14 @@
     MemberId from(nodeid, pid);
     framing::Buffer buf(static_cast<char*>(msg), msg_len);
     Event e(Event::decodeCopy(from, buf));
-    LATENCY_TRACK(if (e.getConnectionId().getMember() == self) mcast.cpgLatency.finish());
     deliverEvent(e);
 }
 
-LATENCY_TRACK(sys::LatencyTracker<const char*> eventQueueLatencyTracker("EventQueue");)
-    LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> frameQueueLatencyTracker("FrameQueue");)
-
-    void Cluster::deliverEvent(const Event& e) {
-    LATENCY_TRACK(eventQueueLatencyTracker.start(e.getData());)
-        deliverEventQueue.push(e);
+void Cluster::deliverEvent(const Event& e) {
+    deliverEventQueue.push(e);
 }
 
 void Cluster::deliverFrame(const EventFrame& e) {
-    LATENCY_TRACK(frameQueueLatencyTracker.start(e.frame.getBody()));
     deliverFrameQueue.push(e);
 }
 
@@ -350,7 +348,6 @@
 // Handler for deliverEventQueue.
 // This thread decodes frames from events.
 void Cluster::deliveredEvent(const Event& e) {
-    LATENCY_TRACK(eventQueueLatencyTracker.finish(e.getData()));
     if (e.isCluster()) {
         QPID_LOG(trace, *this << " DLVR: " << e);
         EventFrame ef(e, e.getFrame());
@@ -396,13 +393,9 @@
         error.error(connection, type, map.getFrameSeq(), map.getMembers(), msg);
 }
 
-LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> doOutputTracker("DoOutput");)
-
 // Handler for deliverFrameQueue.
 // This thread executes the main logic.
-    void Cluster::deliveredFrame(const EventFrame& efConst) {
-    LATENCY_TRACK(frameQueueLatencyTracker.finish(e.frame.getBody()));
-    LATENCY_TRACK(if (e.frame.getBody()->type() == CONTENT_BODY) doOutputTracker.start(e.frame.getBody()));
+void Cluster::deliveredFrame(const EventFrame& efConst) {
     Mutex::ScopedLock l(lock);
     if (state == LEFT) return;
     EventFrame e(efConst);
@@ -434,7 +427,6 @@
             throw Exception(QPID_MSG("Invalid cluster control"));
     }
     else if (state >= CATCHUP) {
-        LATENCY_TRACK(LatencyScope ls(processLatency));
         map.incrementFrameSeq();
         ConnectionPtr connection = getConnection(e, l);
         if (connection) {

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp Sun Oct 11 23:22:08 2009
@@ -45,7 +45,8 @@
 }
 
 void ErrorCheck::error(
-    Connection& c, ErrorType t, framing::SequenceNumber seq, const MemberSet& ms, const std::string& msg)
+    Connection& c, ErrorType t, framing::SequenceNumber seq, const MemberSet& ms,
+    const std::string& msg)
 {
     // Detected a local error, inform cluster and set error state.
     assert(t != ERROR_TYPE_NONE); // Must be an error.
@@ -54,10 +55,11 @@
     unresolved = ms;
     frameSeq = seq;
     connection = &c;
-    QPID_LOG(error, cluster
-             << (type == ERROR_TYPE_SESSION ? " channel" : " connection")
-             << " error " << frameSeq << " on " << c << ": " << msg
-             << " must be resolved with: " << unresolved);
+    message = msg;
+    QPID_LOG(debug, cluster<< (type == ERROR_TYPE_SESSION ? " channel" : " connection")
+             << " error " << frameSeq << " on " << c
+             << " must be resolved with: " << unresolved
+             << ": " << message);
     mcast.mcastControl(
         ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), cluster.getId());
     // If there are already frames queued up by a previous error, review
@@ -84,13 +86,15 @@
         if (errorCheck->getFrameSeq() == frameSeq) { // Addresses current error
             next = frames.erase(i);    // Drop matching error check controls
             if (errorCheck->getType() < type) { // my error is worse than his
-                QPID_LOG(critical, cluster << " error " << frameSeq
-                         << " did not occur on " << i->getMemberId());
-                throw Exception(QPID_MSG("Error " << frameSeq
-                                         << " did not occur on all members"));
+                QPID_LOG(critical, cluster
+                         << " local error " << frameSeq << " did not occur on member "
+                         << i->getMemberId()
+                         << ": " << message);
+                throw Exception(
+                    QPID_MSG("local error did not occur on all cluster members " << ": " << message));
             }
             else {              // his error is worse/same as mine.
-                QPID_LOG(info, cluster << " error " << frameSeq
+                QPID_LOG(debug, cluster << " error " << frameSeq
                          << " resolved with " << i->getMemberId());
                 unresolved.erase(i->getMemberId());
                 checkResolved();
@@ -128,10 +132,10 @@
 void ErrorCheck::checkResolved() {
     if (unresolved.empty()) {   // No more potentially conflicted members, we're clear.
         type = ERROR_TYPE_NONE;
-        QPID_LOG(info, cluster << " error " << frameSeq << " resolved.");
+        QPID_LOG(debug, cluster << " error " << frameSeq << " resolved.");
     }
     else 
-        QPID_LOG(info, cluster << " error " << frameSeq
+        QPID_LOG(debug, cluster << " error " << frameSeq
                  << " must be resolved with " << unresolved);
 }
 
@@ -146,7 +150,7 @@
     // Don't respond to non-errors or to my own errors.
     if (type == ERROR_TYPE_NONE || from == cluster.getId())
         return;
-    QPID_LOG(info, cluster << " error " << frameSeq << " did not occur locally.");
+    QPID_LOG(debug, cluster << " error " << frameSeq << " did not occur locally.");
     mcast.mcastControl(
         ClusterErrorCheckBody(ProtocolVersion(), ERROR_TYPE_NONE, frameSeq),
         cluster.getId()

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/ErrorCheck.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/ErrorCheck.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/ErrorCheck.h (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/ErrorCheck.h Sun Oct 11 23:22:08 2009
@@ -84,6 +84,7 @@
     SequenceNumber frameSeq;
     ErrorType type;
     Connection* connection;
+    std::string message;
 };
 
 }} // namespace qpid::cluster

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Event.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Event.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Event.cpp Sun Oct 11 23:22:08 2009
@@ -38,9 +38,6 @@
     sizeof(uint8_t) +  // type
     sizeof(uint64_t) + // connection pointer only, CPG provides member ID.
     sizeof(uint32_t)  // payload size
-#ifdef QPID_LATENCY_METRIC
-    + sizeof(int64_t)           // timestamp
-#endif
     ;
 
 EventHeader::EventHeader(EventType t, const ConnectionId& c,  size_t s)
@@ -61,9 +58,6 @@
         throw Exception("Invalid multicast event type");
     connectionId = ConnectionId(m, buf.getLongLong());
     size = buf.getLong();
-#ifdef QPID_LATENCY_METRIC
-    latency_metric_timestamp = buf.getLongLong();
-#endif
 }
 
 Event Event::decodeCopy(const MemberId& m, framing::Buffer& buf) {
@@ -97,9 +91,6 @@
     b.putOctet(type);
     b.putLongLong(connectionId.getNumber());
     b.putLong(size);
-#ifdef QPID_LATENCY_METRIC
-    b.putLongLong(latency_metric_timestamp);
-#endif
 }
 
 // Encode my header in my buffer.

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Multicaster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Multicaster.cpp Sun Oct 11 23:22:08 2009
@@ -31,9 +31,6 @@
 Multicaster::Multicaster(Cpg& cpg_, 
                          const boost::shared_ptr<sys::Poller>& poller,
                          boost::function<void()> onError_) :
-#if defined (QPID_LATENCY_TRACKER)
-    cpgLatency("CPG"),
-#endif
     onError(onError_), cpg(cpg_), 
     queue(boost::bind(&Multicaster::sendMcast, this, _1), poller),
     holding(true)
@@ -61,7 +58,6 @@
 void Multicaster::mcast(const Event& e) {
     {
         sys::Mutex::ScopedLock l(lock);
-        LATENCY_TRACK(cpgLatency.start());
         if (e.isConnection() && holding) {
             holdingQueue.push_back(e); 
             return;

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Multicaster.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Multicaster.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Multicaster.h (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/Multicaster.h Sun Oct 11 23:22:08 2009
@@ -26,7 +26,6 @@
 #include "qpid/cluster/Event.h"
 #include "qpid/sys/PollableQueue.h"
 #include "qpid/sys/Mutex.h"
-#include "qpid/sys/LatencyTracker.h"
 #include <boost/shared_ptr.hpp>
 #include <deque>
 
@@ -58,8 +57,6 @@
     /** End holding mode, held events are mcast */
     void release();
     
-    LATENCY_TRACK(sys::LatencyCounter cpgLatency;)
-    
   private:
     typedef sys::PollableQueue<Event> PollableEventQueue;
     typedef std::deque<Event> PlainEventQueue;

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Sun Oct 11 23:22:08 2009
@@ -24,7 +24,6 @@
 #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/log/Statement.h"
-#include "qpid/sys/LatencyTracker.h"
 #include <boost/current_function.hpp>
 
 
@@ -40,16 +39,9 @@
     : parent(p), closing(false), next(&h), sendMax(1), sent(0), sentDoOutput(false)
 {}
 
-#if defined QPID_LATENCY_TRACKER
-extern sys::LatencyTracker<const AMQBody*> doOutputTracker;
-#endif
-
 void OutputInterceptor::send(framing::AMQFrame& f) {
-    LATENCY_TRACK(doOutputTracker.finish(f.getBody()));
-    {
-        sys::Mutex::ScopedLock l(lock);
-        next->send(f);
-    }
+    sys::Mutex::ScopedLock l(lock);
+    next->send(f);
 }
 
 void OutputInterceptor::activateOutput() {

Modified: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Sun Oct 11 23:22:08 2009
@@ -209,9 +209,16 @@
             ClusterConnectionProxy(session).expiryId(*expiryId);
         }
 
+        // We can't send a broker::Message via the normal client API,
+        // and it would be expensive to copy it into a client::Message
+        // so we go a bit under the client API covers here.
+        //
         SessionBase_0_10Access sb(session);
+        // Disable client code that clears the delivery-properties.exchange
+        sb.get()->setDoClearDeliveryPropertiesExchange(false);
         framing::MessageTransferBody transfer(
-            framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
+            framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE,
+            message::ACQUIRE_MODE_PRE_ACQUIRED);
         
         sb.get()->send(transfer, message.payload->getFrames(), !message.payload->isContentReleased());
         if (message.payload->isContentReleased()){

Propchange: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
 /incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp:443187-726139
-/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp:805429-816233
+/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp:805429-824132

Propchange: qpid/branches/java-network-refactor/qpid/cpp/src/qpid/cluster/UpdateClient.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
 /incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h:443187-726139
-/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h:805429-816233
+/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h:805429-824132



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org