You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/03 14:14:12 UTC

svn commit: r1368910 [3/27] - in /qpid/branches/asyncstore: ./ bin/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ruby/ cpp/bindings/qpid/python/ cpp/bindings/qpid/ruby/ cpp/bindings/qpid/ruby/features/ cpp/bindings/qpid/ruby/features/step_definitions/ cpp/...

Modified: qpid/branches/asyncstore/cpp/src/qpid/RefCounted.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/RefCounted.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/RefCounted.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/RefCounted.h Fri Aug  3 12:13:32 2012
@@ -49,15 +49,11 @@ protected:
 };
 
 
-} // namespace qpid
-
 // intrusive_ptr support.
-namespace boost {
-template <typename T>
-inline void intrusive_ptr_add_ref(const T* p) { p->qpid::RefCounted::addRef(); }
-template <typename T>
-inline void intrusive_ptr_release(const T* p) { p->qpid::RefCounted::release(); }
-}
+inline void intrusive_ptr_add_ref(const RefCounted* p) { p->addRef(); }
+inline void intrusive_ptr_release(const RefCounted* p) { p->release(); }
+
+} // namespace qpid
 
 
 #endif  /*!QPID_REFCOUNTED_H*/

Propchange: qpid/branches/asyncstore/cpp/src/qpid/acl/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src/qpid/acl:r1333988-1368650

Modified: qpid/branches/asyncstore/cpp/src/qpid/acl/Acl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/acl/Acl.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/acl/Acl.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/acl/Acl.cpp Fri Aug  3 12:13:32 2012
@@ -51,7 +51,7 @@ using qpid::management::Args;
 namespace _qmf = qmf::org::apache::qpid::acl;
 
 Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(false), mgmtObject(0),
-    connectionCounter(new ConnectionCounter(*this, aclValues.aclMaxConnectPerUser, aclValues.aclMaxConnectPerIp))
+    connectionCounter(new ConnectionCounter(*this, aclValues.aclMaxConnectPerUser, aclValues.aclMaxConnectPerIp, aclValues.aclMaxConnectTotal))
 {
 
     agent = broker->getManagementAgent();
@@ -60,11 +60,14 @@ Acl::Acl (AclValues& av, Broker& b): acl
         _qmf::Package  packageInit(agent);
         mgmtObject = new _qmf::Acl (agent, this, broker);
         agent->addObject (mgmtObject);
+        mgmtObject->set_maxConnections(aclValues.aclMaxConnectTotal);
+        mgmtObject->set_maxConnectionsPerIp(aclValues.aclMaxConnectPerIp);
+        mgmtObject->set_maxConnectionsPerUser(aclValues.aclMaxConnectPerUser);
     }
     std::string errorString;
     if (!readAclFile(errorString)){
-        throw Exception("Could not read ACL file " + errorString);
         if (mgmtObject!=0) mgmtObject->set_enforcingAcl(0);
+        throw Exception("Could not read ACL file " + errorString);
     }
     broker->getConnectionObservers().add(connectionCounter);
     QPID_LOG(info, "ACL Plugin loaded");
@@ -121,6 +124,11 @@ bool Acl::authorise(
 }
 
 
