You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/03 14:14:12 UTC

svn commit: r1368910 [5/27] - in /qpid/branches/asyncstore: ./ bin/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ruby/ cpp/bindings/qpid/python/ cpp/bindings/qpid/ruby/ cpp/bindings/qpid/ruby/features/ cpp/bindings/qpid/ruby/features/step_definitions/ cpp/...

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.cpp Fri Aug  3 12:13:32 2012
@@ -68,54 +68,92 @@ LinkRegistry::LinkRegistry (Broker* _bro
 
 LinkRegistry::~LinkRegistry() {}
 
+/** find link by the *configured* remote address */
+boost::shared_ptr<Link> LinkRegistry::getLink(const std::string& host,
+                                              uint16_t           port,
+                                              const std::string& transport)
+{
+    Mutex::ScopedLock   locker(lock);
+    for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) {
+        Link::shared_ptr& link = i->second;
+        if (link->getHost() == host &&
+            link->getPort() == port &&
+            (transport.empty() || link->getTransport() == transport))
+            return link;
+     }
+    return boost::shared_ptr<Link>();
+}
 
-void LinkRegistry::changeAddress(const qpid::Address& oldAddress, const qpid::Address& newAddress)
+/** find link by name */
+boost::shared_ptr<Link> LinkRegistry::getLink(const std::string& name)
 {
     Mutex::ScopedLock   locker(lock);
-    std::string oldKey = createKey(oldAddress);
-    std::string newKey = createKey(newAddress);
-    if (links.find(newKey) != links.end()) {
-        QPID_LOG(error, "Attempted to update key from " << oldKey << " to " << newKey << " which is already in use");
-    } else {
-        LinkMap::iterator i = links.find(oldKey);
-        if (i == links.end()) {
-            QPID_LOG(error, "Attempted to update key from " << oldKey << " which does not exist, to " << newKey);
-        } else {
-            links[newKey] = i->second;
-            links.erase(oldKey);
-            QPID_LOG(info, "Updated link key from " << oldKey << " to " << newKey);
-        }
-    }
+    LinkMap::iterator l = links.find(name);
+    if (l != links.end())
+        return l->second;
+    return boost::shared_ptr<Link>();
 }
 
-pair<Link::shared_ptr, bool> LinkRegistry::declare(const string&  host,
+pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& name,
+                                                   const string&  host,
                                                    uint16_t port,
                                                    const string&  transport,
                                                    bool     durable,
                                                    const string&  authMechanism,
                                                    const string&  username,
-                                                   const string&  password)
+                                                   const string&  password,
+                                                   bool failover)
 
 {
     Mutex::ScopedLock   locker(lock);
-    string key = createKey(host, port);
 
-    LinkMap::iterator i = links.find(key);
+    LinkMap::iterator i = links.find(name);
     if (i == links.end())
     {
         Link::shared_ptr link;
 
-        link = Link::shared_ptr (new Link (this, store, host, port, transport, durable,
-                                           authMechanism, username, password,
-                                           broker, parent));
-        links[key] = link;
+        link = Link::shared_ptr (
+            new Link (name, this, host, port, transport,
+                      boost::bind(&LinkRegistry::linkDestroyed, this, _1),
+                      durable, authMechanism, username, password, broker,
+                      parent, failover));
+        if (durable && store) store->create(*link);
+        links[name] = link;
+        QPID_LOG(debug, "Creating new link; name=" << name );
         return std::pair<Link::shared_ptr, bool>(link, true);
     }
     return std::pair<Link::shared_ptr, bool>(i->second, false);
 }
 
-pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host,
-                                                     uint16_t     port,
+/** find bridge by link & route info */
+Bridge::shared_ptr LinkRegistry::getBridge(const Link&  link,
+                                           const std::string& src,
+                                           const std::string& dest,
+                                           const std::string& key)
+{
+    Mutex::ScopedLock   locker(lock);
+    for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) {
+        if (i->second->getSrc() == src && i->second->getDest() == dest &&
+            i->second->getKey() == key && i->second->getLink() &&
+            i->second->getLink()->getName() == link.getName()) {
+            return i->second;
+        }
+    }
+    return Bridge::shared_ptr();
+}
+
+/** find bridge by name */
+Bridge::shared_ptr LinkRegistry::getBridge(const std::string& name)
+{
+    Mutex::ScopedLock   locker(lock);
+    BridgeMap::iterator b = bridges.find(name);
+    if (b != bridges.end())
+        return b->second;
+    return Bridge::shared_ptr();
+}
+
+pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& name,
+                                                     Link&        link,
                                                      bool         durable,
                                                      const std::string& src,
                                                      const std::string& dest,
@@ -126,22 +164,32 @@ pair<Bridge::shared_ptr, bool> LinkRegis
                                                      const std::string& excludes,
                                                      bool         dynamic,
                                                      uint16_t     sync,
-                                                     Bridge::InitializeCallback init
+                                                     Bridge::InitializeCallback init,
+                                                     const std::string& queueName,
+                                                     const std::string& altExchange
 )
 {
     Mutex::ScopedLock locker(lock);
-    QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")");
 
-    string linkKey = createKey(host, port);
-    stringstream keystream;
-    keystream << linkKey << "!" << src << "!" << dest << "!" << key;
-    string bridgeKey = keystream.str();
-
-    LinkMap::iterator l = links.find(linkKey);
-    if (l == links.end())
-        return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false);
+    // Durable bridges are only valid on durable links
+    if (durable && !link.isDurable()) {
+        QPID_LOG(error, "Can't create a durable route '" << name << "' on a non-durable link '" << link.getName());
+         return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false);
+    }
 
-    BridgeMap::iterator b = bridges.find(bridgeKey);
+    if (dynamic) {
+        Exchange::shared_ptr exchange = broker->getExchanges().get(src);
+        if (exchange.get() == 0) {
+            QPID_LOG(error, "Exchange not found, name='" << src << "'" );
+            return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false);
+        }
+        if (!exchange->supportsDynamicBinding()) {
+            QPID_LOG(error, "Exchange type does not support dynamic routing, name='" << src << "'");
+            return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false);
+        }
+    }
+
+    BridgeMap::iterator b = bridges.find(name);
     if (b == bridges.end())
     {
         _qmf::ArgsLinkBridge args;
@@ -159,23 +207,29 @@ pair<Bridge::shared_ptr, bool> LinkRegis
         args.i_sync       = sync;
 
         bridge = Bridge::shared_ptr
-            (new Bridge (l->second.get(), l->second->nextChannel(),
-                         boost::bind(&LinkRegistry::destroy, this,
-                                     host, port, src, dest, key),
-                         args, init));
-        bridges[bridgeKey] = bridge;
-        l->second->add(bridge);
+          (new Bridge (name, &link, link.nextChannel(),
+                       boost::bind(&LinkRegistry::destroyBridge, this, _1),
+                       args, init, queueName, altExchange));
+        bridges[name] = bridge;
+        link.add(bridge);
+        if (durable && store)
+            store->create(*bridge);
+
+        QPID_LOG(debug, "Bridge '" << name <<"' declared on link '" << link.getName() <<
+                 "' from " << src << " to " << dest << " (" << key << ")");
+
         return std::pair<Bridge::shared_ptr, bool>(bridge, true);
     }
     return std::pair<Bridge::shared_ptr, bool>(b->second, false);
 }
 
