You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/05 14:52:10 UTC
svn commit: r821779 [4/11] - in /qpid/branches/java-broker-0-10/qpid: ./
cpp/ cpp/bindings/qmf/ cpp/bindings/qmf/python/
cpp/bindings/qmf/python/qmf/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/
cpp/examples/messaging/ cpp/include/qmf/ cpp/include/qp...
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qmf/ValueImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qmf/ValueImpl.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qmf/ValueImpl.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qmf/ValueImpl.cpp Mon Oct 5 12:51:57 2009
@@ -67,6 +67,11 @@
}
}
+ValueImpl::ValueImpl(Value* e, Typecode t, Typecode at) :
+ envelope(e), typecode(t), valid(false), arrayTypecode(at)
+{
+}
+
ValueImpl::ValueImpl(Typecode t) : envelope(new Value(this)), typecode(t)
{
::memset(&value, 0, sizeof(value));
@@ -186,293 +191,64 @@
// Wrappers
//==================================================================
-Value::Value(Typecode t, Typecode at)
-{
- impl = new ValueImpl(this, t, at);
-}
-
-Value::Value(ValueImpl* i)
-{
- impl = i;
-}
-
-Value::~Value()
-{
- delete impl;
-}
-
-Typecode Value::getType() const
-{
- return impl->getType();
-}
-
-bool Value::isNull() const
-{
- return impl->isNull();
-}
-
-void Value::setNull()
-{
- impl->setNull();
-}
-
-bool Value::isObjectId() const
-{
- return impl->isObjectId();
-}
-
-const ObjectId& Value::asObjectId() const
-{
- return impl->asObjectId();
-}
-
-void Value::setObjectId(const ObjectId& oid)
-{
- impl->setObjectId(oid);
-}
-
-bool Value::isUint() const
-{
- return impl->isUint();
-}
-
-uint32_t Value::asUint() const
-{
- return impl->asUint();
-}
-
-void Value::setUint(uint32_t val)
-{
- impl->setUint(val);
-}
-
-bool Value::isInt() const
-{
- return impl->isInt();
-}
-
-int32_t Value::asInt() const
-{
- return impl->asInt();
-}
-
-void Value::setInt(int32_t val)
-{
- impl->setInt(val);
-}
-
-bool Value::isUint64() const
-{
- return impl->isUint64();
-}
-
-uint64_t Value::asUint64() const
-{
- return impl->asUint64();
-}
-
-void Value::setUint64(uint64_t val)
-{
- impl->setUint64(val);
-}
-
-bool Value::isInt64() const
-{
- return impl->isInt64();
-}
-
-int64_t Value::asInt64() const
-{
- return impl->asInt64();
-}
-
-void Value::setInt64(int64_t val)
-{
- impl->setInt64(val);
-}
-
-bool Value::isString() const
-{
- return impl->isString();
-}
-
-const char* Value::asString() const
-{
- return impl->asString();
-}
-
-void Value::setString(const char* val)
-{
- impl->setString(val);
-}
-
-bool Value::isBool() const
-{
- return impl->isBool();
-}
-
-bool Value::asBool() const
-{
- return impl->asBool();
-}
-
-void Value::setBool(bool val)
-{
- impl->setBool(val);
-}
-
-bool Value::isFloat() const
-{
- return impl->isFloat();
-}
-
-float Value::asFloat() const
-{
- return impl->asFloat();
-}
-
-void Value::setFloat(float val)
-{
- impl->setFloat(val);
-}
-
-bool Value::isDouble() const
-{
- return impl->isDouble();
-}
-
-double Value::asDouble() const
-{
- return impl->asDouble();
-}
-
-void Value::setDouble(double val)
-{
- impl->setDouble(val);
-}
-
-bool Value::isUuid() const
-{
- return impl->isUuid();
-}
-
-const uint8_t* Value::asUuid() const
-{
- return impl->asUuid();
-}
-
-void Value::setUuid(const uint8_t* val)
-{
- impl->setUuid(val);
-}
-
-bool Value::isObject() const
-{
- return impl->isObject();
-}
-
-Object* Value::asObject() const
-{
- return impl->asObject();
-}
-
-void Value::setObject(Object* val)
-{
- impl->setObject(val);
-}
-
-bool Value::isMap() const
-{
- return impl->isMap();
-}
-
-bool Value::keyInMap(const char* key) const
-{
- return impl->keyInMap(key);
-}
-
-Value* Value::byKey(const char* key)
-{
- return impl->byKey(key);
-}
-
-const Value* Value::byKey(const char* key) const
-{
- return impl->byKey(key);
-}
-
-void Value::deleteKey(const char* key)
-{
- impl->deleteKey(key);
-}
-
-void Value::insert(const char* key, Value* val)
-{
- impl->insert(key, val);
-}
-
-uint32_t Value::keyCount() const
-{
- return impl->keyCount();
-}
-
-const char* Value::key(uint32_t idx) const
-{
- return impl->key(idx);
-}
-
-bool Value::isList() const
-{
- return impl->isList();
-}
-
-uint32_t Value::listItemCount() const
-{
- return impl->listItemCount();
-}
-
-Value* Value::listItem(uint32_t idx)
-{
- return impl->listItem(idx);
-}
-
-void Value::appendToList(Value* val)
-{
- impl->appendToList(val);
-}
-
-void Value::deleteListItem(uint32_t idx)
-{
- impl->deleteListItem(idx);
-}
-
-bool Value::isArray() const
-{
- return impl->isArray();
-}
-
-Typecode Value::arrayType() const
-{
- return impl->arrayType();
-}
-
-uint32_t Value::arrayItemCount() const
-{
- return impl->arrayItemCount();
-}
-
-Value* Value::arrayItem(uint32_t idx)
-{
- return impl->arrayItem(idx);
-}
-
-void Value::appendToArray(Value* val)
-{
- impl->appendToArray(val);
-}
-
-void Value::deleteArrayItem(uint32_t idx)
-{
- impl->deleteArrayItem(idx);
-}
+Value::Value(const Value& from) : impl(new ValueImpl(*(from.impl))) {}
+Value::Value(Typecode t, Typecode at) : impl(new ValueImpl(this, t, at)) {}
+Value::Value(ValueImpl* i) : impl(i) {}
+Value::~Value() { delete impl; }
+
+Typecode Value::getType() const { return impl->getType(); }
+bool Value::isNull() const { return impl->isNull(); }
+void Value::setNull() { impl->setNull(); }
+bool Value::isObjectId() const { return impl->isObjectId(); }
+const ObjectId& Value::asObjectId() const { return impl->asObjectId(); }
+void Value::setObjectId(const ObjectId& oid) { impl->setObjectId(oid); }
+bool Value::isUint() const { return impl->isUint(); }
+uint32_t Value::asUint() const { return impl->asUint(); }
+void Value::setUint(uint32_t val) { impl->setUint(val); }
+bool Value::isInt() const { return impl->isInt(); }
+int32_t Value::asInt() const { return impl->asInt(); }
+void Value::setInt(int32_t val) { impl->setInt(val); }
+bool Value::isUint64() const { return impl->isUint64(); }
+uint64_t Value::asUint64() const { return impl->asUint64(); }
+void Value::setUint64(uint64_t val) { impl->setUint64(val); }
+bool Value::isInt64() const { return impl->isInt64(); }
+int64_t Value::asInt64() const { return impl->asInt64(); }
+void Value::setInt64(int64_t val) { impl->setInt64(val); }
+bool Value::isString() const { return impl->isString(); }
+const char* Value::asString() const { return impl->asString(); }
+void Value::setString(const char* val) { impl->setString(val); }
+bool Value::isBool() const { return impl->isBool(); }
+bool Value::asBool() const { return impl->asBool(); }
+void Value::setBool(bool val) { impl->setBool(val); }
+bool Value::isFloat() const { return impl->isFloat(); }
+float Value::asFloat() const { return impl->asFloat(); }
+void Value::setFloat(float val) { impl->setFloat(val); }
+bool Value::isDouble() const { return impl->isDouble(); }
+double Value::asDouble() const { return impl->asDouble(); }
+void Value::setDouble(double val) { impl->setDouble(val); }
+bool Value::isUuid() const { return impl->isUuid(); }
+const uint8_t* Value::asUuid() const { return impl->asUuid(); }
+void Value::setUuid(const uint8_t* val) { impl->setUuid(val); }
+bool Value::isObject() const { return impl->isObject(); }
+Object* Value::asObject() const { return impl->asObject(); }
+void Value::setObject(Object* val) { impl->setObject(val); }
+bool Value::isMap() const { return impl->isMap(); }
+bool Value::keyInMap(const char* key) const { return impl->keyInMap(key); }
+Value* Value::byKey(const char* key) { return impl->byKey(key); }
+const Value* Value::byKey(const char* key) const { return impl->byKey(key); }
+void Value::deleteKey(const char* key) { impl->deleteKey(key); }
+void Value::insert(const char* key, Value* val) { impl->insert(key, val); }
+uint32_t Value::keyCount() const { return impl->keyCount(); }
+const char* Value::key(uint32_t idx) const { return impl->key(idx); }
+bool Value::isList() const { return impl->isList(); }
+uint32_t Value::listItemCount() const { return impl->listItemCount(); }
+Value* Value::listItem(uint32_t idx) { return impl->listItem(idx); }
+void Value::appendToList(Value* val) { impl->appendToList(val); }
+void Value::deleteListItem(uint32_t idx) { impl->deleteListItem(idx); }
+bool Value::isArray() const { return impl->isArray(); }
+Typecode Value::arrayType() const { return impl->arrayType(); }
+uint32_t Value::arrayItemCount() const { return impl->arrayItemCount(); }
+Value* Value::arrayItem(uint32_t idx) { return impl->arrayItem(idx); }
+void Value::appendToArray(Value* val) { impl->appendToArray(val); }
+void Value::deleteArrayItem(uint32_t idx) { impl->deleteArrayItem(idx); }
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qmf/ValueImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qmf/ValueImpl.h?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qmf/ValueImpl.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qmf/ValueImpl.h Mon Oct 5 12:51:57 2009
@@ -60,8 +60,7 @@
uint8_t uuidVal[16];
} value;
- ValueImpl(Value* e, Typecode t, Typecode at) :
- envelope(e), typecode(t), valid(false), arrayTypecode(at) {}
+ ValueImpl(Value* e, Typecode t, Typecode at);
ValueImpl(Typecode t, qpid::framing::Buffer& b);
ValueImpl(Typecode t);
~ValueImpl();
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/acl/AclData.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/acl/AclData.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/acl/AclData.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/acl/AclData.cpp Mon Oct 5 12:51:57 2009
@@ -53,42 +53,65 @@
}
}
-AclResult AclData::lookup(const std::string& id, const Action& action, const ObjectType& objType, const std::string& name, std::map<Property, std::string>* params)
-{
- AclResult aclresult = decisionMode;
-
- if (actionList[action] && actionList[action][objType]){
- AclData::actObjItr itrRule = actionList[action][objType]->find(id);
- if (itrRule == actionList[action][objType]->end())
- itrRule = actionList[action][objType]->find("*");
- if (itrRule != actionList[action][objType]->end() ) {
-
- //loop the vector
- for (ruleSetItr i=itrRule->second.begin(); i<itrRule->second.end(); i++) {
-
- // loop the names looking for match
- bool match =true;
- for (propertyMapItr pMItr = i->props.begin(); (pMItr != i->props.end()) && match; pMItr++)
- {
- //match name is exists first
- if (pMItr->first == acl::PROP_NAME){
- if (!matchProp(pMItr->second, name)){
- match= false;
- }
- }else if (params){ //match pMItr against params
- propertyMapItr paramItr = params->find (pMItr->first);
- if (paramItr == params->end()){
- match = false;
- }else if (!matchProp(paramItr->second, pMItr->second)){
- match = false;
- }
+AclResult AclData::lookup(const std::string& id, const Action& action, const ObjectType& objType,
+ const std::string& name, std::map<Property, std::string>* params) {
+
+ QPID_LOG(debug, "ACL: Lookup for id:" << id << " action:" << AclHelper::getActionStr((Action) action)
+ << " objectType:" << AclHelper::getObjectTypeStr((ObjectType) objType) << " name:" << name
+ << " with params " << AclHelper::propertyMapToString(params));
+
+ AclResult aclresult = decisionMode;
+ if (actionList[action] && actionList[action][objType]) {
+ AclData::actObjItr itrRule = actionList[action][objType]->find(id);
+ if (itrRule == actionList[action][objType]->end())
+ itrRule = actionList[action][objType]->find("*");
+ if (itrRule != actionList[action][objType]->end()) {
+
+ QPID_LOG(debug, "ACL: checking the following rules for : " << itrRule->first );
+
+ //loop the vector
+ for (ruleSetItr i = itrRule->second.begin(); i < itrRule->second.end(); i++) {
+ QPID_LOG(debug, "ACL: checking rule " << i->toString());
+ // loop the names looking for match
+ bool match = true;
+ for (propertyMapItr pMItr = i->props.begin(); (pMItr != i->props.end()) && match; pMItr++) {
+ //match name is exists first
+ if (pMItr->first == acl::PROP_NAME) {
+ if (matchProp(pMItr->second, name)){
+ QPID_LOG(debug, "ACL: name '" << name << "' matched with name '"
+ << pMItr->second << "' given in the rule");
+ }else{
+ match = false;
+ QPID_LOG(debug, "ACL: name '" << name << "' didn't match with name '"
+ << pMItr->second << "' given in the rule");
+ }
+ } else if (params) { //match pMItr against params
+ propertyMapItr paramItr = params->find(pMItr->first);
+ if (paramItr == params->end()) {
+ match = false;
+ QPID_LOG(debug, "ACL: the given parameter map in lookup doesn't contain the property '"
+ << AclHelper::getPropertyStr(pMItr->first) << "'");
+ } else if (!matchProp(pMItr->second, paramItr->second)) {
+ QPID_LOG(debug, "ACL: the pair("
+ << AclHelper::getPropertyStr(paramItr->first) << "," << paramItr->second
+ << ") given in lookup doesn't match the pair("
+ << AclHelper::getPropertyStr(pMItr->first) << "," << pMItr->second << ") given in the rule");
+ match = false;
}
}
- if (match) return getACLResult(i->logOnly, i->log);
- }
- }
- }
- return aclresult;
+ }
+ if (match)
+ {
+ aclresult = getACLResult(i->logOnly, i->log);
+ QPID_LOG(debug,"Successful match, the decision is:" << AclHelper::getAclResultStr(aclresult));
+ return aclresult;
+ }
+ }
+ }
+ }
+
+ QPID_LOG(debug,"No successful match, defaulting to the decision mode " << AclHelper::getAclResultStr(aclresult));
+ return aclresult;
}
AclResult AclData::lookup(const std::string& id, const Action& action, const ObjectType& objType, const std::string& /*Exchange*/ name, const std::string& RoutingKey)
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/acl/AclData.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/acl/AclData.h?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/acl/AclData.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/acl/AclData.h Mon Oct 5 12:51:57 2009
@@ -22,7 +22,7 @@
#include "qpid/broker/AclModule.h"
#include <vector>
-
+#include <sstream>
namespace qpid {
namespace acl {
@@ -45,6 +45,16 @@
rule (propertyMap& p):log(false),logOnly(false),props(p) {};
+
+ std::string toString () const {
+ std::ostringstream ruleStr;
+ ruleStr << "[log=" << log << ", logOnly=" << logOnly << " props{";
+ for (propertyMapItr pMItr = props.begin(); pMItr != props.end(); pMItr++) {
+ ruleStr << " " << AclHelper::getPropertyStr((Property) pMItr-> first) << "=" << pMItr->second;
+ }
+ ruleStr << " }]";
+ return ruleStr.str();
+ }
};
typedef std::vector<rule> ruleSet;
typedef ruleSet::const_iterator ruleSetItr;
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/acl/AclReader.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/acl/AclReader.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/acl/AclReader.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/acl/AclReader.cpp Mon Oct 5 12:51:57 2009
@@ -83,115 +83,142 @@
return oss.str();
}
-void AclReader::loadDecisionData( boost::shared_ptr<AclData> d )
-{
- d->clear();
- QPID_LOG(debug, "ACL Load Rules");
- int cnt = rules.size();
+void AclReader::loadDecisionData(boost::shared_ptr<AclData> d) {
+ d->clear();
+ QPID_LOG(debug, "ACL Load Rules");
+ int cnt = rules.size();
bool foundmode = false;
- for (rlCitr i=rules.end()-1; cnt; i--,cnt--) {
- QPID_LOG(debug, "ACL Processing " << std::setfill(' ') << std::setw(2) << cnt << " " << (*i)->toString());
-
- if (!foundmode && (*i)->actionAll && (*i)->names.size()==1 && (*((*i)->names.begin())).compare("*")==0 ){
- d->decisionMode = (*i)->res;
- QPID_LOG(debug, "ACL FoundMode " << AclHelper::getAclResultStr(d->decisionMode));
- foundmode=true;
- }else{
- AclData::rule rule((*i)->props);
- bool addrule= true;
-
- switch ((*i)->res)
- {
- case qpid::acl::ALLOWLOG:
- rule.log = true;
- if (d->decisionMode == qpid::acl::ALLOW || d->decisionMode == qpid::acl::ALLOWLOG)
- rule.logOnly = true;
+
+ for (rlCitr i = rules.end() - 1; cnt; i--, cnt--) {
+ QPID_LOG(debug, "ACL Processing " << std::setfill(' ') << std::setw(2)
+ << cnt << " " << (*i)->toString());
+
+ if (!foundmode && (*i)->actionAll && (*i)->names.size() == 1
+ && (*((*i)->names.begin())).compare("*") == 0) {
+ d->decisionMode = (*i)->res;
+ QPID_LOG(debug, "ACL FoundMode " << AclHelper::getAclResultStr(
+ d->decisionMode));
+ foundmode = true;
+ } else {
+ AclData::rule rule((*i)->props);
+ bool addrule = true;
+
+ switch ((*i)->res) {
+ case qpid::acl::ALLOWLOG:
+ rule.log = true;
+ if (d->decisionMode == qpid::acl::ALLOW || d->decisionMode
+ == qpid::acl::ALLOWLOG)
+ rule.logOnly = true;
+ break;
+ case qpid::acl::ALLOW:
+ if (d->decisionMode == qpid::acl::ALLOW || d->decisionMode
+ == qpid::acl::ALLOWLOG)
+ addrule = false;
+ break;
+ case qpid::acl::DENYLOG:
+ rule.log = true;
+ if (d->decisionMode == qpid::acl::DENY || d->decisionMode
+ == qpid::acl::DENYLOG)
+ rule.logOnly = true;
+ break;
+ case qpid::acl::DENY:
+ if (d->decisionMode == qpid::acl::DENY || d->decisionMode
+ == qpid::acl::DENYLOG)
+ addrule = false;
break;
- case qpid::acl::ALLOW:
- if (d->decisionMode == qpid::acl::ALLOW || d->decisionMode == qpid::acl::ALLOWLOG)
- addrule = false;
- break;
- case qpid::acl::DENYLOG:
- rule.log = true;
- if (d->decisionMode == qpid::acl::DENY || d->decisionMode == qpid::acl::DENYLOG)
- rule.logOnly = true;
- break;
- case qpid::acl::DENY:
- if (d->decisionMode == qpid::acl::DENY || d->decisionMode == qpid::acl::DENYLOG)
- addrule = false;
- break;
- default:
- throw Exception("Invalid ACL Result loading rules.");
- }
-
-
- // Action -> Object -> map<user -> set<Rule> >
- if (addrule){
- for (int acnt= ((*i)->actionAll?0:(*i)->action);
- acnt< acl::ACTIONSIZE; (*i)->actionAll?acnt++:acnt=acl::ACTIONSIZE ) {
-
- if (acnt == acl::ACT_PUBLISH) d->transferAcl = true; // we have transfer ACL
-
- QPID_LOG(debug, "ACL Adding action:" << AclHelper::getActionStr((Action)acnt) );
-
- //find the Action, create if not exist
- if (d->actionList[acnt]==NULL) {
- d->actionList[acnt] = new AclData::aclAction[qpid::acl::OBJECTSIZE];
- for (int j=0;j<qpid::acl::OBJECTSIZE; j++)
- d->actionList[acnt][j] = NULL;
- }
+ default:
+ throw Exception("Invalid ACL Result loading rules.");
+ }
+
+ // Action -> Object -> map<user -> set<Rule> >
+ if (addrule) {
+ std::ostringstream actionstr;
+ for (int acnt = ((*i)->actionAll ? 0 : (*i)->action); acnt
+ < acl::ACTIONSIZE; (*i)->actionAll ? acnt++ : acnt
+ = acl::ACTIONSIZE) {
+
+ if (acnt == acl::ACT_PUBLISH)
+ d->transferAcl = true; // we have transfer ACL
+
+ actionstr << AclHelper::getActionStr((Action) acnt) << ",";
+
+ //find the Action, create if not exist
+ if (d->actionList[acnt] == NULL) {
+ d->actionList[acnt]
+ = new AclData::aclAction[qpid::acl::OBJECTSIZE];
+ for (int j = 0; j < qpid::acl::OBJECTSIZE; j++)
+ d->actionList[acnt][j] = NULL;
+ }
// optimize this loop to limit to valid options only!!
- for (int ocnt= ((*i)->objStatus!=aclRule::VALUE ?0:(*i)->object);
- ocnt< acl::OBJECTSIZE;
- (*i)->objStatus!=aclRule::VALUE?ocnt++:ocnt=acl::OBJECTSIZE ) {
-
- QPID_LOG(debug, "ACL Adding object:" << AclHelper::getObjectTypeStr((ObjectType)ocnt) );
-
- //find the Object, create if not exist
- if (d->actionList[acnt][ocnt] == NULL)
- d->actionList[acnt][ocnt] = new AclData::actionObject;
-
- // add users and Rule to object set
- bool allNames=false;
- // check to see if names.begin is '*'
- if ( (*(*i)->names.begin()).compare("*")==0 ) allNames = true;
-
- for (nsCitr itr = (allNames?names.begin():(*i)->names.begin());
- itr != (allNames?names.end():(*i)->names.end()); itr++) {
- AclData::actObjItr itrRule = d->actionList[acnt][ocnt]->find(*itr);
- if (itrRule == d->actionList[acnt][ocnt]->end()) {
- QPID_LOG(debug, "ACL Adding rule & user:" << *itr);
- AclData::ruleSet rSet;
- rSet.push_back(rule);
- d->actionList[acnt][ocnt]->insert(make_pair( std::string(*itr) , rSet) );
- }else{
-
- // TODO add code to check for dead rules
- // allow peter create queue name=tmp <-- dead rule!!
- // allow peter create queue
-
- itrRule->second.push_back(rule);
- QPID_LOG(debug, "ACL Adding rule to user:" << *itr);
- }
- }
-
- }
-
- }
- }else{
- QPID_LOG(debug, "ACL Skipping based on Mode:" << AclHelper::getAclResultStr(d->decisionMode) );
- }
- }
-
- }
+ for (int ocnt = ((*i)->objStatus != aclRule::VALUE ? 0
+ : (*i)->object); ocnt < acl::OBJECTSIZE; (*i)->objStatus
+ != aclRule::VALUE ? ocnt++ : ocnt = acl::OBJECTSIZE) {
+
+ //find the Object, create if not exist
+ if (d->actionList[acnt][ocnt] == NULL)
+ d->actionList[acnt][ocnt]
+ = new AclData::actionObject;
+
+ // add users and Rule to object set
+ bool allNames = false;
+ // check to see if names.begin is '*'
+ if ((*(*i)->names.begin()).compare("*") == 0)
+ allNames = true;
+
+ for (nsCitr itr = (allNames ? names.begin()
+ : (*i)->names.begin()); itr
+ != (allNames ? names.end() : (*i)->names.end()); itr++) {
+
+ AclData::actObjItr itrRule =
+ d->actionList[acnt][ocnt]->find(*itr);
+
+ if (itrRule == d->actionList[acnt][ocnt]->end()) {
+ AclData::ruleSet rSet;
+ rSet.push_back(rule);
+ d->actionList[acnt][ocnt]->insert(make_pair(
+ std::string(*itr), rSet));
+ } else {
+
+ // TODO add code to check for dead rules
+ // allow peter create queue name=tmp <-- dead rule!!
+ // allow peter create queue
+
+ itrRule->second.push_back(rule);
+ }
+ }
+
+ }
+ }
+
+ std::ostringstream objstr;
+ for (int ocnt = ((*i)->objStatus != aclRule::VALUE ? 0 : (*i)->object); ocnt < acl::OBJECTSIZE;
+ (*i)->objStatus != aclRule::VALUE ? ocnt++ : ocnt = acl::OBJECTSIZE) {
+ objstr << AclHelper::getObjectTypeStr((ObjectType) ocnt) << ",";
+ }
+
+ bool allNames = ((*(*i)->names.begin()).compare("*") == 0);
+ std::ostringstream userstr;
+ for (nsCitr itr = (allNames ? names.begin() : (*i)->names.begin());
+ itr != (allNames ? names.end() : (*i)->names.end()); itr++) {
+ userstr << *itr << ",";
+ }
+
+ QPID_LOG(debug,"ACL: Adding actions {" << actionstr.str().substr(0,actionstr.str().length()-1)
+ << "} to objects {" << objstr.str().substr(0,objstr.str().length()-1)
+ << "} with props " << AclHelper::propertyMapToString(&rule.props)
+ << " for users {" << userstr.str().substr(0,userstr.str().length()-1) << "}" );
+ } else {
+ QPID_LOG(debug, "ACL Skipping based on Mode:"
+ << AclHelper::getAclResultStr(d->decisionMode));
+ }
+ }
+ }
}
-
-
void AclReader::aclRule::processName(const std::string& name, const groupMap& groups) {
if (name.compare("all") == 0) {
names.insert("*");
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Mon Oct 5 12:51:57 2009
@@ -435,8 +435,8 @@
} else {
if ((iter->second->getPackageName() != packageName) ||
(iter->second->getClassName() != className)) {
- outBuffer.putLong (Manageable::STATUS_INVALID_PARAMETER);
- outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER));
+ outBuffer.putLong (Manageable::STATUS_PARAMETER_INVALID);
+ outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_PARAMETER_INVALID));
}
else
try {
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/AclModule.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/AclModule.h?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/AclModule.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/AclModule.h Mon Oct 5 12:51:57 2009
@@ -26,7 +26,7 @@
#include <map>
#include <set>
#include <string>
-
+#include <sstream>
namespace qpid {
@@ -179,6 +179,8 @@
typedef std::map<ObjectType, actionMapPtr> objectMap;
typedef objectMap::const_iterator omCitr;
typedef boost::shared_ptr<objectMap> objectMapPtr;
+ typedef std::map<Property, std::string> propMap;
+ typedef propMap::const_iterator propMapItr;
// This map contains the legal combinations of object/action/properties found in an ACL file
static void loadValidationMap(objectMapPtr& map) {
@@ -248,6 +250,19 @@
map->insert(objectPair(OBJ_METHOD, a4));
}
+
+ static std::string propertyMapToString(const std::map<Property, std::string>* params) {
+ std::ostringstream ss;
+ ss << "{";
+ if (params)
+ {
+ for (propMapItr pMItr = params->begin(); pMItr != params->end(); pMItr++) {
+ ss << " " << getPropertyStr((Property) pMItr-> first) << "=" << pMItr->second;
+ }
+ }
+ ss << " }";
+ return ss.str();
+ }
};
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Broker.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Broker.cpp Mon Oct 5 12:51:57 2009
@@ -386,7 +386,7 @@
if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty))
status = Manageable::STATUS_OK;
else
- return Manageable::STATUS_INVALID_PARAMETER;
+ return Manageable::STATUS_PARAMETER_INVALID;
break;
}
default:
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Connection.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Connection.cpp Mon Oct 5 12:51:57 2009
@@ -269,11 +269,13 @@
cb(); // Lend the IO thread for management processing
}
}
- if (mgmtClosing)
+ if (mgmtClosing) {
+ closed();
close(connection::CLOSE_CODE_CONNECTION_FORCED, "Closed by Management Request");
- else
+ } else {
//then do other output as needed:
return outputTasks.doOutput();
+ }
}catch(ConnectionException& e){
close(e.code, e.getMessage());
}catch(std::exception& e){
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Exchange.cpp Mon Oct 5 12:51:57 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -38,7 +38,7 @@
using qpid::management::Args;
namespace _qmf = qmf::org::apache::qpid::broker;
-namespace
+namespace
{
const std::string qpidMsgSequence("qpid.msg_sequence");
const std::string qpidSequenceCounter("qpid.sequence_counter");
@@ -51,17 +51,19 @@
const std::string fedOpUnbind("U");
const std::string fedOpReorigin("R");
const std::string fedOpHello("H");
+
+const std::string QPID_MANAGEMENT("qpid.management");
}
Exchange::PreRoute::PreRoute(Deliverable& msg, Exchange* _p):parent(_p) {
if (parent){
if (parent->sequence || parent->ive) parent->sequenceLock.lock();
-
+
if (parent->sequence){
parent->sequenceNo++;
- msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo);
- }
+ msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo);
+ }
if (parent->ive) {
parent->lastMsg = &( msg.getMessage());
}
@@ -99,11 +101,9 @@
}
}
-static const std::string QPID_MANAGEMENT("qpid.management");
-
Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
Manageable* parent, Broker* b)
- : name(_name), durable(_durable), alternateUsers(0), persistenceId(0),
+ : name(_name), durable(_durable), alternateUsers(0), persistenceId(0),
args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), broker(b)
{
if (parent != 0 && broker != 0)
@@ -169,7 +169,7 @@
string name;
string type;
FieldTable args;
-
+
buffer.getShortString(name);
bool durable(buffer.getOctet());
buffer.getShortString(type);
@@ -185,7 +185,7 @@
}
}
-void Exchange::encode(Buffer& buffer) const
+void Exchange::encode(Buffer& buffer) const
{
buffer.putShortString(name);
buffer.putOctet(durable);
@@ -195,8 +195,8 @@
buffer.put(args);
}
-uint32_t Exchange::encodedSize() const
-{
+uint32_t Exchange::encodedSize() const
+{
return name.size() + 1/*short string size*/
+ 1 /*durable*/
+ getType().size() + 1/*short string size*/
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Link.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Link.cpp Mon Oct 5 12:51:57 2009
@@ -200,8 +200,10 @@
// Move the bridges to be deleted into a local vector so there is no
// corruption of the iterator caused by bridge deletion.
- for (Bridges::iterator i = active.begin(); i != active.end(); i++)
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ (*i)->closed();
toDelete.push_back(*i);
+ }
active.clear();
for (Bridges::iterator i = created.begin(); i != created.end(); i++)
@@ -264,7 +266,6 @@
}
if (!cancellations.empty()) {
for (Bridges::iterator i = cancellations.begin(); i != cancellations.end(); ++i) {
- active.push_back(*i);
(*i)->cancel(*connection);
}
cancellations.clear();
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Message.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Message.cpp Mon Oct 5 12:51:57 2009
@@ -362,7 +362,7 @@
Replacement::iterator i = replacement.find(qfor);
if (i != replacement.end()){
return i->second;
- }
+ }
return empty;
}
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Message.h?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Message.h Mon Oct 5 12:51:57 2009
@@ -34,12 +34,12 @@
#include <vector>
namespace qpid {
-
+
namespace framing {
class FieldTable;
class SequenceNumber;
}
-
+
namespace broker {
class ConnectionToken;
class Exchange;
@@ -145,9 +145,9 @@
bool isExcluded(const std::vector<std::string>& excludes) const;
void addTraceId(const std::string& id);
-
- void forcePersistent();
- bool isForcedPersistent();
+
+ void forcePersistent();
+ bool isForcedPersistent();
boost::intrusive_ptr<Message>& getReplacementMessage(const Queue* qfor) const;
void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor);
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/MessageBuilder.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/MessageBuilder.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/MessageBuilder.cpp Mon Oct 5 12:51:57 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,14 +30,14 @@
using namespace qpid::broker;
using namespace qpid::framing;
-namespace
+namespace
{
std::string type_str(uint8_t type);
+ const std::string QPID_MANAGEMENT("qpid.management");
}
-MessageBuilder::MessageBuilder(MessageStore* const _store, uint64_t _stagingThreshold) :
- state(DORMANT), store(_store), stagingThreshold(_stagingThreshold), staging(false) {}
-static const std::string QPID_MANAGEMENT("qpid.management");
+MessageBuilder::MessageBuilder(MessageStore* const _store, uint64_t _stagingThreshold) :
+ state(DORMANT), store(_store), stagingThreshold(_stagingThreshold), staging(false) {}
void MessageBuilder::handle(AMQFrame& frame)
{
@@ -54,10 +54,10 @@
AMQFrame header((AMQHeaderBody()));
header.setBof(false);
header.setEof(false);
- message->getFrames().append(header);
+ message->getFrames().append(header);
} else if (type != HEADER_BODY) {
throw CommandInvalidException(
- QPID_MSG("Invalid frame sequence for message, expected header or content got "
+ QPID_MSG("Invalid frame sequence for message, expected header or content got "
<< type_str(type) << ")"));
}
state = CONTENT;
@@ -74,13 +74,13 @@
} else {
message->getFrames().append(frame);
//have we reached the staging limit? if so stage message and release content
- if (state == CONTENT
- && stagingThreshold
+ if (state == CONTENT
+ && stagingThreshold
&& message->getFrames().getContentSize() >= stagingThreshold
&& !NullMessageStore::isNullStore(store)
- && message->getExchangeName() != QPID_MANAGEMENT /* don't stage mgnt messages */)
+ && message->getExchangeName() != QPID_MANAGEMENT /* don't stage mgnt messages */)
{
- message->releaseContent(store);
+ message->releaseContent(store);
staging = true;
}
}
@@ -108,7 +108,7 @@
const std::string HEARTBEAT_BODY_S = "HEARTBEAT";
const std::string UNKNOWN = "unknown";
-std::string type_str(uint8_t type)
+std::string type_str(uint8_t type)
{
switch(type) {
case METHOD_BODY: return METHOD_BODY_S;
@@ -124,7 +124,7 @@
void MessageBuilder::checkType(uint8_t expected, uint8_t actual)
{
if (expected != actual) {
- throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (expected "
+ throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (expected "
<< type_str(expected) << " got " << type_str(actual) << ")"));
}
}
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/PersistableMessage.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/PersistableMessage.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/PersistableMessage.cpp Mon Oct 5 12:51:57 2009
@@ -62,7 +62,7 @@
void PersistableMessage::setContentReleased() {contentReleased = true; }
bool PersistableMessage::isContentReleased()const { return contentReleased; }
-
+
bool PersistableMessage::isEnqueueComplete() {
sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
return asyncEnqueueCounter == 0;
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/PersistableMessage.h Mon Oct 5 12:51:57 2009
@@ -46,7 +46,7 @@
sys::Mutex asyncEnqueueLock;
sys::Mutex asyncDequeueLock;
sys::Mutex storeLock;
-
+
/**
* Tracks the number of outstanding asynchronous enqueue
* operations. When the message is enqueued asynchronously the
@@ -97,7 +97,7 @@
void flush();
bool isContentReleased() const;
-
+
QPID_BROKER_EXTERN bool isEnqueueComplete();
QPID_BROKER_EXTERN void enqueueComplete();
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Queue.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Queue.cpp Mon Oct 5 12:51:57 2009
@@ -597,7 +597,7 @@
Mutex::ScopedUnlock u(messageLock);
dequeue(0, QueuedMessage(qm.queue, old, qm.position));
}
- }
+ }
}else {
messages.push_back(qm);
listeners.populate(copy);
@@ -702,7 +702,7 @@
if (inLastNodeFailure && persistLastNode){
msg->forcePersistent();
}
-
+
if (traceId.size()) {
msg->addTraceId(traceId);
}
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/QueueRegistry.h?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/QueueRegistry.h Mon Oct 5 12:51:57 2009
@@ -111,7 +111,7 @@
/** Call f for each queue in the registry. */
template <class F> void eachQueue(F f) const {
- qpid::sys::RWlock::ScopedWlock l(lock);
+ qpid::sys::RWlock::ScopedRlock l(lock);
for (QueueMap::const_iterator i = queues.begin(); i != queues.end(); ++i)
f(i->second);
}
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Mon Oct 5 12:51:57 2009
@@ -195,7 +195,7 @@
{
queue->setPersistenceId(id);
}
-
+
uint64_t RecoverableQueueImpl::getPersistenceId() const
{
return queue->getPersistenceId();
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Mon Oct 5 12:51:57 2009
@@ -72,7 +72,7 @@
params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? _TRUE : _FALSE) ));
params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,¶ms) )
- throw NotAllowedException(QPID_MSG("ACL denied exhange declare request from " << getConnection().getUserId()));
+ throw NotAllowedException(QPID_MSG("ACL denied exchange declare request from " << getConnection().getUserId()));
}
//TODO: implement autoDelete
@@ -121,9 +121,11 @@
void SessionAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate)
{
- if (alternate && alternate != exchange->getAlternate())
+ if (alternate && ((exchange->getAlternate() && alternate != exchange->getAlternate())
+ || !exchange->getAlternate()))
throw NotAllowedException(QPID_MSG("Exchange declared with alternate-exchange "
- << exchange->getAlternate()->getName() << ", requested "
+ << (exchange->getAlternate() ? exchange->getAlternate()->getName() : "<nonexistent>")
+ << ", requested "
<< alternate->getName()));
}
@@ -132,7 +134,7 @@
AclModule* acl = getBroker().getAcl();
if (acl) {
if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) )
- throw NotAllowedException(QPID_MSG("ACL denied exhange delete request from " << getConnection().getUserId()));
+ throw NotAllowedException(QPID_MSG("ACL denied exchange delete request from " << getConnection().getUserId()));
}
//TODO: implement unused
@@ -152,7 +154,7 @@
AclModule* acl = getBroker().getAcl();
if (acl) {
if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,name,NULL) )
- throw NotAllowedException(QPID_MSG("ACL denied exhange query request from " << getConnection().getUserId()));
+ throw NotAllowedException(QPID_MSG("ACL denied exchange query request from " << getConnection().getUserId()));
}
try {
@@ -169,8 +171,12 @@
{
AclModule* acl = getBroker().getAcl();
if (acl) {
- if (!acl->authorise(getConnection().getUserId(),acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,routingKey) )
- throw NotAllowedException(QPID_MSG("ACL denied exhange bind request from " << getConnection().getUserId()));
+ std::map<acl::Property, std::string> params;
+ params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
+ params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey));
+
+ if (!acl->authorise(getConnection().getUserId(),acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,¶ms))
+ throw NotAllowedException(QPID_MSG("ACL denied exchange bind request from " << getConnection().getUserId()));
}
Queue::shared_ptr queue = getQueue(queueName);
@@ -232,8 +238,8 @@
std::map<acl::Property, std::string> params;
params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
params.insert(make_pair(acl::PROP_ROUTINGKEY, key));
- if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchangeName,¶ms) )
- throw NotAllowedException(QPID_MSG("ACL denied exhange bound request from " << getConnection().getUserId()));
+ if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,exchangeName,¶ms) )
+ throw NotAllowedException(QPID_MSG("ACL denied exchange bound request from " << getConnection().getUserId()));
}
Exchange::shared_ptr exchange;
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/Dispatcher.cpp Mon Oct 5 12:51:57 2009
@@ -29,7 +29,14 @@
#include "qpid/client/Message.h"
#include "qpid/client/MessageImpl.h"
-#include <boost/state_saver.hpp>
+#include <boost/version.hpp>
+#if (BOOST_VERSION >= 104000)
+# include <boost/serialization/state_saver.hpp>
+ using boost::serialization::state_saver;
+#else
+# include <boost/state_saver.hpp>
+ using boost::state_saver;
+#endif /* BOOST_VERSION */
using qpid::framing::FrameSet;
using qpid::framing::MessageTransferBody;
@@ -65,7 +72,7 @@
Mutex::ScopedLock l(lock);
if (running)
throw Exception("Dispatcher is already running.");
- boost::state_saver<bool> reset(running); // Reset to false on exit.
+ state_saver<bool> reset(running); // Reset to false on exit.
running = true;
try {
while (!queue->isClosed()) {
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Mon Oct 5 12:51:57 2009
@@ -22,6 +22,7 @@
#include "qpid/client/amqp0_10/Codecs.h"
#include "qpid/client/amqp0_10/MessageSource.h"
#include "qpid/client/amqp0_10/MessageSink.h"
+#include "qpid/client/amqp0_10/OutgoingMessage.h"
#include "qpid/messaging/Address.h"
#include "qpid/messaging/Filter.h"
#include "qpid/messaging/Message.h"
@@ -122,7 +123,7 @@
bool passive = true, const std::string& type = EMPTY_STRING, bool durable = false,
const FieldTable& options = EMPTY_FIELD_TABLE);
void declare(qpid::client::AsyncSession& session, const std::string& name);
- void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message);
+ void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message);
void cancel(qpid::client::AsyncSession& session, const std::string& name);
private:
const std::string name;
@@ -139,7 +140,7 @@
QueueSink(const std::string& name, bool passive=true, bool exclusive=false,
bool autoDelete=false, bool durable=false, const FieldTable& options = EMPTY_FIELD_TABLE);
void declare(qpid::client::AsyncSession& session, const std::string& name);
- void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message);
+ void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message);
void cancel(qpid::client::AsyncSession& session, const std::string& name);
private:
const std::string name;
@@ -328,14 +329,12 @@
}
}
-void Exchange::send(qpid::client::AsyncSession& session, const std::string&, qpid::messaging::Message& m)
+void Exchange::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
{
- qpid::client::Message message;
- convert(m, message);
- if (message.getDeliveryProperties().getRoutingKey().empty() && !defaultSubject.empty()) {
- message.getDeliveryProperties().setRoutingKey(defaultSubject);
+ if (m.message.getDeliveryProperties().getRoutingKey().empty() && !defaultSubject.empty()) {
+ m.message.getDeliveryProperties().setRoutingKey(defaultSubject);
}
- session.messageTransfer(arg::destination=name, arg::content=message);
+ m.status = session.messageTransfer(arg::destination=name, arg::content=m.message);
}
void Exchange::cancel(qpid::client::AsyncSession&, const std::string&) {}
@@ -355,12 +354,10 @@
arg::autoDelete=autoDelete, arg::arguments=options);
}
}
-void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, qpid::messaging::Message& m)
+void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
{
- qpid::client::Message message;
- convert(m, message);
- message.getDeliveryProperties().setRoutingKey(name);
- session.messageTransfer(arg::content=message);
+ m.message.getDeliveryProperties().setRoutingKey(name);
+ m.status = session.messageTransfer(arg::content=m.message);
}
void QueueSink::cancel(qpid::client::AsyncSession&, const std::string&) {}
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h Mon Oct 5 12:51:57 2009
@@ -31,8 +31,8 @@
}
namespace messaging {
-class Address;
-class Filter;
+struct Address;
+struct Filter;
}
namespace client {
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/Codecs.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/Codecs.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/Codecs.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/Codecs.cpp Mon Oct 5 12:51:57 2009
@@ -183,25 +183,25 @@
{
boost::shared_ptr<FieldValue> out;
switch (in.getType()) {
- case VOID: out = boost::shared_ptr<FieldValue>(new VoidValue()); break;
- case BOOL: out = boost::shared_ptr<FieldValue>(new BoolValue(in.asBool())); break;
- case UINT8: out = boost::shared_ptr<FieldValue>(new Unsigned8Value(in.asUint8())); break;
- case UINT16: out = boost::shared_ptr<FieldValue>(new Unsigned16Value(in.asUint16())); break;
- case UINT32: out = boost::shared_ptr<FieldValue>(new Unsigned32Value(in.asUint32())); break;
- case UINT64: out = boost::shared_ptr<FieldValue>(new Unsigned64Value(in.asUint64())); break;
- case INT8: out = boost::shared_ptr<FieldValue>(new Integer8Value(in.asInt8())); break;
- case INT16: out = boost::shared_ptr<FieldValue>(new Integer16Value(in.asInt16())); break;
- case INT32: out = boost::shared_ptr<FieldValue>(new Integer32Value(in.asInt32())); break;
- case INT64: out = boost::shared_ptr<FieldValue>(new Integer64Value(in.asInt64())); break;
- case FLOAT: out = boost::shared_ptr<FieldValue>(new FloatValue(in.asFloat())); break;
- case DOUBLE: out = boost::shared_ptr<FieldValue>(new DoubleValue(in.asDouble())); break;
+ case VAR_VOID: out = boost::shared_ptr<FieldValue>(new VoidValue()); break;
+ case VAR_BOOL: out = boost::shared_ptr<FieldValue>(new BoolValue(in.asBool())); break;
+ case VAR_UINT8: out = boost::shared_ptr<FieldValue>(new Unsigned8Value(in.asUint8())); break;
+ case VAR_UINT16: out = boost::shared_ptr<FieldValue>(new Unsigned16Value(in.asUint16())); break;
+ case VAR_UINT32: out = boost::shared_ptr<FieldValue>(new Unsigned32Value(in.asUint32())); break;
+ case VAR_UINT64: out = boost::shared_ptr<FieldValue>(new Unsigned64Value(in.asUint64())); break;
+ case VAR_INT8: out = boost::shared_ptr<FieldValue>(new Integer8Value(in.asInt8())); break;
+ case VAR_INT16: out = boost::shared_ptr<FieldValue>(new Integer16Value(in.asInt16())); break;
+ case VAR_INT32: out = boost::shared_ptr<FieldValue>(new Integer32Value(in.asInt32())); break;
+ case VAR_INT64: out = boost::shared_ptr<FieldValue>(new Integer64Value(in.asInt64())); break;
+ case VAR_FLOAT: out = boost::shared_ptr<FieldValue>(new FloatValue(in.asFloat())); break;
+ case VAR_DOUBLE: out = boost::shared_ptr<FieldValue>(new DoubleValue(in.asDouble())); break;
//TODO: check encoding (and length?) when deciding what AMQP type to treat string as
- case STRING: out = boost::shared_ptr<FieldValue>(new Str16Value(in.asString())); break;
- case MAP:
+ case VAR_STRING: out = boost::shared_ptr<FieldValue>(new Str16Value(in.asString())); break;
+ case VAR_MAP:
//out = boost::shared_ptr<FieldValue>(toFieldValueCollection<FieldTableValue>(in.asMap(), &toFieldTableEntry));
out = boost::shared_ptr<FieldValue>(toFieldTableValue(in.asMap()));
break;
- case LIST:
+ case VAR_LIST:
//out = boost::shared_ptr<FieldValue>(toFieldValueCollection<ListValue>(in.asList(), &toFieldValue));
out = boost::shared_ptr<FieldValue>(toListValue(in.asList()));
break;
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Mon Oct 5 12:51:57 2009
@@ -21,14 +21,16 @@
#include "ConnectionImpl.h"
#include "SessionImpl.h"
#include "qpid/messaging/Session.h"
-#include "qpid/client/ConnectionSettings.h"
+#include "qpid/client/PrivateImplRef.h"
#include "qpid/log/Statement.h"
+#include <boost/intrusive_ptr.hpp>
namespace qpid {
namespace client {
namespace amqp0_10 {
using qpid::messaging::Variant;
+using namespace qpid::sys;
template <class T> void setIfFound(const Variant::Map& map, const std::string& key, T& value)
{
@@ -56,24 +58,124 @@
setIfFound(from, "bounds", to.bounds);
}
-ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options)
+ConnectionImpl::ConnectionImpl(const std::string& u, const Variant::Map& options) :
+ url(u), reconnectionEnabled(true), timeout(-1),
+ minRetryInterval(1), maxRetryInterval(30)
{
QPID_LOG(debug, "Opening connection to " << url << " with " << options);
- Url u(url);
- ConnectionSettings settings;
convert(options, settings);
- connection.open(u, settings);
+ setIfFound(options, "reconnection-enabled", reconnectionEnabled);
+ setIfFound(options, "reconnection-timeout", timeout);
+ setIfFound(options, "min-retry-interval", minRetryInterval);
+ setIfFound(options, "max-retry-interval", maxRetryInterval);
+ connection.open(url, settings);
}
void ConnectionImpl::close()
{
+ qpid::sys::Mutex::ScopedLock l(lock);
connection.close();
}
+boost::intrusive_ptr<SessionImpl> getImplPtr(qpid::messaging::Session& session)
+{
+ return boost::dynamic_pointer_cast<SessionImpl>(
+ qpid::client::PrivateImplRef<qpid::messaging::Session>::get(session)
+ );
+}
+
+void ConnectionImpl::closed(SessionImpl& s)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+ if (getImplPtr(*i).get() == &s) {
+ sessions.erase(i);
+ break;
+ }
+ }
+}
+
qpid::messaging::Session ConnectionImpl::newSession()
{
- qpid::messaging::Session impl(new SessionImpl(connection.newSession()));
+ qpid::messaging::Session impl(new SessionImpl(*this));
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ sessions.push_back(impl);
+ }
+ try {
+ getImplPtr(impl)->setSession(connection.newSession());
+ } catch (const TransportFailure&) {
+ reconnect();
+ }
return impl;
}
+void ConnectionImpl::reconnect()
+{
+ AbsTime start = now();
+ ScopedLock<Semaphore> l(semaphore);
+ if (!connection.isOpen()) connect(start);
+}
+
+bool expired(const AbsTime& start, int timeout)
+{
+ if (timeout == 0) return true;
+ if (timeout < 0) return false;
+ Duration used(start, now());
+ Duration allowed = timeout * TIME_SEC;
+ return allowed > used;
+}
+
+void ConnectionImpl::connect(const AbsTime& started)
+{
+ for (int i = minRetryInterval; !tryConnect(); i = std::min(i * 2, maxRetryInterval)) {
+ if (expired(started, timeout)) throw TransportFailure();
+ else qpid::sys::sleep(i);
+ }
+}
+
+bool ConnectionImpl::tryConnect()
+{
+ if (tryConnect(url) || tryConnect(connection.getKnownBrokers())) {
+ return resetSessions();
+ } else {
+ return false;
+ }
+}
+
+bool ConnectionImpl::tryConnect(const Url& u)
+{
+ try {
+ QPID_LOG(info, "Trying to connect to " << url << "...");
+ connection.open(u, settings);
+ return true;
+ } catch (const Exception& e) {
+ //TODO: need to fix timeout on open so that it throws TransportFailure
+ QPID_LOG(info, "Failed to connect to " << u << ": " << e.what());
+ }
+ return false;
+}
+
+bool ConnectionImpl::tryConnect(const std::vector<Url>& urls)
+{
+ for (std::vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
+ if (tryConnect(*i)) return true;
+ }
+ return false;
+}
+
+bool ConnectionImpl::resetSessions()
+{
+ try {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+ getImplPtr(*i)->setSession(connection.newSession());
+ }
+ return true;
+ } catch (const TransportFailure&) {
+ QPID_LOG(debug, "Connection failed while re-inialising sessions");
+ return false;
+ }
+}
+
}}} // namespace qpid::client::amqp0_10
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h Mon Oct 5 12:51:57 2009
@@ -23,20 +23,46 @@
*/
#include "qpid/messaging/ConnectionImpl.h"
#include "qpid/messaging/Variant.h"
+#include "qpid/Url.h"
#include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionSettings.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Semaphore.h"
+#include <vector>
namespace qpid {
namespace client {
namespace amqp0_10 {
+class SessionImpl;
+
class ConnectionImpl : public qpid::messaging::ConnectionImpl
{
public:
ConnectionImpl(const std::string& url, const qpid::messaging::Variant::Map& options);
void close();
qpid::messaging::Session newSession();
+ void closed(SessionImpl&);
+ void reconnect();
private:
+ typedef std::vector<qpid::messaging::Session> Sessions;
+
+ qpid::sys::Mutex lock;//used to protect data structures
+ qpid::sys::Semaphore semaphore;//used to coordinate reconnection
qpid::client::Connection connection;
+ qpid::Url url;
+ qpid::client::ConnectionSettings settings;
+ Sessions sessions;
+ bool reconnectionEnabled;
+ int timeout;
+ int minRetryInterval;
+ int maxRetryInterval;
+
+ void connect(const qpid::sys::AbsTime& started);
+ bool tryConnect();
+ bool tryConnect(const std::vector<Url>& urls);
+ bool tryConnect(const Url&);
+ bool resetSessions();
};
}}} // namespace qpid::client::amqp0_10
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Mon Oct 5 12:51:57 2009
@@ -21,11 +21,13 @@
#include "qpid/client/amqp0_10/IncomingMessages.h"
#include "qpid/client/amqp0_10/AddressResolution.h"
#include "qpid/client/amqp0_10/Codecs.h"
+#include "qpid/client/amqp0_10/CodecsInternal.h"
#include "qpid/client/SessionImpl.h"
#include "qpid/client/SessionBase_0_10Access.h"
#include "qpid/log/Statement.h"
#include "qpid/messaging/Address.h"
#include "qpid/messaging/Message.h"
+#include "qpid/messaging/MessageImpl.h"
#include "qpid/messaging/Variant.h"
#include "qpid/framing/DeliveryProperties.h"
#include "qpid/framing/FrameSet.h"
@@ -41,6 +43,7 @@
using namespace qpid::framing::message;
using qpid::sys::AbsTime;
using qpid::sys::Duration;
+using qpid::messaging::MessageImplAccess;
using qpid::messaging::Variant;
namespace {
@@ -78,11 +81,32 @@
}
}
};
+
+struct Match
+{
+ const std::string destination;
+ uint32_t matched;
+
+ Match(const std::string& d) : destination(d), matched(0) {}
+
+ bool operator()(boost::shared_ptr<qpid::framing::FrameSet> command)
+ {
+ if (command->as<MessageTransferBody>()->getDestination() == destination) {
+ ++matched;
+ return true;
+ } else {
+ return false;
+ }
+ }
+};
}
-IncomingMessages::IncomingMessages(qpid::client::AsyncSession s) :
- session(s),
- incoming(SessionBase_0_10Access(session).get()->getDemux().getDefault()) {}
+void IncomingMessages::setSession(qpid::client::AsyncSession s)
+{
+ session = s;
+ incoming = SessionBase_0_10Access(session).get()->getDemux().getDefault();
+ acceptTracker.reset();
+}
bool IncomingMessages::get(Handler& handler, Duration timeout)
{
@@ -101,8 +125,7 @@
void IncomingMessages::accept()
{
- session.messageAccept(unaccepted);
- unaccepted.clear();
+ acceptTracker.accept(session);
}
void IncomingMessages::releaseAll()
@@ -116,8 +139,7 @@
GetAny handler;
while (process(&handler, 0)) ;
//now release all messages
- session.messageRelease(unaccepted);
- unaccepted.clear();
+ acceptTracker.release(session);
}
void IncomingMessages::releasePending(const std::string& destination)
@@ -161,6 +183,32 @@
return false;
}
+uint32_t IncomingMessages::pendingAccept()
+{
+ return acceptTracker.acceptsPending();
+}
+uint32_t IncomingMessages::pendingAccept(const std::string& destination)
+{
+ return acceptTracker.acceptsPending(destination);
+}
+
+uint32_t IncomingMessages::available()
+{
+ //first pump all available messages from incoming to received...
+ while (process(0, 0)) {}
+ //return the count of received messages
+ return received.size();
+}
+
+uint32_t IncomingMessages::available(const std::string& destination)
+{
+ //first pump all available messages from incoming to received...
+ while (process(0, 0)) {}
+
+ //count all messages for this destination from received list
+ return std::for_each(received.begin(), received.end(), Match(destination)).matched;
+}
+
void populate(qpid::messaging::Message& message, FrameSet& command);
/**
@@ -175,7 +223,7 @@
}
const MessageTransferBody* transfer = command->as<MessageTransferBody>();
if (transfer->getAcquireMode() == ACQUIRE_MODE_PRE_ACQUIRED && transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) {
- unaccepted.add(command->getId());
+ acceptTracker.delivered(transfer->getDestination(), command->getId());
}
session.markCompleted(command->getId(), false, false);
}
@@ -191,8 +239,6 @@
parent.retrieve(content, message);
}
-void translate(const FieldTable& from, Variant::Map& to);//implemented in Codecs.cpp
-
void populateHeaders(qpid::messaging::Message& message,
const DeliveryProperties* deliveryProperties,
const MessageProperties* messageProperties)
@@ -206,6 +252,7 @@
if (messageProperties->hasReplyTo()) {
message.setReplyTo(AddressResolution::convert(messageProperties->getReplyTo()));
}
+ message.getHeaders().clear();
translate(messageProperties->getApplicationHeaders(), message.getHeaders());
//TODO: convert other message properties
}
@@ -219,9 +266,8 @@
void populate(qpid::messaging::Message& message, FrameSet& command)
{
//need to be able to link the message back to the transfer it was delivered by
- //e.g. for rejecting. TODO: hide this from API
- uint32_t commandId = command.getId();
- message.setInternalId(reinterpret_cast<void*>(commandId));
+ //e.g. for rejecting.
+ MessageImplAccess::get(message).setInternalId(command.getId());
command.getContent(message.getBytes());
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h Mon Oct 5 12:51:57 2009
@@ -27,6 +27,7 @@
#include "qpid/framing/SequenceSet.h"
#include "qpid/sys/BlockingQueue.h"
#include "qpid/sys/Time.h"
+#include "qpid/client/amqp0_10/AcceptTracker.h"
namespace qpid {
@@ -67,20 +68,26 @@
virtual bool accept(MessageTransfer& transfer) = 0;
};
- IncomingMessages(qpid::client::AsyncSession session);
+ void setSession(qpid::client::AsyncSession session);
bool get(Handler& handler, qpid::sys::Duration timeout);
//bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout);
//bool get(const std::string& destination, qpid::messaging::Message& message, qpid::sys::Duration timeout);
void accept();
void releaseAll();
void releasePending(const std::string& destination);
+
+ uint32_t pendingAccept();
+ uint32_t pendingAccept(const std::string& destination);
+
+ uint32_t available();
+ uint32_t available(const std::string& destination);
private:
typedef std::deque<FrameSetPtr> FrameSetQueue;
qpid::client::AsyncSession session;
- qpid::framing::SequenceSet unaccepted;
boost::shared_ptr< sys::BlockingQueue<FrameSetPtr> > incoming;
FrameSetQueue received;
+ AcceptTracker acceptTracker;
bool process(Handler*, qpid::sys::Duration);
void retrieve(FrameSetPtr, qpid::messaging::Message*);
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h Mon Oct 5 12:51:57 2009
@@ -33,6 +33,8 @@
namespace client {
namespace amqp0_10 {
+class OutgoingMessage;
+
/**
*
*/
@@ -41,7 +43,7 @@
public:
virtual ~MessageSink() {}
virtual void declare(qpid::client::AsyncSession& session, const std::string& name) = 0;
- virtual void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message) = 0;
+ virtual void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message) = 0;
virtual void cancel(qpid::client::AsyncSession& session, const std::string& name) = 0;
private:
};
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp Mon Oct 5 12:51:57 2009
@@ -19,6 +19,7 @@
*
*/
#include "ReceiverImpl.h"
+#include "AddressResolution.h"
#include "MessageSource.h"
#include "SessionImpl.h"
#include "qpid/messaging/MessageListener.h"
@@ -38,11 +39,6 @@
window = capacity;
}
}
-
-bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::sys::Duration timeout)
-{
- return parent.get(*this, message, timeout);
-}
qpid::messaging::Message ReceiverImpl::get(qpid::sys::Duration timeout)
{
@@ -50,24 +46,6 @@
if (!get(result, timeout)) throw Receiver::NoMessageAvailable();
return result;
}
-
-bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout)
-{
- if (capacity == 0 && !cancelled) {
- session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1);
- if (!started) session.messageFlow(destination, CREDIT_UNIT_BYTE, byteCredit);
- }
-
- if (get(message, timeout)) {
- return true;
- } else {
- if (!cancelled) {
- sync(session).messageFlush(destination);
- start();//reallocate credit
- }
- return get(message, 0);
- }
-}
qpid::messaging::Message ReceiverImpl::fetch(qpid::sys::Duration timeout)
{
@@ -76,71 +54,152 @@
return result;
}
+bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::sys::Duration timeout)
+{
+ Get f(*this, message, timeout);
+ while (!parent.execute(f)) {}
+ return f.result;
+}
+
+bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout)
+{
+ Fetch f(*this, message, timeout);
+ while (!parent.execute(f)) {}
+ return f.result;
+}
+
void ReceiverImpl::cancel()
{
- if (!cancelled) {
- //TODO: should syncronicity be an optional argument to this call?
- source->cancel(session, destination);
- //need to be sure cancel is complete and all incoming
- //framesets are processed before removing the receiver
- parent.receiverCancelled(destination);
- cancelled = true;
- }
+ execute<Cancel>();
}
void ReceiverImpl::start()
{
- if (!cancelled) {
- started = true;
- session.messageSetFlowMode(destination, capacity > 0);
+ execute<Start>();
+}
+
+void ReceiverImpl::stop()
+{
+ execute<Stop>();
+}
+
+void ReceiverImpl::setCapacity(uint32_t c)
+{
+ execute1<SetCapacity>(c);
+}
+
+void ReceiverImpl::startFlow()
+{
+ if (capacity > 0) {
+ session.messageSetFlowMode(destination, FLOW_MODE_WINDOW);
session.messageFlow(destination, CREDIT_UNIT_MESSAGE, capacity);
session.messageFlow(destination, CREDIT_UNIT_BYTE, byteCredit);
window = capacity;
}
}
-void ReceiverImpl::stop()
+void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver)
{
- session.messageStop(destination);
- started = false;
+
+ session = s;
+ if (state == UNRESOLVED) {
+ source = resolver.resolveSource(session, address, filter, options);
+ state = STOPPED;//TODO: if session is started, go straight to started
+ }
+ if (state == CANCELLED) {
+ source->cancel(session, destination);
+ parent.receiverCancelled(destination);
+ } else {
+ source->subscribe(session, destination);
+ if (state == STARTED) start();
+ }
}
-void ReceiverImpl::subscribe()
+void ReceiverImpl::setListener(qpid::messaging::MessageListener* l) { listener = l; }
+qpid::messaging::MessageListener* ReceiverImpl::getListener() { return listener; }
+
+const std::string& ReceiverImpl::getName() const { return destination; }
+
+uint32_t ReceiverImpl::getCapacity()
+{
+ return capacity;
+}
+
+uint32_t ReceiverImpl::available()
+{
+ return parent.available(destination);
+}
+
+uint32_t ReceiverImpl::pendingAck()
{
- source->subscribe(session, destination);
+ return parent.pendingAck(destination);
}
-void ReceiverImpl::setSession(qpid::client::AsyncSession s)
+ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name,
+ const qpid::messaging::Address& a,
+ const qpid::messaging::Filter* f,
+ const qpid::messaging::Variant::Map& o) :
+
+ parent(p), destination(name), address(a), filter(f), options(o), byteCredit(0xFFFFFFFF),
+ state(UNRESOLVED), capacity(0), listener(0), window(0) {}
+
+bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout)
+{
+ return parent.get(*this, message, timeout);
+}
+
+bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout)
+{
+ if (state == CANCELLED) return false;//TODO: or should this be an error?
+
+ if (capacity == 0 || state != STARTED) {
+ session.messageSetFlowMode(destination, FLOW_MODE_CREDIT);
+ session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1);
+ session.messageFlow(destination, CREDIT_UNIT_BYTE, 0xFFFFFFFF);
+ }
+
+ if (getImpl(message, timeout)) {
+ return true;
+ } else {
+ sync(session).messageFlush(destination);
+ startFlow();//reallocate credit
+ return getImpl(message, 0);
+ }
+}
+
+void ReceiverImpl::cancelImpl()
{
- session = s;
- if (!cancelled) {
- subscribe();
- //if we were in started state before the session was changed,
- //start again on this new session
- //TODO: locking if receiver is to be threadsafe...
- if (started) start();
+ if (state != CANCELLED) {
+ state = CANCELLED;
+ source->cancel(session, destination);
+ parent.receiverCancelled(destination);
}
}
-void ReceiverImpl::setCapacity(uint32_t c)
+void ReceiverImpl::startImpl()
+{
+ if (state == STOPPED) {
+ state = STARTED;
+ startFlow();
+ }
+}
+
+void ReceiverImpl::stopImpl()
+{
+ state = STOPPED;
+ session.messageStop(destination);
+}
+
+void ReceiverImpl::setCapacityImpl(uint32_t c)
{
if (c != capacity) {
capacity = c;
- if (!cancelled && started) {
- stop();
- start();
+ if (state == STARTED) {
+ session.messageStop(destination);
+ startFlow();
}
}
}
-void ReceiverImpl::setListener(qpid::messaging::MessageListener* l) { listener = l; }
-qpid::messaging::MessageListener* ReceiverImpl::getListener() { return listener; }
-
-const std::string& ReceiverImpl::getName() const { return destination; }
-
-ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, std::auto_ptr<MessageSource> s) :
- parent(p), source(s), destination(name), byteCredit(0xFFFFFFFF),
- capacity(0), started(false), cancelled(false), listener(0), window(0) {}
-
}}} // namespace qpid::client::amqp0_10
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h Mon Oct 5 12:51:57 2009
@@ -21,9 +21,13 @@
* under the License.
*
*/
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Filter.h"
#include "qpid/messaging/Message.h"
#include "qpid/messaging/ReceiverImpl.h"
+#include "qpid/messaging/Variant.h"
#include "qpid/client/AsyncSession.h"
+#include "qpid/client/amqp0_10/SessionImpl.h"
#include "qpid/sys/Time.h"
#include <memory>
@@ -31,8 +35,8 @@
namespace client {
namespace amqp0_10 {
+class AddressResolution;
class MessageSource;
-class SessionImpl;
/**
* A receiver implementation based on an AMQP 0-10 subscription.
@@ -41,8 +45,14 @@
{
public:
- ReceiverImpl(SessionImpl& parent, const std::string& name, std::auto_ptr<MessageSource> source);
+ enum State {UNRESOLVED, STOPPED, STARTED, CANCELLED};
+ ReceiverImpl(SessionImpl& parent, const std::string& name,
+ const qpid::messaging::Address& address,
+ const qpid::messaging::Filter* filter,
+ const qpid::messaging::Variant::Map& options);
+
+ void init(qpid::client::AsyncSession session, AddressResolution& resolver);
bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout);
qpid::messaging::Message get(qpid::sys::Duration timeout);
bool fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout);
@@ -50,25 +60,107 @@
void cancel();
void start();
void stop();
- void subscribe();
- void setSession(qpid::client::AsyncSession s);
const std::string& getName() const;
void setCapacity(uint32_t);
+ uint32_t getCapacity();
+ uint32_t available();
+ uint32_t pendingAck();
void setListener(qpid::messaging::MessageListener* listener);
qpid::messaging::MessageListener* getListener();
void received(qpid::messaging::Message& message);
private:
SessionImpl& parent;
- const std::auto_ptr<MessageSource> source;
const std::string destination;
+ const qpid::messaging::Address address;
+ const qpid::messaging::Filter* filter;
+ const qpid::messaging::Variant::Map options;
const uint32_t byteCredit;
-
+ State state;
+
+ std::auto_ptr<MessageSource> source;
uint32_t capacity;
qpid::client::AsyncSession session;
- bool started;
- bool cancelled;
qpid::messaging::MessageListener* listener;
uint32_t window;
+
+ void startFlow();
+ //implementation of public facing methods
+ bool fetchImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout);
+ bool getImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout);
+ void startImpl();
+ void stopImpl();
+ void cancelImpl();
+ void setCapacityImpl(uint32_t);
+
+ //functors for public facing methods (allows locking and retry
+ //logic to be centralised)
+ struct Command
+ {
+ ReceiverImpl& impl;
+
+ Command(ReceiverImpl& i) : impl(i) {}
+ };
+
+ struct Get : Command
+ {
+ qpid::messaging::Message& message;
+ qpid::sys::Duration timeout;
+ bool result;
+
+ Get(ReceiverImpl& i, qpid::messaging::Message& m, qpid::sys::Duration t) :
+ Command(i), message(m), timeout(t), result(false) {}
+ void operator()() { result = impl.getImpl(message, timeout); }
+ };
+
+ struct Fetch : Command
+ {
+ qpid::messaging::Message& message;
+ qpid::sys::Duration timeout;
+ bool result;
+
+ Fetch(ReceiverImpl& i, qpid::messaging::Message& m, qpid::sys::Duration t) :
+ Command(i), message(m), timeout(t), result(false) {}
+ void operator()() { result = impl.fetchImpl(message, timeout); }
+ };
+
+ struct Stop : Command
+ {
+ Stop(ReceiverImpl& i) : Command(i) {}
+ void operator()() { impl.stopImpl(); }
+ };
+
+ struct Start : Command
+ {
+ Start(ReceiverImpl& i) : Command(i) {}
+ void operator()() { impl.startImpl(); }
+ };
+
+ struct Cancel : Command
+ {
+ Cancel(ReceiverImpl& i) : Command(i) {}
+ void operator()() { impl.cancelImpl(); }
+ };
+
+ struct SetCapacity : Command
+ {
+ uint32_t capacity;
+
+ SetCapacity(ReceiverImpl& i, uint32_t c) : Command(i), capacity(c) {}
+ void operator()() { impl.setCapacityImpl(capacity); }
+ };
+
+ //helper templates for some common patterns
+ template <class F> void execute()
+ {
+ F f(*this);
+ parent.execute(f);
+ }
+
+ template <class F, class P> void execute1(P p)
+ {
+ F f(*this, p);
+ parent.execute(f);
+ }
};
}}} // namespace qpid::client::amqp0_10
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp Mon Oct 5 12:51:57 2009
@@ -21,29 +21,113 @@
#include "SenderImpl.h"
#include "MessageSink.h"
#include "SessionImpl.h"
+#include "AddressResolution.h"
+#include "OutgoingMessage.h"
namespace qpid {
namespace client {
namespace amqp0_10 {
-SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, std::auto_ptr<MessageSink> _sink) :
- parent(_parent), name(_name), sink(_sink) {}
+SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name,
+ const qpid::messaging::Address& _address,
+ const qpid::messaging::Variant::Map& _options) :
+ parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED),
+ capacity(50), window(0), flushed(false) {}
-void SenderImpl::send(qpid::messaging::Message& m)
+void SenderImpl::send(const qpid::messaging::Message& message)
{
- sink->send(session, name, m);
+ Send f(*this, &message);
+ while (f.repeat) parent.execute(f);
}
void SenderImpl::cancel()
{
- sink->cancel(session, name);
- parent.senderCancelled(name);
+ execute<Cancel>();
+}
+
+void SenderImpl::setCapacity(uint32_t c)
+{
+ bool flush = c < capacity;
+ capacity = c;
+ execute1<CheckPendingSends>(flush);
}
+uint32_t SenderImpl::getCapacity() { return capacity; }
+uint32_t SenderImpl::pending()
+{
+ CheckPendingSends f(*this, false);
+ parent.execute(f);
+ return f.pending;
+}
-void SenderImpl::setSession(qpid::client::AsyncSession s)
+void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver)
{
session = s;
- sink->declare(session, name);
+ if (state == UNRESOLVED) {
+ sink = resolver.resolveSink(session, address, options);
+ state = ACTIVE;
+ }
+ if (state == CANCELLED) {
+ sink->cancel(session, name);
+ parent.senderCancelled(name);
+ } else {
+ sink->declare(session, name);
+ replay();
+ }
+}
+
+void SenderImpl::waitForCapacity()
+{
+ //TODO: add option to throw exception rather than blocking?
+ if (capacity <= (flushed ? checkPendingSends(false) : outgoing.size())) {
+ //Initial implementation is very basic. As outgoing is
+ //currently only reduced on receiving completions and we are
+ //blocking anyway we may as well sync(). If successful that
+ //should clear all outstanding sends.
+ session.sync();
+ checkPendingSends(false);
+ }
+ //flush periodically and check for conmpleted sends
+ if (++window > (capacity / 4)) {//TODO: make this configurable?
+ checkPendingSends(true);
+ window = 0;
+ }
+}
+
+void SenderImpl::sendImpl(const qpid::messaging::Message& m)
+{
+ //TODO: make recording for replay optional (would still want to track completion however)
+ std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage());
+ msg->convert(m);
+ outgoing.push_back(msg.release());
+ sink->send(session, name, outgoing.back());
+}
+
+void SenderImpl::replay()
+{
+ for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) {
+ sink->send(session, name, *i);
+ }
+}
+
+uint32_t SenderImpl::checkPendingSends(bool flush)
+{
+ if (flush) {
+ session.flush();
+ flushed = true;
+ } else {
+ flushed = false;
+ }
+ while (!outgoing.empty() && outgoing.front().status.isComplete()) {
+ outgoing.pop_front();
+ }
+ return outgoing.size();
+}
+
+void SenderImpl::cancelImpl()
+{
+ state = CANCELLED;
+ sink->cancel(session, name);
+ parent.senderCancelled(name);
}
}}} // namespace qpid::client::amqp0_10
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org