You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/20 20:43:26 UTC

svn commit: r1186990 [8/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf2...

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxWorkRecord.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxWorkRecord.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxWorkRecord.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxWorkRecord.h Thu Oct 20 18:42:46 2011
@@ -73,9 +73,19 @@ public:
     void timedout();
     void setTimeout(boost::intrusive_ptr<DtxTimeout> t) { timeout = t; }
     boost::intrusive_ptr<DtxTimeout> getTimeout() { return timeout; }
+    std::string getXid() const { return xid; }
+    bool isCompleted() const { return completed; }
+    bool isRolledback() const { return rolledback; }
+    bool isPrepared() const { return prepared; }
+    bool isExpired() const { return expired; }
+
+    // Used by cluster update;
+    size_t size() const { return work.size(); }
+    DtxBuffer::shared_ptr operator[](size_t i) const;
+    uint32_t getTimeout() const { return timeout? timeout->timeout : 0; }
+    size_t indexOf(const DtxBuffer::shared_ptr&);
 };
 
-}
-}
+}} // qpid::broker
 
 #endif

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Exchange.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Exchange.cpp Thu Oct 20 18:42:46 2011
@@ -19,16 +19,18 @@
  *
  */
 
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/DeliverableMessage.h"
 #include "qpid/broker/Exchange.h"
 #include "qpid/broker/ExchangeRegistry.h"
 #include "qpid/broker/FedOps.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/management/ManagementAgent.h"
 #include "qpid/broker/Queue.h"
-#include "qpid/log/Statement.h"
 #include "qpid/framing/MessageProperties.h"
 #include "qpid/framing/reply_exceptions.h"
-#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/sys/ExceptionHolder.h"
+#include <stdexcept>
 
 using namespace qpid::broker;
 using namespace qpid::framing;
@@ -56,7 +58,7 @@ Exchange::PreRoute::PreRoute(Deliverable
 
         if (parent->sequence){
             parent->sequenceNo++;
-            msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo);
+            msg.getMessage().insertCustomProperty(qpidMsgSequence,parent->sequenceNo);
         }
         if (parent->ive) {
             parent->lastMsg =  &( msg.getMessage());
@@ -70,6 +72,36 @@ Exchange::PreRoute::~PreRoute(){
     }
 }
 
+namespace {
+/** Store information about an exception to be thrown later.
+ * If multiple exceptions are stored, save the first of the "most severe"
+ * exceptions, SESSION is les sever than CONNECTION etc.
+ */
+class  ExInfo {
+  public:
+    enum Type { NONE, SESSION, CONNECTION, OTHER };
+
+    ExInfo(string exchange) : type(NONE), exchange(exchange) {}
+    void store(Type type_, const qpid::sys::ExceptionHolder& exception_, const boost::shared_ptr<Queue>& queue) {
+        QPID_LOG(warning, "Exchange " << exchange << " cannot deliver to  queue "
+                 <<  queue->getName() << ": " << exception_.what());
+        if (type < type_) {     // Replace less severe exception
+            type = type_;
+            exception = exception_;
+        }
+    }
+
+    void raise() {
+        exception.raise();
+    }
+
+  private:
+    Type type;
+    string exchange;
+    qpid::sys::ExceptionHolder exception;
+};
+}
+
 void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
 {
     int count = 0;
@@ -80,11 +112,25 @@ void Exchange::doRoute(Deliverable& msg,
             msg.getMessage().blockContentRelease();
         }
 
+
+        ExInfo error(getName()); // Save exception to throw at the end.
         for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) {
-            msg.deliverTo((*i)->queue);
-            if ((*i)->mgmtBinding != 0)
-                (*i)->mgmtBinding->inc_msgMatched();
+            try {
+                msg.deliverTo((*i)->queue);
+                if ((*i)->mgmtBinding != 0)
+                    (*i)->mgmtBinding->inc_msgMatched();
+            }
+            catch (const SessionException& e) {
+                error.store(ExInfo::SESSION, framing::createSessionException(e.code, e.what()),(*i)->queue);
+            }
+            catch (const ConnectionException& e) {
+                error.store(ExInfo::CONNECTION, framing::createConnectionException(e.code, e.what()), (*i)->queue);
+            }
+            catch (const std::exception& e) {
+                error.store(ExInfo::OTHER, qpid::sys::ExceptionHolder(new Exception(e.what())), (*i)->queue);
+            }
         }
+        error.raise();
     }
 
     if (mgmtExchange != 0)
@@ -115,7 +161,7 @@ void Exchange::routeIVE(){
 
 Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) :
     name(_name), durable(false), persistenceId(0), sequence(false),
