You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 03:20:13 UTC

svn commit: r1187150 [8/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf2...

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxWorkRecord.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxWorkRecord.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxWorkRecord.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxWorkRecord.h Fri Oct 21 01:19:00 2011
@@ -73,19 +73,9 @@ public:
     void timedout();
     void setTimeout(boost::intrusive_ptr<DtxTimeout> t) { timeout = t; }
     boost::intrusive_ptr<DtxTimeout> getTimeout() { return timeout; }
-    std::string getXid() const { return xid; }
-    bool isCompleted() const { return completed; }
-    bool isRolledback() const { return rolledback; }
-    bool isPrepared() const { return prepared; }
-    bool isExpired() const { return expired; }
-
-    // Used by cluster update;
-    size_t size() const { return work.size(); }
-    DtxBuffer::shared_ptr operator[](size_t i) const;
-    uint32_t getTimeout() const { return timeout? timeout->timeout : 0; }
-    size_t indexOf(const DtxBuffer::shared_ptr&);
 };
 
-}} // qpid::broker
+}
+}
 
 #endif

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Exchange.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Exchange.cpp Fri Oct 21 01:19:00 2011
@@ -19,18 +19,16 @@
  *
  */
 
-#include "qpid/broker/Broker.h"
-#include "qpid/broker/DeliverableMessage.h"
 #include "qpid/broker/Exchange.h"
 #include "qpid/broker/ExchangeRegistry.h"
 #include "qpid/broker/FedOps.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/management/ManagementAgent.h"
 #include "qpid/broker/Queue.h"
+#include "qpid/log/Statement.h"
 #include "qpid/framing/MessageProperties.h"
 #include "qpid/framing/reply_exceptions.h"
-#include "qpid/log/Statement.h"
-#include "qpid/management/ManagementAgent.h"
-#include "qpid/sys/ExceptionHolder.h"
-#include <stdexcept>
+#include "qpid/broker/DeliverableMessage.h"
 
 using namespace qpid::broker;
 using namespace qpid::framing;
@@ -58,7 +56,7 @@ Exchange::PreRoute::PreRoute(Deliverable
 
         if (parent->sequence){
             parent->sequenceNo++;
-            msg.getMessage().insertCustomProperty(qpidMsgSequence,parent->sequenceNo);
+            msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo);
         }
         if (parent->ive) {
             parent->lastMsg =  &( msg.getMessage());
@@ -72,36 +70,6 @@ Exchange::PreRoute::~PreRoute(){
     }
 }
 
-namespace {
-/** Store information about an exception to be thrown later.
- * If multiple exceptions are stored, save the first of the "most severe"
- * exceptions, SESSION is les sever than CONNECTION etc.
- */
-class  ExInfo {
-  public:
-    enum Type { NONE, SESSION, CONNECTION, OTHER };
-
-    ExInfo(string exchange) : type(NONE), exchange(exchange) {}
-    void store(Type type_, const qpid::sys::ExceptionHolder& exception_, const boost::shared_ptr<Queue>& queue) {
-        QPID_LOG(warning, "Exchange " << exchange << " cannot deliver to  queue "
-                 <<  queue->getName() << ": " << exception_.what());
-        if (type < type_) {     // Replace less severe exception
-            type = type_;
-            exception = exception_;
-        }
-    }
-
-    void raise() {
-        exception.raise();
-    }
-
-  private:
-    Type type;
-    string exchange;
-    qpid::sys::ExceptionHolder exception;
-};
-}
-
 void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
 {
     int count = 0;
@@ -112,25 +80,11 @@ void Exchange::doRoute(Deliverable& msg,
             msg.getMessage().blockContentRelease();
         }
 
-
-        ExInfo error(getName()); // Save exception to throw at the end.
         for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) {
-            try {
-                msg.deliverTo((*i)->queue);
-                if ((*i)->mgmtBinding != 0)
-                    (*i)->mgmtBinding->inc_msgMatched();
-            }
-            catch (const SessionException& e) {
-                error.store(ExInfo::SESSION, framing::createSessionException(e.code, e.what()),(*i)->queue);
-            }
-            catch (const ConnectionException& e) {
-                error.store(ExInfo::CONNECTION, framing::createConnectionException(e.code, e.what()), (*i)->queue);
-            }
-            catch (const std::exception& e) {
-                error.store(ExInfo::OTHER, qpid::sys::ExceptionHolder(new Exception(e.what())), (*i)->queue);
-            }
+            msg.deliverTo((*i)->queue);
+            if ((*i)->mgmtBinding != 0)
+                (*i)->mgmtBinding->inc_msgMatched();
         }
-        error.raise();
     }
 
     if (mgmtExchange != 0)
@@ -161,7 +115,7 @@ void Exchange::routeIVE(){
 
 Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) :
     name(_name), durable(false), persistenceId(0), sequence(false),