-void LinkRegistry::destroy(const string& host, const uint16_t port)
+/** called back by the link when it has completed its cleanup and can be removed. */
+void LinkRegistry::linkDestroyed(Link *link)
 {
+    QPID_LOG(debug, "LinkRegistry::destroy(); link= " << link->getName());
     Mutex::ScopedLock   locker(lock);
-    string key = createKey(host, port);
 
-    LinkMap::iterator i = links.find(key);
+    LinkMap::iterator i = links.find(link->getName());
     if (i != links.end())
     {
         if (i->second->isDurable() && store)
@@ -184,27 +238,20 @@ void LinkRegistry::destroy(const string&
     }
 }
 
-void LinkRegistry::destroy(const std::string& host,
-                           const uint16_t     port,
-                           const std::string& src,
-                           const std::string& dest,
-                           const std::string& key)
+/** called back by bridge when its destruction has been requested */
+void LinkRegistry::destroyBridge(Bridge *bridge)
 {
+    QPID_LOG(debug, "LinkRegistry::destroy(); bridge= " << bridge->getName());
     Mutex::ScopedLock locker(lock);
-    string linkKey = createKey(host, port);
-    stringstream keystream;
-    keystream << linkKey << "!" << src << "!" << dest << "!" << key;
-    string bridgeKey = keystream.str();
 
-    LinkMap::iterator l = links.find(linkKey);
-    if (l == links.end())
-        return;
-
-    BridgeMap::iterator b = bridges.find(bridgeKey);
+    BridgeMap::iterator b = bridges.find(bridge->getName());
     if (b == bridges.end())
         return;
 
-    l->second->cancel(b->second);
+    Link *link = b->second->getLink();
+    if (link) {
+        link->cancel(b->second);
+    }
     if (b->second->isDurable())
         store->destroy(*(b->second));
     bridges.erase(b);
@@ -219,26 +266,71 @@ MessageStore* LinkRegistry::getStore() c
     return store;
 }
 
-Link::shared_ptr LinkRegistry::findLink(const std::string& keyOrMgmtId)
-{
-    // Convert keyOrMgmtId to a host:port key.
-    //
-    // TODO aconway 2011-02-01: centralize code that constructs/parses
-    // connection management IDs. Currently sys:: protocol factories
-    // and IO plugins construct the IDs and LinkRegistry parses them.
-    size_t separator = keyOrMgmtId.find('-');
-    if (separator == std::string::npos) separator = 0;
-    std::string key =  keyOrMgmtId.substr(separator+1, std::string::npos);
+namespace {
+    void extractHostPort(const std::string& connId, std::string *host, uint16_t *port)
+    {
+        // Extract host and port of remote broker from connection id string.
+        //
+        // TODO aconway 2011-02-01: centralize code that constructs/parses connection
+        // management IDs. Currently sys:: protocol factories and IO plugins construct the
+        // IDs and LinkRegistry parses them.
+        // KAG: current connection id format assumed:
+        // "localhost:port-remotehost:port".  In the case of IpV6, the host addresses are
+        // contained within brackets "[...]", example:
+        // connId="[::1]:36859-[::1]:48603". Liberal use of "asserts" provided to alert us
+        // if this assumption changes!
+        size_t separator = connId.find('-');
+        assert(separator != std::string::npos);
+        std::string remote = connId.substr(separator+1, std::string::npos);
+        separator = remote.rfind(":");
+        assert(separator != std::string::npos);
+        *host = remote.substr(0, separator);
+        // IPv6 - host is bracketed by "[]", strip them
+        if ((*host)[0] == '[' && (*host)[host->length() - 1] == ']') {
+            *host = host->substr(1, host->length() - 2);
+        }
+        try {
+            *port = boost::lexical_cast<uint16_t>(remote.substr(separator+1, std::string::npos));
+        } catch (const boost::bad_lexical_cast&) {
+            QPID_LOG(error, "Invalid format for connection identifier! '" << connId << "'");
+            assert(false);
+        }
+    }
+}
 
+/** find the Link that corresponds to the given connection */
+Link::shared_ptr LinkRegistry::findLink(const std::string& connId)
+{
     Mutex::ScopedLock locker(lock);
-    LinkMap::iterator l = links.find(key);
-    if (l != links.end()) return l->second;
-    else return Link::shared_ptr();
+    ConnectionMap::iterator c = connections.find(connId);
+    if (c != connections.end()) {
+        LinkMap::iterator l = links.find(c->second);
+        if (l != links.end())
+            return l->second;
+    }
+    return Link::shared_ptr();
 }
 
 void LinkRegistry::notifyConnection(const std::string& key, Connection* c)
 {
-    Link::shared_ptr link = findLink(key);
+    // find a link that is attempting to connect to the remote, and
+    // create a mapping from connection id to link
+    QPID_LOG(debug, "LinkRegistry::notifyConnection(); key=" << key );
+    std::string host;
+    uint16_t port = 0;
+    extractHostPort( key, &host, &port );
+    Link::shared_ptr link;
+    {
+        Mutex::ScopedLock locker(lock);
+        for (LinkMap::iterator l = links.begin(); l != links.end(); ++l) {
+            if (l->second->pendingConnection(host, port)) {
+                link = l->second;
+                connections[key] = link->getName();
+                break;
+            }
+        }
+    }
+
     if (link) {
         link->established(c);
         c->setUserId(str(format("%1%@%2%") % link->getUsername() % realm));
@@ -343,20 +435,6 @@ std::string LinkRegistry::getAuthIdentit
 }
 
 
-std::string LinkRegistry::createKey(const qpid::Address& a) {
-    // TODO aconway 2010-05-11: key should also include protocol/transport to
-    // be unique. Requires refactor of LinkRegistry interface.
-    return createKey(a.host, a.port);
-}
-
-std::string LinkRegistry::createKey(const std::string& host,  uint16_t port) {
-    // TODO aconway 2010-05-11: key should also include protocol/transport to
-    // be unique. Requires refactor of LinkRegistry interface.
-    stringstream keystream;
-    keystream << host << ":" << port;
-    return keystream.str();
-}
-
 void LinkRegistry::setPassive(bool p)
 {
     Mutex::ScopedLock locker(lock);
@@ -369,10 +447,12 @@ void LinkRegistry::setPassive(bool p)
 }
 
 void LinkRegistry::eachLink(boost::function<void(boost::shared_ptr<Link>)> f) {
+    Mutex::ScopedLock locker(lock);
     for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) f(i->second);
 }
 
 void LinkRegistry::eachBridge(boost::function<void(boost::shared_ptr<Bridge>)> f) {
+    Mutex::ScopedLock locker(lock);
     for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) f(i->second);
 }
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.h Fri Aug  3 12:13:32 2012
@@ -42,9 +42,11 @@ namespace broker {
     class LinkRegistry {
         typedef std::map<std::string, boost::shared_ptr<Link> > LinkMap;
         typedef std::map<std::string, Bridge::shared_ptr> BridgeMap;
+        typedef std::map<std::string, std::string> ConnectionMap;
 
-        LinkMap   links;
-        BridgeMap bridges;
+        LinkMap   links;    /** indexed by name of Link */
+        BridgeMap bridges;  /** indexed by name of Bridge */
+        ConnectionMap   connections;  /** indexed by connection identifier, gives link name */
 
         qpid::sys::Mutex lock;
         Broker* broker;
@@ -54,15 +56,18 @@ namespace broker {
         std::string realm;
 
         boost::shared_ptr<Link> findLink(const std::string& key);
-        static std::string createKey(const Address& address);
-        static std::string createKey(const std::string& host, uint16_t port);
 
-        // Methods called by the connection observer.
+        // Methods called by the connection observer, key is connection identifier
         void notifyConnection (const std::string& key, Connection* c);
         void notifyOpened     (const std::string& key);
         void notifyClosed     (const std::string& key);
         void notifyConnectionForced    (const std::string& key, const std::string& text);
-      friend class LinkRegistryConnectionObserver;
+        friend class LinkRegistryConnectionObserver;
+
+        /** Notify the registry that a Link has been destroyed */
+        void linkDestroyed(Link*);
+        /** Request to destroy a Bridge */
+        void destroyBridge(Bridge*);
 
     public:
         QPID_BROKER_EXTERN LinkRegistry (); // Only used in store tests
@@ -70,17 +75,29 @@ namespace broker {
         QPID_BROKER_EXTERN ~LinkRegistry();
 
         QPID_BROKER_EXTERN std::pair<boost::shared_ptr<Link>, bool>
-        declare(const std::string& host,
+        declare(const std::string& name,
+                const std::string& host,
                 uint16_t     port,
                 const std::string& transport,
                 bool         durable,
                 const std::string& authMechanism,
                 const std::string& username,
-                const std::string& password);
+                const std::string& password,
+                bool failover=true);
+
+        /** determine if Link exists */
+        QPID_BROKER_EXTERN boost::shared_ptr<Link>
+          getLink(const std::string& name);
+        /** host,port,transport will be matched against the configured values, which may
+            be different from the current values due to failover */
+        QPID_BROKER_EXTERN boost::shared_ptr<Link>
+          getLink(const std::string& configHost,
+                  uint16_t           configPort,
+                  const std::string& configTransport = std::string());
 
         QPID_BROKER_EXTERN std::pair<Bridge::shared_ptr, bool>
-        declare(const std::string& host,
-                uint16_t     port,
+        declare(const std::string& name,
+                Link& link,
                 bool         durable,
                 const std::string& src,
                 const std::string& dest,
@@ -91,16 +108,18 @@ namespace broker {
                 const std::string& excludes,
                 bool         dynamic,
                 uint16_t     sync,
-                Bridge::InitializeCallback=0
+                Bridge::InitializeCallback=0,
+                const std::string& queueName="",
+                const std::string& altExchange=""
         );
-
-        QPID_BROKER_EXTERN void destroy(const std::string& host, const uint16_t port);
-
-        QPID_BROKER_EXTERN void destroy(const std::string& host,
-                                        const uint16_t     port,
-                                        const std::string& src,
-                                        const std::string& dest,
-                                        const std::string& key);
+        /** determine if Bridge exists */
+        QPID_BROKER_EXTERN Bridge::shared_ptr
+          getBridge(const std::string&  name);
+        QPID_BROKER_EXTERN Bridge::shared_ptr
+          getBridge(const Link&  link,
+                    const std::string& src,
+                    const std::string& dest,
+                    const std::string& key);
 
         /**
          * Register the manageable parent for declared queues
@@ -126,11 +145,6 @@ namespace broker {
         QPID_BROKER_EXTERN uint16_t    getPort            (const std::string& key);
 
         /**
-         * Called by links failing over to new address
-         */
-        void changeAddress(const Address& oldAddress, const Address& newAddress);
-
-        /**
          * Called to alter passive state. In passive state the links
          * and bridges managed by a link registry will be recorded and
          * updated but links won't actually establish connections and

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Message.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Message.cpp Fri Aug  3 12:13:32 2012
@@ -384,6 +384,18 @@ void Message::addTraceId(const std::stri
     }
 }
 
+void Message::clearTrace()
+{
+    sys::Mutex::ScopedLock l(lock);
+    if (isA<MessageTransferBody>()) {
+        FieldTable& headers = getModifiableProperties<MessageProperties>()->getApplicationHeaders();
+        std::string trace = headers.getAsString(X_QPID_TRACE);
+        if (!trace.empty()) {
+            headers.setString(X_QPID_TRACE, "");
+        }
+    }
+}
+
 void Message::setTimestamp()
 {
     sys::Mutex::ScopedLock l(lock);

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Message.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Message.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Message.h Fri Aug  3 12:13:32 2012
@@ -161,6 +161,7 @@ public:
 
     bool isExcluded(const std::vector<std::string>& excludes) const;
     void addTraceId(const std::string& id);
+    void clearTrace();
 
     void forcePersistent();
     bool isForcedPersistent();

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.cpp Fri Aug  3 12:13:32 2012
@@ -40,13 +40,16 @@ size_t MessageDeque::index(const framing
 bool MessageDeque::deleted(const QueuedMessage& m)
 {
     size_t i = index(m.position);
-    if (i < messages.size() && messages[i].status != QueuedMessage::DELETED) {
-        messages[i].status = QueuedMessage::DELETED;
-        clean();
-        return true;
-    } else {
-        return false;
+    if (i < messages.size()) {
+        QueuedMessage *qm = &messages[i];
+        if (qm->status != QueuedMessage::DELETED) {
+            qm->status = QueuedMessage::DELETED;
+            qm->payload = 0; // message no longer needed
+            clean();
+            return true;
+        }
     }
+    return false;
 }
 
 size_t MessageDeque::size()
@@ -144,6 +147,7 @@ QueuedMessage* MessageDeque::pushPtr(con
     messages.back().status = QueuedMessage::AVAILABLE;
     if (head >= messages.size()) head = messages.size() - 1;
     ++available;
+    clean();  // QPID-4046: let producer help clean the backlog of deleted messages
     return &messages.back();
 }
 
@@ -173,12 +177,37 @@ void MessageDeque::updateAcquired(const 
     }
 }
 
+namespace {
+bool isNotDeleted(const QueuedMessage& qm) { return qm.status != QueuedMessage::DELETED; }
+} // namespace
+
+void MessageDeque::setPosition(const framing::SequenceNumber& n) {
+    size_t i = index(n+1);
+    if (i >= messages.size()) return; // Nothing to do.
+
+    // Assertion to verify the precondition: no messaages after n.
+    assert(std::find_if(messages.begin()+i, messages.end(), &isNotDeleted) ==
+           messages.end());
+    messages.erase(messages.begin()+i, messages.end());
+    if (head >= messages.size()) head = messages.size() - 1;
+    // Re-count the available messages
+    available = 0;
+    for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) {
+        if (i->status == QueuedMessage::AVAILABLE) ++available;
+    }
+}
+
 void MessageDeque::clean()
 {
-    while (messages.size() && messages.front().status == QueuedMessage::DELETED) {
+    // QPID-4046: If a queue has multiple consumers, then it is possible for a large
+    // collection of deleted messages to build up.  Limit the number of messages cleaned
+    // up on each call to clean().
+    size_t count = 0;
+    while (messages.size() && messages.front().status == QueuedMessage::DELETED && count < 10) {
         messages.pop_front();
-        if (head) --head;
+        count += 1;
     }
+    head = (head > count) ? head - count : 0;
 }
 
 void MessageDeque::foreach(Functor f)

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.h Fri Aug  3 12:13:32 2012
@@ -44,7 +44,7 @@ class MessageDeque : public Messages
     bool consume(QueuedMessage&);
     bool push(const QueuedMessage& added, QueuedMessage& removed);
     void updateAcquired(const QueuedMessage& acquired);
-
+    void setPosition(const framing::SequenceNumber&);
     void foreach(Functor);
     void removeIf(Predicate);
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.cpp Fri Aug  3 12:13:32 2012
@@ -21,6 +21,7 @@
 #include "qpid/broker/MessageMap.h"
 #include "qpid/broker/QueuedMessage.h"
 #include "qpid/log/Statement.h"
+#include <algorithm>
 
 namespace qpid {
 namespace broker {
@@ -130,18 +131,24 @@ bool MessageMap::push(const QueuedMessag
         QueuedMessage& a = messages[added.position];
         a = added;
         a.status = QueuedMessage::AVAILABLE;
-        QPID_LOG(debug, "Added message at " << a.position);
+        QPID_LOG(debug, "Added message " << a);
         return false;
     } else {
         //there is already a message with that key which needs to be replaced
         removed = result.first->second;
         result.first->second = replace(result.first->second, added);
         result.first->second.status = QueuedMessage::AVAILABLE;
-        QPID_LOG(debug, "Displaced message at " << removed.position << " with " << result.first->second.position << ": " << result.first->first);
+        QPID_LOG(debug, "Displaced message " << removed << " with " << result.first->second << ": " << result.first->first);
         return true;
     }
 }
 
+void MessageMap::setPosition(const framing::SequenceNumber& seq) {
+    // Nothing to do, just assert that the precondition is respected and there
+    // are no undeleted messages after seq.
+    (void) seq; assert(messages.empty() || (--messages.end())->first <= seq);
+}
+
 void MessageMap::foreach(Functor f)
 {
     for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.h Fri Aug  3 12:13:32 2012
@@ -6,7 +6,7 @@
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+o * regarding copyright ownership.  The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
@@ -50,6 +50,7 @@ class MessageMap : public Messages
     virtual bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
     bool consume(QueuedMessage&);
     virtual bool push(const QueuedMessage& added, QueuedMessage& removed);
+    void setPosition(const framing::SequenceNumber&);
 
     void foreach(Functor);
     virtual void removeIf(Predicate);

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Messages.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Messages.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Messages.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Messages.h Fri Aug  3 12:13:32 2012
@@ -21,6 +21,7 @@
  * under the License.
  *
  */
+#include "qpid/framing/SequenceNumber.h"
 #include <boost/function.hpp>
 
 namespace qpid {
@@ -101,14 +102,22 @@ class Messages
     virtual void updateAcquired(const QueuedMessage&) { }
 
     /**
+     * Set the position of the back of the queue. Next message enqueued will be n+1.
+     *@pre Any messages with seq > n must already be dequeued.
+     */
+    virtual void setPosition(const framing::SequenceNumber& /*n*/) = 0;
+
+    /**
      * Apply, the functor to each message held
      */
+
     virtual void foreach(Functor) = 0;
     /**
      * Remove every message held that for which the specified
      * predicate returns true
      */
     virtual void removeIf(Predicate) = 0;
+
   private:
 };
 }} // namespace qpid::broker

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/NameGenerator.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/NameGenerator.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/NameGenerator.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/NameGenerator.h Fri Aug  3 12:13:32 2012
@@ -32,6 +32,7 @@ namespace qpid {
             NameGenerator(const std::string& base);
             std::string generate();
         };
+        const std::string QPID_NAME_PREFIX("qpid.");  // reserved for private names
     }
 }
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.cpp Fri Aug  3 12:13:32 2012
@@ -121,6 +121,10 @@ void PriorityQueue::updateAcquired(const
     fifo.updateAcquired(acquired);
 }
 
+void PriorityQueue::setPosition(const framing::SequenceNumber& n) {
+    fifo.setPosition(n);
+}
+
 void PriorityQueue::foreach(Functor f)
 {
     fifo.foreach(f);

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.h Fri Aug  3 12:13:32 2012
@@ -52,6 +52,7 @@ class PriorityQueue : public Messages
     bool consume(QueuedMessage&);
     bool push(const QueuedMessage& added, QueuedMessage& removed);
     void updateAcquired(const QueuedMessage& acquired);
+    void setPosition(const framing::SequenceNumber&);
     void foreach(Functor);
     void removeIf(Predicate);
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/PrivateImplRef.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/PrivateImplRef.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/PrivateImplRef.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/PrivateImplRef.h Fri Aug  3 12:13:32 2012
@@ -88,15 +88,15 @@ template <class T> class PrivateImplRef 
     /** Set the implementation pointer in a handle */
     static void set(T& t, const intrusive_ptr& p) {
         if (t.impl == p) return;
-        if (t.impl) boost::intrusive_ptr_release(t.impl);
+        if (t.impl) intrusive_ptr_release(t.impl);
         t.impl = p.get();
-        if (t.impl) boost::intrusive_ptr_add_ref(t.impl);
+        if (t.impl) intrusive_ptr_add_ref(t.impl);
     }
 
     // Helper functions to implement the ctor, dtor, copy, assign
-    static void ctor(T& t, Impl* p) { t.impl = p; if (p) boost::intrusive_ptr_add_ref(p); }
+    static void ctor(T& t, Impl* p) { t.impl = p; if (p) intrusive_ptr_add_ref(p); }
     static void copy(T& t, const T& x) { if (&t == &x) return; t.impl = 0; assign(t, x); }
-    static void dtor(T& t) { if(t.impl) boost::intrusive_ptr_release(t.impl); }
+    static void dtor(T& t) { if(t.impl) intrusive_ptr_release(t.impl); }
     static T& assign(T& t, const T& x) { set(t, get(x)); return t;}
 };
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.cpp Fri Aug  3 12:13:32 2012
@@ -49,6 +49,7 @@
 #include "qpid/types/Variant.h"
 #include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h"
 #include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
 
 #include <iostream>
 #include <algorithm>
@@ -67,6 +68,7 @@ using qpid::management::ManagementAgent;
 using qpid::management::ManagementObject;
 using qpid::management::Manageable;
 using qpid::management::Args;
+using std::string;
 using std::for_each;
 using std::mem_fun;
 namespace _qmf = qmf::org::apache::qpid::broker;
@@ -176,7 +178,8 @@ Queue::Queue(const string& _name, bool _
         ManagementAgent* agent = broker->getManagementAgent();
 
         if (agent != 0) {
-            mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, _autodelete, _owner != 0);
+            mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, _autodelete);
+            mgmtObject->set_exclusive(_owner != 0);
             agent->addObject(mgmtObject, 0, store != 0);
             brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject();
             if (brokerMgmtObject)
@@ -587,21 +590,51 @@ QueuedMessage Queue::get(){
     return msg;
 }
 
-bool collect_if_expired(std::deque<QueuedMessage>& expired, QueuedMessage& message)
+namespace {
+bool collectIf(QueuedMessage& qm, Messages::Predicate predicate,
+               std::deque<QueuedMessage>& collection)
 {
-    if (message.payload->hasExpired()) {
-        expired.push_back(message);
+    if (predicate(qm)) {
+        collection.push_back(qm);
         return true;
     } else {
         return false;
     }
 }
 
+bool isExpired(const QueuedMessage& qm) { return qm.payload->hasExpired(); }
+} // namespace
+
+void Queue::dequeueIf(Messages::Predicate predicate,
+                      std::deque<QueuedMessage>& dequeued)
+{
+    {
+        Mutex::ScopedLock locker(messageLock);
+        messages->removeIf(boost::bind(&collectIf, _1, predicate, boost::ref(dequeued)));
+    }
+    if (!dequeued.empty()) {
+        if (mgmtObject) {
+            mgmtObject->inc_acquires(dequeued.size());
+            if (brokerMgmtObject)
+                brokerMgmtObject->inc_acquires(dequeued.size());
+        }
+        for (std::deque<QueuedMessage>::const_iterator i = dequeued.begin();
+             i != dequeued.end(); ++i) {
+            {
+                // KAG: should be safe to retake lock after the removeIf, since
+                // no other thread can touch these messages after the removeIf() call
+                Mutex::ScopedLock locker(messageLock);
+                observeAcquire(*i, locker);
+            }
+            dequeue( 0, *i );
+        }
+    }
+}
+
 /**
  *@param lapse: time since the last purgeExpired
  */
-void Queue::purgeExpired(qpid::sys::Duration lapse)
-{
+void Queue::purgeExpired(sys::Duration lapse) {
     //As expired messages are discarded during dequeue also, only
     //bother explicitly expiring if the rate of dequeues since last
     //attempt is less than one per second.
@@ -609,37 +642,18 @@ void Queue::purgeExpired(qpid::sys::Dura
     dequeueSincePurge -= count;
     int seconds = int64_t(lapse)/qpid::sys::TIME_SEC;
     if (seconds == 0 || count / seconds < 1) {
-        std::deque<QueuedMessage> expired;
-        {
-            Mutex::ScopedLock locker(messageLock);
-            messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1));
-        }
-
-        if (!expired.empty()) {
+        std::deque<QueuedMessage> dequeued;
+        dequeueIf(boost::bind(&isExpired, _1), dequeued);
+        if (dequeued.size()) {
             if (mgmtObject) {
-                mgmtObject->inc_acquires(expired.size());
-                mgmtObject->inc_discardsTtl(expired.size());
-                if (brokerMgmtObject) {
-                    brokerMgmtObject->inc_acquires(expired.size());
-                    brokerMgmtObject->inc_discardsTtl(expired.size());
-                }
-            }
-
-            for (std::deque<QueuedMessage>::const_iterator i = expired.begin();
-                 i != expired.end(); ++i) {
-                {
-                    // KAG: should be safe to retake lock after the removeIf, since
-                    // no other thread can touch these messages after the removeIf() call
-                    Mutex::ScopedLock locker(messageLock);
-                    observeAcquire(*i, locker);
-                }
-                dequeue( 0, *i );
+                mgmtObject->inc_discardsTtl(dequeued.size());
+                if (brokerMgmtObject)
+                    brokerMgmtObject->inc_discardsTtl(dequeued.size());
             }
         }
     }
 }
 
-
 namespace {
     // for use with purge/move below - collect messages that match a given filter
     //
@@ -797,6 +811,7 @@ uint32_t Queue::purge(const uint32_t pur
             // now reroute if necessary
             if (dest.get()) {
                 assert(qmsg->payload);
+                qmsg->payload->clearTrace();
                 DeliverableMessage dmsg(qmsg->payload);
                 dest->routeWithAlternate(dmsg);
             }
@@ -888,9 +903,10 @@ void Queue::push(boost::intrusive_ptr<Me
         if (mgmtObject) {
             mgmtObject->inc_acquires();
             mgmtObject->inc_discardsLvq();
-            if (brokerMgmtObject)
+            if (brokerMgmtObject) {
                 brokerMgmtObject->inc_acquires();
                 brokerMgmtObject->inc_discardsLvq();
+            }
         }
         if (isRecovery) {
             //can't issue new requests for the store until
@@ -1470,12 +1486,18 @@ boost::shared_ptr<Exchange> Queue::getAl
     return alternateExchange;
 }
 
-void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue)
+void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue, const std::string& connectionId, const std::string& userId)
 {
     if (broker.getQueues().destroyIf(queue->getName(),
                                      boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
         QPID_LOG(debug, "Auto-deleting " << queue->getName());
         queue->destroyed();
+
+        if (broker.getManagementAgent())
+            broker.getManagementAgent()->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, queue->getName()));
+        QPID_LOG_CAT(debug, model, "Delete queue. name:" << queue->getName()
+            << " user:" << userId
+            << " rhost:" << connectionId );
     }
 }
 
@@ -1483,9 +1505,11 @@ struct AutoDeleteTask : qpid::sys::Timer
 {
     Broker& broker;
     Queue::shared_ptr queue;
+    std::string connectionId;
+    std::string userId;
 
-    AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
-        : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), broker(b), queue(q) {}
+    AutoDeleteTask(Broker& b, Queue::shared_ptr q, const std::string& cId, const std::string& uId, AbsTime fireTime)
+        : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), broker(b), queue(q), connectionId(cId), userId(uId) {}
 
     void fire()
     {
@@ -1493,19 +1517,19 @@ struct AutoDeleteTask : qpid::sys::Timer
         //created, but then became unused again before the task fired;
         //in this case ignore this request as there will have already
         //been a later task added
-        tryAutoDeleteImpl(broker, queue);
+        tryAutoDeleteImpl(broker, queue, connectionId, userId);
     }
 };
 
-void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
+void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue, const std::string& connectionId, const std::string& userId)
 {
     if (queue->autoDeleteTimeout && queue->canAutoDelete()) {
         AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC));
-        queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time));
+        queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, connectionId, userId, time));
         broker.getClusterTimer().add(queue->autoDeleteTask);
         QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated");
     } else {
-        tryAutoDeleteImpl(broker, queue);
+        tryAutoDeleteImpl(broker, queue, connectionId, userId);
     }
 }
 