-    sequenceNo(0), ive(false), mgmtExchange(0), broker(b)
+    sequenceNo(0), ive(false), mgmtExchange(0), broker(b), destroyed(false)
 {
     if (parent != 0 && broker != 0)
     {
@@ -133,7 +179,7 @@ Exchange::Exchange (const string& _name,
 Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
                    Manageable* parent, Broker* b)
     : name(_name), durable(_durable), alternateUsers(0), persistenceId(0),
-      args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), broker(b)
+      args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), broker(b), destroyed(false)
 {
     if (parent != 0 && broker != 0)
     {
@@ -155,7 +201,11 @@ Exchange::Exchange(const string& _name, 
     }
 
     ive = _args.get(qpidIVE);
-    if (ive) QPID_LOG(debug, "Configured exchange " <<  _name  << " with Initial Value");
+    if (ive) {
+        if (broker && broker->isInCluster())
+            throw framing::NotImplementedException("Cannot use Initial Value Exchanges in a cluster");
+        QPID_LOG(debug, "Configured exchange " <<  _name  << " with Initial Value");
+    }
 }
 
 Exchange::~Exchange ()
@@ -340,5 +390,14 @@ bool Exchange::MatchQueue::operator()(Ex
 }
 
 void Exchange::setProperties(const boost::intrusive_ptr<Message>& msg) {
-    msg->getProperties<DeliveryProperties>()->setExchange(getName());
+    msg->setExchange(getName());
+}
+
+bool Exchange::routeWithAlternate(Deliverable& msg)
+{
+    route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders());
+    if (!msg.delivered && alternate) {
+        alternate->route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders());
+    }
+    return msg.delivered;
 }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Exchange.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Exchange.h Thu Oct 20 18:42:46 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,7 +39,7 @@ namespace broker {
 class Broker;
 class ExchangeRegistry;
 
-class Exchange : public PersistableExchange, public management::Manageable {
+class QPID_BROKER_CLASS_EXTERN Exchange : public PersistableExchange, public management::Manageable {
 public:
     struct Binding : public management::Manageable {
         typedef boost::shared_ptr<Binding>       shared_ptr;
@@ -82,15 +82,15 @@ protected:
     private:
         Exchange* parent;
     };
-           
+
     typedef boost::shared_ptr<const std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > ConstBindingList;
     typedef boost::shared_ptr<      std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > BindingList;
     void doRoute(Deliverable& msg, ConstBindingList b);
     void routeIVE();
-           
+
 
     struct MatchQueue {
-        const boost::shared_ptr<Queue> queue;        
+        const boost::shared_ptr<Queue> queue;
         MatchQueue(boost::shared_ptr<Queue> q);
         bool operator()(Exchange::Binding::shared_ptr b);
     };
@@ -133,15 +133,15 @@ protected:
 
         /** Returns true if propagation is needed. */
         bool delOrigin(const std::string& queueName, const std::string& origin){
-            fedBindings[queueName].erase(origin);
-            return true;
-        }
-
-        /** Returns true if propagation is needed. */
-        bool delOrigin() {
-            if (localBindings > 0)
-                localBindings--;
-            return localBindings == 0;
+            if (origin.empty()) {   // no remote == local binding
+                if (localBindings > 0)
+                    localBindings--;
+                return localBindings == 0;
+            }
+            size_t match = fedBindings[queueName].erase(origin);
+            if (fedBindings[queueName].empty())
+                fedBindings.erase(queueName);
+            return match != 0;
         }
 
         uint32_t count() {
@@ -149,7 +149,11 @@ protected:
         }
 
         uint32_t countFedBindings(const std::string& queueName) {
-            return  fedBindings[queueName].size();
+            // don't use '[]' - it may increase size of fedBindings!
+            std::map<std::string, originSet>::iterator i;
+            if ((i = fedBindings.find(queueName)) != fedBindings.end())
+                return  i->second.size();
+            return 0;
         }
     };
 
@@ -162,7 +166,7 @@ public:
                                          Broker* broker = 0);
     QPID_BROKER_EXTERN Exchange(const std::string& _name, bool _durable, const qpid::framing::FieldTable& _args,
                                 management::Manageable* parent = 0, Broker* broker = 0);
-    QPID_BROKER_EXTERN virtual ~Exchange();
+    QPID_BROKER_INLINE_EXTERN virtual ~Exchange();
 
     const std::string& getName() const { return name; }
     bool isDurable() { return durable; }
@@ -191,7 +195,7 @@ public:
     virtual bool isBound(boost::shared_ptr<Queue> queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0;
     QPID_BROKER_EXTERN virtual void setProperties(const boost::intrusive_ptr<Message>&);
     virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
-    
+
     //PersistableExchange:
     QPID_BROKER_EXTERN void setPersistenceId(uint64_t id) const;
     uint64_t getPersistenceId() const { return persistenceId; }
@@ -222,14 +226,20 @@ public:
      */
     void recoveryComplete(ExchangeRegistry& exchanges);
 
+    bool routeWithAlternate(Deliverable& message);
+
+    void destroy() { destroyed = true; }
+    bool isDestroyed() const { return destroyed; }
+
 protected:
     qpid::sys::Mutex bridgeLock;
     std::vector<DynamicBridge*> bridgeVector;
     Broker* broker;
+    bool destroyed;
 
     QPID_BROKER_EXTERN virtual void handleHelloRequest();
     void propagateFedOp(const std::string& routingKey, const std::string& tags,
-                        const std::string& op,         const std::string& origin, 
+                        const std::string& op,         const std::string& origin,
                         qpid::framing::FieldTable* extra_args=0);
 };
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/ExchangeRegistry.cpp Thu Oct 20 18:42:46 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,7 +39,7 @@ pair<Exchange::shared_ptr, bool> Exchang
     return declare(name, type, false, FieldTable());
 }
 
-pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type, 
+pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type,
                                                            bool durable, const FieldTable& args){
     RWlock::ScopedWlock locker(lock);
     ExchangeMap::iterator i =  exchanges.find(name);
@@ -61,7 +61,7 @@ pair<Exchange::shared_ptr, bool> Exchang
         }else{
             FunctionMap::iterator i =  factory.find(type);
             if (i == factory.end()) {
-                throw UnknownExchangeTypeException();    
+                throw UnknownExchangeTypeException();
             } else {
                 exchange = i->second(name, durable, args, parent, broker);
             }
@@ -82,6 +82,7 @@ void ExchangeRegistry::destroy(const str
     RWlock::ScopedWlock locker(lock);
     ExchangeMap::iterator i =  exchanges.find(name);
     if (i != exchanges.end()) {
+        i->second->destroy();
         exchanges.erase(i);
     }
 }
@@ -104,7 +105,7 @@ void ExchangeRegistry::registerType(cons
 }
 
 
-namespace 
+namespace
 {
 const std::string empty;
 }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/ExpiryPolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/ExpiryPolicy.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/ExpiryPolicy.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/ExpiryPolicy.cpp Thu Oct 20 18:42:46 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,12 +27,12 @@ namespace broker {
 
 ExpiryPolicy::~ExpiryPolicy() {}
 
-void ExpiryPolicy::willExpire(Message&) {}
-
 bool ExpiryPolicy::hasExpired(Message& m) {
     return m.getExpiration() < sys::AbsTime::now();
 }
 
-void ExpiryPolicy::forget(Message&) {}
+sys::AbsTime ExpiryPolicy::getCurrentTime() {
+    return sys::AbsTime::now();
+}
 
 }} // namespace qpid::broker

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/ExpiryPolicy.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/ExpiryPolicy.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/ExpiryPolicy.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/ExpiryPolicy.h Thu Oct 20 18:42:46 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,6 +26,11 @@
 #include "qpid/broker/BrokerImportExport.h"
 
 namespace qpid {
+
+namespace sys {
+class AbsTime;
+}
+
 namespace broker {
 
 class Message;
@@ -33,13 +38,12 @@ class Message;
 /**
  * Default expiry policy.
  */
-class ExpiryPolicy : public RefCounted
+class QPID_BROKER_CLASS_EXTERN ExpiryPolicy : public RefCounted
 {
   public:
     QPID_BROKER_EXTERN virtual ~ExpiryPolicy();
-    QPID_BROKER_EXTERN virtual void willExpire(Message&);
     QPID_BROKER_EXTERN virtual bool hasExpired(Message&);
-    QPID_BROKER_EXTERN virtual void forget(Message&);
+    QPID_BROKER_EXTERN virtual qpid::sys::AbsTime getCurrentTime();
 };
 }} // namespace qpid::broker
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Fairshare.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Fairshare.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Fairshare.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Fairshare.cpp Thu Oct 20 18:42:46 2011
@@ -24,6 +24,7 @@
 #include "qpid/log/Statement.h"
 #include <boost/format.hpp>
 #include <boost/lexical_cast.hpp>