-    sequenceNo(0), ive(false), mgmtExchange(0), broker(b), destroyed(false)
+    sequenceNo(0), ive(false), mgmtExchange(0), broker(b)
 {
     if (parent != 0 && broker != 0)
     {
@@ -179,7 +133,7 @@ Exchange::Exchange (const string& _name,
 Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
                    Manageable* parent, Broker* b)
     : name(_name), durable(_durable), alternateUsers(0), persistenceId(0),
-      args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), broker(b), destroyed(false)
+      args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), broker(b)
 {
     if (parent != 0 && broker != 0)
     {
@@ -201,11 +155,7 @@ Exchange::Exchange(const string& _name, 
     }
 
     ive = _args.get(qpidIVE);
-    if (ive) {
-        if (broker && broker->isInCluster())
-            throw framing::NotImplementedException("Cannot use Initial Value Exchanges in a cluster");
-        QPID_LOG(debug, "Configured exchange " <<  _name  << " with Initial Value");
-    }
+    if (ive) QPID_LOG(debug, "Configured exchange " <<  _name  << " with Initial Value");
 }
 
 Exchange::~Exchange ()
@@ -390,14 +340,5 @@ bool Exchange::MatchQueue::operator()(Ex
 }
 
 void Exchange::setProperties(const boost::intrusive_ptr<Message>& msg) {
-    msg->setExchange(getName());
-}
-
-bool Exchange::routeWithAlternate(Deliverable& msg)
-{
-    route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders());
-    if (!msg.delivered && alternate) {
-        alternate->route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders());
-    }
-    return msg.delivered;
+    msg->getProperties<DeliveryProperties>()->setExchange(getName());
 }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Exchange.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Exchange.h Fri Oct 21 01:19:00 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,7 +39,7 @@ namespace broker {
 class Broker;
 class ExchangeRegistry;
 
-class QPID_BROKER_CLASS_EXTERN Exchange : public PersistableExchange, public management::Manageable {
+class Exchange : public PersistableExchange, public management::Manageable {
 public:
     struct Binding : public management::Manageable {
         typedef boost::shared_ptr<Binding>       shared_ptr;
@@ -82,15 +82,15 @@ protected:
     private:
         Exchange* parent;
     };
-
+           
     typedef boost::shared_ptr<const std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > ConstBindingList;
     typedef boost::shared_ptr<      std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > BindingList;
     void doRoute(Deliverable& msg, ConstBindingList b);
     void routeIVE();
-
+           
 
     struct MatchQueue {
-        const boost::shared_ptr<Queue> queue;
+        const boost::shared_ptr<Queue> queue;        
         MatchQueue(boost::shared_ptr<Queue> q);
         bool operator()(Exchange::Binding::shared_ptr b);
     };
@@ -133,15 +133,15 @@ protected:
 
         /** Returns true if propagation is needed. */
         bool delOrigin(const std::string& queueName, const std::string& origin){
-            if (origin.empty()) {   // no remote == local binding
-                if (localBindings > 0)
-                    localBindings--;
-                return localBindings == 0;
-            }
-            size_t match = fedBindings[queueName].erase(origin);
-            if (fedBindings[queueName].empty())
-                fedBindings.erase(queueName);
-            return match != 0;
+            fedBindings[queueName].erase(origin);
+            return true;
+        }
+
+        /** Returns true if propagation is needed. */
+        bool delOrigin() {
+            if (localBindings > 0)
+                localBindings--;
+            return localBindings == 0;
         }
 
         uint32_t count() {
@@ -149,11 +149,7 @@ protected:
         }
 
         uint32_t countFedBindings(const std::string& queueName) {
-            // don't use '[]' - it may increase size of fedBindings!
-            std::map<std::string, originSet>::iterator i;
-            if ((i = fedBindings.find(queueName)) != fedBindings.end())
-                return  i->second.size();
-            return 0;
+            return  fedBindings[queueName].size();
         }
     };
 
@@ -166,7 +162,7 @@ public:
                                          Broker* broker = 0);
     QPID_BROKER_EXTERN Exchange(const std::string& _name, bool _durable, const qpid::framing::FieldTable& _args,
                                 management::Manageable* parent = 0, Broker* broker = 0);
-    QPID_BROKER_INLINE_EXTERN virtual ~Exchange();
+    QPID_BROKER_EXTERN virtual ~Exchange();
 
     const std::string& getName() const { return name; }
     bool isDurable() { return durable; }
@@ -195,7 +191,7 @@ public:
     virtual bool isBound(boost::shared_ptr<Queue> queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0;
     QPID_BROKER_EXTERN virtual void setProperties(const boost::intrusive_ptr<Message>&);
     virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
-
+    
     //PersistableExchange:
     QPID_BROKER_EXTERN void setPersistenceId(uint64_t id) const;
     uint64_t getPersistenceId() const { return persistenceId; }
@@ -226,20 +222,14 @@ public:
      */
     void recoveryComplete(ExchangeRegistry& exchanges);
 
-    bool routeWithAlternate(Deliverable& message);
-
-    void destroy() { destroyed = true; }
-    bool isDestroyed() const { return destroyed; }
-
 protected:
     qpid::sys::Mutex bridgeLock;
     std::vector<DynamicBridge*> bridgeVector;
     Broker* broker;
-    bool destroyed;
 
     QPID_BROKER_EXTERN virtual void handleHelloRequest();
     void propagateFedOp(const std::string& routingKey, const std::string& tags,
-                        const std::string& op,         const std::string& origin,
+                        const std::string& op,         const std::string& origin, 
                         qpid::framing::FieldTable* extra_args=0);
 };
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/ExchangeRegistry.cpp Fri Oct 21 01:19:00 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,7 +39,7 @@ pair<Exchange::shared_ptr, bool> Exchang
     return declare(name, type, false, FieldTable());
 }
 
-pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type,
+pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type, 
                                                            bool durable, const FieldTable& args){
     RWlock::ScopedWlock locker(lock);
     ExchangeMap::iterator i =  exchanges.find(name);
@@ -61,7 +61,7 @@ pair<Exchange::shared_ptr, bool> Exchang
         }else{
             FunctionMap::iterator i =  factory.find(type);
             if (i == factory.end()) {
-                throw UnknownExchangeTypeException();
+                throw UnknownExchangeTypeException();    
             } else {
                 exchange = i->second(name, durable, args, parent, broker);
             }
@@ -82,7 +82,6 @@ void ExchangeRegistry::destroy(const str
     RWlock::ScopedWlock locker(lock);
     ExchangeMap::iterator i =  exchanges.find(name);
     if (i != exchanges.end()) {
-        i->second->destroy();
         exchanges.erase(i);
     }
 }
@@ -105,7 +104,7 @@ void ExchangeRegistry::registerType(cons
 }
 
 
-namespace
+namespace 
 {
 const std::string empty;
 }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/ExpiryPolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/ExpiryPolicy.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/ExpiryPolicy.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/ExpiryPolicy.cpp Fri Oct 21 01:19:00 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,12 +27,12 @@ namespace broker {
 
 ExpiryPolicy::~ExpiryPolicy() {}
 
+void ExpiryPolicy::willExpire(Message&) {}
+
 bool ExpiryPolicy::hasExpired(Message& m) {
     return m.getExpiration() < sys::AbsTime::now();
 }
 
-sys::AbsTime ExpiryPolicy::getCurrentTime() {
-    return sys::AbsTime::now();
-}
+void ExpiryPolicy::forget(Message&) {}
 
 }} // namespace qpid::broker

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/ExpiryPolicy.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/ExpiryPolicy.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/ExpiryPolicy.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/ExpiryPolicy.h Fri Oct 21 01:19:00 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,11 +26,6 @@
 #include "qpid/broker/BrokerImportExport.h"
 
 namespace qpid {
-
-namespace sys {
-class AbsTime;
-}
-
 namespace broker {
 
 class Message;
@@ -38,12 +33,13 @@ class Message;
 /**
  * Default expiry policy.
  */
-class QPID_BROKER_CLASS_EXTERN ExpiryPolicy : public RefCounted
+class ExpiryPolicy : public RefCounted
 {
   public:
     QPID_BROKER_EXTERN virtual ~ExpiryPolicy();
+    QPID_BROKER_EXTERN virtual void willExpire(Message&);
     QPID_BROKER_EXTERN virtual bool hasExpired(Message&);
-    QPID_BROKER_EXTERN virtual qpid::sys::AbsTime getCurrentTime();
+    QPID_BROKER_EXTERN virtual void forget(Message&);
 };
 }} // namespace qpid::broker
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Fairshare.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Fairshare.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Fairshare.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Fairshare.cpp Fri Oct 21 01:19:00 2011
@@ -24,7 +24,6 @@
 #include "qpid/log/Statement.h"
 #include <boost/format.hpp>
 #include <boost/lexical_cast.hpp>