@@ -1659,13 +1683,28 @@ void Queue::query(qpid::types::Variant::
     if (allocator) allocator->query(results);
 }
 
+namespace {
+struct After {
+    framing::SequenceNumber seq;
+    After(framing::SequenceNumber s) : seq(s) {}
+    bool operator()(const QueuedMessage& qm) { return qm.position > seq; }
+};
+} // namespace
+
+
 void Queue::setPosition(SequenceNumber n) {
     Mutex::ScopedLock locker(messageLock);
+    if (n < sequence) {
+        std::deque<QueuedMessage> dequeued;
+        dequeueIf(After(n), dequeued);
+        messages->setPosition(n);
+    }
     sequence = n;
     QPID_LOG(trace, "Set position to " << sequence << " on " << getName());
 }
 
 SequenceNumber Queue::getPosition() {
+    Mutex::ScopedLock locker(messageLock);
     return sequence;
 }
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.h Fri Aug  3 12:13:32 2012
@@ -175,6 +175,7 @@ class Queue : public boost::enable_share
     void configureImpl(const qpid::framing::FieldTable& settings);
     void checkNotDeleted(const Consumer::shared_ptr& c);
     void notifyDeleted();
+    void dequeueIf(Messages::Predicate predicate, std::deque<QueuedMessage>& dequeued);
 
   public:
 
