You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/03 14:14:12 UTC
svn commit: r1368910 [3/27] - in /qpid/branches/asyncstore: ./ bin/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ruby/ cpp/bindings/qpid/python/
cpp/bindings/qpid/ruby/ cpp/bindings/qpid/ruby/features/
cpp/bindings/qpid/ruby/features/step_definitions/ cpp/...
Modified: qpid/branches/asyncstore/cpp/src/qpid/RefCounted.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/RefCounted.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/RefCounted.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/RefCounted.h Fri Aug 3 12:13:32 2012
@@ -49,15 +49,11 @@ protected:
};
-} // namespace qpid
-
// intrusive_ptr support.
-namespace boost {
-template <typename T>
-inline void intrusive_ptr_add_ref(const T* p) { p->qpid::RefCounted::addRef(); }
-template <typename T>
-inline void intrusive_ptr_release(const T* p) { p->qpid::RefCounted::release(); }
-}
+inline void intrusive_ptr_add_ref(const RefCounted* p) { p->addRef(); }
+inline void intrusive_ptr_release(const RefCounted* p) { p->release(); }
+
+} // namespace qpid
#endif /*!QPID_REFCOUNTED_H*/
Propchange: qpid/branches/asyncstore/cpp/src/qpid/acl/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/qpid/acl:r1333988-1368650
Modified: qpid/branches/asyncstore/cpp/src/qpid/acl/Acl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/acl/Acl.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/acl/Acl.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/acl/Acl.cpp Fri Aug 3 12:13:32 2012
@@ -51,7 +51,7 @@ using qpid::management::Args;
namespace _qmf = qmf::org::apache::qpid::acl;
Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(false), mgmtObject(0),
- connectionCounter(new ConnectionCounter(*this, aclValues.aclMaxConnectPerUser, aclValues.aclMaxConnectPerIp))
+ connectionCounter(new ConnectionCounter(*this, aclValues.aclMaxConnectPerUser, aclValues.aclMaxConnectPerIp, aclValues.aclMaxConnectTotal))
{
agent = broker->getManagementAgent();
@@ -60,11 +60,14 @@ Acl::Acl (AclValues& av, Broker& b): acl
_qmf::Package packageInit(agent);
mgmtObject = new _qmf::Acl (agent, this, broker);
agent->addObject (mgmtObject);
+ mgmtObject->set_maxConnections(aclValues.aclMaxConnectTotal);
+ mgmtObject->set_maxConnectionsPerIp(aclValues.aclMaxConnectPerIp);
+ mgmtObject->set_maxConnectionsPerUser(aclValues.aclMaxConnectPerUser);
}
std::string errorString;
if (!readAclFile(errorString)){
- throw Exception("Could not read ACL file " + errorString);
if (mgmtObject!=0) mgmtObject->set_enforcingAcl(0);
+ throw Exception("Could not read ACL file " + errorString);
}
broker->getConnectionObservers().add(connectionCounter);
QPID_LOG(info, "ACL Plugin loaded");
@@ -121,6 +124,11 @@ bool Acl::authorise(
}
+bool Acl::approveConnection(const qpid::broker::Connection& conn)
+{
+ return connectionCounter->approveConnection(conn);
+}
+
bool Acl::result(
const AclResult& aclreslt,
const std::string& id,
Modified: qpid/branches/asyncstore/cpp/src/qpid/acl/Acl.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/acl/Acl.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/acl/Acl.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/acl/Acl.h Fri Aug 3 12:13:32 2012
@@ -38,6 +38,7 @@
namespace qpid {
namespace broker {
class Broker;
+class Connection;
}
namespace acl {
@@ -45,8 +46,9 @@ class ConnectionCounter;
struct AclValues {
std::string aclFile;
- uint32_t aclMaxConnectPerUser;
- uint32_t aclMaxConnectPerIp;
+ uint16_t aclMaxConnectPerUser;
+ uint16_t aclMaxConnectPerIp;
+ uint16_t aclMaxConnectTotal;
};
@@ -66,6 +68,9 @@ private:
public:
Acl (AclValues& av, broker::Broker& b);
+ /** reportConnectLimit
+ * issue management counts and alerts for denied connections
+ */
void reportConnectLimit(const std::string user, const std::string addr);
inline virtual bool doTransferAcl() {
@@ -87,6 +92,8 @@ public:
const std::string& ExchangeName,
const std::string& RoutingKey);
+ virtual bool approveConnection(const broker::Connection& connection);
+
virtual ~Acl();
private:
bool result(
Modified: qpid/branches/asyncstore/cpp/src/qpid/acl/AclConnectionCounter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/acl/AclConnectionCounter.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/acl/AclConnectionCounter.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/acl/AclConnectionCounter.cpp Fri Aug 3 12:13:32 2012
@@ -34,37 +34,82 @@ namespace acl {
//
// This module instantiates a broker::ConnectionObserver and limits client
-// connections by counting connections per user name and per client IP address.
+// connections by counting connections per user name, per client IP address
+// and per total connection count.
//
//
//
//
-ConnectionCounter::ConnectionCounter(Acl& a, uint32_t nl, uint32_t hl) :
- acl(a), nameLimit(nl), hostLimit(hl) {}
+ConnectionCounter::ConnectionCounter(Acl& a, uint16_t nl, uint16_t hl, uint16_t tl) :
+ acl(a), nameLimit(nl), hostLimit(hl), totalLimit(tl), totalCurrentConnections(0) {}
ConnectionCounter::~ConnectionCounter() {}
//
-// limitCheckLH
+// limitApproveLH
+//
+// Connection creation approver. Return true only if user is under limit.
+// Called with lock held.
+//
+bool ConnectionCounter::limitApproveLH(
+ connectCountsMap_t& theMap,
+ const std::string& theName,
+ uint16_t theLimit,
+ bool emitLog) {
+
+ bool result(true);
+ if (theLimit > 0) {
+ uint16_t count;
+ connectCountsMap_t::iterator eRef = theMap.find(theName);
+ if (eRef != theMap.end()) {
+ count = (uint16_t)(*eRef).second;
+ result = count <= theLimit;
+ } else {
+ // Not found
+ count = 0;
+ }
+ if (emitLog) {
+ QPID_LOG(trace, "ACL ConnectionApprover IP=" << theName
+ << " limit=" << theLimit
+ << " curValue=" << count
+ << " result=" << (result ? "allow" : "deny"));
+ }
+ }
+ return result;
+}
+
+
+//
+// countConnectionLH
//
// Increment the name's count in map and return a comparison against the limit.
// called with dataLock already taken
//
-bool ConnectionCounter::limitCheckLH(
- connectCountsMap_t& theMap, const std::string& theName, uint32_t theLimit) {
+bool ConnectionCounter::countConnectionLH(
+ connectCountsMap_t& theMap,
+ const std::string& theName,
+ uint16_t theLimit,
+ bool emitLog) {
bool result(true);
+ uint16_t count(0);
if (theLimit > 0) {
connectCountsMap_t::iterator eRef = theMap.find(theName);
if (eRef != theMap.end()) {
- uint32_t count = (uint32_t)(*eRef).second + 1;
+ count = (uint16_t)(*eRef).second + 1;
(*eRef).second = count;
result = count <= theLimit;
} else {
- theMap[theName] = 1;
+ theMap[theName] = count = 1;
+ }
+ if (emitLog) {
+ QPID_LOG(trace, "ACL ConnectionApprover user=" << theName
+ << " limit=" << theLimit
+ << " curValue=" << count
+ << " result=" << (result ? "allow" : "deny"));
}
}
return result;
@@ -78,12 +123,12 @@ bool ConnectionCounter::limitCheckLH(
// called with dataLock already taken
//
void ConnectionCounter::releaseLH(
- connectCountsMap_t& theMap, const std::string& theName, uint32_t theLimit) {
+ connectCountsMap_t& theMap, const std::string& theName, uint16_t theLimit) {
if (theLimit > 0) {
connectCountsMap_t::iterator eRef = theMap.find(theName);
if (eRef != theMap.end()) {
- uint32_t count = (uint32_t) (*eRef).second;
+ uint16_t count = (uint16_t) (*eRef).second;
assert (count > 0);
if (1 == count) {
theMap.erase (eRef);
@@ -103,52 +148,20 @@ void ConnectionCounter::releaseLH(
// connection - called during Connection's constructor
//
void ConnectionCounter::connection(broker::Connection& connection) {
- QPID_LOG(trace, "ACL ConnectionCounter connection IP:" << connection.getMgmtId()
- << ", userId:" << connection.getUserId());
+ QPID_LOG(trace, "ACL ConnectionCounter new connection: " << connection.getMgmtId());
- Mutex::ScopedLock locker(dataLock);
-
- connectProgressMap[connection.getMgmtId()] = C_CREATED;
-}
-
-
-//
-// opened - called when first AMQP frame is received over Connection
-//
-void ConnectionCounter::opened(broker::Connection& connection) {
- QPID_LOG(trace, "ACL ConnectionCounter Opened IP:" << connection.getMgmtId()
- << ", userId:" << connection.getUserId());
+ const std::string& hostName(getClientHost(connection.getMgmtId()));
Mutex::ScopedLock locker(dataLock);
- const std::string& userName( connection.getUserId());
- const std::string& hostName(getClientHost(connection.getMgmtId()));
-
- // Bump state from CREATED to OPENED
- (void) limitCheckLH(connectProgressMap, connection.getMgmtId(), C_OPENED);
+ // Total connections goes up
+ totalCurrentConnections += 1;
- bool nameOk = limitCheckLH(connectByNameMap, userName, nameLimit);
- bool hostOk = limitCheckLH(connectByHostMap, hostName, hostLimit);
+ // Record the fact that this connection exists
+ connectProgressMap[connection.getMgmtId()] = C_CREATED;
- if (!nameOk) {
- // User has too many
- acl.reportConnectLimit(userName, hostName);
- QPID_LOG(notice, "ACL ConnectionCounter User '" << userName
- << "' exceeded maximum allowed connections");
- throw Exception(
- QPID_MSG("User '" << userName
- << "' exceeded maximum allowed connections"));
- }
-
- if (!hostOk) {
- // Host has too many
- acl.reportConnectLimit(userName, hostName);
- QPID_LOG(notice, "ACL ConnectionCounter Client host '" << hostName
- << "' exceeded maximum allowed connections");
- throw Exception(
- QPID_MSG("Client host '" << hostName
- << "' exceeded maximum allowed connections"));
- }
+ // Count the connection from this host.
+ (void) countConnectionLH(connectByHostMap, hostName, hostLimit, false);
}
@@ -156,7 +169,7 @@ void ConnectionCounter::opened(broker::C
// closed - called during Connection's destructor
//
void ConnectionCounter::closed(broker::Connection& connection) {
- QPID_LOG(trace, "ACL ConnectionCounter Closed IP:" << connection.getMgmtId()
+ QPID_LOG(trace, "ACL ConnectionCounter closed: " << connection.getMgmtId()
<< ", userId:" << connection.getUserId());
Mutex::ScopedLock locker(dataLock);
@@ -165,32 +178,129 @@ void ConnectionCounter::closed(broker::C
if (eRef != connectProgressMap.end()) {
if ((*eRef).second == C_OPENED){
// Normal case: connection was created and opened.
- // Decrement in-use counts
+ // Decrement user in-use counts
releaseLH(connectByNameMap,
connection.getUserId(),
nameLimit);
-
- releaseLH(connectByHostMap,
- getClientHost(connection.getMgmtId()),
- hostLimit);
} else {
// Connection was created but not opened.
- // Don't decrement any connection counts.
+ // Don't decrement user count.
}
+
+ // Decrement host in-use count.
+ releaseLH(connectByHostMap,
+ getClientHost(connection.getMgmtId()),
+ hostLimit);
+
+ // destroy connection progress indicator
connectProgressMap.erase(eRef);
} else {
// connection not found in progress map
- QPID_LOG(notice, "ACL ConnectionCounter info for '" << connection.getMgmtId()
+ QPID_LOG(notice, "ACL ConnectionCounter closed info for '" << connection.getMgmtId()
<< "' not found in connection state pool");
}
+
+ // total connections
+ totalCurrentConnections -= 1;
}
//
+// approveConnection
+// check total connections, connections from IP, connections by user and
+// disallow if over any limit
+//
+bool ConnectionCounter::approveConnection(const broker::Connection& connection)
+{
+ const std::string& hostName(getClientHost(connection.getMgmtId()));
+ const std::string& userName( connection.getUserId());
+
+ Mutex::ScopedLock locker(dataLock);
+
+ // Bump state from CREATED to OPENED
+ (void) countConnectionLH(connectProgressMap, connection.getMgmtId(),
+ C_OPENED, false);
+
+ // Approve total connections
+ bool okTotal = true;
+ if (totalLimit > 0) {
+ okTotal = totalCurrentConnections <= totalLimit;
+ if (!connection.isShadow()) {
+ QPID_LOG(trace, "ACL ConnectionApprover totalLimit=" << totalLimit
+ << " curValue=" << totalCurrentConnections
+ << " result=" << (okTotal ? "allow" : "deny"));
+ }
+ }
+
+ // Approve by IP host connections
+ bool okByIP = limitApproveLH(connectByHostMap, hostName, hostLimit, !connection.isShadow());
+
+ // Count and Approve the connection by the user
+ bool okByUser = countConnectionLH(connectByNameMap, userName, nameLimit, !connection.isShadow());
+
+ if (!connection.isShadow()) {
+ // Emit separate log for each disapproval
+ if (!okTotal) {
+ QPID_LOG(error, "Client max total connection count limit of " << totalLimit
+ << " exceeded by '"
+ << connection.getMgmtId() << "', user: '"
+ << userName << "'. Connection refused");
+ }
+ if (!okByIP) {
+ QPID_LOG(error, "Client max per-host connection count limit of "
+ << hostLimit << " exceeded by '"
+ << connection.getMgmtId() << "', user: '"
+ << userName << "'. Connection refused.");
+ }
+ if (!okByUser) {
+ QPID_LOG(error, "Client max per-user connection count limit of "
+ << nameLimit << " exceeded by '"
+ << connection.getMgmtId() << "', user: '"
+ << userName << "'. Connection refused.");
+ }
+
+ // Count/Event once for each disapproval
+ bool result = okTotal && okByIP && okByUser;
+ if (!result) {
+ acl.reportConnectLimit(userName, hostName);
+ }
+
+ return result;
+ } else {
+ // Always allow shadow connections
+ if (!okTotal) {
+ QPID_LOG(warning, "Client max total connection count limit of " << totalLimit
+ << " exceeded by '"
+ << connection.getMgmtId() << "', user: '"
+ << userName << "' but still within tolerance. Cluster connection allowed");
+ }
+ if (!okByIP) {
+ QPID_LOG(warning, "Client max per-host connection count limit of "
+ << hostLimit << " exceeded by '"
+ << connection.getMgmtId() << "', user: '"
+ << userName << "' but still within tolerance. Cluster connection allowed");
+ }
+ if (!okByUser) {
+ QPID_LOG(warning, "Client max per-user connection count limit of "
+ << nameLimit << " exceeded by '"
+ << connection.getMgmtId() << "', user: '"
+ << userName << "' but still within tolerance. Cluster connection allowed");
+ }
+ if (okTotal && okByIP && okByUser) {
+ QPID_LOG(debug, "Cluster client connection: '"
+ << connection.getMgmtId() << "', user '"
+ << userName << "' allowed");
+ }
+ return true;
+ }
+}
+
+//
// getClientIp - given a connection's mgmtId return the client host part.
//
// TODO: Ideally this would be a method of the connection itself.
+// TODO: Verify it works with rdma connection names.
//
std::string ConnectionCounter::getClientHost(const std::string mgmtId)
{
Modified: qpid/branches/asyncstore/cpp/src/qpid/acl/AclConnectionCounter.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/acl/AclConnectionCounter.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/acl/AclConnectionCounter.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/acl/AclConnectionCounter.h Fri Aug 3 12:13:32 2012
@@ -48,32 +48,52 @@ private:
enum CONNECTION_PROGRESS { C_CREATED=1, C_OPENED=2 };
Acl& acl;
- uint32_t nameLimit;
- uint32_t hostLimit;
+ uint16_t nameLimit;
+ uint16_t hostLimit;
+ uint16_t totalLimit;
+ uint16_t totalCurrentConnections;
qpid::sys::Mutex dataLock;
+ /** Records per-connection state */
connectCountsMap_t connectProgressMap;
+
+ /** Records per-username counts */
connectCountsMap_t connectByNameMap;
+
+ /** Records per-host counts */
connectCountsMap_t connectByHostMap;
+ /** Given a connection's management ID, return the client host name */
std::string getClientHost(const std::string mgmtId);
- bool limitCheckLH(connectCountsMap_t& theMap,
- const std::string& theName,
- uint32_t theLimit);
+ /** Return approval for proposed connection */
+ bool limitApproveLH(connectCountsMap_t& theMap,
+ const std::string& theName,
+ uint16_t theLimit,
+ bool emitLog);
+
+ /** Record a connection.
+ * @return indication if user/host is over its limit */
+ bool countConnectionLH(connectCountsMap_t& theMap,
+ const std::string& theName,
+ uint16_t theLimit,
+ bool emitLog);
+ /** Release a connection */
void releaseLH(connectCountsMap_t& theMap,
const std::string& theName,
- uint32_t theLimit);
+ uint16_t theLimit);
public:
- ConnectionCounter(Acl& acl, uint32_t nl, uint32_t hl);
+ ConnectionCounter(Acl& acl, uint16_t nl, uint16_t hl, uint16_t tl);
~ConnectionCounter();
+ // ConnectionObserver interface
void connection(broker::Connection& connection);
- void opened(broker::Connection& connection);
void closed(broker::Connection& connection);
+ // Connection counting
+ bool approveConnection(const broker::Connection& conn);
};
}} // namespace qpid::ha
Modified: qpid/branches/asyncstore/cpp/src/qpid/acl/AclData.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/acl/AclData.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/acl/AclData.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/acl/AclData.cpp Fri Aug 3 12:13:32 2012
@@ -305,7 +305,9 @@ namespace acl {
// lookup
//
// The ACL main business logic function of matching rules and declaring
- // an allow or deny result.
+ // an allow or deny result. This lookup is the fastpath per-message
+ // lookup to verify if a user is allowed to publish to an exchange with
+ // a given key.
//
AclResult AclData::lookup(
const std::string& id,
@@ -331,7 +333,8 @@ namespace acl {
if (itrRule != actionList[action][objType]->end() )
{
- //loop the vector
+ // Found a rule list for this user-action-object set.
+ // Search the rule list for a matching rule.
ruleSetItr rsItr = itrRule->second.end();
for (int cnt = itrRule->second.size(); cnt != 0; cnt--)
{
@@ -339,56 +342,46 @@ namespace acl {
QPID_LOG(debug, "ACL: checking rule " << rsItr->toString());
- // loop the names looking for match
+ // Search on exchange name and routing key only if specfied in rule.
bool match =true;
- for (specPropertyMapItr pMItr = rsItr->props.begin();
- (pMItr != rsItr->props.end()) && match;
- pMItr++)
+ if (rsItr->pubExchNameInRule)
{
- //match name is exists first
- switch (pMItr->first)
+ if (matchProp(rsItr->pubExchName, name))
{
- case acl::SPECPROP_NAME:
- if (matchProp(pMItr->second, name))
- {
- QPID_LOG(debug, "ACL: lookup exchange name '"
- << name << "' matched with rule name '"
- << pMItr->second << "'");
-
- }
- else
- {
- match= false;
- QPID_LOG(debug, "ACL: lookup exchange name '"
- << name << "' did not match with rule name '"
- << pMItr->second << "'");
- }
- break;
-
- case acl::SPECPROP_ROUTINGKEY:
- if (matchProp(pMItr->second, routingKey))
- {
- QPID_LOG(debug, "ACL: lookup key name '"
- << routingKey << "' matched with rule routing key '"
- << pMItr->second << "'");
- }
- else
- {
- match= false;
- QPID_LOG(debug, "ACL: lookup key name '"
- << routingKey << "' did not match with rule routing key '"
- << pMItr->second << "'");
- }
- break;
-
- default:
- // Don't care
- break;
- };
+ QPID_LOG(debug, "ACL: Rule: " << rsItr->rawRuleNum << " lookup exchange name '"
+ << name << "' matched with rule name '"
+ << rsItr->pubExchName << "'");
+
+ }
+ else
+ {
+ match= false;
+ QPID_LOG(debug, "ACL: Rule: " << rsItr->rawRuleNum << " lookup exchange name '"
+ << name << "' did not match with rule name '"
+ << rsItr->pubExchName << "'");
+ }
+ }
+
+ if (match && rsItr->pubRoutingKeyInRule)
+ {
+ if (rsItr->matchRoutingKey(routingKey))
+ {
+ QPID_LOG(debug, "ACL: Rule: " << rsItr->rawRuleNum << " lookup key name '"
+ << routingKey << "' matched with rule routing key '"
+ << rsItr->pubRoutingKey << "'");
+ }
+ else
+ {
+ QPID_LOG(debug, "ACL: Rule: " << rsItr->rawRuleNum << " lookup key name '"
+ << routingKey << "' did not match with rule routing key '"
+ << rsItr->pubRoutingKey << "'");
+ match = false;
+ }
}
+
if (match){
aclresult = rsItr->ruleMode;
- QPID_LOG(debug,"ACL: Successful match, the decision is:"
+ QPID_LOG(debug,"ACL: Rule: " << rsItr->rawRuleNum << " Successful match, the decision is:"
<< AclHelper::getAclResultStr(aclresult));
return aclresult;
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/acl/AclData.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/acl/AclData.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/acl/AclData.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/acl/AclData.h Fri Aug 3 12:13:32 2012
@@ -21,6 +21,9 @@
*/
#include "qpid/broker/AclModule.h"
+#include "AclTopicMatch.h"
+#include "qpid/log/Statement.h"
+#include "boost/shared_ptr.hpp"
#include <vector>
#include <sstream>
@@ -48,18 +51,29 @@ public:
// A single ACL file entry may create many rule entries in
// many ruleset vectors.
//
- struct rule {
+ struct Rule {
+ typedef broker::TopicExchange::TopicExchangeTester topicTester;
int rawRuleNum; // rule number in ACL file
qpid::acl::AclResult ruleMode; // combined allow/deny log/nolog
specPropertyMap props; //
+ bool pubRoutingKeyInRule;
+ std::string pubRoutingKey;
+ boost::shared_ptr<topicTester> pTTest;
+ bool pubExchNameInRule;
+ std::string pubExchName;
-
- rule (int ruleNum, qpid::acl::AclResult res, specPropertyMap& p) :
+ Rule (int ruleNum, qpid::acl::AclResult res, specPropertyMap& p) :
rawRuleNum(ruleNum),
ruleMode(res),
- props(p)
- {};
+ props(p),
+ pubRoutingKeyInRule(false),
+ pubRoutingKey(),
+ pTTest(boost::shared_ptr<topicTester>(new topicTester())),
+ pubExchNameInRule(false),
+ pubExchName()
+ {}
+
std::string toString () const {
std::ostringstream ruleStr;
@@ -76,9 +90,21 @@ public:
ruleStr << " }]";
return ruleStr.str();
}
+
+ void addTopicTest(const std::string& pattern) {
+ pTTest->addBindingKey(broker::TopicExchange::normalize(pattern));
+ }
+
+ // Topic Exchange tester
+ // return true if any bindings match 'pattern'
+ bool matchRoutingKey(const std::string& pattern) const
+ {
+ topicTester::BindingVec bv;
+ return pTTest->findMatches(pattern, bv);
+ }
};
- typedef std::vector<rule> ruleSet;
+ typedef std::vector<Rule> ruleSet;
typedef ruleSet::const_iterator ruleSetItr;
typedef std::map<std::string, ruleSet > actionObject; // user
typedef actionObject::iterator actObjItr;
Modified: qpid/branches/asyncstore/cpp/src/qpid/acl/AclPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/acl/AclPlugin.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/acl/AclPlugin.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/acl/AclPlugin.cpp Fri Aug 3 12:13:32 2012
@@ -39,10 +39,13 @@ struct AclOptions : public Options {
AclValues& values;
AclOptions(AclValues& v) : Options("ACL Options"), values(v) {
+ values.aclMaxConnectTotal = 500;
addOptions()
("acl-file", optValue(values.aclFile, "FILE"), "The policy file to load from, loaded from data dir")
- ("acl-max-connect-per-user", optValue(values.aclMaxConnectPerUser, "N"), "The maximum number of connections allowed per user")
- ("acl-max-connect-per-ip" , optValue(values.aclMaxConnectPerIp, "N"), "The maximum number of connections allowed per host IP address");
+ ("max-connections" , optValue(values.aclMaxConnectTotal, "N"), "The maximum combined number of connections allowed. 0 implies no limit.")
+ ("max-connections-per-user", optValue(values.aclMaxConnectPerUser, "N"), "The maximum number of connections allowed per user. 0 implies no limit.")
+ ("max-connections-per-ip" , optValue(values.aclMaxConnectPerIp, "N"), "The maximum number of connections allowed per host IP address. 0 implies no limit.")
+ ;
}
};
@@ -69,7 +72,6 @@ struct AclPlugin : public Plugin {
oss << b.getDataDir().getPath() << "/" << values.aclFile;
values.aclFile = oss.str();
}
-
acl = new Acl(values, b);
b.setAcl(acl.get());
b.addFinalizer(boost::bind(&AclPlugin::shutdown, this));
Modified: qpid/branches/asyncstore/cpp/src/qpid/acl/AclReader.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/acl/AclReader.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/acl/AclReader.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/acl/AclReader.cpp Fri Aug 3 12:13:32 2012
@@ -101,7 +101,7 @@ namespace acl {
<< AclHelper::getAclResultStr(d->decisionMode));
foundmode = true;
} else {
- AclData::rule rule(cnt, (*i)->res, (*i)->props);
+ AclData::Rule rule(cnt, (*i)->res, (*i)->props);
// Action -> Object -> map<user -> set<Rule> >
std::ostringstream actionstr;
@@ -110,8 +110,27 @@ namespace acl {
(*i)->actionAll ? acnt++ : acnt = acl::ACTIONSIZE) {
if (acnt == acl::ACT_PUBLISH)
+ {
d->transferAcl = true; // we have transfer ACL
-
+ // For Publish the only object should be Exchange
+ // and the only property should be routingkey.
+ // Go through the rule properties and find the name and the key.
+ // If found then place them specially for the lookup engine.
+ for (pmCitr pItr=(*i)->props.begin(); pItr!=(*i)->props.end(); pItr++) {
+ if (acl::SPECPROP_ROUTINGKEY == pItr->first)
+ {
+ rule.pubRoutingKeyInRule = true;
+ rule.pubRoutingKey = (std::string)pItr->second;
+ rule.addTopicTest(rule.pubRoutingKey);
+ break;
+ }
+ if (acl::SPECPROP_NAME == pItr->first)
+ {
+ rule.pubExchNameInRule = true;
+ rule.pubExchName = pItr->second;
+ }
+ }
+ }
actionstr << AclHelper::getActionStr((Action) acnt) << ",";
//find the Action, create if not exist
@@ -285,7 +304,7 @@ namespace acl {
if (ws) {
ret = true;
} else {
- errorStream << ACL_FORMAT_ERR_LOG_PREFIX << "Line : " << lineNumber
+ errorStream << ACL_FORMAT_ERR_LOG_PREFIX << "Line : " << lineNumber
<< ", Non-continuation line must start with \"group\" or \"acl\".";
ret = false;
}
@@ -314,13 +333,23 @@ namespace acl {
if (contFlag) {
gmCitr citr = groups.find(groupName);
for (unsigned i = 0; i < toksSize; i++) {
- if (!isValidUserName(toks[i])) return false;
+ if (isValidGroupName(toks[i])) {
+ if (toks[i] == groupName) {
+ QPID_LOG(debug, "ACL: Line: " << lineNumber
+ << ", Ignoring recursive sub-group \"" << toks[i] << "\".");
+ continue;
+ } else if (groups.find(toks[i]) == groups.end()) {
+ errorStream << ACL_FORMAT_ERR_LOG_PREFIX << "Line : " << lineNumber
+ << ", Sub-group \"" << toks[i] << "\" not defined yet.";
+ return false;
+ }
+ } else if (!isValidUserName(toks[i])) return false;
addName(toks[i], citr->second);
}
} else {
const unsigned minimumSize = (cont ? 2 : 3);
if (toksSize < minimumSize) {
- errorStream << ACL_FORMAT_ERR_LOG_PREFIX << "Line : " << lineNumber
+ errorStream << ACL_FORMAT_ERR_LOG_PREFIX << "Line : " << lineNumber
<< ", Insufficient tokens for group definition.";
return false;
}
@@ -332,7 +361,17 @@ namespace acl {
gmCitr citr = addGroup(toks[1]);
if (citr == groups.end()) return false;
for (unsigned i = 2; i < toksSize; i++) {
- if (!isValidUserName(toks[i])) return false;
+ if (isValidGroupName(toks[i])) {
+ if (toks[i] == groupName) {
+ QPID_LOG(debug, "ACL: Line: " << lineNumber
+ << ", Ignoring recursive sub-group \"" << toks[i] << "\".");
+ continue;
+ } else if (groups.find(toks[i]) == groups.end()) {
+ errorStream << ACL_FORMAT_ERR_LOG_PREFIX << "Line : " << lineNumber
+ << ", Sub-group \"" << toks[i] << "\" not defined yet.";
+ return false;
+ }
+ } else if (!isValidUserName(toks[i])) return false;
addName(toks[i], citr->second);
}
}
@@ -356,7 +395,7 @@ namespace acl {
void AclReader::addName(const std::string& name, nameSetPtr groupNameSet) {
gmCitr citr = groups.find(name);
- if (citr != groups.end() && citr->first != name){
+ if (citr != groups.end()) {
// This is a previously defined group: add all the names in that group to this group
groupNameSet->insert(citr->second->begin(), citr->second->end());
} else {
@@ -459,7 +498,7 @@ namespace acl {
nvPair propNvp = splitNameValuePair(toks[i]);
if (propNvp.second.size() == 0) {
errorStream << ACL_FORMAT_ERR_LOG_PREFIX << "Line : " << lineNumber
- <<", Badly formed property name-value pair \""
+ <<", Badly formed property name-value pair \""
<< propNvp.first << "\". (Must be name=value)";
return false;
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/acl/AclReader.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/acl/AclReader.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/acl/AclReader.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/acl/AclReader.h Fri Aug 3 12:13:32 2012
@@ -26,6 +26,7 @@
#include <string>
#include <vector>
#include <sstream>
+#include <memory>
#include "qpid/acl/AclData.h"
#include "qpid/broker/AclModule.h"
Modified: qpid/branches/asyncstore/cpp/src/qpid/acl/AclValidator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/acl/AclValidator.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/acl/AclValidator.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/acl/AclValidator.cpp Fri Aug 3 12:13:32 2012
@@ -131,7 +131,7 @@ namespace acl {
boost::bind(&AclValidator::validateRule, this, _1));
}
- void AclValidator::validateRule(qpid::acl::AclData::rule& rule){
+ void AclValidator::validateRule(qpid::acl::AclData::Rule& rule){
std::for_each(rule.props.begin(),
rule.props.end(),
boost::bind(&AclValidator::validateProperty, this, _1));
Modified: qpid/branches/asyncstore/cpp/src/qpid/acl/AclValidator.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/acl/AclValidator.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/acl/AclValidator.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/acl/AclValidator.h Fri Aug 3 12:13:32 2012
@@ -71,7 +71,7 @@ class AclValidator {
public:
void validateRuleSet(std::pair<const std::string, qpid::acl::AclData::ruleSet>& rules);
- void validateRule(qpid::acl::AclData::rule& rule);
+ void validateRule(qpid::acl::AclData::Rule& rule);
void validateProperty(std::pair<const qpid::acl::SpecProperty, std::string>& prop);
void validate(boost::shared_ptr<AclData> d);
AclValidator();
Modified: qpid/branches/asyncstore/cpp/src/qpid/acl/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/acl/management-schema.xml?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/acl/management-schema.xml (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/acl/management-schema.xml Fri Aug 3 12:13:32 2012
@@ -17,13 +17,16 @@
-->
<class name="Acl">
- <property name="brokerRef" type="objId" references="org.apache.qpid.broker:Broker" access="RO" index="y" parentRef="y"/>
- <property name="policyFile" type="lstr" access="RO" desc="Name of the policy file"/>
- <property name="enforcingAcl" type="bool" access="RO" desc="Currently Enforcing ACL"/>
- <property name="transferAcl" type="bool" access="RO" desc="Any transfer ACL rules in force"/>
- <property name="lastAclLoad" type="absTime" access="RO" desc="Timestamp of last successful load of ACL"/>
- <statistic name="aclDenyCount" type="count64" unit="request" desc="Number of ACL requests denied"/>
- <statistic name="connectionDenyCount" type="count64" unit="connection" desc="Number of connections denied"/>
+ <property name="brokerRef" type="objId" references="org.apache.qpid.broker:Broker" access="RO" index="y" parentRef="y"/>
+ <property name="policyFile" type="lstr" access="RO" desc="Name of the policy file"/>
+ <property name="enforcingAcl" type="bool" access="RO" desc="Currently Enforcing ACL"/>
+ <property name="transferAcl" type="bool" access="RO" desc="Any transfer ACL rules in force"/>
+ <property name="lastAclLoad" type="absTime" access="RO" desc="Timestamp of last successful load of ACL"/>
+ <property name="maxConnections" type="uint16" access="RO" desc="Maximum allowed connections"/>
+ <property name="maxConnectionsPerIp" type="uint16" access="RO" desc="Maximum allowed connections"/>
+ <property name="maxConnectionsPerUser" type="uint16" access="RO" desc="Maximum allowed connections"/>
+ <statistic name="aclDenyCount" type="count64" unit="request" desc="Number of ACL requests denied"/>
+ <statistic name="connectionDenyCount" type="count64" unit="connection" desc="Number of connections denied"/>
<method name="reloadACLFile" desc="Reload the ACL file"/>
Modified: qpid/branches/asyncstore/cpp/src/qpid/amqp_0_10/Codecs.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/amqp_0_10/Codecs.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/amqp_0_10/Codecs.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/amqp_0_10/Codecs.cpp Fri Aug 3 12:13:32 2012
@@ -52,9 +52,7 @@ template <class T, class U, class F> voi
}
Variant::Map::value_type toVariantMapEntry(const FieldTable::value_type& in);
-FieldTable::value_type toFieldTableEntry(const Variant::Map::value_type& in);
Variant toVariant(boost::shared_ptr<FieldValue> in);
-boost::shared_ptr<FieldValue> toFieldValue(const Variant& in);
template <class T, class U, class F> void translate(boost::shared_ptr<FieldValue> in, U& u, F f)
{
@@ -70,20 +68,6 @@ template <class T, class U, class F> T*
return new T(t);
}
-FieldTableValue* toFieldTableValue(const Variant::Map& map)
-{
- FieldTable ft;
- convert(map, ft, &toFieldTableEntry);
- return new FieldTableValue(ft);
-}
-
-ListValue* toListValue(const Variant::List& list)
-{
- List l;
- convert(list, l, &toFieldValue);
- return new ListValue(l);
-}
-
void setEncodingFor(Variant& out, uint8_t code)
{
switch(code){
@@ -151,7 +135,7 @@ Variant toVariant(boost::shared_ptr<Fiel
case 0xf0: break;//void, which is the default value for Variant
case 0xf1: out.setEncoding(amqp0_10_bit); break;//treat 'bit' as void, which is the default value for Variant
-
+
//Variable Width types:
//strings:
case 0x80:
@@ -217,89 +201,254 @@ boost::shared_ptr<FieldValue> convertStr
}
}
-boost::shared_ptr<FieldValue> toFieldValue(const Variant& in)
+Variant::Map::value_type toVariantMapEntry(const FieldTable::value_type& in)
{
- boost::shared_ptr<FieldValue> out;
- switch (in.getType()) {
- case VAR_VOID: out = boost::shared_ptr<FieldValue>(new VoidValue()); break;
- case VAR_BOOL: out = boost::shared_ptr<FieldValue>(new BoolValue(in.asBool())); break;
- case VAR_UINT8: out = boost::shared_ptr<FieldValue>(new Unsigned8Value(in.asUint8())); break;
- case VAR_UINT16: out = boost::shared_ptr<FieldValue>(new Unsigned16Value(in.asUint16())); break;
- case VAR_UINT32: out = boost::shared_ptr<FieldValue>(new Unsigned32Value(in.asUint32())); break;
- case VAR_UINT64: out = boost::shared_ptr<FieldValue>(new Unsigned64Value(in.asUint64())); break;
- case VAR_INT8: out = boost::shared_ptr<FieldValue>(new Integer8Value(in.asInt8())); break;
- case VAR_INT16: out = boost::shared_ptr<FieldValue>(new Integer16Value(in.asInt16())); break;
- case VAR_INT32: out = boost::shared_ptr<FieldValue>(new Integer32Value(in.asInt32())); break;
- case VAR_INT64: out = boost::shared_ptr<FieldValue>(new Integer64Value(in.asInt64())); break;
- case VAR_FLOAT: out = boost::shared_ptr<FieldValue>(new FloatValue(in.asFloat())); break;
- case VAR_DOUBLE: out = boost::shared_ptr<FieldValue>(new DoubleValue(in.asDouble())); break;
- case VAR_STRING: out = convertString(in.asString(), in.getEncoding()); break;
- case VAR_UUID: out = boost::shared_ptr<FieldValue>(new UuidValue(in.asUuid().data())); break;
- case VAR_MAP:
- out = boost::shared_ptr<FieldValue>(toFieldTableValue(in.asMap()));
- break;
- case VAR_LIST:
- out = boost::shared_ptr<FieldValue>(toListValue(in.asList()));
- break;
+ return Variant::Map::value_type(in.first, toVariant(in.second));
+}
+
+struct DecodeBuffer
+{
+ Buffer buffer;
+
+ DecodeBuffer(const std::string& s) : buffer(const_cast<char*>(s.data()), s.size()) {}
+
+ template <class T> void decode(T& t) { t.decode(buffer); }
+
+};
+
+template <class T, class U, class F> void _decode(const std::string& data, U& value, F f)
+{
+ T t;
+ DecodeBuffer buffer(data);
+ buffer.decode(t);
+ convert(t, value, f);
+}
+
+uint32_t encodedSize(const Variant::Map& values);
+uint32_t encodedSize(const Variant::List& values);
+uint32_t encodedSize(const std::string& value);
+
+uint32_t encodedSize(const Variant& value)
+{
+ switch (value.getType()) {
+ case VAR_VOID:
+ return 0;
+ case VAR_BOOL:
+ case VAR_UINT8:
+ case VAR_INT8:
+ return 1;
+ case VAR_UINT16:
+ case VAR_INT16:
+ return 2;
+ break;
+ case VAR_UINT32:
+ case VAR_INT32:
+ case VAR_FLOAT:
+ return 4;
+ case VAR_UINT64:
+ case VAR_INT64:
+ case VAR_DOUBLE:
+ return 8;
+ case VAR_UUID:
+ return 16;
+ case VAR_MAP:
+ return encodedSize(value.asMap());
+ case VAR_LIST:
+ return encodedSize(value.asList());
+ case VAR_STRING:
+ return encodedSize(value.getString());
+ default:
+ throw Exception("Couldn't encode Variant: Illegal type code");
}
- return out;
}
-Variant::Map::value_type toVariantMapEntry(const FieldTable::value_type& in)
+uint32_t encodedSize(const Variant::Map& values)
{
- return Variant::Map::value_type(in.first, toVariant(in.second));
+ uint32_t size = 4/*size field*/ + 4/*count field*/;
+ for(Variant::Map::const_iterator i = values.begin(); i != values.end(); ++i) {
+ size += 1/*size of key*/ + (i->first).size() + 1/*typecode*/ + encodedSize(i->second);
+ }
+ return size;
}
-FieldTable::value_type toFieldTableEntry(const Variant::Map::value_type& in)
+uint32_t encodedSize(const Variant::Map& values, const std::string& efield, const Variant& evalue)
{
- return FieldTable::value_type(in.first, toFieldValue(in.second));
+ uint32_t size = 4/*size field*/ + 4/*count field*/;
+ for(Variant::Map::const_iterator i = values.begin(); i != values.end(); ++i) {
+ size += 1/*size of key*/ + (i->first).size() + 1/*typecode*/ + encodedSize(i->second);
+ }
+ size += 1/*size of key*/ + efield.size() + 1/*typecode*/ + encodedSize(evalue);
+ return size;
}
-struct EncodeBuffer
+uint32_t encodedSize(const Variant::List& values)
{
- char* data;
- Buffer buffer;
+ uint32_t size = 4/*size field*/ + 4/*count field*/;
+ for(Variant::List::const_iterator i = values.begin(); i != values.end(); ++i) {
+ size += 1/*typecode*/ + encodedSize(*i);
+ }
+ return size;
+}
+
+uint32_t encodedSize(const std::string& value)
+{
+ uint32_t size = value.size();
+ if (size > std::numeric_limits<uint16_t>::max()) {
+ return size + 4; /*Long size*/
+ } else {
+ return size + 2; /*Short size*/
+ }
+}
- EncodeBuffer(size_t size) : data(new char[size]), buffer(data, size) {}
- ~EncodeBuffer() { delete[] data; }
+void encode(const std::string& value, const std::string& encoding, qpid::framing::Buffer& buffer)
+{
+ uint32_t size = value.size();
+ if (size > std::numeric_limits<uint16_t>::max()) {
+ if (encoding == utf8 || encoding == utf16 || encoding == iso885915) {
+ throw Exception(QPID_MSG("Could not encode " << encoding << " character string - too long (" << size << " bytes)"));
+ } else {
+ buffer.putOctet(0xa0);
+ buffer.putLong(size);
+ buffer.putRawData(value);
+ }
+ } else {
+ if (encoding == utf8) {
+ buffer.putOctet(0x95);
+ } else if (encoding == utf16) {
+ buffer.putOctet(0x96);
+ } else if (encoding == iso885915) {
+ buffer.putOctet(0x94);
+ } else {
+ buffer.putOctet(0x90);
+ }
+ buffer.putShort(size);
+ buffer.putRawData(value);
+ }
+}
- template <class T> void encode(T& t) { t.encode(buffer); }
+void encode(const Variant::Map& map, uint32_t len, qpid::framing::Buffer& buffer);
+void encode(const Variant::List& list, uint32_t len, qpid::framing::Buffer& buffer);
- void getData(std::string& s) {
- s.assign(data, buffer.getSize());
+void encode(const Variant& value, qpid::framing::Buffer& buffer)
+{
+ switch (value.getType()) {
+ case VAR_VOID:
+ buffer.putOctet(0xf0);
+ break;
+ case VAR_BOOL:
+ buffer.putOctet(0x08);
+ buffer.putOctet(value.asBool());
+ break;
+ case VAR_INT8:
+ buffer.putOctet(0x01);
+ buffer.putInt8(value.asInt8());
+ break;
+ case VAR_UINT8:
+ buffer.putOctet(0x02);
+ buffer.putOctet(value.asUint8());
+ break;
+ case VAR_INT16:
+ buffer.putOctet(0x11);
+ buffer.putInt16(value.asInt16());
+ break;
+ case VAR_UINT16:
+ buffer.putOctet(0x12);
+ buffer.putShort(value.asUint16());
+ break;
+ case VAR_INT32:
+ buffer.putOctet(0x21);
+ buffer.putInt32(value.asInt32());
+ break;
+ case VAR_UINT32:
+ buffer.putOctet(0x22);
+ buffer.putLong(value.asUint32());
+ break;
+ case VAR_FLOAT:
+ buffer.putOctet(0x23);
+ buffer.putFloat(value.asFloat());
+ break;
+ case VAR_INT64:
+ buffer.putOctet(0x31);
+ buffer.putInt64(value.asInt64());
+ break;
+ case VAR_UINT64:
+ buffer.putOctet(0x32);
+ buffer.putLongLong(value.asUint64());
+ break;
+ case VAR_DOUBLE:
+ buffer.putOctet(0x33);
+ buffer.putDouble(value.asDouble());
+ break;
+ case VAR_UUID:
+ buffer.putOctet(0x48);
+ buffer.putBin128(value.asUuid().data());
+ break;
+ case VAR_MAP:
+ buffer.putOctet(0xa8);
+ encode(value.asMap(), encodedSize(value.asMap()), buffer);
+ break;
+ case VAR_LIST:
+ buffer.putOctet(0xa9);
+ encode(value.asList(), encodedSize(value.asList()), buffer);
+ break;
+ case VAR_STRING:
+ encode(value.getString(), value.getEncoding(), buffer);
+ break;
}
-};
+}
-struct DecodeBuffer
+void encode(const Variant::Map& map, uint32_t len, qpid::framing::Buffer& buffer)
{
- Buffer buffer;
+ uint32_t s = buffer.getPosition();
+ buffer.putLong(len - 4);//exclusive of the size field itself
+ buffer.putLong(map.size());
+ for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) {
+ buffer.putShortString(i->first);
+ encode(i->second, buffer);
+ }
+ (void) s; assert(s + len == buffer.getPosition());
+}
- DecodeBuffer(const std::string& s) : buffer(const_cast<char*>(s.data()), s.size()) {}
+void encode(const Variant::Map& map, const std::string& efield, const Variant& evalue, uint32_t len, qpid::framing::Buffer& buffer)
+{
+ uint32_t s = buffer.getPosition();
+ buffer.putLong(len - 4);//exclusive of the size field itself
+ buffer.putLong(map.size() + 1 /* The extra field */ );
+ for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) {
+ buffer.putShortString(i->first);
+ encode(i->second, buffer);
+ }
+ buffer.putShortString(efield);
+ encode(evalue, buffer);
- template <class T> void decode(T& t) { t.decode(buffer); }
-
-};
+ (void) s; assert(s + len == buffer.getPosition());
+}
-template <class T, class U, class F> void _encode(const U& value, std::string& data, F f)
+void encode(const Variant::List& list, uint32_t len, qpid::framing::Buffer& buffer)
{
- T t;
- convert(value, t, f);
- EncodeBuffer buffer(t.encodedSize());
- buffer.encode(t);
- buffer.getData(data);
+ uint32_t s = buffer.getPosition();
+ buffer.putLong(len - 4);//exclusive of the size field itself
+ buffer.putLong(list.size());
+ for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
+ encode(*i, buffer);
+ }
+ (void) s; assert(s + len == buffer.getPosition());
}
-template <class T, class U, class F> void _decode(const std::string& data, U& value, F f)
+void decode(qpid::framing::Buffer&, Variant::Map&)
{
- T t;
- DecodeBuffer buffer(data);
- buffer.decode(t);
- convert(t, value, f);
}
+
void MapCodec::encode(const Variant::Map& value, std::string& data)
{
- _encode<FieldTable>(value, data, &toFieldTableEntry);
+ uint32_t len = qpid::amqp_0_10::encodedSize(value);
+ std::vector<char> space(len);
+ qpid::framing::Buffer buff(&space[0], len);
+
+ qpid::amqp_0_10::encode(value, len, buff);
+ assert( len == buff.getPosition() );
+ data.assign(&space[0], len);
}
void MapCodec::decode(const std::string& data, Variant::Map& value)
@@ -309,14 +458,18 @@ void MapCodec::decode(const std::string&
size_t MapCodec::encodedSize(const Variant::Map& value)
{
- std::string encoded;
- encode(value, encoded);
- return encoded.size();
+ return qpid::amqp_0_10::encodedSize(value);
}
void ListCodec::encode(const Variant::List& value, std::string& data)
{
- _encode<List>(value, data, &toFieldValue);
+ uint32_t len = qpid::amqp_0_10::encodedSize(value);
+ std::vector<char> space(len);
+ qpid::framing::Buffer buff(&space[0], len);
+
+ qpid::amqp_0_10::encode(value, len, buff);
+ assert( len == buff.getPosition() );
+ data.assign(&space[0], len);
}
void ListCodec::decode(const std::string& data, Variant::List& value)
@@ -326,14 +479,47 @@ void ListCodec::decode(const std::string
size_t ListCodec::encodedSize(const Variant::List& value)
{
- std::string encoded;
- encode(value, encoded);
- return encoded.size();
+ return qpid::amqp_0_10::encodedSize(value);
}
void translate(const Variant::Map& from, FieldTable& to)
{
- convert(from, to, &toFieldTableEntry);
+ // Create buffer of correct size to encode Variant::Map
+ uint32_t len = encodedSize(from);
+ std::vector<char> space(len);
+ qpid::framing::Buffer buff(&space[0], len);
+
+ // Encode Variant::Map into buffer directly -
+ // We pass the already calculated length in to avoid
+ // recalculating it.
+ encode(from, len, buff);
+
+ // Give buffer to FieldTable
+ // Could speed this up a bit by avoiding copying
+ // the buffer we just created into the FieldTable
+ assert( len == buff.getPosition() );
+ buff.reset();
+ to.decode(buff);
+}
+
+void translate(const Variant::Map& from, const std::string& efield, const Variant& evalue, FieldTable& to)
+{
+ // Create buffer of correct size to encode Variant::Map
+ uint32_t len = encodedSize(from, efield, evalue);
+ std::vector<char> space(len);
+ qpid::framing::Buffer buff(&space[0], len);
+
+ // Encode Variant::Map into buffer directly -
+ // We pass the already calculated length in to avoid
+ // recalculating it.
+ encode(from, efield, evalue, len, buff);
+
+ // Give buffer to FieldTable
+ // Could speed this up a bit by avoiding copying
+ // the buffer we just created into the FieldTable
+ assert( len == buff.getPosition() );
+ buff.reset();
+ to.decode(buff);
}
void translate(const FieldTable& from, Variant::Map& to)
Propchange: qpid/branches/asyncstore/cpp/src/qpid/broker/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/qpid/broker:r1333988-1368650
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/AclModule.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/AclModule.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/AclModule.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/AclModule.h Fri Aug 3 12:13:32 2012
@@ -113,6 +113,7 @@ namespace acl {
namespace broker {
+ class Connection;
class AclModule
{
@@ -139,6 +140,11 @@ namespace broker {
// Add specialized authorise() methods as required.
+ /** Approve connection by counting connections total, per-IP, and
+ * per-user.
+ */
+ virtual bool approveConnection (const Connection& connection)=0;
+
virtual ~AclModule() {};
};
} // namespace broker
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Bridge.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Bridge.cpp Fri Aug 3 12:13:32 2012
@@ -57,22 +57,25 @@ void Bridge::PushHandler::handle(framing
conn->received(frame);
}
-Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
- const _qmf::ArgsLinkBridge& _args,
- InitializeCallback init) :
- link(_link), id(_id), args(_args), mgmtObject(0),
- listener(l), name(Uuid(true).str()), queueName("qpid.bridge_queue_"), persistenceId(0),
- initialize(init), detached(false)
-{
- std::stringstream title;
- title << id << "_" << name;
- queueName += title.str();
+Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id,
+ CancellationListener l, const _qmf::ArgsLinkBridge& _args,
+ InitializeCallback init, const std::string& _queueName, const string& ae) :
+ link(_link), channel(_id), args(_args), mgmtObject(0),
+ listener(l), name(_name),
+ queueName(_queueName.empty() ? "qpid.bridge_queue_" + name + "_" + link->getBroker()->getFederationTag()
+ : _queueName),
+ altEx(ae), persistenceId(0),
+ connState(0), conn(0), initialize(init), detached(false),
+ useExistingQueue(!_queueName.empty()),
+ sessionName("qpid.bridge_session_" + name + "_" + link->getBroker()->getFederationTag())
+{
ManagementAgent* agent = link->getBroker()->getManagementAgent();
if (agent != 0) {
mgmtObject = new _qmf::Bridge
- (agent, this, link, id, args.i_durable, args.i_src, args.i_dest,
+ (agent, this, link, name, args.i_durable, args.i_src, args.i_dest,
args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync);
+ mgmtObject->set_channelId(channel);
agent->addObject(mgmtObject);
}
QPID_LOG(debug, "Bridge " << name << " created from " << args.i_src << " to " << args.i_dest);
@@ -90,23 +93,22 @@ void Bridge::create(Connection& c)
conn = &c;
FieldTable options;
if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync);
- SessionHandler& sessionHandler = c.getChannel(id);
- sessionHandler.setDetachedCallback(
- boost::bind(&Bridge::sessionDetached, shared_from_this()));
+ SessionHandler& sessionHandler = c.getChannel(channel);
+ sessionHandler.setErrorListener(shared_from_this());
if (args.i_srcIsLocal) {
if (args.i_dynamic)
throw Exception("Dynamic routing not supported for push routes");
// Point the bridging commands at the local connection handler
pushHandler.reset(new PushHandler(&c));
- channelHandler.reset(new framing::ChannelHandler(id, pushHandler.get()));
+ channelHandler.reset(new framing::ChannelHandler(channel, pushHandler.get()));
session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
- session->attach(name, false);
+ session->attach(sessionName, false);
session->commandPoint(0,0);
} else {
- sessionHandler.attachAs(name);
+ sessionHandler.attachAs(sessionName);
// Point the bridging commands at the remote peer broker
peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out));
}
@@ -115,7 +117,7 @@ void Bridge::create(Connection& c)
if (initialize) initialize(*this, sessionHandler);
else if (args.i_srcIsQueue) {
peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options);
- peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer->getMessage().flow(args.i_dest, 0, args.i_sync ? 2 * args.i_sync : 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
QPID_LOG(debug, "Activated bridge " << name << " for route from queue " << args.i_src << " to " << args.i_dest);
} else {
@@ -138,12 +140,13 @@ void Bridge::create(Connection& c)
}
bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues?
- bool autoDelete = !durable;//auto delete transient queues?
- peer->getQueue().declare(queueName, "", false, durable, true, autoDelete, queueSettings);
+ bool exclusive = !useExistingQueue; // only exclusive if the queue is owned by the bridge
+ bool autoDelete = exclusive && !durable;//auto delete transient queues?
+ peer->getQueue().declare(queueName, altEx, false, durable, exclusive, autoDelete, queueSettings);
if (!args.i_dynamic)
peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable());
- peer->getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
- peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer->getMessage().subscribe(queueName, args.i_dest, (useExistingQueue && args.i_sync) ? 0 : 1, 0, false, "", 0, options);
+ peer->getMessage().flow(args.i_dest, 0, (useExistingQueue && args.i_sync) ? 2 * args.i_sync : 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
if (args.i_dynamic) {
@@ -163,11 +166,12 @@ void Bridge::cancel(Connection&)
{
if (resetProxy()) {
peer->getMessage().cancel(args.i_dest);
- peer->getSession().detach(name);
+ peer->getSession().detach(sessionName);
}
QPID_LOG(debug, "Cancelled bridge " << name);
}
+/** Notify the bridge that the connection has closed */
void Bridge::closed()
{
if (args.i_dynamic) {
@@ -177,9 +181,10 @@ void Bridge::closed()
QPID_LOG(debug, "Closed bridge " << name);
}
-void Bridge::destroy()
+/** Shut down the bridge */
+void Bridge::close()
{
- listener(this);
+ listener(this); // ask the LinkRegistry to destroy us
}
void Bridge::setPersistenceId(uint64_t pId) const
@@ -187,8 +192,21 @@ void Bridge::setPersistenceId(uint64_t p
persistenceId = pId;
}
+
+const std::string Bridge::ENCODED_IDENTIFIER("bridge.v2");
+const std::string Bridge::ENCODED_IDENTIFIER_V1("bridge");
+
+bool Bridge::isEncodedBridge(const std::string& key)
+{
+ return key == ENCODED_IDENTIFIER || key == ENCODED_IDENTIFIER_V1;
+}
+
+
Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
{
+ string kind;
+ buffer.getShortString(kind);
+
string host;
uint16_t port;
string src;
@@ -196,9 +214,33 @@ Bridge::shared_ptr Bridge::decode(LinkRe
string key;
string id;
string excludes;
+ string name;
+
+ Link::shared_ptr link;
+ if (kind == ENCODED_IDENTIFIER_V1) {
+ /** previous versions identified the bridge by host:port, not by name, and
+ * transport wasn't provided. Try to find a link using those paramters.
+ */
+ buffer.getShortString(host);
+ port = buffer.getShort();
+
+ link = links.getLink(host, port);
+ if (!link) {
+ QPID_LOG(error, "Bridge::decode() failed: cannot find Link for host=" << host << ", port=" << port);
+ return Bridge::shared_ptr();
+ }
+ } else {
+ string linkName;
+
+ buffer.getShortString(name);
+ buffer.getShortString(linkName);
+ link = links.getLink(linkName);
+ if (!link) {
+ QPID_LOG(error, "Bridge::decode() failed: cannot find Link named='" << linkName << "'");
+ return Bridge::shared_ptr();
+ }
+ }
- buffer.getShortString(host);
- port = buffer.getShort();
bool durable(buffer.getOctet());
buffer.getShortString(src);
buffer.getShortString(dest);
@@ -210,15 +252,21 @@ Bridge::shared_ptr Bridge::decode(LinkRe
bool dynamic(buffer.getOctet());
uint16_t sync = buffer.getShort();
- return links.declare(host, port, durable, src, dest, key,
- is_queue, is_local, id, excludes, dynamic, sync).first;
+ if (kind == ENCODED_IDENTIFIER_V1) {
+ /** previous versions did not provide a name for the bridge, so create one
+ */
+ name = createName(link->getName(), src, dest, key);
+ }
+
+ return links.declare(name, *link, durable, src, dest, key, is_queue,
+ is_local, id, excludes, dynamic, sync).first;
}
void Bridge::encode(Buffer& buffer) const
{
- buffer.putShortString(string("bridge"));
- buffer.putShortString(link->getHost());
- buffer.putShort(link->getPort());
+ buffer.putShortString(ENCODED_IDENTIFIER);
+ buffer.putShortString(name);
+ buffer.putShortString(link->getName());
buffer.putOctet(args.i_durable ? 1 : 0);
buffer.putShortString(args.i_src);
buffer.putShortString(args.i_dest);
@@ -233,9 +281,9 @@ void Bridge::encode(Buffer& buffer) cons
uint32_t Bridge::encodedSize() const
{
- return link->getHost().size() + 1 // short-string (host)
- + 7 // short-string ("bridge")
- + 2 // port
+ return ENCODED_IDENTIFIER.size() + 1 // +1 byte length
+ + name.size() + 1
+ + link->getName().size() + 1
+ 1 // durable
+ args.i_src.size() + 1
+ args.i_dest.size() + 1
@@ -259,7 +307,8 @@ management::Manageable::status_t Bridge:
{
if (methodId == _qmf::Bridge::METHOD_CLOSE) {
//notify that we are closed
- destroy();
+ QPID_LOG(debug, "Bridge::close() method called on bridge '" << name << "'");
+ close();
return management::Manageable::STATUS_OK;
} else {
return management::Manageable::STATUS_UNKNOWN_METHOD;
@@ -306,7 +355,7 @@ void Bridge::sendReorigin()
}
bool Bridge::resetProxy()
{
- SessionHandler& sessionHandler = conn->getChannel(id);
+ SessionHandler& sessionHandler = conn->getChannel(channel);
if (!sessionHandler.getSession()) peer.reset();
else peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out));
return peer.get();
@@ -318,7 +367,7 @@ void Bridge::ioThreadPropagateBinding(co
peer->getExchange().bind(queue, exchange, key, args);
} else {
QPID_LOG(error, "Cannot propagate binding for dynamic bridge as session has been detached, deleting dynamic bridge");
- destroy();
+ close();
}
}
@@ -333,8 +382,38 @@ const string& Bridge::getLocalTag() cons
return link->getBroker()->getFederationTag();
}
-void Bridge::sessionDetached() {
+// SessionHandler::ErrorListener methods.
+void Bridge::connectionException(
+ framing::connection::CloseCode code, const std::string& msg)
+{
+ if (errorListener) errorListener->connectionException(code, msg);
+}
+
+void Bridge::channelException(
+ framing::session::DetachCode code, const std::string& msg)
+{
+ if (errorListener) errorListener->channelException(code, msg);
+}
+
+void Bridge::executionException(
+ framing::execution::ErrorCode code, const std::string& msg)
+{
+ if (errorListener) errorListener->executionException(code, msg);
+}
+
+void Bridge::detach() {
detached = true;
+ if (errorListener) errorListener->detach();
+}
+
+std::string Bridge::createName(const std::string& linkName,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key)
+{
+ std::stringstream keystream;
+ keystream << linkName << "!" << src << "!" << dest << "!" << key;
+ return keystream.str();
}
}}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Bridge.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Bridge.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Bridge.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Bridge.h Fri Aug 3 12:13:32 2012
@@ -29,6 +29,7 @@
#include "qpid/framing/FieldTable.h"
#include "qpid/management/Manageable.h"
#include "qpid/broker/Exchange.h"
+#include "qpid/broker/SessionHandler.h"
#include "qmf/org/apache/qpid/broker/ArgsLinkBridge.h"
#include "qmf/org/apache/qpid/broker/Bridge.h"
@@ -43,29 +44,31 @@ class Connection;
class ConnectionState;
class Link;
class LinkRegistry;
-class SessionHandler;
class Bridge : public PersistableConfig,
public management::Manageable,
public Exchange::DynamicBridge,
+ public SessionHandler::ErrorListener,
public boost::enable_shared_from_this<Bridge>
{
-public:
+ public:
typedef boost::shared_ptr<Bridge> shared_ptr;
typedef boost::function<void(Bridge*)> CancellationListener;
typedef boost::function<void(Bridge&, SessionHandler&)> InitializeCallback;
- Bridge(Link* link, framing::ChannelId id, CancellationListener l,
+ Bridge(const std::string& name, Link* link, framing::ChannelId id, CancellationListener l,
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args,
- InitializeCallback init
+ InitializeCallback init, const std::string& queueName="",
+ const std::string& altExchange=""
);
~Bridge();
- void create(Connection& c);
- void cancel(Connection& c);
- void closed();
- void destroy();
+ QPID_BROKER_EXTERN void close();
bool isDurable() { return args.i_durable; }
+ Link *getLink() const { return link; }
+ const std::string getSrc() const { return args.i_src; }
+ const std::string getDest() const { return args.i_dest; }
+ const std::string getKey() const { return args.i_key; }
bool isDetached() const { return detached; }
@@ -80,7 +83,11 @@ public:
uint32_t encodedSize() const;
void encode(framing::Buffer& buffer) const;
const std::string& getName() const { return name; }
+
+ static const std::string ENCODED_IDENTIFIER;
+ static const std::string ENCODED_IDENTIFIER_V1;
static Bridge::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
+ static bool isEncodedBridge(const std::string& key);
// Exchange::DynamicBridge methods
void propagateBinding(const std::string& key, const std::string& tagList, const std::string& op, const std::string& origin, qpid::framing::FieldTable* extra_args=0);
@@ -93,10 +100,20 @@ public:
std::string getQueueName() const { return queueName; }
const qmf::org::apache::qpid::broker::ArgsLinkBridge& getArgs() { return args; }
-private:
- // Callback when the bridge's session is detached.
- void sessionDetached();
+ /** create a name for a bridge (if none supplied by user config) */
+ static std::string createName(const std::string& linkName,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key);
+
+ // SessionHandler::ErrorListener methods.
+ void connectionException(framing::connection::CloseCode code, const std::string& msg);
+ void channelException(framing::session::DetachCode, const std::string& msg);
+ void executionException(framing::execution::ErrorCode, const std::string& msg);
+ void detach();
+ void setErrorListener(boost::shared_ptr<ErrorListener> e) { errorListener = e; }
+ private:
struct PushHandler : framing::FrameHandler {
PushHandler(Connection* c) { conn = c; }
void handle(framing::AMQFrame& frame);
@@ -108,19 +125,30 @@ private:
std::auto_ptr<framing::AMQP_ServerProxy::Session> session;
std::auto_ptr<framing::AMQP_ServerProxy> peer;
- Link* link;
- framing::ChannelId id;
+ Link* const link;
+ const framing::ChannelId channel;
qmf::org::apache::qpid::broker::ArgsLinkBridge args;
qmf::org::apache::qpid::broker::Bridge* mgmtObject;
CancellationListener listener;
std::string name;
std::string queueName;
+ std::string altEx;
mutable uint64_t persistenceId;
ConnectionState* connState;
Connection* conn;
InitializeCallback initialize;
bool detached; // Set when session is detached.
bool resetProxy();
+
+ // connection Management (called by owning Link)
+ void create(Connection& c);
+ void cancel(Connection& c);
+ void closed();
+ friend class Link; // to call create, cancel, closed()
+ boost::shared_ptr<ErrorListener> errorListener;
+
+ const bool useExistingQueue;
+ const std::string sessionName;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org