-#include <boost/assign/list_of.hpp>
 
 namespace qpid {
 namespace broker {
@@ -105,80 +104,51 @@ bool Fairshare::setState(Messages& m, ui
     return fairshare && fairshare->setState(priority, count);
 }
 
-int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::vector<std::string>& keys)
+int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key)
 {
-    qpid::framing::FieldTable::ValuePtr v;
-    std::vector<std::string>::const_iterator i = keys.begin(); 
-    while (!v && i != keys.end()) {
-        v = settings.get(*i++);
-    }
-
+    qpid::framing::FieldTable::ValuePtr v = settings.get(key);
     if (!v) {
         return 0;
     } else if (v->convertsTo<int>()) {
         return v->get<int>();
     } else if (v->convertsTo<std::string>()){
         std::string s = v->get<std::string>();
-        try {
-            return boost::lexical_cast<int>(s);
+        try { 
+            return boost::lexical_cast<int>(s); 
         } catch(const boost::bad_lexical_cast&) {
-            QPID_LOG(warning, "Ignoring invalid integer value for " << *i << ": " << s);
+            QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s);
             return 0;
         }
     } else {
-        QPID_LOG(warning, "Ignoring invalid integer value for " << *i << ": " << *v);
+        QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << *v);
         return 0;
     }
 }
 
-int getIntegerSettingForKey(const qpid::framing::FieldTable& settings, const std::string& key)
-{
-    return getIntegerSetting(settings, boost::assign::list_of<std::string>(key));
-}
-
-int getSetting(const qpid::framing::FieldTable& settings, const std::vector<std::string>& keys, int minvalue, int maxvalue)
+int getSetting(const qpid::framing::FieldTable& settings, const std::string& key, int minvalue, int maxvalue)
 {
-    return std::max(minvalue,std::min(getIntegerSetting(settings, keys), maxvalue));
-}
-
-std::auto_ptr<Fairshare> getFairshareForKey(const qpid::framing::FieldTable& settings, uint levels, const std::string& key)
-{
-    uint defaultLimit = getIntegerSettingForKey(settings, key);
-    std::auto_ptr<Fairshare> fairshare(new Fairshare(levels, defaultLimit));
-    for (uint i = 0; i < levels; i++) {
-        std::string levelKey = (boost::format("%1%-%2%") % key % i).str();
-        if(settings.isSet(levelKey)) {
-            fairshare->setLimit(i, getIntegerSettingForKey(settings, levelKey));
-        }
-    }
-    if (!fairshare->isNull()) {
-        return fairshare;
-    } else {
-        return std::auto_ptr<Fairshare>();
-    }
-}
-
-std::auto_ptr<Fairshare> getFairshare(const qpid::framing::FieldTable& settings,
-                                      uint levels,
-                                      const std::vector<std::string>& keys)
-{
-    std::auto_ptr<Fairshare> fairshare;
-    for (std::vector<std::string>::const_iterator i = keys.begin(); i != keys.end() && !fairshare.get(); ++i) {
-        fairshare = getFairshareForKey(settings, levels, *i);
-    }
-    return fairshare;
+    return std::max(minvalue,std::min(getIntegerSetting(settings, key), maxvalue));
 }
 
 std::auto_ptr<Messages> Fairshare::create(const qpid::framing::FieldTable& settings)
 {
-    using boost::assign::list_of;
     std::auto_ptr<Messages> result;
-    size_t levels = getSetting(settings, list_of<std::string>("qpid.priorities")("x-qpid-priorities"), 0, 100);
+    size_t levels = getSetting(settings, "x-qpid-priorities", 1, 100);
     if (levels) {
-        std::auto_ptr<Fairshare> fairshare =
-            getFairshare(settings, levels, list_of<std::string>("qpid.fairshare")("x-qpid-fairshare"));
-        if (fairshare.get()) result = fairshare;
-        else result = std::auto_ptr<Messages>(new PriorityQueue(levels));
+        uint defaultLimit = getIntegerSetting(settings, "x-qpid-fairshare");
+        std::auto_ptr<Fairshare> fairshare(new Fairshare(levels, defaultLimit));
+        for (uint i = 0; i < levels; i++) {
+            std::string key = (boost::format("x-qpid-fairshare-%1%") % i).str();
+            if(settings.isSet(key)) {
+                fairshare->setLimit(i, getIntegerSetting(settings, key));
+            }
+        }
+        
+        if (fairshare->isNull()) {
+            result = std::auto_ptr<Messages>(new PriorityQueue(levels));
+        } else {
+            result = fairshare;
+        }
     }
     return result;
 }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Fairshare.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Fairshare.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Fairshare.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Fairshare.h Fri Oct 21 01:19:00 2011
@@ -41,18 +41,18 @@ class Fairshare : public PriorityQueue
     bool getState(uint& priority, uint& count) const;
     bool setState(uint priority, uint count);
     void setLimit(size_t level, uint limit);
-    bool isNull();
     static std::auto_ptr<Messages> create(const qpid::framing::FieldTable& settings);
     static bool getState(const Messages&, uint& priority, uint& count);
     static bool setState(Messages&, uint priority, uint count);
   private:
     std::vector<uint> limits;
-
+    
     uint priority;
     uint count;
-
+    
     uint currentLevel();
     uint nextLevel();
+    bool isNull();
     bool limitReached();
     bool findFrontLevel(uint& p, PriorityLevels&);
 };

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/FanOutExchange.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/FanOutExchange.cpp Fri Oct 21 01:19:00 2011
@@ -18,7 +18,6 @@
  * under the License.
  *
  */
-#include "qpid/log/Statement.h"
 #include "qpid/broker/FanOutExchange.h"
 #include "qpid/broker/FedOps.h"
 #include <algorithm>