@@ -343,7 +344,7 @@ class Queue : public boost::enable_share
      * exclusive owner
      */
     static Queue::shared_ptr restore(QueueRegistry& queues, framing::Buffer& buffer);
-    static void tryAutoDelete(Broker& broker, Queue::shared_ptr);
+    static void tryAutoDelete(Broker& broker, Queue::shared_ptr, const std::string& connectionId, const std::string& userId);
 
     virtual void setExternalQueueStore(ExternalQueueStore* inst);
 
@@ -375,12 +376,21 @@ class Queue : public boost::enable_share
         std::for_each<Observers::iterator, F>(observers.begin(), observers.end(), f);
     }
 
-    /** Set the position sequence number  for the next message on the queue.
-     * Must be >= the current sequence number.
-     * Used by cluster to replicate queues.
+    /**
+     * Set the sequence number for the back of the queue, the
+     * next message enqueued will be pos+1.
+     * If pos > getPosition() this creates a gap in the sequence numbers.
+     * if pos < getPosition() the back of the queue is reset to pos,
+     *
+     * The _caller_ must ensure that any messages after pos have been dequeued.
+     *
+     * Used by HA/cluster code for queue replication.
      */
     QPID_BROKER_EXTERN void setPosition(framing::SequenceNumber pos);
-    /** return current position sequence number for the next message on the queue.
+
+    /**
+     *@return sequence number for the back of the queue. The next message pushed
+     * will be at getPosition+1
      */
     QPID_BROKER_EXTERN framing::SequenceNumber getPosition();
     QPID_BROKER_EXTERN void addObserver(boost::shared_ptr<QueueObserver>);

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFlowLimit.cpp Fri Aug  3 12:13:32 2012
@@ -75,8 +75,8 @@ namespace {
             result = v->get<int64_t>();
             QPID_LOG(debug, "Got integer value for " << key << ": " << result);
             if (result >= 0) return result;
-        } else if (v->convertsTo<string>()) {
-            string s(v->get<string>());
+        } else if (v->convertsTo<std::string>()) {
+            std::string s(v->get<std::string>());
             QPID_LOG(debug, "Got string value for " << key << ": " << s);
             std::istringstream convert(s);
             if (convert >> result && result >= 0) return result;

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueuePolicy.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueuePolicy.cpp Fri Aug  3 12:13:32 2012
@@ -133,8 +133,8 @@ T getCapacity(const FieldTable& settings
         result = v->get<T>();
         QPID_LOG(debug, "Got integer value for " << key << ": " << result);
         if (result >= 0) return result;
-    } else if (v->convertsTo<string>()) {
-        string s(v->get<string>());
+    } else if (v->convertsTo<std::string>()) {
+        std::string s(v->get<std::string>());
         QPID_LOG(debug, "Got string value for " << key << ": " << s);
         std::istringstream convert(s);
         if (convert >> result && result >= 0 && convert.eof()) return result;

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.cpp Fri Aug  3 12:13:32 2012
@@ -18,6 +18,7 @@
  * under the License.
  *
  */
+#include "qpid/broker/Broker.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueueRegistry.h"
 #include "qpid/broker/QueueEvents.h"
@@ -46,40 +47,49 @@ QueueRegistry::declare(const string& dec
                                         definition from persistente
                                         record*/)
 {
-    RWlock::ScopedWlock locker(lock);
-    string name = declareName.empty() ? generateName() : declareName;
-    assert(!name.empty());
-    QueueMap::iterator i =  queues.find(name);
-
-    if (i == queues.end()) {
-        Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker));
-        if (alternate) {
-            queue->setAlternateExchange(alternate);//need to do this *before* create
-            alternate->incAlternateUsers();
-        }
-        if (!recovering) {
-            //apply settings & create persistent record if required
-            queue->create(arguments);
+    Queue::shared_ptr queue;
+    std::pair<Queue::shared_ptr, bool> result;
+    {
+        RWlock::ScopedWlock locker(lock);
+        string name = declareName.empty() ? generateName() : declareName;
+        assert(!name.empty());
+        QueueMap::iterator i =  queues.find(name);
+
+        if (i == queues.end()) {
+            queue.reset(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker));
+            if (alternate) {
+                queue->setAlternateExchange(alternate);//need to do this *before* create
+                alternate->incAlternateUsers();
+            }
+            if (!recovering) {
+                //apply settings & create persistent record if required
+                queue->create(arguments);
+            } else {
+                //i.e. recovering a queue for which we already have a persistent record
+                queue->configure(arguments);
+            }
+            queues[name] = queue;
+            if (lastNode) queue->setLastNodeFailure();
+            result = std::pair<Queue::shared_ptr, bool>(queue, true);
         } else {
-            //i.e. recovering a queue for which we already have a persistent record
-            queue->configure(arguments);
+            result = std::pair<Queue::shared_ptr, bool>(i->second, false);
         }
-        queues[name] = queue;
-        if (lastNode) queue->setLastNodeFailure();
-
-        return std::pair<Queue::shared_ptr, bool>(queue, true);
-    } else {
-        return std::pair<Queue::shared_ptr, bool>(i->second, false);
     }
+    if (broker && queue) broker->getConfigurationObservers().queueCreate(queue);
+    return result;
 }
 
-void QueueRegistry::destroyLH (const string& name){
-    queues.erase(name);
-}
-
-void QueueRegistry::destroy (const string& name){
-    RWlock::ScopedWlock locker(lock);
-    destroyLH (name);
+void QueueRegistry::destroy(const string& name) {
+    Queue::shared_ptr q;
+    {
+        qpid::sys::RWlock::ScopedWlock locker(lock);
+        QueueMap::iterator i = queues.find(name);
+        if (i != queues.end()) {
+            Queue::shared_ptr q = i->second;
+            queues.erase(i);
+        }
+    }
+    if (broker && q) broker->getConfigurationObservers().queueDestroy(q);
 }
 
 Queue::shared_ptr QueueRegistry::find(const string& name){

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.h Fri Aug  3 12:13:32 2012
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -61,7 +61,7 @@ class QueueRegistry {
     QPID_BROKER_EXTERN std::pair<boost::shared_ptr<Queue>, bool> declare(
         const std::string& name,
         bool durable = false,
-        bool autodelete = false, 
+        bool autodelete = false,
         const OwnershipToken* owner = 0,
         boost::shared_ptr<Exchange> alternateExchange = boost::shared_ptr<Exchange>(),
         const qpid::framing::FieldTable& args = framing::FieldTable(),
@@ -82,9 +82,8 @@ class QueueRegistry {
     QPID_BROKER_EXTERN void destroy(const std::string& name);
     template <class Test> bool destroyIf(const std::string& name, Test test)
     {
-        qpid::sys::RWlock::ScopedWlock locker(lock);
         if (test()) {
-            destroyLH (name);
+            destroy(name);
             return true;
         } else {
             return false;
@@ -127,13 +126,13 @@ class QueueRegistry {
         for (QueueMap::const_iterator i = queues.begin(); i != queues.end(); ++i)
             f(i->second);
     }
-	
+
 	/**
 	* Change queue mode when cluster size drops to 1 node, expands again
 	* in practice allows flow queue to disk when last name to be exectuted
 	*/
 	void updateQueueClusterState(bool lastNode);
-    
+
 private:
     typedef std::map<std::string, boost::shared_ptr<Queue> > QueueMap;
     QueueMap queues;
@@ -144,12 +143,9 @@ private:
     management::Manageable* parent;
     bool lastNode; //used to set mode on queue declare
     Broker* broker;
-
-    //destroy impl that assumes lock is already held:
-    void destroyLH (const std::string& name);
 };
 
-    
+
 }} // namespace qpid::broker
 
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Fri Aug  3 12:13:32 2012
@@ -144,11 +144,13 @@ RecoverableTransaction::shared_ptr Recov
 RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer& buffer)
 {
     string kind;
-
+    uint32_t p = buffer.getPosition();
     buffer.getShortString (kind);
-    if      (kind == "link")
+    buffer.setPosition(p);
+
+    if (Link::isEncodedLink(kind))
         return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Link::decode (links, buffer)));
-    else if (kind == "bridge")
+    else if (Bridge::isEncodedBridge(kind))
         return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Bridge::decode (links, buffer)));
 
     return RecoverableConfig::shared_ptr(); // TODO: raise an exception instead

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SaslAuthenticator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SaslAuthenticator.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SaslAuthenticator.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SaslAuthenticator.cpp Fri Aug  3 12:13:32 2012
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,6 +23,7 @@
 #  include "config.h"
 #endif
 
+#include "qpid/broker/AclModule.h"
 #include "qpid/broker/Connection.h"
 #include "qpid/log/Statement.h"
 #include "qpid/framing/reply_exceptions.h"
@@ -37,6 +38,7 @@
 using qpid::sys::cyrus::CyrusSecurityLayer;
 #endif
 
+using std::string;
 using namespace qpid::framing;
 using qpid::sys::SecurityLayer;
 using qpid::sys::SecuritySettings;
@@ -164,13 +166,17 @@ void SaslAuthenticator::fini(void)
 
 #endif
 
-std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connection& c, bool isShadow )
+std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connection& c )
 {
     if (c.getBroker().getOptions().auth) {
-        if ( isShadow )
-            return std::auto_ptr<SaslAuthenticator>(new NullAuthenticator(c, c.getBroker().getOptions().requireEncrypted));
-        else 
-            return std::auto_ptr<SaslAuthenticator>(new CyrusAuthenticator(c, c.getBroker().getOptions().requireEncrypted));
+        // The cluster creates non-authenticated connections for internal shadow connections
+        // that are never connected to an external client.
+        if ( !c.isAuthenticated() )
+            return std::auto_ptr<SaslAuthenticator>(
+                new NullAuthenticator(c, c.getBroker().getOptions().requireEncrypted));
+        else
+            return std::auto_ptr<SaslAuthenticator>(
+                new CyrusAuthenticator(c, c.getBroker().getOptions().requireEncrypted));
     } else {
         QPID_LOG(debug, "SASL: No Authentication Performed");
         return std::auto_ptr<SaslAuthenticator>(new NullAuthenticator(c, c.getBroker().getOptions().requireEncrypted));
@@ -178,7 +184,7 @@ std::auto_ptr<SaslAuthenticator> SaslAut
 }
 
 
-NullAuthenticator::NullAuthenticator(Connection& c, bool e) : connection(c), client(c.getOutput()), 
+NullAuthenticator::NullAuthenticator(Connection& c, bool e) : connection(c), client(c.getOutput()),
                                                               realm(c.getBroker().getOptions().realm), encrypt(e) {}
 NullAuthenticator::~NullAuthenticator() {}
 
@@ -214,7 +220,7 @@ void NullAuthenticator::start(const stri
             } else if (i != string::npos) {
                 //authorization id is first null delimited field
                 uid = response->substr(0, i);
-            }//else not a valid SASL PLAIN response, throw error?            
+            }//else not a valid SASL PLAIN response, throw error?
             if (!uid.empty()) {
                 //append realm if it has not already been added
                 i = uid.find(realm);
@@ -226,7 +232,12 @@ void NullAuthenticator::start(const stri
         }
     } else {
         connection.setUserId("anonymous");
-    }   
+    }
+    AclModule* acl = connection.getBroker().getAcl();
+    if (acl && !acl->approveConnection(connection))
+    {
+        throw ConnectionForcedException("User connection denied by configured limit");
+    }
     client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, connection.getHeartbeatMax());
 }
 