+#include <boost/assign/list_of.hpp>
 
 namespace qpid {
 namespace broker {
@@ -104,51 +105,80 @@ bool Fairshare::setState(Messages& m, ui
     return fairshare && fairshare->setState(priority, count);
 }
 
-int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key)
+int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::vector<std::string>& keys)
 {
-    qpid::framing::FieldTable::ValuePtr v = settings.get(key);
+    qpid::framing::FieldTable::ValuePtr v;
+    std::vector<std::string>::const_iterator i = keys.begin(); 
+    while (!v && i != keys.end()) {
+        v = settings.get(*i++);
+    }
+
     if (!v) {
         return 0;
     } else if (v->convertsTo<int>()) {
         return v->get<int>();
     } else if (v->convertsTo<std::string>()){
         std::string s = v->get<std::string>();
-        try { 
-            return boost::lexical_cast<int>(s); 
+        try {
+            return boost::lexical_cast<int>(s);
         } catch(const boost::bad_lexical_cast&) {
-            QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s);
+            QPID_LOG(warning, "Ignoring invalid integer value for " << *i << ": " << s);
             return 0;
         }
     } else {
-        QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << *v);
+        QPID_LOG(warning, "Ignoring invalid integer value for " << *i << ": " << *v);
         return 0;
     }
 }
 
-int getSetting(const qpid::framing::FieldTable& settings, const std::string& key, int minvalue, int maxvalue)
+int getIntegerSettingForKey(const qpid::framing::FieldTable& settings, const std::string& key)
+{
+    return getIntegerSetting(settings, boost::assign::list_of<std::string>(key));
+}
+
+int getSetting(const qpid::framing::FieldTable& settings, const std::vector<std::string>& keys, int minvalue, int maxvalue)
 {
-    return std::max(minvalue,std::min(getIntegerSetting(settings, key), maxvalue));
+    return std::max(minvalue,std::min(getIntegerSetting(settings, keys), maxvalue));
+}
+
+std::auto_ptr<Fairshare> getFairshareForKey(const qpid::framing::FieldTable& settings, uint levels, const std::string& key)
+{
+    uint defaultLimit = getIntegerSettingForKey(settings, key);
+    std::auto_ptr<Fairshare> fairshare(new Fairshare(levels, defaultLimit));
+    for (uint i = 0; i < levels; i++) {
+        std::string levelKey = (boost::format("%1%-%2%") % key % i).str();
+        if(settings.isSet(levelKey)) {
+            fairshare->setLimit(i, getIntegerSettingForKey(settings, levelKey));
+        }
+    }
+    if (!fairshare->isNull()) {
+        return fairshare;
+    } else {
+        return std::auto_ptr<Fairshare>();
+    }
+}
+
+std::auto_ptr<Fairshare> getFairshare(const qpid::framing::FieldTable& settings,
+                                      uint levels,
+                                      const std::vector<std::string>& keys)
+{
+    std::auto_ptr<Fairshare> fairshare;
+    for (std::vector<std::string>::const_iterator i = keys.begin(); i != keys.end() && !fairshare.get(); ++i) {
+        fairshare = getFairshareForKey(settings, levels, *i);
+    }
+    return fairshare;
 }
 
 std::auto_ptr<Messages> Fairshare::create(const qpid::framing::FieldTable& settings)
 {
+    using boost::assign::list_of;
     std::auto_ptr<Messages> result;
-    size_t levels = getSetting(settings, "x-qpid-priorities", 1, 100);
+    size_t levels = getSetting(settings, list_of<std::string>("qpid.priorities")("x-qpid-priorities"), 0, 100);
     if (levels) {
-        uint defaultLimit = getIntegerSetting(settings, "x-qpid-fairshare");
-        std::auto_ptr<Fairshare> fairshare(new Fairshare(levels, defaultLimit));
-        for (uint i = 0; i < levels; i++) {
-            std::string key = (boost::format("x-qpid-fairshare-%1%") % i).str();
-            if(settings.isSet(key)) {
-                fairshare->setLimit(i, getIntegerSetting(settings, key));
-            }
-        }
-        
-        if (fairshare->isNull()) {
-            result = std::auto_ptr<Messages>(new PriorityQueue(levels));
-        } else {
-            result = fairshare;
-        }
+        std::auto_ptr<Fairshare> fairshare =
+            getFairshare(settings, levels, list_of<std::string>("qpid.fairshare")("x-qpid-fairshare"));
+        if (fairshare.get()) result = fairshare;
+        else result = std::auto_ptr<Messages>(new PriorityQueue(levels));
     }
     return result;
 }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Fairshare.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Fairshare.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Fairshare.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Fairshare.h Thu Oct 20 18:42:46 2011
@@ -41,18 +41,18 @@ class Fairshare : public PriorityQueue
     bool getState(uint& priority, uint& count) const;
     bool setState(uint priority, uint count);
     void setLimit(size_t level, uint limit);
+    bool isNull();
     static std::auto_ptr<Messages> create(const qpid::framing::FieldTable& settings);
     static bool getState(const Messages&, uint& priority, uint& count);
     static bool setState(Messages&, uint priority, uint count);
   private:
     std::vector<uint> limits;
-    
+
     uint priority;
     uint count;
-    
+
     uint currentLevel();
     uint nextLevel();
-    bool isNull();
     bool limitReached();
     bool findFrontLevel(uint& p, PriorityLevels&);
 };

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/FanOutExchange.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/FanOutExchange.cpp Thu Oct 20 18:42:46 2011
@@ -18,6 +18,7 @@
  * under the License.
  *
  */
+#include "qpid/log/Statement.h"
 #include "qpid/broker/FanOutExchange.h"
 #include "qpid/broker/FedOps.h"
 #include <algorithm>