+bool Acl::approveConnection(const qpid::broker::Connection& conn)
+{
+    return connectionCounter->approveConnection(conn);
+}
+
 bool Acl::result(
     const AclResult&   aclreslt,
     const std::string& id,

Modified: qpid/branches/asyncstore/cpp/src/qpid/acl/Acl.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/acl/Acl.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/acl/Acl.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/acl/Acl.h Fri Aug  3 12:13:32 2012
@@ -38,6 +38,7 @@
 namespace qpid {
 namespace broker {
 class Broker;
+class Connection;
 }
 
 namespace acl {
@@ -45,8 +46,9 @@ class ConnectionCounter;
 
 struct AclValues {
     std::string aclFile;
-    uint32_t    aclMaxConnectPerUser;
-    uint32_t    aclMaxConnectPerIp;
+    uint16_t    aclMaxConnectPerUser;
+    uint16_t    aclMaxConnectPerIp;
+    uint16_t    aclMaxConnectTotal;
 };
 
 
@@ -66,6 +68,9 @@ private:
 public:
     Acl (AclValues& av, broker::Broker& b);
 
+    /** reportConnectLimit
+     * issue management counts and alerts for denied connections
+     */
     void reportConnectLimit(const std::string user, const std::string addr);
 
     inline virtual bool doTransferAcl() {
@@ -87,6 +92,8 @@ public:
         const std::string&               ExchangeName,
         const std::string&               RoutingKey);
 
+    virtual bool approveConnection(const broker::Connection& connection);
+
     virtual ~Acl();
 private:
     bool result(

Modified: qpid/branches/asyncstore/cpp/src/qpid/acl/AclConnectionCounter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/acl/AclConnectionCounter.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/acl/AclConnectionCounter.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/acl/AclConnectionCounter.cpp Fri Aug  3 12:13:32 2012
@@ -34,37 +34,82 @@ namespace acl {
 
 //
 // This module instantiates a broker::ConnectionObserver and limits client
-// connections by counting connections per user name and per client IP address.
+// connections by counting connections per user name, per client IP address
+// and per total connection count.
 //
 
 
 //
 //
 //
-ConnectionCounter::ConnectionCounter(Acl& a, uint32_t nl, uint32_t hl) :
-    acl(a), nameLimit(nl), hostLimit(hl) {}
+ConnectionCounter::ConnectionCounter(Acl& a, uint16_t nl, uint16_t hl, uint16_t tl) :
+    acl(a), nameLimit(nl), hostLimit(hl), totalLimit(tl), totalCurrentConnections(0) {}
 
 ConnectionCounter::~ConnectionCounter() {}
 
 
 //
-// limitCheckLH
+// limitApproveLH
+//
+// Connection creation approver. Return true only if user is under limit.
+// Called with lock held.
+//
+bool ConnectionCounter::limitApproveLH(
+    connectCountsMap_t& theMap,
+    const std::string& theName,
+    uint16_t theLimit,
+    bool emitLog) {
+
+    bool result(true);
+    if (theLimit > 0) {
+        uint16_t count;
+        connectCountsMap_t::iterator eRef = theMap.find(theName);
+        if (eRef != theMap.end()) {
+            count = (uint16_t)(*eRef).second;
+            result = count <= theLimit;
+        } else {
+            // Not found
+            count = 0;
+        }
+        if (emitLog) {
+            QPID_LOG(trace, "ACL ConnectionApprover IP=" << theName
+                << " limit=" << theLimit
+                << " curValue=" << count
+                << " result=" << (result ? "allow" : "deny"));
+        }
+    }
+    return result;
+}
+
+
+//
+// countConnectionLH
 //
 // Increment the name's count in map and return a comparison against the limit.
 // called with dataLock already taken
 //
-bool ConnectionCounter::limitCheckLH(
-    connectCountsMap_t& theMap, const std::string& theName, uint32_t theLimit) {
+bool ConnectionCounter::countConnectionLH(
+    connectCountsMap_t& theMap,
+    const std::string& theName,
+    uint16_t theLimit,
+    bool emitLog) {
 
     bool result(true);
+    uint16_t count(0);
     if (theLimit > 0) {
         connectCountsMap_t::iterator eRef = theMap.find(theName);
         if (eRef != theMap.end()) {
-            uint32_t count = (uint32_t)(*eRef).second + 1;
+            count = (uint16_t)(*eRef).second + 1;
             (*eRef).second = count;
             result = count <= theLimit;
         } else {
-            theMap[theName] = 1;
+            theMap[theName] = count = 1;
+        }
+        if (emitLog) {
+            QPID_LOG(trace, "ACL ConnectionApprover user=" << theName
+                << " limit=" << theLimit
+                << " curValue=" << count
+                << " result=" << (result ? "allow" : "deny"));
         }
     }
     return result;
@@ -78,12 +123,12 @@ bool ConnectionCounter::limitCheckLH(
 // called with dataLock already taken
 //
 void ConnectionCounter::releaseLH(
-    connectCountsMap_t& theMap, const std::string& theName, uint32_t theLimit) {
+    connectCountsMap_t& theMap, const std::string& theName, uint16_t theLimit) {
 
     if (theLimit > 0) {
         connectCountsMap_t::iterator eRef = theMap.find(theName);
         if (eRef != theMap.end()) {
-            uint32_t count = (uint32_t) (*eRef).second;
+            uint16_t count = (uint16_t) (*eRef).second;
             assert (count > 0);
             if (1 == count) {
                 theMap.erase (eRef);
@@ -103,52 +148,20 @@ void ConnectionCounter::releaseLH(
 // connection - called during Connection's constructor
 //
 void ConnectionCounter::connection(broker::Connection& connection) {
-    QPID_LOG(trace, "ACL ConnectionCounter connection IP:" << connection.getMgmtId()
-        << ", userId:" << connection.getUserId());
+    QPID_LOG(trace, "ACL ConnectionCounter new connection: " << connection.getMgmtId());
 
-    Mutex::ScopedLock locker(dataLock);
-
-    connectProgressMap[connection.getMgmtId()] = C_CREATED;
-}
-
-
-//
-// opened - called when first AMQP frame is received over Connection
-//
-void ConnectionCounter::opened(broker::Connection& connection) {
-    QPID_LOG(trace, "ACL ConnectionCounter Opened IP:" << connection.getMgmtId()
-        << ", userId:" << connection.getUserId());
+    const std::string& hostName(getClientHost(connection.getMgmtId()));
 
     Mutex::ScopedLock locker(dataLock);
 
-    const std::string& userName(              connection.getUserId());
-    const std::string& hostName(getClientHost(connection.getMgmtId()));
-
-    // Bump state from CREATED to OPENED
-    (void) limitCheckLH(connectProgressMap, connection.getMgmtId(), C_OPENED);
+    // Total connections goes up
+    totalCurrentConnections += 1;
 
-    bool nameOk = limitCheckLH(connectByNameMap, userName, nameLimit);
-    bool hostOk = limitCheckLH(connectByHostMap, hostName, hostLimit);
+    // Record the fact that this connection exists
+    connectProgressMap[connection.getMgmtId()] = C_CREATED;
 
-    if (!nameOk) {
-        // User has too many
-        acl.reportConnectLimit(userName, hostName);
-        QPID_LOG(notice, "ACL ConnectionCounter User '" << userName
-            << "' exceeded maximum allowed connections");
-        throw Exception(
-            QPID_MSG("User '" << userName
-                << "' exceeded maximum allowed connections"));
-    }
-
-    if (!hostOk) {
-        // Host has too many
-        acl.reportConnectLimit(userName, hostName);
-        QPID_LOG(notice, "ACL ConnectionCounter Client host '" << hostName
-            << "' exceeded maximum allowed connections");
-        throw Exception(
-            QPID_MSG("Client host '" << hostName
-                << "' exceeded maximum allowed connections"));
-    }
+    // Count the connection from this host.
+    (void) countConnectionLH(connectByHostMap, hostName, hostLimit, false);
 }
 
 
@@ -156,7 +169,7 @@ void ConnectionCounter::opened(broker::C
 // closed - called during Connection's destructor
 //
 void ConnectionCounter::closed(broker::Connection& connection) {
-    QPID_LOG(trace, "ACL ConnectionCounter Closed IP:" << connection.getMgmtId()
+    QPID_LOG(trace, "ACL ConnectionCounter closed: " << connection.getMgmtId()
         << ", userId:" << connection.getUserId());
 
     Mutex::ScopedLock locker(dataLock);
@@ -165,32 +178,129 @@ void ConnectionCounter::closed(broker::C
     if (eRef != connectProgressMap.end()) {
         if ((*eRef).second == C_OPENED){
             // Normal case: connection was created and opened.
-            // Decrement in-use counts
+            // Decrement user in-use counts
             releaseLH(connectByNameMap,
                       connection.getUserId(),
                       nameLimit);
-
-            releaseLH(connectByHostMap,
-                      getClientHost(connection.getMgmtId()),
-                      hostLimit);
         } else {
             // Connection was created but not opened.
-            // Don't decrement any connection counts.
+            // Don't decrement user count.
         }
+
+        // Decrement host in-use count.
+        releaseLH(connectByHostMap,
+                  getClientHost(connection.getMgmtId()),
+                  hostLimit);
+
+        // destroy connection progress indicator
         connectProgressMap.erase(eRef);
 
     } else {
         // connection not found in progress map
-        QPID_LOG(notice, "ACL ConnectionCounter info for '" << connection.getMgmtId()
+        QPID_LOG(notice, "ACL ConnectionCounter closed info for '" << connection.getMgmtId()
             << "' not found in connection state pool");
     }
+
+    // total connections
+    totalCurrentConnections -= 1;
 }
 
 
 //
+// approveConnection
+//  check total connections, connections from IP, connections by user and
+//  disallow if over any limit
+//
+bool ConnectionCounter::approveConnection(const broker::Connection& connection)
+{
+    const std::string& hostName(getClientHost(connection.getMgmtId()));
+    const std::string& userName(              connection.getUserId());
+
+    Mutex::ScopedLock locker(dataLock);
+
+    // Bump state from CREATED to OPENED
+    (void) countConnectionLH(connectProgressMap, connection.getMgmtId(),
+                             C_OPENED, false);
+
+    // Approve total connections
+    bool okTotal  = true;
+    if (totalLimit > 0) {
+        okTotal = totalCurrentConnections <= totalLimit;
+        if (!connection.isShadow()) {
+            QPID_LOG(trace, "ACL ConnectionApprover totalLimit=" << totalLimit
+                << " curValue=" << totalCurrentConnections
+                << " result=" << (okTotal ? "allow" : "deny"));
+        }
+    }
+
+    // Approve by IP host connections
+    bool okByIP   = limitApproveLH(connectByHostMap, hostName, hostLimit, !connection.isShadow());
+
+    // Count and Approve the connection by the user
+    bool okByUser = countConnectionLH(connectByNameMap, userName, nameLimit, !connection.isShadow());
+
+    if (!connection.isShadow()) {
+        // Emit separate log for each disapproval
+        if (!okTotal) {
+            QPID_LOG(error, "Client max total connection count limit of " << totalLimit
+                << " exceeded by '"
+                << connection.getMgmtId() << "', user: '"
+                << userName << "'. Connection refused");
+        }
+        if (!okByIP) {
+            QPID_LOG(error, "Client max per-host connection count limit of "
+                << hostLimit << " exceeded by '"
+                << connection.getMgmtId() << "', user: '"
+                << userName << "'. Connection refused.");
+        }
+        if (!okByUser) {
+            QPID_LOG(error, "Client max per-user connection count limit of "
+                << nameLimit << " exceeded by '"
+                << connection.getMgmtId() << "', user: '"
+                << userName << "'. Connection refused.");
+        }
+
+        // Count/Event once for each disapproval
+        bool result = okTotal && okByIP && okByUser;
+        if (!result) {
+            acl.reportConnectLimit(userName, hostName);
+        }
+
+        return result;
+    } else {
+        // Always allow shadow connections
+        if (!okTotal) {
+            QPID_LOG(warning, "Client max total connection count limit of " << totalLimit
+                << " exceeded by '"
+                << connection.getMgmtId() << "', user: '"
+                << userName << "' but still within tolerance. Cluster connection allowed");
+        }
+        if (!okByIP) {
+            QPID_LOG(warning, "Client max per-host connection count limit of "
+                << hostLimit << " exceeded by '"
+                << connection.getMgmtId() << "', user: '"
+                << userName << "' but still within tolerance. Cluster connection allowed");
+        }
+        if (!okByUser) {
+            QPID_LOG(warning, "Client max per-user connection count limit of "
+                << nameLimit << " exceeded by '"
+                << connection.getMgmtId() << "', user: '"
+                << userName << "' but still within tolerance. Cluster connection allowed");
+        }
+        if (okTotal && okByIP && okByUser) {
+            QPID_LOG(debug, "Cluster client connection: '"
+                << connection.getMgmtId() << "', user '"
+                <<  userName << "' allowed");
+        }
+        return true;
+    }
+}
+
+//
 // getClientIp - given a connection's mgmtId return the client host part.
 //
 // TODO: Ideally this would be a method of the connection itself.
+// TODO: Verify it works with rdma connection names.
 //
 std::string ConnectionCounter::getClientHost(const std::string mgmtId)
 {

Modified: qpid/branches/asyncstore/cpp/src/qpid/acl/AclConnectionCounter.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/acl/AclConnectionCounter.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/acl/AclConnectionCounter.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/acl/AclConnectionCounter.h Fri Aug  3 12:13:32 2012
@@ -48,32 +48,52 @@ private:
     enum CONNECTION_PROGRESS { C_CREATED=1, C_OPENED=2 };
 
     Acl&             acl;
-    uint32_t         nameLimit;
-    uint32_t         hostLimit;
+    uint16_t         nameLimit;
+    uint16_t         hostLimit;
+    uint16_t         totalLimit;
+    uint16_t         totalCurrentConnections;
     qpid::sys::Mutex dataLock;
 
+    /** Records per-connection state */
     connectCountsMap_t connectProgressMap;
+
+    /** Records per-username counts */
     connectCountsMap_t connectByNameMap;
+
+    /** Records per-host counts */
     connectCountsMap_t connectByHostMap;
 
+    /** Given a connection's management ID, return the client host name */
     std::string getClientHost(const std::string mgmtId);
 
-    bool limitCheckLH(connectCountsMap_t& theMap,
-                      const std::string& theName,
-                      uint32_t theLimit);
+    /** Return approval for proposed connection */
+    bool limitApproveLH(connectCountsMap_t& theMap,
+                        const std::string& theName,
+                        uint16_t theLimit,
+                        bool emitLog);
+
+    /** Record a connection.
+     * @return indication if user/host is over its limit */
+    bool countConnectionLH(connectCountsMap_t& theMap,
+                           const std::string& theName,
+                           uint16_t theLimit,
+                           bool emitLog);
 
+    /** Release a connection */
     void releaseLH(connectCountsMap_t& theMap,
                    const std::string& theName,
-                   uint32_t theLimit);
+                   uint16_t theLimit);
 
 public:
-    ConnectionCounter(Acl& acl, uint32_t nl, uint32_t hl);
+    ConnectionCounter(Acl& acl, uint16_t nl, uint16_t hl, uint16_t tl);
     ~ConnectionCounter();
 
+    // ConnectionObserver interface
     void connection(broker::Connection& connection);
-    void     opened(broker::Connection& connection);
     void     closed(broker::Connection& connection);
 
+    // Connection counting
+    bool approveConnection(const broker::Connection& conn);
 };
 
 }} // namespace qpid::ha

Modified: qpid/branches/asyncstore/cpp/src/qpid/acl/AclData.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/acl/AclData.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/acl/AclData.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/acl/AclData.cpp Fri Aug  3 12:13:32 2012
@@ -305,7 +305,9 @@ namespace acl {
     // lookup
     //
     // The ACL main business logic function of matching rules and declaring
-    // an allow or deny result.
+    // an allow or deny result. This lookup is the fastpath per-message
+    // lookup to verify if a user is allowed to publish to an exchange with
+    // a given key.
     //
     AclResult AclData::lookup(
         const std::string&              id,
@@ -331,7 +333,8 @@ namespace acl {
 
             if (itrRule != actionList[action][objType]->end() )
             {
-                //loop the vector
+                // Found a rule list for this user-action-object set.
+                // Search the rule list for a matching rule.
                 ruleSetItr rsItr = itrRule->second.end();
                 for (int cnt = itrRule->second.size(); cnt != 0; cnt--)
                 {
@@ -339,56 +342,46 @@ namespace acl {
 
                     QPID_LOG(debug, "ACL: checking rule " <<  rsItr->toString());
 
-                    // loop the names looking for match
+                    // Search on exchange name and routing key only if specfied in rule.
                     bool match =true;
-                    for (specPropertyMapItr pMItr  = rsItr->props.begin();
-                                           (pMItr != rsItr->props.end()) && match;
-                                            pMItr++)
+                    if (rsItr->pubExchNameInRule)
                     {
-                        //match name is exists first
-                        switch (pMItr->first)
+                        if (matchProp(rsItr->pubExchName, name))
                         {
-                        case acl::SPECPROP_NAME:
-                            if (matchProp(pMItr->second, name))
-                            {
-                                QPID_LOG(debug, "ACL: lookup exchange name '"
-                                    << name << "' matched with rule name '"
-                                    << pMItr->second << "'");
-
-                            }
-                            else
-                            {
-                                match= false;
-                                QPID_LOG(debug, "ACL: lookup exchange name '"
-                                    << name << "' did not match with rule name '"
-                                    << pMItr->second << "'");
-                            }
-                            break;
-
-                        case acl::SPECPROP_ROUTINGKEY:
-                            if (matchProp(pMItr->second, routingKey))
-                            {
-                                QPID_LOG(debug, "ACL: lookup key name '"
-                                    << routingKey << "' matched with rule routing key '"
-                                    << pMItr->second << "'");
-                            }
-                            else
-                            {
-                                match= false;
-                                QPID_LOG(debug, "ACL: lookup key name '"
-                                    << routingKey << "' did not match with rule routing key '"
-                                    << pMItr->second << "'");
-                            }
-                            break;
-
-                        default:
-                            // Don't care
-                            break;
-                        };
+                            QPID_LOG(debug, "ACL: Rule: " << rsItr->rawRuleNum << " lookup exchange name '"
+                                << name << "' matched with rule name '"
+                                << rsItr->pubExchName << "'");
+
+                        }
+                        else
+                        {
+                            match= false;
+                            QPID_LOG(debug, "ACL: Rule: " << rsItr->rawRuleNum << " lookup exchange name '"
+                                << name << "' did not match with rule name '"
+                                << rsItr->pubExchName << "'");
+                        }
+                    }
+
+                    if (match && rsItr->pubRoutingKeyInRule)
+                    {
+                        if (rsItr->matchRoutingKey(routingKey))
+                        {
+                            QPID_LOG(debug, "ACL: Rule: " << rsItr->rawRuleNum << " lookup key name '"
+                                << routingKey << "' matched with rule routing key '"
+                                << rsItr->pubRoutingKey << "'");
+                        }
+                        else
+                        {
+                            QPID_LOG(debug, "ACL: Rule: " << rsItr->rawRuleNum << " lookup key name '"
+                                << routingKey << "' did not match with rule routing key '"
+                                << rsItr->pubRoutingKey << "'");
+                            match = false;
+                        }
                     }
+
                     if (match){
                         aclresult = rsItr->ruleMode;
-                        QPID_LOG(debug,"ACL: Successful match, the decision is:"
+                        QPID_LOG(debug,"ACL: Rule: " << rsItr->rawRuleNum << " Successful match, the decision is:"
                             << AclHelper::getAclResultStr(aclresult));
                         return aclresult;
                     }

Modified: qpid/branches/asyncstore/cpp/src/qpid/acl/AclData.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/acl/AclData.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/acl/AclData.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/acl/AclData.h Fri Aug  3 12:13:32 2012
@@ -21,6 +21,9 @@
  */
 
 #include "qpid/broker/AclModule.h"
+#include "AclTopicMatch.h"
+#include "qpid/log/Statement.h"
+#include "boost/shared_ptr.hpp"
 #include <vector>
 #include <sstream>
 
@@ -48,18 +51,29 @@ public:
     // A single ACL file entry may create many rule entries in
     //  many ruleset vectors.
     //
-    struct rule {
+    struct Rule {
+        typedef broker::TopicExchange::TopicExchangeTester topicTester;
 
         int                   rawRuleNum;   // rule number in ACL file
         qpid::acl::AclResult  ruleMode;     // combined allow/deny log/nolog
         specPropertyMap       props;        //
+        bool                  pubRoutingKeyInRule;
+        std::string           pubRoutingKey;
+        boost::shared_ptr<topicTester> pTTest;
+        bool                  pubExchNameInRule;
+        std::string           pubExchName;
 
-
-        rule (int ruleNum, qpid::acl::AclResult res, specPropertyMap& p) :
+        Rule (int ruleNum, qpid::acl::AclResult res, specPropertyMap& p) :
             rawRuleNum(ruleNum),
             ruleMode(res),
-            props(p)
-            {};
+            props(p),
+            pubRoutingKeyInRule(false),
+            pubRoutingKey(),
+            pTTest(boost::shared_ptr<topicTester>(new topicTester())),
+            pubExchNameInRule(false),
+            pubExchName()
+            {}
+
 
         std::string toString () const {
             std::ostringstream ruleStr;
@@ -76,9 +90,21 @@ public:
             ruleStr << " }]";
             return ruleStr.str();
         }
+
+        void addTopicTest(const std::string& pattern) {
+            pTTest->addBindingKey(broker::TopicExchange::normalize(pattern));
+        }
+
+        // Topic Exchange tester
+        // return true if any bindings match 'pattern'
+        bool matchRoutingKey(const std::string& pattern) const
+        {
+            topicTester::BindingVec bv;
+            return pTTest->findMatches(pattern, bv);
+        }
     };
 
-    typedef  std::vector<rule>               ruleSet;
+    typedef  std::vector<Rule>               ruleSet;
     typedef  ruleSet::const_iterator         ruleSetItr;
     typedef  std::map<std::string, ruleSet > actionObject; // user
     typedef  actionObject::iterator          actObjItr;

Modified: qpid/branches/asyncstore/cpp/src/qpid/acl/AclPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/acl/AclPlugin.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/acl/AclPlugin.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/acl/AclPlugin.cpp Fri Aug  3 12:13:32 2012
@@ -39,10 +39,13 @@ struct AclOptions : public Options {
     AclValues& values;
 
     AclOptions(AclValues& v) : Options("ACL Options"), values(v) {
+        values.aclMaxConnectTotal = 500;
         addOptions()
             ("acl-file",           optValue(values.aclFile, "FILE"), "The policy file to load from, loaded from data dir")
-            ("acl-max-connect-per-user", optValue(values.aclMaxConnectPerUser, "N"), "The maximum number of connections allowed per user")
-            ("acl-max-connect-per-ip"  , optValue(values.aclMaxConnectPerIp, "N"),   "The maximum number of connections allowed per host IP address");
+            ("max-connections"         , optValue(values.aclMaxConnectTotal, "N"),   "The maximum combined number of connections allowed. 0 implies no limit.")
+            ("max-connections-per-user", optValue(values.aclMaxConnectPerUser, "N"), "The maximum number of connections allowed per user. 0 implies no limit.")
+            ("max-connections-per-ip"  , optValue(values.aclMaxConnectPerIp, "N"),   "The maximum number of connections allowed per host IP address. 0 implies no limit.")
+            ;
     }
 };
 
@@ -69,7 +72,6 @@ struct AclPlugin : public Plugin {
             oss << b.getDataDir().getPath() << "/" << values.aclFile;
             values.aclFile = oss.str();
     	}
-
         acl = new Acl(values, b);
         b.setAcl(acl.get());
         b.addFinalizer(boost::bind(&AclPlugin::shutdown, this));

Modified: qpid/branches/asyncstore/cpp/src/qpid/acl/AclReader.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/acl/AclReader.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/acl/AclReader.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/acl/AclReader.cpp Fri Aug  3 12:13:32 2012
@@ -101,7 +101,7 @@ namespace acl {
                         << AclHelper::getAclResultStr(d->decisionMode));
                     foundmode = true;
             } else {
-                AclData::rule rule(cnt, (*i)->res, (*i)->props);
+                AclData::Rule rule(cnt, (*i)->res, (*i)->props);
 
                 // Action -> Object -> map<user -> set<Rule> >
                 std::ostringstream actionstr;
@@ -110,8 +110,27 @@ namespace acl {
                     (*i)->actionAll ? acnt++ : acnt = acl::ACTIONSIZE) {
 
                     if (acnt == acl::ACT_PUBLISH)
+                    {
                         d->transferAcl = true; // we have transfer ACL
-
+                        // For Publish the only object should be Exchange
+                        // and the only property should be routingkey.
+                        // Go through the rule properties and find the name and the key.
+                        // If found then place them specially for the lookup engine.
+                        for (pmCitr pItr=(*i)->props.begin(); pItr!=(*i)->props.end(); pItr++) {
+                            if (acl::SPECPROP_ROUTINGKEY == pItr->first)
+                            {
+                                rule.pubRoutingKeyInRule = true;
+                                rule.pubRoutingKey = (std::string)pItr->second;
+                                rule.addTopicTest(rule.pubRoutingKey);
+                                break;
+                            }
+                            if (acl::SPECPROP_NAME == pItr->first)
+                            {
+                                rule.pubExchNameInRule = true;
+                                rule.pubExchName = pItr->second;
+                            }
+                        }
+                    }
                     actionstr << AclHelper::getActionStr((Action) acnt) << ",";
 
                     //find the Action, create if not exist
@@ -285,7 +304,7 @@ namespace acl {
             if (ws) {
                 ret = true;
             } else {
-                errorStream << ACL_FORMAT_ERR_LOG_PREFIX << "Line : " << lineNumber 
+                errorStream << ACL_FORMAT_ERR_LOG_PREFIX << "Line : " << lineNumber
                     << ", Non-continuation line must start with \"group\" or \"acl\".";
                 ret = false;
             }
@@ -314,13 +333,23 @@ namespace acl {
         if (contFlag) {
             gmCitr citr = groups.find(groupName);
             for (unsigned i = 0; i < toksSize; i++) {
-                if (!isValidUserName(toks[i])) return false;
+                if (isValidGroupName(toks[i])) {
+                    if (toks[i] == groupName) {
+                        QPID_LOG(debug, "ACL: Line: " << lineNumber
+                            << ", Ignoring recursive sub-group \"" << toks[i] << "\".");
+                        continue;
+                    } else if (groups.find(toks[i]) == groups.end()) {
+                        errorStream << ACL_FORMAT_ERR_LOG_PREFIX << "Line : " << lineNumber
+                            << ", Sub-group \"" << toks[i] << "\" not defined yet.";
+                        return false;
+                    }
+                } else if (!isValidUserName(toks[i])) return false;
                 addName(toks[i], citr->second);
             }
         } else {
             const unsigned minimumSize = (cont ? 2 : 3);
             if (toksSize < minimumSize) {
-                errorStream << ACL_FORMAT_ERR_LOG_PREFIX << "Line : " << lineNumber 
+                errorStream << ACL_FORMAT_ERR_LOG_PREFIX << "Line : " << lineNumber
                     << ", Insufficient tokens for group definition.";
                 return false;
             }
@@ -332,7 +361,17 @@ namespace acl {
             gmCitr citr = addGroup(toks[1]);
             if (citr == groups.end()) return false;
             for (unsigned i = 2; i < toksSize; i++) {
-                if (!isValidUserName(toks[i])) return false;
+                if (isValidGroupName(toks[i])) {
+                    if (toks[i] == groupName) {
+                        QPID_LOG(debug, "ACL: Line: " << lineNumber
+                            << ", Ignoring recursive sub-group \"" << toks[i] << "\".");
+                        continue;
+                    } else if (groups.find(toks[i]) == groups.end()) {
+                        errorStream << ACL_FORMAT_ERR_LOG_PREFIX << "Line : " << lineNumber
+                            << ", Sub-group \"" << toks[i] << "\" not defined yet.";
+                        return false;
+                    }
+                } else if (!isValidUserName(toks[i])) return false;
                 addName(toks[i], citr->second);
             }
         }
@@ -356,7 +395,7 @@ namespace acl {
 
     void AclReader::addName(const std::string& name, nameSetPtr groupNameSet) {
         gmCitr citr = groups.find(name);
-        if (citr != groups.end() && citr->first != name){
+        if (citr != groups.end()) {
             // This is a previously defined group: add all the names in that group to this group
             groupNameSet->insert(citr->second->begin(), citr->second->end());
         } else {
@@ -459,7 +498,7 @@ namespace acl {
                 nvPair propNvp = splitNameValuePair(toks[i]);
                 if (propNvp.second.size() == 0) {
                     errorStream << ACL_FORMAT_ERR_LOG_PREFIX <<  "Line : " << lineNumber
-                        <<", Badly formed property name-value pair \"" 
+                        <<", Badly formed property name-value pair \""
                         << propNvp.first << "\". (Must be name=value)";
                     return false;
                 }

Modified: qpid/branches/asyncstore/cpp/src/qpid/acl/AclReader.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/acl/AclReader.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/acl/AclReader.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/acl/AclReader.h Fri Aug  3 12:13:32 2012
@@ -26,6 +26,7 @@
 #include <string>
 #include <vector>
 #include <sstream>
+#include <memory>
 #include "qpid/acl/AclData.h"
 #include "qpid/broker/AclModule.h"
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/acl/AclValidator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/acl/AclValidator.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/acl/AclValidator.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/acl/AclValidator.cpp Fri Aug  3 12:13:32 2012
@@ -131,7 +131,7 @@ namespace acl {
             boost::bind(&AclValidator::validateRule, this, _1));
     }
 
-    void AclValidator::validateRule(qpid::acl::AclData::rule& rule){
+    void AclValidator::validateRule(qpid::acl::AclData::Rule& rule){
         std::for_each(rule.props.begin(),
             rule.props.end(),
             boost::bind(&AclValidator::validateProperty, this, _1));

Modified: qpid/branches/asyncstore/cpp/src/qpid/acl/AclValidator.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/acl/AclValidator.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/acl/AclValidator.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/acl/AclValidator.h Fri Aug  3 12:13:32 2012
@@ -71,7 +71,7 @@ class AclValidator {
 public:
 
    void validateRuleSet(std::pair<const std::string, qpid::acl::AclData::ruleSet>& rules);
-   void validateRule(qpid::acl::AclData::rule& rule);
+   void validateRule(qpid::acl::AclData::Rule& rule);
    void validateProperty(std::pair<const qpid::acl::SpecProperty, std::string>& prop);
    void validate(boost::shared_ptr<AclData> d);
    AclValidator();

Modified: qpid/branches/asyncstore/cpp/src/qpid/acl/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/acl/management-schema.xml?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/acl/management-schema.xml (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/acl/management-schema.xml Fri Aug  3 12:13:32 2012
@@ -17,13 +17,16 @@
 -->
 
   <class name="Acl">
-    <property name="brokerRef"     type="objId"   references="org.apache.qpid.broker:Broker" access="RO" index="y" parentRef="y"/>
-    <property name="policyFile"    type="lstr"    access="RO"    desc="Name of the policy file"/>
-    <property name="enforcingAcl"  type="bool"    access="RO"    desc="Currently Enforcing ACL"/>
-    <property name="transferAcl"   type="bool"    access="RO"    desc="Any transfer ACL rules in force"/>
-    <property name="lastAclLoad"   type="absTime" access="RO"    desc="Timestamp of last successful load of ACL"/>
-    <statistic name="aclDenyCount" type="count64" unit="request" desc="Number of ACL requests denied"/>
-    <statistic name="connectionDenyCount" type="count64" unit="connection" desc="Number of connections denied"/>
+    <property name="brokerRef"             type="objId"   references="org.apache.qpid.broker:Broker" access="RO" index="y" parentRef="y"/>
+    <property name="policyFile"            type="lstr"    access="RO"       desc="Name of the policy file"/>
+    <property name="enforcingAcl"          type="bool"    access="RO"       desc="Currently Enforcing ACL"/>
+    <property name="transferAcl"           type="bool"    access="RO"       desc="Any transfer ACL rules in force"/>
+    <property name="lastAclLoad"           type="absTime" access="RO"       desc="Timestamp of last successful load of ACL"/>
+    <property name="maxConnections"        type="uint16"  access="RO"       desc="Maximum allowed connections"/>
+    <property name="maxConnectionsPerIp"   type="uint16"  access="RO"       desc="Maximum allowed connections"/>
+    <property name="maxConnectionsPerUser" type="uint16"  access="RO"       desc="Maximum allowed connections"/>
+    <statistic name="aclDenyCount"         type="count64" unit="request"    desc="Number of ACL requests denied"/>
+    <statistic name="connectionDenyCount"  type="count64" unit="connection" desc="Number of connections denied"/>
 
     <method name="reloadACLFile" desc="Reload the ACL file"/>
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/amqp_0_10/Codecs.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/amqp_0_10/Codecs.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/amqp_0_10/Codecs.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/amqp_0_10/Codecs.cpp Fri Aug  3 12:13:32 2012
@@ -52,9 +52,7 @@ template <class T, class U, class F> voi
 }
 
 Variant::Map::value_type toVariantMapEntry(const FieldTable::value_type& in);
-FieldTable::value_type toFieldTableEntry(const Variant::Map::value_type& in);
 Variant toVariant(boost::shared_ptr<FieldValue> in);
-boost::shared_ptr<FieldValue> toFieldValue(const Variant& in);
 
 template <class T, class U, class F> void translate(boost::shared_ptr<FieldValue> in, U& u, F f) 
 {
@@ -70,20 +68,6 @@ template <class T, class U, class F> T* 
     return new T(t);
 }
 
-FieldTableValue* toFieldTableValue(const Variant::Map& map) 
-{
-    FieldTable ft;
-    convert(map, ft, &toFieldTableEntry);
-    return new FieldTableValue(ft);
-}
-
-ListValue* toListValue(const Variant::List& list) 
-{
-    List l;
-    convert(list, l, &toFieldValue);
-    return new ListValue(l);
-}
-
 void setEncodingFor(Variant& out, uint8_t code)
 {
     switch(code){
@@ -151,7 +135,7 @@ Variant toVariant(boost::shared_ptr<Fiel
 
       case 0xf0: break;//void, which is the default value for Variant
       case 0xf1: out.setEncoding(amqp0_10_bit); break;//treat 'bit' as void, which is the default value for Variant
-        
+
         //Variable Width types:
         //strings:
       case 0x80: 
@@ -217,89 +201,254 @@ boost::shared_ptr<FieldValue> convertStr
     }
 }
 
-boost::shared_ptr<FieldValue> toFieldValue(const Variant& in)
+Variant::Map::value_type toVariantMapEntry(const FieldTable::value_type& in)
 {
-    boost::shared_ptr<FieldValue> out;
-    switch (in.getType()) {
-      case VAR_VOID: out = boost::shared_ptr<FieldValue>(new VoidValue()); break;
-      case VAR_BOOL: out = boost::shared_ptr<FieldValue>(new BoolValue(in.asBool())); break;
-      case VAR_UINT8: out = boost::shared_ptr<FieldValue>(new Unsigned8Value(in.asUint8())); break;
-      case VAR_UINT16: out = boost::shared_ptr<FieldValue>(new Unsigned16Value(in.asUint16())); break;
-      case VAR_UINT32: out = boost::shared_ptr<FieldValue>(new Unsigned32Value(in.asUint32())); break;
-      case VAR_UINT64: out = boost::shared_ptr<FieldValue>(new Unsigned64Value(in.asUint64())); break;
-      case VAR_INT8: out = boost::shared_ptr<FieldValue>(new Integer8Value(in.asInt8())); break;
-      case VAR_INT16: out = boost::shared_ptr<FieldValue>(new Integer16Value(in.asInt16())); break;
-      case VAR_INT32: out = boost::shared_ptr<FieldValue>(new Integer32Value(in.asInt32())); break;
-      case VAR_INT64: out = boost::shared_ptr<FieldValue>(new Integer64Value(in.asInt64())); break;
-      case VAR_FLOAT: out = boost::shared_ptr<FieldValue>(new FloatValue(in.asFloat())); break;
-      case VAR_DOUBLE: out = boost::shared_ptr<FieldValue>(new DoubleValue(in.asDouble())); break;
-      case VAR_STRING: out = convertString(in.asString(), in.getEncoding()); break;
-      case VAR_UUID: out = boost::shared_ptr<FieldValue>(new UuidValue(in.asUuid().data())); break;
-      case VAR_MAP: 
-        out = boost::shared_ptr<FieldValue>(toFieldTableValue(in.asMap()));
-        break;
-      case VAR_LIST: 
-        out = boost::shared_ptr<FieldValue>(toListValue(in.asList()));
-        break;
+    return Variant::Map::value_type(in.first, toVariant(in.second));
+}
+
+struct DecodeBuffer
+{
+    Buffer buffer;
+
+    DecodeBuffer(const std::string& s) : buffer(const_cast<char*>(s.data()), s.size()) {}
+
+    template <class T> void decode(T& t) { t.decode(buffer); }
+
+};
+
+template <class T, class U, class F> void _decode(const std::string& data, U& value, F f)
+{
+    T t;
+    DecodeBuffer buffer(data);
+    buffer.decode(t);
+    convert(t, value, f);
+}
+
+uint32_t encodedSize(const Variant::Map& values);
+uint32_t encodedSize(const Variant::List& values);
+uint32_t encodedSize(const std::string& value);
+
+uint32_t encodedSize(const Variant& value)
+{
+    switch (value.getType()) {
+      case VAR_VOID:
+        return 0;
+      case VAR_BOOL:
+      case VAR_UINT8:
+      case VAR_INT8:
+        return 1;
+      case VAR_UINT16:
+      case VAR_INT16:
+        return 2;
+        break;
+      case VAR_UINT32:
+      case VAR_INT32:
+      case VAR_FLOAT:
+        return 4;
+      case VAR_UINT64:
+      case VAR_INT64:
+      case VAR_DOUBLE:
+        return 8;
+      case VAR_UUID:
+        return 16;
+      case VAR_MAP:
+        return encodedSize(value.asMap());
+      case VAR_LIST:
+        return encodedSize(value.asList());
+      case VAR_STRING:
+        return encodedSize(value.getString());
+      default:
+        throw Exception("Couldn't encode Variant: Illegal type code");
     }
-    return out;
 }
 
-Variant::Map::value_type toVariantMapEntry(const FieldTable::value_type& in)
+uint32_t encodedSize(const Variant::Map& values)
 {
-    return Variant::Map::value_type(in.first, toVariant(in.second));
+    uint32_t size = 4/*size field*/ + 4/*count field*/;
+    for(Variant::Map::const_iterator i = values.begin(); i != values.end(); ++i) {
+        size += 1/*size of key*/ + (i->first).size() + 1/*typecode*/ + encodedSize(i->second);
+    }
+    return size;
 }
 
-FieldTable::value_type toFieldTableEntry(const Variant::Map::value_type& in)
+uint32_t encodedSize(const Variant::Map& values, const std::string& efield, const Variant& evalue)
 {
-    return FieldTable::value_type(in.first, toFieldValue(in.second));
+    uint32_t size = 4/*size field*/ + 4/*count field*/;
+    for(Variant::Map::const_iterator i = values.begin(); i != values.end(); ++i) {
+        size += 1/*size of key*/ + (i->first).size() + 1/*typecode*/ + encodedSize(i->second);
+    }
+    size += 1/*size of key*/ + efield.size() + 1/*typecode*/ + encodedSize(evalue);
+    return size;
 }
 
-struct EncodeBuffer
+uint32_t encodedSize(const Variant::List& values)
 {
-    char* data;
-    Buffer buffer;
+    uint32_t size = 4/*size field*/ + 4/*count field*/;
+    for(Variant::List::const_iterator i = values.begin(); i != values.end(); ++i) {
+        size += 1/*typecode*/ + encodedSize(*i);
+    }
+    return size;
+}
+
+uint32_t encodedSize(const std::string& value)
+{
+    uint32_t size = value.size();
+    if (size > std::numeric_limits<uint16_t>::max()) {
+        return size + 4; /*Long size*/
+    } else {
+        return size + 2; /*Short size*/
+    }
+}
 
-    EncodeBuffer(size_t size) : data(new char[size]), buffer(data, size) {}
-    ~EncodeBuffer() { delete[] data; }
+void encode(const std::string& value, const std::string& encoding, qpid::framing::Buffer& buffer)
+{
+    uint32_t size = value.size();
+    if (size > std::numeric_limits<uint16_t>::max()) {
+        if (encoding == utf8 || encoding == utf16 || encoding == iso885915) {
+            throw Exception(QPID_MSG("Could not encode " << encoding << " character string - too long (" << size << " bytes)"));
+        } else {
+            buffer.putOctet(0xa0);
+            buffer.putLong(size);
+            buffer.putRawData(value);
+        }
+    } else {
+        if (encoding == utf8) {
+            buffer.putOctet(0x95);
+        } else if (encoding == utf16) {
+            buffer.putOctet(0x96);
+        } else if (encoding == iso885915) {
+            buffer.putOctet(0x94);
+        } else {
+            buffer.putOctet(0x90);
+        }
+        buffer.putShort(size);
+        buffer.putRawData(value);
+    }
+}
 
-    template <class T> void encode(T& t) { t.encode(buffer); }
+void encode(const Variant::Map& map, uint32_t len, qpid::framing::Buffer& buffer);
+void encode(const Variant::List& list, uint32_t len, qpid::framing::Buffer& buffer);
 
-    void getData(std::string& s) { 
-        s.assign(data, buffer.getSize()); 
+void encode(const Variant& value, qpid::framing::Buffer& buffer)
+{
+    switch (value.getType()) {
+      case VAR_VOID:
+        buffer.putOctet(0xf0);
+        break;
+      case VAR_BOOL:
+        buffer.putOctet(0x08);
+        buffer.putOctet(value.asBool());
+        break;
+      case VAR_INT8:
+        buffer.putOctet(0x01);
+        buffer.putInt8(value.asInt8());
+        break;
+      case VAR_UINT8:
+        buffer.putOctet(0x02);
+        buffer.putOctet(value.asUint8());
+        break;
+      case VAR_INT16:
+        buffer.putOctet(0x11);
+        buffer.putInt16(value.asInt16());
+        break;
+      case VAR_UINT16:
+        buffer.putOctet(0x12);
+        buffer.putShort(value.asUint16());
+        break;
+      case VAR_INT32:
+        buffer.putOctet(0x21);
+        buffer.putInt32(value.asInt32());
+        break;
+      case VAR_UINT32:
+        buffer.putOctet(0x22);
+        buffer.putLong(value.asUint32());
+        break;
+      case VAR_FLOAT:
+        buffer.putOctet(0x23);
+        buffer.putFloat(value.asFloat());
+        break;
+      case VAR_INT64:
+        buffer.putOctet(0x31);
+        buffer.putInt64(value.asInt64());
+        break;
+      case VAR_UINT64:
+        buffer.putOctet(0x32);
+        buffer.putLongLong(value.asUint64());
+        break;
+      case VAR_DOUBLE:
+        buffer.putOctet(0x33);
+        buffer.putDouble(value.asDouble());
+        break;
+      case VAR_UUID:
+        buffer.putOctet(0x48);
+        buffer.putBin128(value.asUuid().data());
+        break;
+      case VAR_MAP:
+        buffer.putOctet(0xa8);
+        encode(value.asMap(), encodedSize(value.asMap()), buffer);
+        break;
+      case VAR_LIST:
+        buffer.putOctet(0xa9);
+        encode(value.asList(), encodedSize(value.asList()), buffer);
+        break;
+      case VAR_STRING:
+        encode(value.getString(), value.getEncoding(), buffer);
+        break;
     }
-};
+}
 
-struct DecodeBuffer
+void encode(const Variant::Map& map, uint32_t len, qpid::framing::Buffer& buffer)
 {
-    Buffer buffer;
+    uint32_t s = buffer.getPosition();
+    buffer.putLong(len - 4);//exclusive of the size field itself
+    buffer.putLong(map.size());
+    for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) {
+        buffer.putShortString(i->first);
+    	encode(i->second, buffer);
+    }
+    (void) s; assert(s + len == buffer.getPosition());
+}
 
-    DecodeBuffer(const std::string& s) : buffer(const_cast<char*>(s.data()), s.size()) {}
+void encode(const Variant::Map& map, const std::string& efield, const Variant& evalue, uint32_t len, qpid::framing::Buffer& buffer)
+{
+    uint32_t s = buffer.getPosition();
+    buffer.putLong(len - 4);//exclusive of the size field itself
+    buffer.putLong(map.size() + 1 /* The extra field */ );
+    for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) {
+        buffer.putShortString(i->first);
+        encode(i->second, buffer);
+    }
+    buffer.putShortString(efield);
+    encode(evalue, buffer);
 
-    template <class T> void decode(T& t) { t.decode(buffer); }
-    
-};
+    (void) s; assert(s + len == buffer.getPosition());
+}
 
-template <class T, class U, class F> void _encode(const U& value, std::string& data, F f)
+void encode(const Variant::List& list, uint32_t len, qpid::framing::Buffer& buffer)
 {
-    T t;
-    convert(value, t, f);
-    EncodeBuffer buffer(t.encodedSize());
-    buffer.encode(t);
-    buffer.getData(data);
+    uint32_t s = buffer.getPosition();
+    buffer.putLong(len - 4);//exclusive of the size field itself
+    buffer.putLong(list.size());
+    for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
+    	encode(*i, buffer);
+    }
+    (void) s; assert(s + len == buffer.getPosition());
 }
 
-template <class T, class U, class F> void _decode(const std::string& data, U& value, F f)
+void decode(qpid::framing::Buffer&, Variant::Map&)
 {
-    T t;
-    DecodeBuffer buffer(data);
-    buffer.decode(t);
-    convert(t, value, f);
 }
 
+
 void MapCodec::encode(const Variant::Map& value, std::string& data)
 {
-    _encode<FieldTable>(value, data, &toFieldTableEntry);
+    uint32_t len = qpid::amqp_0_10::encodedSize(value);
+    std::vector<char> space(len);
+    qpid::framing::Buffer buff(&space[0], len);
+
+    qpid::amqp_0_10::encode(value, len, buff);
+    assert( len == buff.getPosition() );
+    data.assign(&space[0], len);
 }
 
 void MapCodec::decode(const std::string& data, Variant::Map& value)
@@ -309,14 +458,18 @@ void MapCodec::decode(const std::string&
 
 size_t MapCodec::encodedSize(const Variant::Map& value)
 {
-    std::string encoded;
-    encode(value, encoded);
-    return encoded.size();
+    return qpid::amqp_0_10::encodedSize(value);
 }
 
 void ListCodec::encode(const Variant::List& value, std::string& data)
 {
-    _encode<List>(value, data, &toFieldValue);
+    uint32_t len = qpid::amqp_0_10::encodedSize(value);
+    std::vector<char> space(len);
+    qpid::framing::Buffer buff(&space[0], len);
+
+    qpid::amqp_0_10::encode(value, len, buff);
+    assert( len == buff.getPosition() );
+    data.assign(&space[0], len);
 }
 
 void ListCodec::decode(const std::string& data, Variant::List& value)
@@ -326,14 +479,47 @@ void ListCodec::decode(const std::string
 
 size_t ListCodec::encodedSize(const Variant::List& value)
 {
-    std::string encoded;
-    encode(value, encoded);
-    return encoded.size();
+    return qpid::amqp_0_10::encodedSize(value);
 }
 
 void translate(const Variant::Map& from, FieldTable& to)
 {
-    convert(from, to, &toFieldTableEntry);
+    // Create buffer of correct size to encode Variant::Map
+    uint32_t len = encodedSize(from);
+    std::vector<char> space(len);
+    qpid::framing::Buffer buff(&space[0], len);
+
+    // Encode Variant::Map into buffer directly -
+    // We pass the already calculated length in to avoid
+    // recalculating it.
+    encode(from, len, buff);
+
+    // Give buffer to FieldTable
+    // Could speed this up a bit by avoiding copying
+    // the buffer we just created into the FieldTable
+    assert( len == buff.getPosition() );
+    buff.reset();
+    to.decode(buff);
+}
+
+void translate(const Variant::Map& from, const std::string& efield, const Variant& evalue, FieldTable& to)
+{
+    // Create buffer of correct size to encode Variant::Map
+    uint32_t len = encodedSize(from, efield, evalue);
+    std::vector<char> space(len);
+    qpid::framing::Buffer buff(&space[0], len);
+
+    // Encode Variant::Map into buffer directly -
+    // We pass the already calculated length in to avoid
+    // recalculating it.
+    encode(from, efield, evalue, len, buff);
+
+    // Give buffer to FieldTable
+    // Could speed this up a bit by avoiding copying
+    // the buffer we just created into the FieldTable
+    assert( len == buff.getPosition() );
+    buff.reset();
+    to.decode(buff);
 }
 
 void translate(const FieldTable& from, Variant::Map& to)

Propchange: qpid/branches/asyncstore/cpp/src/qpid/broker/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src/qpid/broker:r1333988-1368650

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/AclModule.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/AclModule.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/AclModule.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/AclModule.h Fri Aug  3 12:13:32 2012
@@ -113,6 +113,7 @@ namespace acl {
 
 namespace broker {
 
+    class Connection;
 
     class AclModule
     {
@@ -139,6 +140,11 @@ namespace broker {
 
         // Add specialized authorise() methods as required.
 
+        /** Approve connection by counting connections total, per-IP, and
+         *  per-user.
+         */
+        virtual bool approveConnection (const Connection& connection)=0;
+
         virtual ~AclModule() {};
     };
 } // namespace broker

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Bridge.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Bridge.cpp Fri Aug  3 12:13:32 2012
@@ -57,22 +57,25 @@ void Bridge::PushHandler::handle(framing
     conn->received(frame);
 }
 
-Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
-               const _qmf::ArgsLinkBridge& _args,
-               InitializeCallback init) :
-    link(_link), id(_id), args(_args), mgmtObject(0),
-    listener(l), name(Uuid(true).str()), queueName("qpid.bridge_queue_"), persistenceId(0),
-    initialize(init), detached(false)
-{
-    std::stringstream title;
-    title << id << "_" << name;
-    queueName += title.str();
+Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id,
+               CancellationListener l, const _qmf::ArgsLinkBridge& _args,
+               InitializeCallback init, const std::string& _queueName, const string& ae) :
+    link(_link), channel(_id), args(_args), mgmtObject(0),
+    listener(l), name(_name),
+    queueName(_queueName.empty() ? "qpid.bridge_queue_" + name + "_" + link->getBroker()->getFederationTag()
+              : _queueName),
+    altEx(ae), persistenceId(0),
+    connState(0), conn(0), initialize(init), detached(false),
+    useExistingQueue(!_queueName.empty()),
+    sessionName("qpid.bridge_session_" + name + "_" + link->getBroker()->getFederationTag())
+{
     ManagementAgent* agent = link->getBroker()->getManagementAgent();
     if (agent != 0) {
         mgmtObject = new _qmf::Bridge
-            (agent, this, link, id, args.i_durable, args.i_src, args.i_dest,
+            (agent, this, link, name, args.i_durable, args.i_src, args.i_dest,
              args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
              args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync);
+        mgmtObject->set_channelId(channel);
         agent->addObject(mgmtObject);
     }
     QPID_LOG(debug, "Bridge " << name << " created from " << args.i_src << " to " << args.i_dest);
@@ -90,23 +93,22 @@ void Bridge::create(Connection& c)
     conn = &c;
     FieldTable options;
     if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync);
-    SessionHandler& sessionHandler = c.getChannel(id);
-    sessionHandler.setDetachedCallback(
-        boost::bind(&Bridge::sessionDetached, shared_from_this()));
+    SessionHandler& sessionHandler = c.getChannel(channel);
+    sessionHandler.setErrorListener(shared_from_this());
     if (args.i_srcIsLocal) {
         if (args.i_dynamic)
             throw Exception("Dynamic routing not supported for push routes");
         // Point the bridging commands at the local connection handler
         pushHandler.reset(new PushHandler(&c));
-        channelHandler.reset(new framing::ChannelHandler(id, pushHandler.get()));
+        channelHandler.reset(new framing::ChannelHandler(channel, pushHandler.get()));
 
         session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
         peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
 
-        session->attach(name, false);
+        session->attach(sessionName, false);
         session->commandPoint(0,0);
     } else {
-        sessionHandler.attachAs(name);
+        sessionHandler.attachAs(sessionName);
         // Point the bridging commands at the remote peer broker
         peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out));
     }
@@ -115,7 +117,7 @@ void Bridge::create(Connection& c)
     if (initialize) initialize(*this, sessionHandler);
     else if (args.i_srcIsQueue) {
         peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options);
-        peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+        peer->getMessage().flow(args.i_dest, 0, args.i_sync ? 2 * args.i_sync : 0xFFFFFFFF);
         peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
         QPID_LOG(debug, "Activated bridge " << name << " for route from queue " << args.i_src << " to " << args.i_dest);
     } else {
@@ -138,12 +140,13 @@ void Bridge::create(Connection& c)
         }
 
         bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues?
-        bool autoDelete = !durable;//auto delete transient queues?
-        peer->getQueue().declare(queueName, "", false, durable, true, autoDelete, queueSettings);
+        bool exclusive = !useExistingQueue;  // only exclusive if the queue is owned by the bridge
+        bool autoDelete = exclusive && !durable;//auto delete transient queues?
+        peer->getQueue().declare(queueName, altEx, false, durable, exclusive, autoDelete, queueSettings);
         if (!args.i_dynamic)
             peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable());
-        peer->getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
-        peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+        peer->getMessage().subscribe(queueName, args.i_dest, (useExistingQueue && args.i_sync) ? 0 : 1, 0, false, "", 0, options);
+        peer->getMessage().flow(args.i_dest, 0, (useExistingQueue && args.i_sync) ? 2 * args.i_sync : 0xFFFFFFFF);
         peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
 
         if (args.i_dynamic) {
@@ -163,11 +166,12 @@ void Bridge::cancel(Connection&)
 {
     if (resetProxy()) {
         peer->getMessage().cancel(args.i_dest);
-        peer->getSession().detach(name);
+        peer->getSession().detach(sessionName);
     }
     QPID_LOG(debug, "Cancelled bridge " << name);
 }
 
+/** Notify the bridge that the connection has closed */
 void Bridge::closed()
 {
     if (args.i_dynamic) {
@@ -177,9 +181,10 @@ void Bridge::closed()
     QPID_LOG(debug, "Closed bridge " << name);
 }
 
-void Bridge::destroy()
+/** Shut down the bridge */
+void Bridge::close()
 {
-    listener(this);
+    listener(this); // ask the LinkRegistry to destroy us
 }
 
 void Bridge::setPersistenceId(uint64_t pId) const
@@ -187,8 +192,21 @@ void Bridge::setPersistenceId(uint64_t p
     persistenceId = pId;
 }
 
+
+const std::string Bridge::ENCODED_IDENTIFIER("bridge.v2");
+const std::string Bridge::ENCODED_IDENTIFIER_V1("bridge");
+
+bool Bridge::isEncodedBridge(const std::string& key)
+{
+    return key == ENCODED_IDENTIFIER || key == ENCODED_IDENTIFIER_V1;
+}
+
+
 Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
 {
+    string kind;
+    buffer.getShortString(kind);
+
     string   host;
     uint16_t port;
     string   src;
@@ -196,9 +214,33 @@ Bridge::shared_ptr Bridge::decode(LinkRe
     string   key;
     string   id;
     string   excludes;
+    string   name;
+
+    Link::shared_ptr link;
+    if (kind == ENCODED_IDENTIFIER_V1) {
+        /** previous versions identified the bridge by host:port, not by name, and
+         * transport wasn't provided.  Try to find a link using those paramters.
+         */
+        buffer.getShortString(host);
+        port = buffer.getShort();
+
+        link = links.getLink(host, port);
+        if (!link) {
+            QPID_LOG(error, "Bridge::decode() failed: cannot find Link for host=" << host << ", port=" << port);
+            return Bridge::shared_ptr();
+        }
+    } else {
+        string linkName;
+
+        buffer.getShortString(name);
+        buffer.getShortString(linkName);
+        link = links.getLink(linkName);
+        if (!link) {
+            QPID_LOG(error, "Bridge::decode() failed: cannot find Link named='" << linkName << "'");
+            return Bridge::shared_ptr();
+        }
+    }
 
-    buffer.getShortString(host);
-    port = buffer.getShort();
     bool durable(buffer.getOctet());
     buffer.getShortString(src);
     buffer.getShortString(dest);
@@ -210,15 +252,21 @@ Bridge::shared_ptr Bridge::decode(LinkRe
     bool dynamic(buffer.getOctet());
     uint16_t sync = buffer.getShort();
 
-    return links.declare(host, port, durable, src, dest, key,
-                         is_queue, is_local, id, excludes, dynamic, sync).first;
+    if (kind == ENCODED_IDENTIFIER_V1) {
+        /** previous versions did not provide a name for the bridge, so create one
+         */
+        name = createName(link->getName(), src, dest, key);
+    }
+
+    return links.declare(name, *link, durable, src, dest, key, is_queue,
+                         is_local, id, excludes, dynamic, sync).first;
 }
 
 void Bridge::encode(Buffer& buffer) const
 {
-    buffer.putShortString(string("bridge"));
-    buffer.putShortString(link->getHost());
-    buffer.putShort(link->getPort());
+    buffer.putShortString(ENCODED_IDENTIFIER);
+    buffer.putShortString(name);
+    buffer.putShortString(link->getName());
     buffer.putOctet(args.i_durable ? 1 : 0);
     buffer.putShortString(args.i_src);
     buffer.putShortString(args.i_dest);
@@ -233,9 +281,9 @@ void Bridge::encode(Buffer& buffer) cons
 
 uint32_t Bridge::encodedSize() const
 {
-    return link->getHost().size() + 1 // short-string (host)
-        + 7                // short-string ("bridge")
-        + 2                // port
+    return ENCODED_IDENTIFIER.size() + 1  // +1 byte length
+        + name.size() + 1
+        + link->getName().size() + 1
         + 1                // durable
         + args.i_src.size()  + 1
         + args.i_dest.size() + 1
@@ -259,7 +307,8 @@ management::Manageable::status_t Bridge:
 {
     if (methodId == _qmf::Bridge::METHOD_CLOSE) {
         //notify that we are closed
-        destroy();
+        QPID_LOG(debug, "Bridge::close() method called on bridge '" << name << "'");
+        close();
         return management::Manageable::STATUS_OK;
     } else {
         return management::Manageable::STATUS_UNKNOWN_METHOD;
@@ -306,7 +355,7 @@ void Bridge::sendReorigin()
 }
 bool Bridge::resetProxy()
 {
-    SessionHandler& sessionHandler = conn->getChannel(id);
+    SessionHandler& sessionHandler = conn->getChannel(channel);
     if (!sessionHandler.getSession()) peer.reset();
     else peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out));
     return peer.get();
@@ -318,7 +367,7 @@ void Bridge::ioThreadPropagateBinding(co
         peer->getExchange().bind(queue, exchange, key, args);
     } else {
         QPID_LOG(error, "Cannot propagate binding for dynamic bridge as session has been detached, deleting dynamic bridge");
-        destroy();
+        close();
     }
 }
 
@@ -333,8 +382,38 @@ const string& Bridge::getLocalTag() cons
     return link->getBroker()->getFederationTag();
 }
 
-void Bridge::sessionDetached() {
+// SessionHandler::ErrorListener methods.
+void Bridge::connectionException(
+    framing::connection::CloseCode code, const std::string& msg)
+{
+    if (errorListener) errorListener->connectionException(code, msg);
+}
+
+void Bridge::channelException(
+    framing::session::DetachCode code, const std::string& msg)
+{
+    if (errorListener) errorListener->channelException(code, msg);
+}
+
+void Bridge::executionException(
+    framing::execution::ErrorCode code, const std::string& msg)
+{
+    if (errorListener) errorListener->executionException(code, msg);
+}
+
+void Bridge::detach() {
     detached = true;
+    if (errorListener) errorListener->detach();
+}
+
+std::string Bridge::createName(const std::string& linkName,
+                               const std::string& src,
+                               const std::string& dest,
+                               const std::string& key)
+{
+    std::stringstream keystream;
+    keystream << linkName << "!" << src << "!" << dest << "!" << key;
+    return keystream.str();
 }
 
 }}

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Bridge.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Bridge.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Bridge.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Bridge.h Fri Aug  3 12:13:32 2012
@@ -29,6 +29,7 @@
 #include "qpid/framing/FieldTable.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/broker/Exchange.h"
+#include "qpid/broker/SessionHandler.h"
 #include "qmf/org/apache/qpid/broker/ArgsLinkBridge.h"
 #include "qmf/org/apache/qpid/broker/Bridge.h"
 
@@ -43,29 +44,31 @@ class Connection;
 class ConnectionState;
 class Link;
 class LinkRegistry;
-class SessionHandler;
 
 class Bridge : public PersistableConfig,
                public management::Manageable,
                public Exchange::DynamicBridge,
+               public SessionHandler::ErrorListener,
                public boost::enable_shared_from_this<Bridge>
 {
-public:
+  public:
     typedef boost::shared_ptr<Bridge> shared_ptr;
     typedef boost::function<void(Bridge*)> CancellationListener;
     typedef boost::function<void(Bridge&, SessionHandler&)> InitializeCallback;
 
-    Bridge(Link* link, framing::ChannelId id, CancellationListener l,
+    Bridge(const std::string& name, Link* link, framing::ChannelId id, CancellationListener l,
            const qmf::org::apache::qpid::broker::ArgsLinkBridge& args,
-           InitializeCallback init
+           InitializeCallback init, const std::string& queueName="",
+           const std::string& altExchange=""
     );
     ~Bridge();
 
-    void create(Connection& c);
-    void cancel(Connection& c);
-    void closed();
-    void destroy();
+    QPID_BROKER_EXTERN void close();
     bool isDurable() { return args.i_durable; }
+    Link *getLink() const { return link; }
+    const std::string getSrc() const { return args.i_src; }
+    const std::string getDest() const { return args.i_dest; }
+    const std::string getKey() const { return args.i_key; }
 
     bool isDetached() const { return detached; }
 
@@ -80,7 +83,11 @@ public:
     uint32_t encodedSize() const;
     void     encode(framing::Buffer& buffer) const;
     const std::string& getName() const { return name; }
+
+    static const std::string ENCODED_IDENTIFIER;
+    static const std::string ENCODED_IDENTIFIER_V1;
     static Bridge::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
+    static bool isEncodedBridge(const std::string& key);
 
     // Exchange::DynamicBridge methods
     void propagateBinding(const std::string& key, const std::string& tagList, const std::string& op, const std::string& origin, qpid::framing::FieldTable* extra_args=0);
@@ -93,10 +100,20 @@ public:
     std::string getQueueName() const { return queueName; }
     const qmf::org::apache::qpid::broker::ArgsLinkBridge& getArgs() { return args; }
 
-private:
-    // Callback when the bridge's session is detached.
-    void sessionDetached();
+    /** create a name for a bridge (if none supplied by user config) */
+    static std::string createName(const std::string& linkName,
+                                  const std::string& src,
+                                  const std::string& dest,
+                                  const std::string& key);
+
+    // SessionHandler::ErrorListener methods.
+    void connectionException(framing::connection::CloseCode code, const std::string& msg);
+    void channelException(framing::session::DetachCode, const std::string& msg);
+    void executionException(framing::execution::ErrorCode, const std::string& msg);
+    void detach();
 
+    void setErrorListener(boost::shared_ptr<ErrorListener> e) { errorListener = e; }
+  private:
     struct PushHandler : framing::FrameHandler {
         PushHandler(Connection* c) { conn = c; }
         void handle(framing::AMQFrame& frame);
@@ -108,19 +125,30 @@ private:
     std::auto_ptr<framing::AMQP_ServerProxy::Session> session;
     std::auto_ptr<framing::AMQP_ServerProxy>          peer;
 
-    Link* link;
-    framing::ChannelId          id;
+    Link* const link;
+    const framing::ChannelId          channel;
     qmf::org::apache::qpid::broker::ArgsLinkBridge args;
     qmf::org::apache::qpid::broker::Bridge*        mgmtObject;
     CancellationListener        listener;
     std::string name;
     std::string queueName;
+    std::string altEx;
     mutable uint64_t  persistenceId;
     ConnectionState* connState;
     Connection* conn;
     InitializeCallback initialize;
     bool detached;              // Set when session is detached.
     bool resetProxy();
+
+    // connection Management (called by owning Link)
+    void create(Connection& c);
+    void cancel(Connection& c);
+    void closed();
+    friend class Link; // to call create, cancel, closed()
+    boost::shared_ptr<ErrorListener> errorListener;
+
+    const bool useExistingQueue;
+    const std::string sessionName;
 };
 
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org