@@ -240,7 +251,7 @@ std::auto_ptr<SecurityLayer> NullAuthent
 
 #if HAVE_SASL
 
-CyrusAuthenticator::CyrusAuthenticator(Connection& c, bool _encrypt) : 
+CyrusAuthenticator::CyrusAuthenticator(Connection& c, bool _encrypt) :
     sasl_conn(0), connection(c), client(c.getOutput()), encrypt(_encrypt)
 {
     init();
@@ -271,17 +282,17 @@ void CyrusAuthenticator::init()
                            NULL, /* Callbacks */
                            0, /* Connection flags */
                            &sasl_conn);
-    
+
     if (SASL_OK != code) {
         QPID_LOG(error, "SASL: Connection creation failed: [" << code << "] " << sasl_errdetail(sasl_conn));
-        
+
         // TODO: Change this to an exception signaling
         // server error, when one is available
         throw ConnectionForcedException("Unable to perform authentication");
     }
 
     sasl_security_properties_t secprops;
-    
+
     //TODO: should the actual SSF values be configurable here?
     secprops.min_ssf = encrypt ? 10: 0;
     secprops.max_ssf = 256;
@@ -319,14 +330,14 @@ void CyrusAuthenticator::init()
     secprops.property_values = 0;
     secprops.security_flags = 0; /* or SASL_SEC_NOANONYMOUS etc as appropriate */
     /*
-     * The nodict flag restricts SASL authentication mechanisms 
-     * to those that are not susceptible to dictionary attacks.  
-     * They are:  
+     * The nodict flag restricts SASL authentication mechanisms
+     * to those that are not susceptible to dictionary attacks.
+     * They are:
      *   SRP
      *   PASSDSS-3DES-1
      *   EXTERNAL
      */