@@ -65,7 +66,7 @@ bool FanOutExchange::bind(Queue::shared_
     } else if (fedOp == fedOpUnbind) {
         propagate = fedBinding.delOrigin(queue->getName(), fedOrigin);
         if (fedBinding.countFedBindings(queue->getName()) == 0)
-            unbind(queue, "", 0);
+            unbind(queue, "", args);
     } else if (fedOp == fedOpReorigin) {
         if (fedBinding.hasLocal()) {
             propagateFedOp(string(), string(), fedOpBind, string());
@@ -78,12 +79,16 @@ bool FanOutExchange::bind(Queue::shared_
     return true;
 }
 
-bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* /*args*/)
+bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* args)
 {
+    string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
     bool propagate = false;
 
+    QPID_LOG(debug, "Unbinding queue " << queue->getName()
+             << " from exchange " << getName() << " origin=" << fedOrigin << ")" );
+
     if (bindings.remove_if(MatchQueue(queue))) {
-        propagate = fedBinding.delOrigin();
+        propagate = fedBinding.delOrigin(queue->getName(), fedOrigin);
         if (mgmtExchange != 0) {
             mgmtExchange->dec_bindingCount();
         }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/HeadersExchange.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/HeadersExchange.cpp Thu Oct 20 18:42:46 2011
@@ -112,9 +112,14 @@ bool HeadersExchange::bind(Queue::shared
 
         {
             Mutex::ScopedLock l(lock);
-            Binding::shared_ptr binding (new Binding (bindingKey, queue, this, *args));
+            //NOTE: do not include the fed op/tags/origin in the
+            //arguments as when x-match is 'all' these would prevent
+            //matching (they are internally added properties
+            //controlling binding propagation but not relevant to
+            //actual routing)
+            Binding::shared_ptr binding (new Binding (bindingKey, queue, this, extra_args));
             BoundKey bk(binding);
-            if (bindings.add_unless(bk, MatchArgs(queue, args))) {
+            if (bindings.add_unless(bk, MatchArgs(queue, &extra_args))) {
                 binding->startManagement();
                 propagate = bk.fedBinding.addOrigin(queue->getName(), fedOrigin);
                 if (mgmtExchange != 0) {
@@ -158,12 +163,13 @@ bool HeadersExchange::bind(Queue::shared
     return true;
 }
 
-bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable*){
+bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable *args){
     bool propagate = false;
+    string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
     {
         Mutex::ScopedLock l(lock);
 
-        FedUnbindModifier modifier;
+        FedUnbindModifier modifier(queue->getName(), fedOrigin);
         MatchKey match_key(queue, bindingKey);
         bindings.modify_if(match_key, modifier);
         propagate = modifier.shouldPropagate;
@@ -330,11 +336,7 @@ HeadersExchange::FedUnbindModifier::FedU
 
 bool HeadersExchange::FedUnbindModifier::operator()(BoundKey & bk)
 {
-    if ("" == fedOrigin) {
-        shouldPropagate = bk.fedBinding.delOrigin();
-    } else {
-        shouldPropagate = bk.fedBinding.delOrigin(queueName, fedOrigin);
-    }
+    shouldPropagate = bk.fedBinding.delOrigin(queueName, fedOrigin);
     if (bk.fedBinding.countFedBindings(queueName) == 0)
     {
         shouldUnbind = true;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/LegacyLVQ.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/LegacyLVQ.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/LegacyLVQ.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/LegacyLVQ.cpp Thu Oct 20 18:42:46 2011
@@ -93,11 +93,7 @@ void LegacyLVQ::removeIf(Predicate p)
     //purging of an LVQ is not enabled if the broker is clustered
     //(expired messages will be removed on delivery and consolidated
     //by key as part of normal LVQ operation).
-
-    //TODO: Is there a neater way to check whether broker is
-    //clustered? Here we assume that if the clustered timer is the
-    //same as the regular timer, we are not clustered:
-    if (!broker || &(broker->getClusterTimer()) == &(broker->getTimer()))
+    if (!broker || !broker->isInCluster())
         MessageMap::removeIf(p);
 }
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Link.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Link.cpp Thu Oct 20 18:42:46 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,7 +30,6 @@
 #include "qpid/framing/enum.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/broker/AclModule.h"
-#include "qpid/sys/ClusterSafe.h"
 
 using namespace qpid::broker;
 using qpid::framing::Buffer;
@@ -57,8 +56,8 @@ Link::Link(LinkRegistry*  _links,
            string&        _password,
            Broker*        _broker,
            Manageable*    parent)
-    : links(_links), store(_store), host(_host), port(_port), 
-      transport(_transport), 
+    : links(_links), store(_store), host(_host), port(_port),
+      transport(_transport),
       durable(_durable),
       authMechanism(_authMechanism), username(_username), password(_password),
       persistenceId(0), mgmtObject(0), broker(_broker), state(0),
@@ -97,7 +96,8 @@ void Link::setStateLH (int newState)
         return;
 
     state = newState;
-    if (mgmtObject == 0)
+
+    if (hideManagement())
         return;
 
     switch (state)
@@ -117,12 +117,12 @@ void Link::startConnectionLH ()
         // Set the state before calling connect.  It is possible that connect
         // will fail synchronously and call Link::closed before returning.
         setStateLH(STATE_CONNECTING);
-        broker->connect (host, port, transport,
+        broker->connect (host, boost::lexical_cast<std::string>(port), transport,
                          boost::bind (&Link::closed, this, _1, _2));
         QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port);
     } catch(std::exception& e) {
         setStateLH(STATE_WAITING);
-        if (mgmtObject != 0)
+        if (!hideManagement())
             mgmtObject->set_lastError (e.what());
     }
 }
@@ -133,8 +133,7 @@ void Link::established ()
     addr << host << ":" << port;
     QPID_LOG (info, "Inter-broker link established to " << addr.str());
 
-    // Don't raise the management event in a cluster, other members wont't get this call.
-    if (!sys::isCluster()) 
+    if (!hideManagement() && agent)
         agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
 
     {
@@ -154,12 +153,11 @@ void Link::closed (int, std::string text
 
     connection = 0;
 
-    // Don't raise the management event in a cluster, other members wont't get this call.
     if (state == STATE_OPERATIONAL) {
         stringstream addr;
         addr << host << ":" << port;
         QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str());
-        if (!sys::isCluster())
+        if (!hideManagement() && agent)
             agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
     }
 
@@ -172,7 +170,7 @@ void Link::closed (int, std::string text
     if (state != STATE_FAILED)
     {
         setStateLH(STATE_WAITING);
-        if (mgmtObject != 0)
+        if (!hideManagement())
             mgmtObject->set_lastError (text);
     }
 
@@ -221,7 +219,7 @@ void Link::cancel(Bridge::shared_ptr bri
 {
     {
         Mutex::ScopedLock mutex(lock);
-        
+
         for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
             if ((*i).get() == bridge.get()) {
                 created.erase(i);
@@ -250,6 +248,19 @@ void Link::ioThreadProcessing()
         return;
     QPID_LOG(debug, "Link::ioThreadProcessing()");
 
+    // check for bridge session errors and recover
+    if (!active.empty()) {
+        Bridges::iterator removed = std::remove_if(
+            active.begin(), active.end(), !boost::bind(&Bridge::isSessionReady, _1));
+        for (Bridges::iterator i = removed; i != active.end(); ++i) {
+            Bridge::shared_ptr  bridge = *i;
+            bridge->closed();
+            bridge->cancel(*connection);
+            created.push_back(bridge);
+        }
+        active.erase(removed, active.end());
+    }
+
     //process any pending creates and/or cancellations
     if (!created.empty()) {
         for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
@@ -277,9 +288,9 @@ void Link::maintenanceVisit ()
 {
     Mutex::ScopedLock mutex(lock);
 
-    if (connection && updateUrls) { 
+    if (connection && updateUrls) {
         urls.reset(connection->getKnownHosts());
-        QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls);        
+        QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls);
         updateUrls = false;
     }
 
@@ -298,7 +309,7 @@ void Link::maintenanceVisit ()
             }
         }
     }
-    else if (state == STATE_OPERATIONAL && (!created.empty() || !cancellations.empty()) && connection != 0)
+    else if (state == STATE_OPERATIONAL && (!active.empty() || !created.empty() || !cancellations.empty()) && connection != 0)
         connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
 }
 
@@ -309,7 +320,7 @@ void Link::reconnect(const qpid::Address
     port = a.port;
     transport = a.protocol;
     startConnectionLH();
-    if (mgmtObject != 0) {
+    if (!hideManagement()) {
         stringstream errorString;
         errorString << "Failed over to " << a;
         mgmtObject->set_lastError(errorString.str());
@@ -319,7 +330,7 @@ void Link::reconnect(const qpid::Address
 bool Link::tryFailover()
 {
     Address next;
-    if (urls.next(next) && 
+    if (urls.next(next) &&
         (next.host != host || next.port != port || next.protocol != transport)) {
         links->changeAddress(Address(transport, host, port), next);
         QPID_LOG(debug, "Link failing over to " << host << ":" << port);
@@ -329,6 +340,12 @@ bool Link::tryFailover()
     }
 }
 
+// Management updates for a linke are inconsistent in a cluster, so they are
+// suppressed.
+bool Link::hideManagement() const {
+    return !mgmtObject || ( broker && broker->isInCluster());
+}
+
 uint Link::nextChannel()
 {
     Mutex::ScopedLock mutex(lock);
@@ -341,7 +358,7 @@ void Link::notifyConnectionForced(const 
     Mutex::ScopedLock mutex(lock);
 
     setStateLH(STATE_FAILED);
-    if (mgmtObject != 0)
+    if (!hideManagement())
         mgmtObject->set_lastError(text);
 }
 
@@ -363,7 +380,7 @@ Link::shared_ptr Link::decode(LinkRegist
     string   authMechanism;
     string   username;
     string   password;
-    
+
     buffer.getShortString(host);
     port = buffer.getShort();
     buffer.getShortString(transport);
@@ -375,7 +392,7 @@ Link::shared_ptr Link::decode(LinkRegist
     return links.declare(host, port, transport, durable, authMechanism, username, password).first;
 }
 
-void Link::encode(Buffer& buffer) const 
+void Link::encode(Buffer& buffer) const
 {
     buffer.putShortString(string("link"));
     buffer.putShortString(host);
@@ -387,8 +404,8 @@ void Link::encode(Buffer& buffer) const 
     buffer.putShortString(password);
 }
 
-uint32_t Link::encodedSize() const 
-{ 
+uint32_t Link::encodedSize() const
+{
     return host.size() + 1 // short-string (host)
         + 5                // short-string ("link")
         + 2                // port

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Link.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Link.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Link.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Link.h Thu Oct 20 18:42:46 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -85,6 +85,7 @@ namespace qpid {
             void destroy();                  // Called when mgmt deletes this link
             void ioThreadProcessing();       // Called on connection's IO thread by request
             bool tryFailover();              // Called during maintenance visit
+            bool hideManagement() const;
 
         public:
             typedef boost::shared_ptr<Link> shared_ptr;
@@ -122,12 +123,12 @@ namespace qpid {
 
             void notifyConnectionForced(const std::string text);
             void setPassive(bool p);
-            
+
             // PersistableConfig:
             void     setPersistenceId(uint64_t id) const;
             uint64_t getPersistenceId() const { return persistenceId; }
             uint32_t encodedSize() const;
-            void     encode(framing::Buffer& buffer) const; 
+            void     encode(framing::Buffer& buffer) const;
             const std::string& getName() const;
 
             static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
@@ -135,6 +136,7 @@ namespace qpid {
             // Manageable entry points
             management::ManagementObject*    GetManagementObject(void) const;
             management::Manageable::status_t ManagementMethod(uint32_t, management::Args&, std::string&);
+
         };
     }
 }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/LinkRegistry.cpp Thu Oct 20 18:42:46 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -381,7 +381,7 @@ std::string LinkRegistry::createKey(cons
     return keystream.str();
 }
 
-void LinkRegistry::setPassive(bool p) 
+void LinkRegistry::setPassive(bool p)
 {
     Mutex::ScopedLock locker(lock);
     passiveChanged = p != passive;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Message.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Message.cpp Thu Oct 20 18:42:46 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,6 +30,7 @@
 #include "qpid/framing/SendContent.h"
 #include "qpid/framing/SequenceNumber.h"
 #include "qpid/framing/TypeFilter.h"
+#include "qpid/framing/reply_exceptions.h"
 #include "qpid/log/Statement.h"
 
 #include <time.h>
@@ -49,27 +50,16 @@ TransferAdapter Message::TRANSFER;
 
 Message::Message(const framing::SequenceNumber& id) :
     frames(id), persistenceId(0), redelivered(false), loaded(false),
-    staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), 
-    expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0),
-    inCallback(false), requiredCredit(0) {}
+    staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
+    expiration(FAR_FUTURE), dequeueCallback(0),
+    inCallback(false), requiredCredit(0), isManagementMessage(false), copyHeaderOnWrite(false)
+{}
 
-Message::Message(const Message& original) :
-    PersistableMessage(), frames(original.frames), persistenceId(0), redelivered(false), loaded(false),
-    staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), 
-    expiration(original.expiration), enqueueCallback(0), dequeueCallback(0),
-    inCallback(false), requiredCredit(0) 
-{
-    setExpiryPolicy(original.expiryPolicy);
-}
-
-Message::~Message()
-{
-    if (expiryPolicy)
-        expiryPolicy->forget(*this);
-}
+Message::~Message() {}
 
 void Message::forcePersistent()
 {
+    sys::Mutex::ScopedLock l(lock);
     // only set forced bit if we actually need to force.
     if (! getAdapter().isPersistent(frames) ){
         forcePersistentPolicy = true;
@@ -86,7 +76,7 @@ std::string Message::getRoutingKey() con
     return getAdapter().getRoutingKey(frames);
 }
 
-std::string Message::getExchangeName() const 
+std::string Message::getExchangeName() const
 {
     return getAdapter().getExchange(frames);
 }
@@ -95,7 +85,7 @@ const boost::shared_ptr<Exchange> Messag
 {
     if (!exchange) {
         exchange = registry.get(getExchangeName());
-    } 
+    }
     return exchange;
 }
 
@@ -106,16 +96,19 @@ bool Message::isImmediate() const
 
 const FieldTable* Message::getApplicationHeaders() const
 {
+    sys::Mutex::ScopedLock l(lock);
     return getAdapter().getApplicationHeaders(frames);
 }
 
 std::string Message::getAppId() const
 {
+    sys::Mutex::ScopedLock l(lock);
     return getAdapter().getAppId(frames);
 }
 
 bool Message::isPersistent() const
 {
+    sys::Mutex::ScopedLock l(lock);
     return (getAdapter().isPersistent(frames) || forcePersistentPolicy);
 }
 
@@ -195,7 +188,7 @@ void Message::decodeContent(framing::Buf
     } else {
         //adjust header flags
         MarkLastSegment f;
-        frames.map_if(f, TypeFilter<HEADER_BODY>());    
+        frames.map_if(f, TypeFilter<HEADER_BODY>());
     }
     //mark content loaded
     loaded = true;
@@ -247,7 +240,7 @@ void Message::destroy()
 bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const
 {
     intrusive_ptr<const PersistableMessage> pmsg(this);
-    
+
     bool done = false;
     string& data = frame.castBody<AMQContentBody>()->getData();
     store->loadContent(queue, pmsg, data, offset, maxContentSize);
@@ -272,7 +265,7 @@ void Message::sendContent(const Queue& q
         uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
         bool morecontent = true;
         for (uint64_t offset = 0; morecontent; offset += maxContentSize)
-        {            
+        {
             AMQFrame frame((AMQContentBody()));
             morecontent = getContentFrame(queue, frame, maxContentSize, offset);
             out.handle(frame);
@@ -290,7 +283,10 @@ void Message::sendHeader(framing::FrameH
 {
     sys::Mutex::ScopedLock l(lock);
     Relay f(out);
-    frames.map_if(f, TypeFilter<HEADER_BODY>());    
+    frames.map_if(f, TypeFilter<HEADER_BODY>());
+    //as frame (and pointer to body) has now been passed to handler,
+    //subsequent modifications should use a copy
+    copyHeaderOnWrite = true;
 }
 
 // TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over
@@ -320,13 +316,14 @@ bool Message::isContentLoaded() const
 }
 
 
-namespace 
+namespace
 {
 const std::string X_QPID_TRACE("x-qpid.trace");
 }
 
 bool Message::isExcluded(const std::vector<std::string>& excludes) const
 {
+    sys::Mutex::ScopedLock l(lock);
     const FieldTable* headers = getApplicationHeaders();
     if (headers) {
         std::string traceStr = headers->getAsString(X_QPID_TRACE);
@@ -345,11 +342,30 @@ bool Message::isExcluded(const std::vect
     return false;
 }
 
+class CloneHeaderBody
+{
+public:
+    void operator()(AMQFrame& f)
+    {
+        f.cloneBody();
+    }
+};
+
+AMQHeaderBody* Message::getHeaderBody()
+{
+    if (copyHeaderOnWrite) {
+        CloneHeaderBody f;
+        frames.map_if(f, TypeFilter<HEADER_BODY>());
+        copyHeaderOnWrite = false;
+    }
+    return frames.getHeaders();
+}
+
 void Message::addTraceId(const std::string& id)
 {
     sys::Mutex::ScopedLock l(lock);
     if (isA<MessageTransferBody>()) {
-        FieldTable& headers = getProperties<MessageProperties>()->getApplicationHeaders();
+        FieldTable& headers = getModifiableProperties<MessageProperties>()->getApplicationHeaders();
         std::string trace = headers.getAsString(X_QPID_TRACE);
         if (trace.empty()) {
             headers.setString(X_QPID_TRACE, id);
@@ -357,13 +373,22 @@ void Message::addTraceId(const std::stri
             trace += ",";
             trace += id;
             headers.setString(X_QPID_TRACE, trace);
-        }        
+        }
     }
 }
 
-void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) 
+void Message::setTimestamp()
+{
+    sys::Mutex::ScopedLock l(lock);
+    DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
+    time_t now = ::time(0);
+    props->setTimestamp(now);   // AMQP-0.10: posix time_t - secs since Epoch
+}
+
+void Message::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e)
 {
-    DeliveryProperties* props = getProperties<DeliveryProperties>();    
+    sys::Mutex::ScopedLock l(lock);
+    DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
     if (props->getTtl()) {
         // AMQP requires setting the expiration property to be posix
         // time_t in seconds. TTL is in milliseconds
@@ -372,26 +397,70 @@ void Message::setTimestamp(const boost::
             time_t now = ::time(0);
             props->setExpiration(now + (props->getTtl()/1000));
         }
-        // Use higher resolution time for the internal expiry calculation.
-        expiration = AbsTime(AbsTime::now(), Duration(props->getTtl() * TIME_MSEC));
-        setExpiryPolicy(e);
+        if (e) {
+            // Use higher resolution time for the internal expiry calculation.
+            // Prevent overflow as a signed int64_t
+            Duration ttl(std::min(props->getTtl() * TIME_MSEC,
+                                  (uint64_t) std::numeric_limits<int64_t>::max()));
+            expiration = AbsTime(e->getCurrentTime(), ttl);
+            setExpiryPolicy(e);
+        }
     }
 }
 
 void Message::adjustTtl()
 {
-    DeliveryProperties* props = getProperties<DeliveryProperties>();
+    sys::Mutex::ScopedLock l(lock);
+    DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
     if (props->getTtl()) {
-        sys::Mutex::ScopedLock l(lock);
-        sys::Duration d(sys::AbsTime::now(), getExpiration());
-        props->setTtl(int64_t(d) > 0 ? int64_t(d)/1000000 : 1); // convert from ns to ms; set to 1 if expired
+        if (expiration < FAR_FUTURE) {
+            sys::AbsTime current(
+                expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now());
+            sys::Duration ttl(current, getExpiration());
+            // convert from ns to ms; set to 1 if expired
+            props->setTtl(int64_t(ttl) >= 1000000 ? int64_t(ttl)/1000000 : 1);
+        }
     }
 }
 
+void Message::setRedelivered()
+{
+    sys::Mutex::ScopedLock l(lock);
+    getModifiableProperties<framing::DeliveryProperties>()->setRedelivered(true);
+}
+
+void Message::insertCustomProperty(const std::string& key, int64_t value)
+{
+    sys::Mutex::ScopedLock l(lock);
+    getModifiableProperties<MessageProperties>()->getApplicationHeaders().setInt64(key,value);
+}
+
+void Message::insertCustomProperty(const std::string& key, const std::string& value)
+{
+    sys::Mutex::ScopedLock l(lock);
+    getModifiableProperties<MessageProperties>()->getApplicationHeaders().setString(key,value);
+}
+
+void Message::removeCustomProperty(const std::string& key)
+{
+    sys::Mutex::ScopedLock l(lock);
+    getModifiableProperties<MessageProperties>()->getApplicationHeaders().erase(key);
+}
+
+void Message::setExchange(const std::string& exchange)
+{
+    sys::Mutex::ScopedLock l(lock);
+    getModifiableProperties<DeliveryProperties>()->setExchange(exchange);
+}
+
+void Message::clearApplicationHeadersFlag()
+{
+    sys::Mutex::ScopedLock l(lock);
+    getModifiableProperties<MessageProperties>()->clearApplicationHeadersFlag();
+}
+
 void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
     expiryPolicy = e;
-    if (expiryPolicy) 
-        expiryPolicy->willExpire(*this);
 }
 
 bool Message::hasExpired()
@@ -415,30 +484,12 @@ struct ScopedSet {
 };
 }
 
-void Message::allEnqueuesComplete() {
-    ScopedSet ss(callbackLock, inCallback);
-    MessageCallback* cb = enqueueCallback;
-    if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
-}
-
 void Message::allDequeuesComplete() {
     ScopedSet ss(callbackLock, inCallback);
     MessageCallback* cb = dequeueCallback;
     if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
 }
 
-void Message::setEnqueueCompleteCallback(MessageCallback& cb) {
-    sys::Mutex::ScopedLock l(callbackLock);
-    while (inCallback) callbackLock.wait();
-    enqueueCallback = &cb;
-}
-
-void Message::resetEnqueueCompleteCallback() {
-    sys::Mutex::ScopedLock l(callbackLock);
-    while (inCallback) callbackLock.wait();
-    enqueueCallback = 0;
-}
-
 void Message::setDequeueCompleteCallback(MessageCallback& cb) {
     sys::Mutex::ScopedLock l(callbackLock);
     while (inCallback) callbackLock.wait();
@@ -452,12 +503,11 @@ void Message::resetDequeueCompleteCallba
 }
 
 uint8_t Message::getPriority() const {
+    sys::Mutex::ScopedLock l(lock);
     return getAdapter().getPriority(frames);
 }
 
-framing::FieldTable& Message::getOrInsertHeaders()
-{
-    return getProperties<MessageProperties>()->getApplicationHeaders();
-}
+bool Message::getIsManagementMessage() const { return isManagementMessage; }
+void Message::setIsManagementMessage(bool b) { isManagementMessage = b; }
 
 }} // namespace qpid::broker

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Message.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Message.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Message.h Thu Oct 20 18:42:46 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -29,17 +29,21 @@
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Time.h"
 #include <boost/function.hpp>
+#include <boost/intrusive_ptr.hpp>
 #include <boost/shared_ptr.hpp>
+#include <memory>
 #include <string>
 #include <vector>
 
 namespace qpid {
-       
+
 namespace framing {
+class AMQBody;
+class AMQHeaderBody;
 class FieldTable;
 class SequenceNumber;
 }
-       
+
 namespace broker {
 class ConnectionToken;
 class Exchange;
@@ -51,11 +55,10 @@ class ExpiryPolicy;
 class Message : public PersistableMessage {
 public:
     typedef boost::function<void (const boost::intrusive_ptr<Message>&)> MessageCallback;
-    
+
     QPID_BROKER_EXTERN Message(const framing::SequenceNumber& id = framing::SequenceNumber());
-    QPID_BROKER_EXTERN Message(const Message&);
     QPID_BROKER_EXTERN ~Message();
-        
+
     uint64_t getPersistenceId() const { return persistenceId; }
     void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
 
@@ -75,27 +78,31 @@ public:
     bool isImmediate() const;
     QPID_BROKER_EXTERN const framing::FieldTable* getApplicationHeaders() const;
     QPID_BROKER_EXTERN std::string getAppId() const;
-    framing::FieldTable& getOrInsertHeaders();
     QPID_BROKER_EXTERN bool isPersistent() const;
     bool requiresAccept();
 
-    QPID_BROKER_EXTERN void setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e);
+    /** determine msg expiration time using the TTL value if present */
+    QPID_BROKER_EXTERN void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e);
     void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e);
     bool hasExpired();
     sys::AbsTime getExpiration() const { return expiration; }
+    void setExpiration(sys::AbsTime exp) { expiration = exp; }
     void adjustTtl();
+    void setRedelivered();
+    QPID_BROKER_EXTERN void insertCustomProperty(const std::string& key, int64_t value);
+    QPID_BROKER_EXTERN void insertCustomProperty(const std::string& key, const std::string& value);
+    QPID_BROKER_EXTERN void removeCustomProperty(const std::string& key);
+    void setExchange(const std::string&);
+    void clearApplicationHeadersFlag();
+    /** set the timestamp delivery property to the current time-of-day */
+    QPID_BROKER_EXTERN void setTimestamp();
 
-    framing::FrameSet& getFrames() { return frames; } 
-    const framing::FrameSet& getFrames() const { return frames; } 
-
-    template <class T> T* getProperties() {
-        qpid::framing::AMQHeaderBody* p = frames.getHeaders();
-        return p->get<T>(true);
-    }
+    framing::FrameSet& getFrames() { return frames; }
+    const framing::FrameSet& getFrames() const { return frames; }
 
     template <class T> const T* getProperties() const {
-        qpid::framing::AMQHeaderBody* p = frames.getHeaders();
-        return p->get<T>(true);
+        const qpid::framing::AMQHeaderBody* p = frames.getHeaders();
+        return p->get<T>();
     }
 
     template <class T> const T* hasProperties() const {
@@ -103,6 +110,11 @@ public:
         return p->get<T>();
     }
 
+    template <class T> void eraseProperties() {
+        qpid::framing::AMQHeaderBody* p = frames.getHeaders();
+        p->erase<T>();
+    }
+
     template <class T> const T* getMethod() const {
         return frames.as<T>();
     }
@@ -135,7 +147,7 @@ public:
 
     QPID_BROKER_EXTERN void decodeHeader(framing::Buffer& buffer);
     QPID_BROKER_EXTERN void decodeContent(framing::Buffer& buffer);
-            
+
     void QPID_BROKER_EXTERN tryReleaseContent();
     void releaseContent();
     void releaseContent(MessageStore* s);//deprecated, use 'setStore(store); releaseContent();' instead
@@ -149,24 +161,19 @@ public:
 
     bool isExcluded(const std::vector<std::string>& excludes) const;
     void addTraceId(const std::string& id);
-       
-       void forcePersistent();
-       bool isForcedPersistent();
-    
-
-    /** Call cb when enqueue is complete, may call immediately. Holds cb by reference. */
-    void setEnqueueCompleteCallback(MessageCallback& cb);
-    void resetEnqueueCompleteCallback();
+
+    void forcePersistent();
+    bool isForcedPersistent();
 
     /** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */
     void setDequeueCompleteCallback(MessageCallback& cb);
     void resetDequeueCompleteCallback();
 
     uint8_t getPriority() const;
-
+    bool getIsManagementMessage() const;
+    void setIsManagementMessage(bool b);
   private:
     MessageAdapter& getAdapter() const;
-    void allEnqueuesComplete();
     void allDequeuesComplete();
 
     mutable sys::Mutex lock;
@@ -176,7 +183,7 @@ public:
     bool redelivered;
     bool loaded;
     bool staged;
-	bool forcePersistentPolicy; // used to force message as durable, via a broker policy
+    bool forcePersistentPolicy; // used to force message as durable, via a broker policy
     ConnectionToken* publisher;
     mutable MessageAdapter* adapter;
     qpid::sys::AbsTime expiration;
@@ -187,11 +194,20 @@ public:
     mutable boost::intrusive_ptr<Message> empty;
 
     sys::Monitor callbackLock;
-    MessageCallback* enqueueCallback;
     MessageCallback* dequeueCallback;
     bool inCallback;
 
     uint32_t requiredCredit;
+    bool isManagementMessage;
+      mutable bool copyHeaderOnWrite;
+
+    /**
+     * Expects lock to be held
+     */
+    template <class T> T* getModifiableProperties() {
+        return getHeaderBody()->get<T>(true);
+    }
+    qpid::framing::AMQHeaderBody* getHeaderBody();
 };
 
 }}

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/MessageBuilder.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/MessageBuilder.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/MessageBuilder.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/MessageBuilder.h Thu Oct 20 18:42:46 2011
@@ -33,7 +33,7 @@ namespace qpid {
         class Message;
         class MessageStore;
 
-        class MessageBuilder : public framing::FrameHandler{
+        class QPID_BROKER_CLASS_EXTERN MessageBuilder : public framing::FrameHandler{
         public:
             QPID_BROKER_EXTERN MessageBuilder(MessageStore* const store);
             QPID_BROKER_EXTERN void handle(framing::AMQFrame& frame);

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Messages.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Messages.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Messages.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Messages.h Thu Oct 20 18:42:46 2011
@@ -32,7 +32,8 @@ struct QueuedMessage;
 
 /**
  * This interface abstracts out the access to the messages held for
- * delivery by a Queue instance.
+ * delivery by a Queue instance. Note the the assumption at present is
+ * that all locking is done in the Queue itself.
  */
 class Messages
 {
@@ -75,7 +76,6 @@ class Messages
      * @return true if there is another message, false otherwise.
      */
     virtual bool next(const framing::SequenceNumber&, QueuedMessage&) = 0;
-
     /**
      * Note: Caller is responsible for ensuring that there is a front
      * (e.g. empty() returns false)

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/NullMessageStore.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/NullMessageStore.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/NullMessageStore.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/NullMessageStore.cpp Thu Oct 20 18:42:46 2011
@@ -126,21 +126,25 @@ std::auto_ptr<TPCTransactionContext> Nul
 
 void NullMessageStore::prepare(TPCTransactionContext& ctxt)
 {
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
     prepared.insert(DummyCtxt::getXid(ctxt));
 }
 
 void NullMessageStore::commit(TransactionContext& ctxt)
 {
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
     prepared.erase(DummyCtxt::getXid(ctxt));
 }
 
 void NullMessageStore::abort(TransactionContext& ctxt)
 {
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
     prepared.erase(DummyCtxt::getXid(ctxt));
 }
 
 void NullMessageStore::collectPreparedXids(std::set<std::string>& out)
 {
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
     out.insert(prepared.begin(), prepared.end());
 }
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/NullMessageStore.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/NullMessageStore.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/NullMessageStore.h Thu Oct 20 18:42:46 2011
@@ -25,6 +25,7 @@
 #include "qpid/broker/BrokerImportExport.h"
 #include "qpid/broker/MessageStore.h"
 #include "qpid/broker/Queue.h"
+#include "qpid/sys/Mutex.h"
 
 #include <boost/intrusive_ptr.hpp>
 
@@ -34,10 +35,11 @@ namespace broker {
 /**
  * A null implementation of the MessageStore interface
  */
-class NullMessageStore : public MessageStore
+class QPID_BROKER_CLASS_EXTERN NullMessageStore : public MessageStore
 {
     std::set<std::string> prepared;
     uint64_t nextPersistenceId;
+    qpid::sys::Mutex lock;
   public:
     QPID_BROKER_EXTERN NullMessageStore();
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/PersistableMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/PersistableMessage.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/PersistableMessage.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/PersistableMessage.cpp Thu Oct 20 18:42:46 2011
@@ -34,7 +34,6 @@ class MessageStore;
 PersistableMessage::~PersistableMessage() {}
 
 PersistableMessage::PersistableMessage() :
-    asyncEnqueueCounter(0), 
     asyncDequeueCounter(0),
     store(0)
 {}
@@ -68,24 +67,6 @@ bool PersistableMessage::isContentReleas
     return contentReleaseState.released;
 }
        
-bool PersistableMessage::isEnqueueComplete() {
-    sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
-    return asyncEnqueueCounter == 0;
-}
-
-void PersistableMessage::enqueueComplete() {
-    bool notify = false;
-    {
-        sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
-        if (asyncEnqueueCounter > 0) {
-            if (--asyncEnqueueCounter == 0) {
-                notify = true;
-            }
-        }
-    }
-    if (notify) 
-        allEnqueuesComplete();
-}
 
 bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){
     if (store && (queue->getPersistenceId()!=0)) {
@@ -109,12 +90,7 @@ void PersistableMessage::addToSyncList(P
 
 void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { 
     addToSyncList(queue, _store);
-    enqueueAsync();
-}
-
-void PersistableMessage::enqueueAsync() { 
-    sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
-    asyncEnqueueCounter++; 
+    enqueueStart();
 }
 
 bool PersistableMessage::isDequeueComplete() { 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/PersistableMessage.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/PersistableMessage.h Thu Oct 20 18:42:46 2011
@@ -31,6 +31,7 @@
 #include "qpid/framing/amqp_types.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/broker/PersistableQueue.h"
+#include "qpid/broker/AsyncCompletion.h"
 
 namespace qpid {
 namespace broker {
@@ -43,18 +44,19 @@ class MessageStore;
 class PersistableMessage : public Persistable
 {
     typedef std::list< boost::weak_ptr<PersistableQueue> > syncList;
-    sys::Mutex asyncEnqueueLock;
     sys::Mutex asyncDequeueLock;
     sys::Mutex storeLock;
-       
+
     /**
-     * Tracks the number of outstanding asynchronous enqueue
-     * operations. When the message is enqueued asynchronously the
-     * count is incremented; when that enqueue completes it is
-     * decremented. Thus when it is 0, there are no outstanding
-     * enqueues.
+     * "Ingress" messages == messages sent _to_ the broker.
+     * Tracks the number of outstanding asynchronous operations that must
+     * complete before an inbound message can be considered fully received by the
+     * broker.  E.g. all enqueues have completed, the message has been written
+     * to store, credit has been replenished, etc. Once all outstanding
+     * operations have completed, the transfer of this message from the client
+     * may be considered complete.
      */
-    int asyncEnqueueCounter;
+    AsyncCompletion ingressCompletion;
 
     /**
      * Tracks the number of outstanding asynchronous dequeue
@@ -65,7 +67,6 @@ class PersistableMessage : public Persis
      */
     int asyncDequeueCounter;
 
-    void enqueueAsync();
     void dequeueAsync();
 
     syncList synclist;
@@ -80,8 +81,6 @@ class PersistableMessage : public Persis
     ContentReleaseState contentReleaseState;
 
   protected:
-    /** Called when all enqueues are complete for this message. */
-    virtual void allEnqueuesComplete() = 0;
     /** Called when all dequeues are complete for this message. */
     virtual void allDequeuesComplete() = 0;
 
@@ -115,9 +114,12 @@ class PersistableMessage : public Persis
 
     virtual QPID_BROKER_EXTERN bool isPersistent() const = 0;
 
-    QPID_BROKER_EXTERN bool isEnqueueComplete();
+    /** track the progress of a message received by the broker - see ingressCompletion above */
+    QPID_BROKER_INLINE_EXTERN bool isIngressComplete() { return ingressCompletion.isDone(); }
+    QPID_BROKER_INLINE_EXTERN AsyncCompletion& getIngressCompletion() { return ingressCompletion; }
 
-    QPID_BROKER_EXTERN void enqueueComplete();
+    QPID_BROKER_INLINE_EXTERN void enqueueStart() { ingressCompletion.startCompleter(); }
+    QPID_BROKER_INLINE_EXTERN void enqueueComplete() { ingressCompletion.finishCompleter(); }
 
     QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue,
                                          MessageStore* _store);
@@ -133,7 +135,6 @@ class PersistableMessage : public Persis
     bool isStoredOnQueue(PersistableQueue::shared_ptr queue);
     
     void addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store);
-    
 };
 
 }}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org