@@ -66,7 +65,7 @@ bool FanOutExchange::bind(Queue::shared_
     } else if (fedOp == fedOpUnbind) {
         propagate = fedBinding.delOrigin(queue->getName(), fedOrigin);
         if (fedBinding.countFedBindings(queue->getName()) == 0)
-            unbind(queue, "", args);
+            unbind(queue, "", 0);
     } else if (fedOp == fedOpReorigin) {
         if (fedBinding.hasLocal()) {
             propagateFedOp(string(), string(), fedOpBind, string());
@@ -79,16 +78,12 @@ bool FanOutExchange::bind(Queue::shared_
     return true;
 }
 
-bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* args)
+bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* /*args*/)
 {
-    string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
     bool propagate = false;
 
-    QPID_LOG(debug, "Unbinding queue " << queue->getName()
-             << " from exchange " << getName() << " origin=" << fedOrigin << ")" );
-
     if (bindings.remove_if(MatchQueue(queue))) {
-        propagate = fedBinding.delOrigin(queue->getName(), fedOrigin);
+        propagate = fedBinding.delOrigin();
         if (mgmtExchange != 0) {
             mgmtExchange->dec_bindingCount();
         }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/HeadersExchange.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/HeadersExchange.cpp Fri Oct 21 01:19:00 2011
@@ -112,14 +112,9 @@ bool HeadersExchange::bind(Queue::shared
 
         {
             Mutex::ScopedLock l(lock);
-            //NOTE: do not include the fed op/tags/origin in the
-            //arguments as when x-match is 'all' these would prevent
-            //matching (they are internally added properties
-            //controlling binding propagation but not relevant to
-            //actual routing)
-            Binding::shared_ptr binding (new Binding (bindingKey, queue, this, extra_args));
+            Binding::shared_ptr binding (new Binding (bindingKey, queue, this, *args));
             BoundKey bk(binding);
-            if (bindings.add_unless(bk, MatchArgs(queue, &extra_args))) {
+            if (bindings.add_unless(bk, MatchArgs(queue, args))) {
                 binding->startManagement();
                 propagate = bk.fedBinding.addOrigin(queue->getName(), fedOrigin);
                 if (mgmtExchange != 0) {
@@ -163,13 +158,12 @@ bool HeadersExchange::bind(Queue::shared
     return true;
 }
 
-bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable *args){
+bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable*){
     bool propagate = false;
-    string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
     {
         Mutex::ScopedLock l(lock);
 
-        FedUnbindModifier modifier(queue->getName(), fedOrigin);
+        FedUnbindModifier modifier;
         MatchKey match_key(queue, bindingKey);
         bindings.modify_if(match_key, modifier);
         propagate = modifier.shouldPropagate;
@@ -336,7 +330,11 @@ HeadersExchange::FedUnbindModifier::FedU
 
 bool HeadersExchange::FedUnbindModifier::operator()(BoundKey & bk)
 {
-    shouldPropagate = bk.fedBinding.delOrigin(queueName, fedOrigin);
+    if ("" == fedOrigin) {
+        shouldPropagate = bk.fedBinding.delOrigin();
+    } else {
+        shouldPropagate = bk.fedBinding.delOrigin(queueName, fedOrigin);
+    }
     if (bk.fedBinding.countFedBindings(queueName) == 0)
     {
         shouldUnbind = true;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/LegacyLVQ.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/LegacyLVQ.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/LegacyLVQ.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/LegacyLVQ.cpp Fri Oct 21 01:19:00 2011
@@ -93,7 +93,11 @@ void LegacyLVQ::removeIf(Predicate p)
     //purging of an LVQ is not enabled if the broker is clustered
     //(expired messages will be removed on delivery and consolidated
     //by key as part of normal LVQ operation).
-    if (!broker || !broker->isInCluster())
+
+    //TODO: Is there a neater way to check whether broker is
+    //clustered? Here we assume that if the clustered timer is the
+    //same as the regular timer, we are not clustered:
+    if (!broker || &(broker->getClusterTimer()) == &(broker->getTimer()))
         MessageMap::removeIf(p);
 }
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Link.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Link.cpp Fri Oct 21 01:19:00 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,6 +30,7 @@
 #include "qpid/framing/enum.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/broker/AclModule.h"
+#include "qpid/sys/ClusterSafe.h"
 
 using namespace qpid::broker;
 using qpid::framing::Buffer;
@@ -56,8 +57,8 @@ Link::Link(LinkRegistry*  _links,
            string&        _password,
            Broker*        _broker,
            Manageable*    parent)
-    : links(_links), store(_store), host(_host), port(_port),
-      transport(_transport),
+    : links(_links), store(_store), host(_host), port(_port), 
+      transport(_transport), 
       durable(_durable),
       authMechanism(_authMechanism), username(_username), password(_password),
       persistenceId(0), mgmtObject(0), broker(_broker), state(0),
@@ -96,8 +97,7 @@ void Link::setStateLH (int newState)
         return;
 
     state = newState;
-
-    if (hideManagement())
+    if (mgmtObject == 0)
         return;
 
     switch (state)
@@ -117,12 +117,12 @@ void Link::startConnectionLH ()
         // Set the state before calling connect.  It is possible that connect
         // will fail synchronously and call Link::closed before returning.
         setStateLH(STATE_CONNECTING);
-        broker->connect (host, boost::lexical_cast<std::string>(port), transport,
+        broker->connect (host, port, transport,
                          boost::bind (&Link::closed, this, _1, _2));
         QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port);
     } catch(std::exception& e) {
         setStateLH(STATE_WAITING);
-        if (!hideManagement())
+        if (mgmtObject != 0)
             mgmtObject->set_lastError (e.what());
     }
 }
@@ -133,7 +133,8 @@ void Link::established ()
     addr << host << ":" << port;
     QPID_LOG (info, "Inter-broker link established to " << addr.str());
 
-    if (!hideManagement() && agent)
+    // Don't raise the management event in a cluster, other members wont't get this call.
+    if (!sys::isCluster()) 
         agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
 
     {
@@ -153,11 +154,12 @@ void Link::closed (int, std::string text
 
     connection = 0;
 
+    // Don't raise the management event in a cluster, other members wont't get this call.
     if (state == STATE_OPERATIONAL) {
         stringstream addr;
         addr << host << ":" << port;
         QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str());
-        if (!hideManagement() && agent)
+        if (!sys::isCluster())
             agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
     }
 
@@ -170,7 +172,7 @@ void Link::closed (int, std::string text
     if (state != STATE_FAILED)
     {
         setStateLH(STATE_WAITING);
-        if (!hideManagement())
+        if (mgmtObject != 0)
             mgmtObject->set_lastError (text);
     }
 
@@ -219,7 +221,7 @@ void Link::cancel(Bridge::shared_ptr bri
 {
     {
         Mutex::ScopedLock mutex(lock);
-
+        
         for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
             if ((*i).get() == bridge.get()) {
                 created.erase(i);
@@ -248,19 +250,6 @@ void Link::ioThreadProcessing()
         return;
     QPID_LOG(debug, "Link::ioThreadProcessing()");
 
-    // check for bridge session errors and recover
-    if (!active.empty()) {
-        Bridges::iterator removed = std::remove_if(
-            active.begin(), active.end(), !boost::bind(&Bridge::isSessionReady, _1));
-        for (Bridges::iterator i = removed; i != active.end(); ++i) {
-            Bridge::shared_ptr  bridge = *i;
-            bridge->closed();
-            bridge->cancel(*connection);
-            created.push_back(bridge);
-        }
-        active.erase(removed, active.end());
-    }
-
     //process any pending creates and/or cancellations
     if (!created.empty()) {
         for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
@@ -288,9 +277,9 @@ void Link::maintenanceVisit ()
 {
     Mutex::ScopedLock mutex(lock);
 
-    if (connection && updateUrls) {
+    if (connection && updateUrls) { 
         urls.reset(connection->getKnownHosts());
-        QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls);
+        QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls);        
         updateUrls = false;
     }
 
@@ -309,7 +298,7 @@ void Link::maintenanceVisit ()
             }
         }
     }
-    else if (state == STATE_OPERATIONAL && (!active.empty() || !created.empty() || !cancellations.empty()) && connection != 0)
+    else if (state == STATE_OPERATIONAL && (!created.empty() || !cancellations.empty()) && connection != 0)
         connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
 }
 
@@ -320,7 +309,7 @@ void Link::reconnect(const qpid::Address
     port = a.port;
     transport = a.protocol;
     startConnectionLH();
-    if (!hideManagement()) {
+    if (mgmtObject != 0) {
         stringstream errorString;
         errorString << "Failed over to " << a;
         mgmtObject->set_lastError(errorString.str());
@@ -330,7 +319,7 @@ void Link::reconnect(const qpid::Address
 bool Link::tryFailover()
 {
     Address next;
-    if (urls.next(next) &&
+    if (urls.next(next) && 
         (next.host != host || next.port != port || next.protocol != transport)) {
         links->changeAddress(Address(transport, host, port), next);
         QPID_LOG(debug, "Link failing over to " << host << ":" << port);
@@ -340,12 +329,6 @@ bool Link::tryFailover()
     }
 }
 
-// Management updates for a linke are inconsistent in a cluster, so they are
-// suppressed.
-bool Link::hideManagement() const {
-    return !mgmtObject || ( broker && broker->isInCluster());
-}
-
 uint Link::nextChannel()
 {
     Mutex::ScopedLock mutex(lock);
@@ -358,7 +341,7 @@ void Link::notifyConnectionForced(const 
     Mutex::ScopedLock mutex(lock);
 
     setStateLH(STATE_FAILED);
-    if (!hideManagement())
+    if (mgmtObject != 0)
         mgmtObject->set_lastError(text);
 }
 
@@ -380,7 +363,7 @@ Link::shared_ptr Link::decode(LinkRegist
     string   authMechanism;
     string   username;
     string   password;
-
+    
     buffer.getShortString(host);
     port = buffer.getShort();
     buffer.getShortString(transport);
@@ -392,7 +375,7 @@ Link::shared_ptr Link::decode(LinkRegist
     return links.declare(host, port, transport, durable, authMechanism, username, password).first;
 }
 
-void Link::encode(Buffer& buffer) const
+void Link::encode(Buffer& buffer) const 
 {
     buffer.putShortString(string("link"));
     buffer.putShortString(host);
@@ -404,8 +387,8 @@ void Link::encode(Buffer& buffer) const
     buffer.putShortString(password);
 }
 
-uint32_t Link::encodedSize() const
-{
+uint32_t Link::encodedSize() const 
+{ 
     return host.size() + 1 // short-string (host)
         + 5                // short-string ("link")
         + 2                // port

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Link.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Link.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Link.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Link.h Fri Oct 21 01:19:00 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -85,7 +85,6 @@ namespace qpid {
             void destroy();                  // Called when mgmt deletes this link
             void ioThreadProcessing();       // Called on connection's IO thread by request
             bool tryFailover();              // Called during maintenance visit
-            bool hideManagement() const;
 
         public:
             typedef boost::shared_ptr<Link> shared_ptr;
@@ -123,12 +122,12 @@ namespace qpid {
 
             void notifyConnectionForced(const std::string text);
             void setPassive(bool p);
-
+            
             // PersistableConfig:
             void     setPersistenceId(uint64_t id) const;
             uint64_t getPersistenceId() const { return persistenceId; }
             uint32_t encodedSize() const;
-            void     encode(framing::Buffer& buffer) const;
+            void     encode(framing::Buffer& buffer) const; 
             const std::string& getName() const;
 
             static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
@@ -136,7 +135,6 @@ namespace qpid {
             // Manageable entry points
             management::ManagementObject*    GetManagementObject(void) const;
             management::Manageable::status_t ManagementMethod(uint32_t, management::Args&, std::string&);
-
         };
     }
 }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/LinkRegistry.cpp Fri Oct 21 01:19:00 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -381,7 +381,7 @@ std::string LinkRegistry::createKey(cons
     return keystream.str();
 }
 
-void LinkRegistry::setPassive(bool p)
+void LinkRegistry::setPassive(bool p) 
 {
     Mutex::ScopedLock locker(lock);
     passiveChanged = p != passive;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Message.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Message.cpp Fri Oct 21 01:19:00 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,7 +30,6 @@
 #include "qpid/framing/SendContent.h"
 #include "qpid/framing/SequenceNumber.h"
 #include "qpid/framing/TypeFilter.h"
-#include "qpid/framing/reply_exceptions.h"
 #include "qpid/log/Statement.h"
 
 #include <time.h>
@@ -50,16 +49,27 @@ TransferAdapter Message::TRANSFER;
 
 Message::Message(const framing::SequenceNumber& id) :
     frames(id), persistenceId(0), redelivered(false), loaded(false),
-    staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
-    expiration(FAR_FUTURE), dequeueCallback(0),
-    inCallback(false), requiredCredit(0), isManagementMessage(false), copyHeaderOnWrite(false)
-{}
+    staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), 
+    expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0),
+    inCallback(false), requiredCredit(0) {}
 
-Message::~Message() {}
+Message::Message(const Message& original) :
+    PersistableMessage(), frames(original.frames), persistenceId(0), redelivered(false), loaded(false),
+    staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), 
+    expiration(original.expiration), enqueueCallback(0), dequeueCallback(0),
+    inCallback(false), requiredCredit(0) 
+{
+    setExpiryPolicy(original.expiryPolicy);
+}
+
+Message::~Message()
+{
+    if (expiryPolicy)
+        expiryPolicy->forget(*this);
+}
 
 void Message::forcePersistent()
 {
-    sys::Mutex::ScopedLock l(lock);
     // only set forced bit if we actually need to force.
     if (! getAdapter().isPersistent(frames) ){
         forcePersistentPolicy = true;
@@ -76,7 +86,7 @@ std::string Message::getRoutingKey() con
     return getAdapter().getRoutingKey(frames);
 }
 
-std::string Message::getExchangeName() const
+std::string Message::getExchangeName() const 
 {
     return getAdapter().getExchange(frames);
 }
@@ -85,7 +95,7 @@ const boost::shared_ptr<Exchange> Messag
 {
     if (!exchange) {
         exchange = registry.get(getExchangeName());
-    }
+    } 
     return exchange;
 }
 
@@ -96,19 +106,16 @@ bool Message::isImmediate() const
 
 const FieldTable* Message::getApplicationHeaders() const
 {
-    sys::Mutex::ScopedLock l(lock);
     return getAdapter().getApplicationHeaders(frames);
 }
 
 std::string Message::getAppId() const
 {
-    sys::Mutex::ScopedLock l(lock);
     return getAdapter().getAppId(frames);
 }
 
 bool Message::isPersistent() const
 {
-    sys::Mutex::ScopedLock l(lock);
     return (getAdapter().isPersistent(frames) || forcePersistentPolicy);
 }
 
@@ -188,7 +195,7 @@ void Message::decodeContent(framing::Buf
     } else {
         //adjust header flags
         MarkLastSegment f;
-        frames.map_if(f, TypeFilter<HEADER_BODY>());
+        frames.map_if(f, TypeFilter<HEADER_BODY>());    
     }
     //mark content loaded
     loaded = true;
@@ -240,7 +247,7 @@ void Message::destroy()
 bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const
 {
     intrusive_ptr<const PersistableMessage> pmsg(this);
-
+    
     bool done = false;
     string& data = frame.castBody<AMQContentBody>()->getData();
     store->loadContent(queue, pmsg, data, offset, maxContentSize);
@@ -265,7 +272,7 @@ void Message::sendContent(const Queue& q
         uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
         bool morecontent = true;
         for (uint64_t offset = 0; morecontent; offset += maxContentSize)
-        {
+        {            
             AMQFrame frame((AMQContentBody()));
             morecontent = getContentFrame(queue, frame, maxContentSize, offset);
             out.handle(frame);
@@ -283,10 +290,7 @@ void Message::sendHeader(framing::FrameH
 {
     sys::Mutex::ScopedLock l(lock);
     Relay f(out);
-    frames.map_if(f, TypeFilter<HEADER_BODY>());
-    //as frame (and pointer to body) has now been passed to handler,
-    //subsequent modifications should use a copy
-    copyHeaderOnWrite = true;
+    frames.map_if(f, TypeFilter<HEADER_BODY>());    
 }
 
 // TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over
@@ -316,14 +320,13 @@ bool Message::isContentLoaded() const
 }
 
 
-namespace
+namespace 
 {
 const std::string X_QPID_TRACE("x-qpid.trace");
 }
 
 bool Message::isExcluded(const std::vector<std::string>& excludes) const
 {
-    sys::Mutex::ScopedLock l(lock);
     const FieldTable* headers = getApplicationHeaders();
     if (headers) {
         std::string traceStr = headers->getAsString(X_QPID_TRACE);
@@ -342,30 +345,11 @@ bool Message::isExcluded(const std::vect
     return false;
 }
 
-class CloneHeaderBody
-{
-public:
-    void operator()(AMQFrame& f)
-    {
-        f.cloneBody();
-    }
-};
-
-AMQHeaderBody* Message::getHeaderBody()
-{
-    if (copyHeaderOnWrite) {
-        CloneHeaderBody f;
-        frames.map_if(f, TypeFilter<HEADER_BODY>());
-        copyHeaderOnWrite = false;
-    }
-    return frames.getHeaders();
-}
-
 void Message::addTraceId(const std::string& id)
 {
     sys::Mutex::ScopedLock l(lock);
     if (isA<MessageTransferBody>()) {
-        FieldTable& headers = getModifiableProperties<MessageProperties>()->getApplicationHeaders();
+        FieldTable& headers = getProperties<MessageProperties>()->getApplicationHeaders();
         std::string trace = headers.getAsString(X_QPID_TRACE);
         if (trace.empty()) {
             headers.setString(X_QPID_TRACE, id);
@@ -373,22 +357,13 @@ void Message::addTraceId(const std::stri
             trace += ",";
             trace += id;
             headers.setString(X_QPID_TRACE, trace);
-        }
+        }        
     }
 }
 
-void Message::setTimestamp()
-{
-    sys::Mutex::ScopedLock l(lock);
-    DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
-    time_t now = ::time(0);
-    props->setTimestamp(now);   // AMQP-0.10: posix time_t - secs since Epoch
-}
-
-void Message::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e)
+void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) 
 {
-    sys::Mutex::ScopedLock l(lock);
-    DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
+    DeliveryProperties* props = getProperties<DeliveryProperties>();    
     if (props->getTtl()) {
         // AMQP requires setting the expiration property to be posix
         // time_t in seconds. TTL is in milliseconds
@@ -397,70 +372,26 @@ void Message::computeExpiration(const bo
             time_t now = ::time(0);
             props->setExpiration(now + (props->getTtl()/1000));
         }
-        if (e) {
-            // Use higher resolution time for the internal expiry calculation.
-            // Prevent overflow as a signed int64_t
-            Duration ttl(std::min(props->getTtl() * TIME_MSEC,
-                                  (uint64_t) std::numeric_limits<int64_t>::max()));
-            expiration = AbsTime(e->getCurrentTime(), ttl);
-            setExpiryPolicy(e);
-        }
+        // Use higher resolution time for the internal expiry calculation.
+        expiration = AbsTime(AbsTime::now(), Duration(props->getTtl() * TIME_MSEC));
+        setExpiryPolicy(e);
     }
 }
 
 void Message::adjustTtl()
 {
-    sys::Mutex::ScopedLock l(lock);
-    DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
+    DeliveryProperties* props = getProperties<DeliveryProperties>();
     if (props->getTtl()) {
-        if (expiration < FAR_FUTURE) {
-            sys::AbsTime current(
-                expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now());
-            sys::Duration ttl(current, getExpiration());
-            // convert from ns to ms; set to 1 if expired
-            props->setTtl(int64_t(ttl) >= 1000000 ? int64_t(ttl)/1000000 : 1);
-        }
+        sys::Mutex::ScopedLock l(lock);
+        sys::Duration d(sys::AbsTime::now(), getExpiration());
+        props->setTtl(int64_t(d) > 0 ? int64_t(d)/1000000 : 1); // convert from ns to ms; set to 1 if expired
     }
 }
 
-void Message::setRedelivered()
-{
-    sys::Mutex::ScopedLock l(lock);
-    getModifiableProperties<framing::DeliveryProperties>()->setRedelivered(true);
-}
-
-void Message::insertCustomProperty(const std::string& key, int64_t value)
-{
-    sys::Mutex::ScopedLock l(lock);
-    getModifiableProperties<MessageProperties>()->getApplicationHeaders().setInt64(key,value);
-}
-
-void Message::insertCustomProperty(const std::string& key, const std::string& value)
-{
-    sys::Mutex::ScopedLock l(lock);
-    getModifiableProperties<MessageProperties>()->getApplicationHeaders().setString(key,value);
-}
-
-void Message::removeCustomProperty(const std::string& key)
-{
-    sys::Mutex::ScopedLock l(lock);
-    getModifiableProperties<MessageProperties>()->getApplicationHeaders().erase(key);
-}
-
-void Message::setExchange(const std::string& exchange)
-{
-    sys::Mutex::ScopedLock l(lock);
-    getModifiableProperties<DeliveryProperties>()->setExchange(exchange);
-}
-
-void Message::clearApplicationHeadersFlag()
-{
-    sys::Mutex::ScopedLock l(lock);
-    getModifiableProperties<MessageProperties>()->clearApplicationHeadersFlag();
-}
-
 void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
     expiryPolicy = e;
+    if (expiryPolicy) 
+        expiryPolicy->willExpire(*this);
 }
 
 bool Message::hasExpired()
@@ -484,12 +415,30 @@ struct ScopedSet {
 };
 }
 
+void Message::allEnqueuesComplete() {
+    ScopedSet ss(callbackLock, inCallback);
+    MessageCallback* cb = enqueueCallback;
+    if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
+}
+
 void Message::allDequeuesComplete() {
     ScopedSet ss(callbackLock, inCallback);
     MessageCallback* cb = dequeueCallback;
     if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
 }
 
+void Message::setEnqueueCompleteCallback(MessageCallback& cb) {
+    sys::Mutex::ScopedLock l(callbackLock);
+    while (inCallback) callbackLock.wait();
+    enqueueCallback = &cb;
+}
+
+void Message::resetEnqueueCompleteCallback() {
+    sys::Mutex::ScopedLock l(callbackLock);
+    while (inCallback) callbackLock.wait();
+    enqueueCallback = 0;
+}
+
 void Message::setDequeueCompleteCallback(MessageCallback& cb) {
     sys::Mutex::ScopedLock l(callbackLock);
     while (inCallback) callbackLock.wait();
@@ -503,11 +452,12 @@ void Message::resetDequeueCompleteCallba
 }
 
 uint8_t Message::getPriority() const {
-    sys::Mutex::ScopedLock l(lock);
     return getAdapter().getPriority(frames);
 }
 
-bool Message::getIsManagementMessage() const { return isManagementMessage; }
-void Message::setIsManagementMessage(bool b) { isManagementMessage = b; }
+framing::FieldTable& Message::getOrInsertHeaders()
+{
+    return getProperties<MessageProperties>()->getApplicationHeaders();
+}
 
 }} // namespace qpid::broker

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Message.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Message.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Message.h Fri Oct 21 01:19:00 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -29,21 +29,17 @@
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Time.h"
 #include <boost/function.hpp>
-#include <boost/intrusive_ptr.hpp>
 #include <boost/shared_ptr.hpp>
-#include <memory>
 #include <string>
 #include <vector>
 
 namespace qpid {
-
+       
 namespace framing {
-class AMQBody;
-class AMQHeaderBody;
 class FieldTable;
 class SequenceNumber;
 }
-
+       
 namespace broker {
 class ConnectionToken;
 class Exchange;
@@ -55,10 +51,11 @@ class ExpiryPolicy;
 class Message : public PersistableMessage {
 public:
     typedef boost::function<void (const boost::intrusive_ptr<Message>&)> MessageCallback;
-
+    
     QPID_BROKER_EXTERN Message(const framing::SequenceNumber& id = framing::SequenceNumber());
+    QPID_BROKER_EXTERN Message(const Message&);
     QPID_BROKER_EXTERN ~Message();
-
+        
     uint64_t getPersistenceId() const { return persistenceId; }
     void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
 
@@ -78,31 +75,27 @@ public:
     bool isImmediate() const;
     QPID_BROKER_EXTERN const framing::FieldTable* getApplicationHeaders() const;
     QPID_BROKER_EXTERN std::string getAppId() const;
+    framing::FieldTable& getOrInsertHeaders();
     QPID_BROKER_EXTERN bool isPersistent() const;
     bool requiresAccept();
 
-    /** determine msg expiration time using the TTL value if present */
-    QPID_BROKER_EXTERN void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e);
+    QPID_BROKER_EXTERN void setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e);
     void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e);
     bool hasExpired();
     sys::AbsTime getExpiration() const { return expiration; }
-    void setExpiration(sys::AbsTime exp) { expiration = exp; }
     void adjustTtl();
-    void setRedelivered();
-    QPID_BROKER_EXTERN void insertCustomProperty(const std::string& key, int64_t value);
-    QPID_BROKER_EXTERN void insertCustomProperty(const std::string& key, const std::string& value);
-    QPID_BROKER_EXTERN void removeCustomProperty(const std::string& key);
-    void setExchange(const std::string&);
-    void clearApplicationHeadersFlag();
-    /** set the timestamp delivery property to the current time-of-day */
-    QPID_BROKER_EXTERN void setTimestamp();
 
-    framing::FrameSet& getFrames() { return frames; }
-    const framing::FrameSet& getFrames() const { return frames; }
+    framing::FrameSet& getFrames() { return frames; } 
+    const framing::FrameSet& getFrames() const { return frames; } 
+
+    template <class T> T* getProperties() {
+        qpid::framing::AMQHeaderBody* p = frames.getHeaders();
+        return p->get<T>(true);
+    }
 
     template <class T> const T* getProperties() const {
-        const qpid::framing::AMQHeaderBody* p = frames.getHeaders();
-        return p->get<T>();
+        qpid::framing::AMQHeaderBody* p = frames.getHeaders();
+        return p->get<T>(true);
     }
 
     template <class T> const T* hasProperties() const {
@@ -110,11 +103,6 @@ public:
         return p->get<T>();
     }
 
-    template <class T> void eraseProperties() {
-        qpid::framing::AMQHeaderBody* p = frames.getHeaders();
-        p->erase<T>();
-    }
-
     template <class T> const T* getMethod() const {
         return frames.as<T>();
     }
@@ -147,7 +135,7 @@ public:
 
     QPID_BROKER_EXTERN void decodeHeader(framing::Buffer& buffer);
     QPID_BROKER_EXTERN void decodeContent(framing::Buffer& buffer);
-
+            
     void QPID_BROKER_EXTERN tryReleaseContent();
     void releaseContent();
     void releaseContent(MessageStore* s);//deprecated, use 'setStore(store); releaseContent();' instead
@@ -161,19 +149,24 @@ public:
 
     bool isExcluded(const std::vector<std::string>& excludes) const;
     void addTraceId(const std::string& id);
-
-    void forcePersistent();
-    bool isForcedPersistent();
+       
+       void forcePersistent();
+       bool isForcedPersistent();
+    
+
+    /** Call cb when enqueue is complete, may call immediately. Holds cb by reference. */
+    void setEnqueueCompleteCallback(MessageCallback& cb);
+    void resetEnqueueCompleteCallback();
 
     /** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */
     void setDequeueCompleteCallback(MessageCallback& cb);
     void resetDequeueCompleteCallback();
 
     uint8_t getPriority() const;
-    bool getIsManagementMessage() const;
-    void setIsManagementMessage(bool b);
+
   private:
     MessageAdapter& getAdapter() const;
+    void allEnqueuesComplete();
     void allDequeuesComplete();
 
     mutable sys::Mutex lock;
@@ -183,7 +176,7 @@ public:
     bool redelivered;
     bool loaded;
     bool staged;
-    bool forcePersistentPolicy; // used to force message as durable, via a broker policy
+	bool forcePersistentPolicy; // used to force message as durable, via a broker policy
     ConnectionToken* publisher;
     mutable MessageAdapter* adapter;
     qpid::sys::AbsTime expiration;
@@ -194,20 +187,11 @@ public:
     mutable boost::intrusive_ptr<Message> empty;
 
     sys::Monitor callbackLock;
+    MessageCallback* enqueueCallback;
     MessageCallback* dequeueCallback;
     bool inCallback;
 
     uint32_t requiredCredit;
-    bool isManagementMessage;
-      mutable bool copyHeaderOnWrite;
-
-    /**
-     * Expects lock to be held
-     */
-    template <class T> T* getModifiableProperties() {
-        return getHeaderBody()->get<T>(true);
-    }
-    qpid::framing::AMQHeaderBody* getHeaderBody();
 };
 
 }}

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/MessageBuilder.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/MessageBuilder.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/MessageBuilder.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/MessageBuilder.h Fri Oct 21 01:19:00 2011
@@ -33,7 +33,7 @@ namespace qpid {
         class Message;
         class MessageStore;
 
-        class QPID_BROKER_CLASS_EXTERN MessageBuilder : public framing::FrameHandler{
+        class MessageBuilder : public framing::FrameHandler{
         public:
             QPID_BROKER_EXTERN MessageBuilder(MessageStore* const store);
             QPID_BROKER_EXTERN void handle(framing::AMQFrame& frame);

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Messages.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Messages.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Messages.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Messages.h Fri Oct 21 01:19:00 2011
@@ -32,8 +32,7 @@ struct QueuedMessage;
 
 /**
  * This interface abstracts out the access to the messages held for
- * delivery by a Queue instance. Note the the assumption at present is
- * that all locking is done in the Queue itself.
+ * delivery by a Queue instance.
  */
 class Messages
 {
@@ -76,6 +75,7 @@ class Messages
      * @return true if there is another message, false otherwise.
      */
     virtual bool next(const framing::SequenceNumber&, QueuedMessage&) = 0;
+
     /**
      * Note: Caller is responsible for ensuring that there is a front
      * (e.g. empty() returns false)

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/NullMessageStore.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/NullMessageStore.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/NullMessageStore.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/NullMessageStore.cpp Fri Oct 21 01:19:00 2011
@@ -126,25 +126,21 @@ std::auto_ptr<TPCTransactionContext> Nul
 
 void NullMessageStore::prepare(TPCTransactionContext& ctxt)
 {
-    qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
     prepared.insert(DummyCtxt::getXid(ctxt));
 }
 
 void NullMessageStore::commit(TransactionContext& ctxt)
 {
-    qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
     prepared.erase(DummyCtxt::getXid(ctxt));
 }
 
 void NullMessageStore::abort(TransactionContext& ctxt)
 {
-    qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
     prepared.erase(DummyCtxt::getXid(ctxt));
 }
 
 void NullMessageStore::collectPreparedXids(std::set<std::string>& out)
 {
-    qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
     out.insert(prepared.begin(), prepared.end());
 }
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/NullMessageStore.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/NullMessageStore.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/NullMessageStore.h Fri Oct 21 01:19:00 2011
@@ -25,7 +25,6 @@
 #include "qpid/broker/BrokerImportExport.h"
 #include "qpid/broker/MessageStore.h"
 #include "qpid/broker/Queue.h"
-#include "qpid/sys/Mutex.h"
 
 #include <boost/intrusive_ptr.hpp>
 
@@ -35,11 +34,10 @@ namespace broker {
 /**
  * A null implementation of the MessageStore interface
  */
-class QPID_BROKER_CLASS_EXTERN NullMessageStore : public MessageStore
+class NullMessageStore : public MessageStore
 {
     std::set<std::string> prepared;
     uint64_t nextPersistenceId;
-    qpid::sys::Mutex lock;
   public:
     QPID_BROKER_EXTERN NullMessageStore();
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/PersistableMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/PersistableMessage.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/PersistableMessage.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/PersistableMessage.cpp Fri Oct 21 01:19:00 2011
@@ -34,6 +34,7 @@ class MessageStore;
 PersistableMessage::~PersistableMessage() {}
 
 PersistableMessage::PersistableMessage() :
+    asyncEnqueueCounter(0), 
     asyncDequeueCounter(0),
     store(0)
 {}
@@ -67,6 +68,24 @@ bool PersistableMessage::isContentReleas
     return contentReleaseState.released;
 }
        
+bool PersistableMessage::isEnqueueComplete() {
+    sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
+    return asyncEnqueueCounter == 0;
+}
+
+void PersistableMessage::enqueueComplete() {
+    bool notify = false;
+    {
+        sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
+        if (asyncEnqueueCounter > 0) {
+            if (--asyncEnqueueCounter == 0) {
+                notify = true;
+            }
+        }
+    }
+    if (notify) 
+        allEnqueuesComplete();
+}
 
 bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){
     if (store && (queue->getPersistenceId()!=0)) {
@@ -90,7 +109,12 @@ void PersistableMessage::addToSyncList(P
 
 void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { 
     addToSyncList(queue, _store);
-    enqueueStart();
+    enqueueAsync();
+}
+
+void PersistableMessage::enqueueAsync() { 
+    sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
+    asyncEnqueueCounter++; 
 }
 
 bool PersistableMessage::isDequeueComplete() { 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/PersistableMessage.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/PersistableMessage.h Fri Oct 21 01:19:00 2011
@@ -31,7 +31,6 @@
 #include "qpid/framing/amqp_types.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/broker/PersistableQueue.h"
-#include "qpid/broker/AsyncCompletion.h"
 
 namespace qpid {
 namespace broker {
@@ -44,19 +43,18 @@ class MessageStore;
 class PersistableMessage : public Persistable
 {
     typedef std::list< boost::weak_ptr<PersistableQueue> > syncList;
+    sys::Mutex asyncEnqueueLock;
     sys::Mutex asyncDequeueLock;
     sys::Mutex storeLock;
-
+       
     /**
-     * "Ingress" messages == messages sent _to_ the broker.
-     * Tracks the number of outstanding asynchronous operations that must
-     * complete before an inbound message can be considered fully received by the
-     * broker.  E.g. all enqueues have completed, the message has been written
-     * to store, credit has been replenished, etc. Once all outstanding
-     * operations have completed, the transfer of this message from the client
-     * may be considered complete.
+     * Tracks the number of outstanding asynchronous enqueue
+     * operations. When the message is enqueued asynchronously the
+     * count is incremented; when that enqueue completes it is
+     * decremented. Thus when it is 0, there are no outstanding
+     * enqueues.
      */
-    AsyncCompletion ingressCompletion;
+    int asyncEnqueueCounter;
 
     /**
      * Tracks the number of outstanding asynchronous dequeue
@@ -67,6 +65,7 @@ class PersistableMessage : public Persis
      */
     int asyncDequeueCounter;
 
+    void enqueueAsync();
     void dequeueAsync();
 
     syncList synclist;
@@ -81,6 +80,8 @@ class PersistableMessage : public Persis
     ContentReleaseState contentReleaseState;
 
   protected:
+    /** Called when all enqueues are complete for this message. */
+    virtual void allEnqueuesComplete() = 0;
     /** Called when all dequeues are complete for this message. */
     virtual void allDequeuesComplete() = 0;
 
@@ -114,12 +115,9 @@ class PersistableMessage : public Persis
 
     virtual QPID_BROKER_EXTERN bool isPersistent() const = 0;
 
-    /** track the progress of a message received by the broker - see ingressCompletion above */
-    QPID_BROKER_INLINE_EXTERN bool isIngressComplete() { return ingressCompletion.isDone(); }
-    QPID_BROKER_INLINE_EXTERN AsyncCompletion& getIngressCompletion() { return ingressCompletion; }
+    QPID_BROKER_EXTERN bool isEnqueueComplete();
 
-    QPID_BROKER_INLINE_EXTERN void enqueueStart() { ingressCompletion.startCompleter(); }
-    QPID_BROKER_INLINE_EXTERN void enqueueComplete() { ingressCompletion.finishCompleter(); }
+    QPID_BROKER_EXTERN void enqueueComplete();
 
     QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue,
                                          MessageStore* _store);
@@ -135,6 +133,7 @@ class PersistableMessage : public Persis
     bool isStoredOnQueue(PersistableQueue::shared_ptr queue);
     
     void addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store);
+    
 };
 
 }}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org