-    if (external.nodict) secprops.security_flags |= SASL_SEC_NODICTIONARY;    
+    if (external.nodict) secprops.security_flags |= SASL_SEC_NODICTIONARY;
     int result = sasl_setprop(sasl_conn, SASL_SEC_PROPS, &secprops);
     if (result != SASL_OK) {
         throw framing::InternalErrorException(QPID_MSG("SASL error: " << result));
@@ -371,10 +382,10 @@ void CyrusAuthenticator::getMechanisms(A
                              "", separator, "",
                              &list, &list_len,
                              &count);
-    
+
     if (SASL_OK != code) {
         QPID_LOG(info, "SASL: Mechanism listing failed: " << sasl_errdetail(sasl_conn));
-        
+
         // TODO: Change this to an exception signaling
         // server error, when one is available
         throw ConnectionForcedException("Mechanism listing failed");
@@ -382,17 +393,17 @@ void CyrusAuthenticator::getMechanisms(A
         string mechanism;
         unsigned int start;
         unsigned int end;
-        
+
         QPID_LOG(info, "SASL: Mechanism list: " << list);
-        
+
         end = 0;
         do {
             start = end;
-            
+
             // Seek to end of next mechanism
             while (end < list_len && separator[0] != list[end])
                 end++;
-            
+
             // Record the mechanism
             mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value(string(list, start, end - start))));
             end++;
@@ -404,20 +415,20 @@ void CyrusAuthenticator::start(const str
 {
     const char *challenge;
     unsigned int challenge_len;
-    
+
     // This should be at same debug level as mech list in getMechanisms().
     QPID_LOG(info, "SASL: Starting authentication with mechanism: " << mechanism);
     int code = sasl_server_start(sasl_conn,
                                  mechanism.c_str(),
                                  (response ? response->c_str() : 0), (response ? response->size() : 0),
                                  &challenge, &challenge_len);
-    
+
     processAuthenticationStep(code, challenge, challenge_len);
     qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject();
-    if ( cnxMgmt ) 
+    if ( cnxMgmt )
         cnxMgmt->set_saslMechanism(mechanism);
 }
-        
+
 void CyrusAuthenticator::step(const string& response)
 {
     const char *challenge;
@@ -439,10 +450,17 @@ void CyrusAuthenticator::processAuthenti
             // authentication failure, when one is available
             throw ConnectionForcedException("Authenticated username unavailable");
         }
-        QPID_LOG(info, connection.getMgmtId() << " SASL: Authentication succeeded for: " << uid);
 
         connection.setUserId(uid);
 
+        AclModule* acl = connection.getBroker().getAcl();
+        if (acl && !acl->approveConnection(connection))
+        {
+            throw ConnectionForcedException("User connection denied by configured limit");
+        }
+
+        QPID_LOG(info, connection.getMgmtId() << " SASL: Authentication succeeded for: " << uid);
+
         client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, connection.getHeartbeatMax());
     } else if (SASL_CONTINUE == code) {
         string challenge_str(challenge, challenge_len);
@@ -490,7 +508,7 @@ std::auto_ptr<SecurityLayer> CyrusAuthen
         securityLayer = std::auto_ptr<SecurityLayer>(new CyrusSecurityLayer(sasl_conn, maxFrameSize));
     }
     qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject();
-    if ( cnxMgmt ) 
+    if ( cnxMgmt )
         cnxMgmt->set_saslSsf(ssf);
     return securityLayer;
 }

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SaslAuthenticator.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SaslAuthenticator.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SaslAuthenticator.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SaslAuthenticator.h Fri Aug  3 12:13:32 2012
@@ -54,7 +54,7 @@ public:
     static void init(const std::string& saslName, std::string const & saslConfigPath );
     static void fini(void);
 
-    static std::auto_ptr<SaslAuthenticator> createAuthenticator(Connection& connection, bool isShadow);
+    static std::auto_ptr<SaslAuthenticator> createAuthenticator(Connection& connection);
 
     virtual void callUserIdCallbacks() { }
 };

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SecureConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SecureConnectionFactory.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SecureConnectionFactory.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SecureConnectionFactory.cpp Fri Aug  3 12:13:32 2012
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -41,11 +41,6 @@ SecureConnectionFactory::SecureConnectio
 sys::ConnectionCodec*
 SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id,
                                 const SecuritySettings& external) {
-    if (broker.getConnectionCounter().allowConnection())
-    {
-        QPID_LOG(error, "Client max connection count limit exceeded: " << broker.getOptions().maxConnections << " connection refused");
-        return 0;
-    }
     if (v == ProtocolVersion(0, 10)) {
         SecureConnectionPtr sc(new SecureConnection());
         CodecPtr c(new amqp_0_10::Connection(out, id, false));
@@ -71,5 +66,5 @@ SecureConnectionFactory::create(sys::Out
     return sc.release();
 }
 
-    
+
 }} // namespace qpid::broker

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.cpp Fri Aug  3 12:13:32 2012
@@ -72,7 +72,8 @@ SemanticState::SemanticState(DeliveryAda
       dtxSelected(false),
       authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isUserProxyAuth()),
       userID(getSession().getConnection().getUserId()),
-      closeComplete(false)
+      closeComplete(false),
+      connectionId(getSession().getConnection().getUrl())
 {}
 
 SemanticState::~SemanticState() {
@@ -142,6 +143,7 @@ bool SemanticState::cancel(const string&
         DeliveryRecords::iterator removed =
             remove_if(unacked.begin(), unacked.end(), bind(&DeliveryRecord::isRedundant, _1));
         unacked.erase(removed, unacked.end());
+        getSession().setUnackedCount(unacked.size());
         return true;
     } else {
         return false;
@@ -270,6 +272,7 @@ void SemanticState::checkDtxTimeout()
 void SemanticState::record(const DeliveryRecord& delivery)
 {
     unacked.push_back(delivery);
+    getSession().setUnackedCount(unacked.size());
 }
 
 const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
@@ -426,7 +429,7 @@ void SemanticState::cancel(ConsumerImpl:
     if(queue) {
         queue->cancel(c);
         if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
-            Queue::tryAutoDelete(session.getBroker(), queue);
+            Queue::tryAutoDelete(session.getBroker(), queue, connectionId, userID);
         }
     }
     c->cancel();
@@ -555,6 +558,7 @@ void SemanticState::recover(bool requeue
         //w.r.t id is lost
         sort(unacked.begin(), unacked.end());
     }
+    getSession().setUnackedCount(unacked.size());
 }
 
 void SemanticState::deliver(DeliveryRecord& msg, bool sync)
@@ -712,6 +716,7 @@ void SemanticState::release(DeliveryId f
     DeliveryRecords::iterator removed =
         remove_if(range.start, range.end, bind(&DeliveryRecord::isRedundant, _1));
     unacked.erase(removed, range.end);
+    getSession().setUnackedCount(unacked.size());
 }
 
 void SemanticState::reject(DeliveryId first, DeliveryId last)
@@ -723,6 +728,7 @@ void SemanticState::reject(DeliveryId fi
         if (i->isRedundant()) i = unacked.erase(i);
         else i++;
     }
+    getSession().setUnackedCount(unacked.size());
 }
 
 bool SemanticState::ConsumerImpl::doOutput()
@@ -810,6 +816,7 @@ void SemanticState::accepted(const Seque
                                               (TransactionContext*) 0)));
         unacked.erase(removed, unacked.end());
     }
+    getSession().setUnackedCount(unacked.size());
 }
 
 void SemanticState::completed(const SequenceSet& commands) {
@@ -819,6 +826,7 @@ void SemanticState::completed(const Sequ
                                      bind(&SemanticState::complete, this, _1)));
     unacked.erase(removed, unacked.end());
     requestDispatch();
+    getSession().setUnackedCount(unacked.size());
 }
 
 void SemanticState::attached()

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.h Fri Aug  3 12:13:32 2012
@@ -146,6 +146,8 @@ class SemanticState : private boost::non
         std::string getResumeId() const { return resumeId; };
         const std::string& getTag() const { return tag; }
         uint64_t getResumeTtl() const { return resumeTtl; }
+	uint32_t getDeliveryCount() const { return deliveryCount; }
+	void setDeliveryCount(uint32_t _deliveryCount) { deliveryCount = _deliveryCount; }
         const framing::FieldTable& getArguments() const { return arguments; }
 
         SemanticState& getParent() { return *parent; }
@@ -180,6 +182,8 @@ class SemanticState : private boost::non
     const bool authMsg;
     const std::string userID;
     bool closeComplete;
+    //needed for queue delete events in auto-delete:
+    const std::string connectionId;
 
     void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy);
     void checkDtxTimeout();

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.cpp Fri Aug  3 12:13:32 2012
@@ -41,6 +41,8 @@
 namespace qpid {
 namespace broker {
 
+using std::string;
+
 using namespace qpid;
 using namespace qpid::framing;
 using namespace qpid::framing::dtx;
@@ -107,6 +109,12 @@ void SessionAdapter::ExchangeHandlerImpl
                                                                  false,
                                                                  ManagementAgent::toMap(args),
                                                                  "existing"));
+                QPID_LOG_CAT(debug, model, "Create exchange. name:" << exchange
+                    << " user:" << getConnection().getUserId()
+                    << " rhost:" << getConnection().getUrl()
+                    << " type:" << type
+                    << " alternateExchange:" << alternateExchange
+                    << " durable:" << (durable ? "T" : "F"));
             }
         }catch(UnknownExchangeTypeException& /*e*/){
             throw NotFoundException(QPID_MSG("Exchange type not implemented: " << type));
@@ -204,7 +212,10 @@ ExchangeBoundResult SessionAdapter::Exch
     }
 }
 
-SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session) : HandlerHelper(session), broker(getBroker())
+SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session)
+    : HandlerHelper(session), broker(getBroker()),
+      //record connection id and userid for deleting exclsuive queues after session has ended:
+      connectionId(getConnection().getUrl()), userId(getConnection().getUserId())
 {}
 
 
@@ -223,7 +234,7 @@ void SessionAdapter::QueueHandlerImpl::d
         Queue::shared_ptr q(exclusiveQueues.front());
         q->releaseExclusiveOwnership();
         if (q->canAutoDelete()) {
-            Queue::tryAutoDelete(broker, q);
+            Queue::tryAutoDelete(broker, q, connectionId, userId);
         }
         exclusiveQueues.erase(exclusiveQueues.begin());
     }
@@ -307,6 +318,14 @@ void SessionAdapter::QueueHandlerImpl::d
                 agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
                                                           name, durable, exclusive, autoDelete, alternateExchange, ManagementAgent::toMap(arguments),
                                                           "existing"));
+            QPID_LOG_CAT(debug, model, "Create queue. name:" << name
+                << " user:" << getConnection().getUserId()
+                << " rhost:" << getConnection().getUrl()
+                << " durable:" << (durable ? "T" : "F")
+                << " exclusive:" << (exclusive ? "T" : "F")
+                << " autodelete:" << (autoDelete ? "T" : "F")
+                << " alternateExchange:" << alternateExchange
+            );
         }
 
     }
@@ -411,6 +430,12 @@ SessionAdapter::MessageHandlerImpl::subs
     if (agent)
         agent->raiseEvent(_qmf::EventSubscribe(getConnection().getUrl(), getConnection().getUserId(),
                                                queueName, destination, exclusive, ManagementAgent::toMap(arguments)));
+    QPID_LOG_CAT(debug, model, "Create subscription. queue:" << queueName
+        << " destination:" << destination
+        << " user:" << getConnection().getUserId()
+        << " rhost:" << getConnection().getUrl()
+        << " exclusive:" << (exclusive ? "T" : "F")
+    );
 }
 
 void
@@ -423,6 +448,9 @@ SessionAdapter::MessageHandlerImpl::canc
     ManagementAgent* agent = getBroker().getManagementAgent();
     if (agent)
         agent->raiseEvent(_qmf::EventUnsubscribe(getConnection().getUrl(), getConnection().getUserId(), destination));
+    QPID_LOG_CAT(debug, model, "Delete subscription. destination:" << destination
+        << " user:" << getConnection().getUserId()
+        << " rhost:" << getConnection().getUrl() );
 }
 
 void

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.h Fri Aug  3 12:13:32 2012
@@ -121,6 +121,9 @@ class Queue;
     {
         Broker& broker;
         std::vector< boost::shared_ptr<Queue> > exclusiveQueues;
+        //connectionId and userId are needed for queue-delete events for auto deleted, exclusive queues
+        std::string connectionId;
+        std::string userId;
 
       public:
         QueueHandlerImpl(SemanticState& session);

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SessionContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SessionContext.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SessionContext.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SessionContext.h Fri Aug  3 12:13:32 2012
@@ -47,6 +47,7 @@ class SessionContext : public OwnershipT
     virtual uint16_t getChannel() const = 0;
     virtual const SessionId& getSessionId() const = 0;
     virtual void addPendingExecutionSync() = 0;
+    virtual void setUnackedCount(uint64_t) {}
 };
 
 }} // namespace qpid::broker

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.cpp Fri Aug  3 12:13:32 2012
@@ -35,23 +35,39 @@ SessionHandler::SessionHandler(Connectio
     : amqp_0_10::SessionHandler(&c.getOutput(), ch),
       connection(c),
       proxy(out),
-      clusterOrderProxy(c.getClusterOrderOutput() ? new SetChannelProxy(ch, c.getClusterOrderOutput()) : 0)
+      clusterOrderProxy(c.getClusterOrderOutput() ?
+                        new SetChannelProxy(ch, c.getClusterOrderOutput()) : 0)
 {}
 
 SessionHandler::~SessionHandler() {}
 
-void SessionHandler::connectionException(framing::connection::CloseCode code, const std::string& msg) {
+void SessionHandler::connectionException(
+    framing::connection::CloseCode code, const std::string& msg)
+{
     // NOTE: must tell the error listener _before_ calling connection.close()
-    if (connection.getErrorListener()) connection.getErrorListener()->connectionError(msg);
+    if (connection.getErrorListener())
+        connection.getErrorListener()->connectionError(msg);
+    if (errorListener)
+        errorListener->connectionException(code, msg);
     connection.close(code, msg);
 }
 
-void SessionHandler::channelException(framing::session::DetachCode, const std::string& msg) {
-    if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg);
+void SessionHandler::channelException(
+    framing::session::DetachCode code, const std::string& msg)
+{
+    if (connection.getErrorListener())
+        connection.getErrorListener()->sessionError(getChannel(), msg);
+    if (errorListener)
+        errorListener->channelException(code, msg);
 }
 
-void SessionHandler::executionException(framing::execution::ErrorCode, const std::string& msg) {
-    if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg);
+void SessionHandler::executionException(
+    framing::execution::ErrorCode code, const std::string& msg)
+{
+    if (connection.getErrorListener())
+        connection.getErrorListener()->sessionError(getChannel(), msg);
+    if (errorListener)
+        errorListener->executionException(code, msg);
 }
 
 ConnectionState& SessionHandler::getConnection() { return connection; }
@@ -64,7 +80,7 @@ void SessionHandler::handleDetach() {
     if (session.get())
         connection.getBroker().getSessionManager().detach(session);
     assert(!session.get());
-    if (detachedCallback) detachedCallback();
+    if (errorListener) errorListener->detach();
     connection.closeChannel(channel.get());
 }
 
@@ -118,8 +134,4 @@ void SessionHandler::attached(const std:
     }
 }
 
-void SessionHandler::setDetachedCallback(boost::function<void()> cb) {
-    detachedCallback = cb;
-}
-
 }} // namespace qpid::broker

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.h Fri Aug  3 12:13:32 2012
@@ -25,7 +25,7 @@
 #include "qpid/amqp_0_10/SessionHandler.h"
 #include "qpid/broker/SessionHandler.h"
 #include "qpid/framing/AMQP_ClientProxy.h"
-#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
 
 namespace qpid {
 class SessionState;
@@ -43,6 +43,21 @@ class SessionState;
  */
 class SessionHandler : public amqp_0_10::SessionHandler {
   public:
+    class ErrorListener {
+      public:
+        virtual ~ErrorListener() {}
+        virtual void connectionException(
+            framing::connection::CloseCode code, const std::string& msg) = 0;
+        virtual void channelException(
+            framing::session::DetachCode, const std::string& msg) = 0;
+        virtual void executionException(
+            framing::execution::ErrorCode, const std::string& msg) = 0;
+        /** Called when it is safe to delete the ErrorListener. */
+        virtual void detach() = 0;
+    };
+
+    /**
+     *@param e must not be deleted until ErrorListener::detach has been called */
     SessionHandler(Connection&, framing::ChannelId);
     ~SessionHandler();
 
@@ -71,7 +86,7 @@ class SessionHandler : public amqp_0_10:
     void attached(const std::string& name);//used by 'pushing' inter-broker bridges
     void attachAs(const std::string& name);//used by 'pulling' inter-broker bridges
 
-    void setDetachedCallback(boost::function<void()> cb);
+    void setErrorListener(boost::shared_ptr<ErrorListener> e) { errorListener = e; }
 
   protected:
     virtual void setState(const std::string& sessionName, bool force);
@@ -94,7 +109,7 @@ class SessionHandler : public amqp_0_10:
     framing::AMQP_ClientProxy proxy;
     std::auto_ptr<SessionState> session;
     std::auto_ptr<SetChannelProxy> clusterOrderProxy;
-    boost::function<void ()> detachedCallback;
+    boost::shared_ptr<ErrorListener> errorListener;
 };
 
 }} // namespace qpid::broker

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.cpp Fri Aug  3 12:13:32 2012
@@ -156,7 +156,7 @@ ManagementObject* SessionState::GetManag
 
 Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
                                                      Args&    /*args*/,
-                                                     string&  /*text*/)
+                                                     std::string&  /*text*/)
 {
     Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.h Fri Aug  3 12:13:32 2012
@@ -126,6 +126,11 @@ class SessionState : public qpid::Sessio
     // the SessionState of a received Execution.Sync command.
     void addPendingExecutionSync();
 
+    void setUnackedCount(uint64_t count) {
+        if (mgmtObject)
+            mgmtObject->set_unackedMessages(count);
+    }
+
     // Used to delay creation of management object for sessions
     // belonging to inter-broker bridges
     void addManagementObject();

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/System.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/System.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/System.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/System.cpp Fri Aug  3 12:13:32 2012
@@ -37,7 +37,6 @@ System::System (string _dataDir, Broker*
 
     if (agent != 0)
     {
-        framing::Uuid systemId;
 
         if (_dataDir.empty ())
         {
@@ -66,14 +65,13 @@ System::System (string _dataDir, Broker*
         }
 
         mgmtObject = new _qmf::System(agent, this, types::Uuid(systemId.c_array()));
-        std::string sysname, nodename, release, version, machine;
-        qpid::sys::SystemInfo::getSystemId (sysname,
-                                            nodename,
+        qpid::sys::SystemInfo::getSystemId (osName,
+                                            nodeName,
                                             release,
                                             version,
                                             machine);
-        mgmtObject->set_osName   (sysname);
-        mgmtObject->set_nodeName (nodename);
+        mgmtObject->set_osName   (osName);
+        mgmtObject->set_nodeName (nodeName);
         mgmtObject->set_release  (release);
         mgmtObject->set_version  (version);
         mgmtObject->set_machine  (machine);

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/System.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/System.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/System.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/System.h Fri Aug  3 12:13:32 2012
@@ -21,6 +21,7 @@
 //
 
 #include "qpid/management/Manageable.h"
+#include "qpid/framing/Uuid.h"
 #include "qmf/org/apache/qpid/broker/System.h"
 #include <boost/shared_ptr.hpp>
 #include <string>
@@ -35,6 +36,8 @@ class System : public management::Manage
   private:
 
     qmf::org::apache::qpid::broker::System* mgmtObject;
+    framing::Uuid systemId;
+    std::string osName, nodeName, release, version, machine;
 
   public:
 
@@ -44,6 +47,20 @@ class System : public management::Manage
 
     management::ManagementObject* GetManagementObject (void) const
     { return mgmtObject; }
+
+
+    /** Persistent UUID assigned by the management system to this broker. */
+    framing::Uuid getSystemId() const  { return systemId; }
+    /** Returns the OS name; e.g., GNU/Linux or Windows */
+    std::string getOsName() const { return osName; }
+    /** Returns the node name. Usually the same as the host name. */
+    std::string getNodeName() const { return nodeName; }
+    /** Returns the OS release identifier. */
+    std::string getRelease() const { return release; }
+    /** Returns the OS release version (kernel, build, sp, etc.) */
+    std::string getVersion() const { return version; }
+    /** Returns the hardware type. */
+    std::string getMachine() const { return machine; }
 };
 
 }}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org