You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/05 14:52:10 UTC

svn commit: r821779 [4/11] - in /qpid/branches/java-broker-0-10/qpid: ./ cpp/ cpp/bindings/qmf/ cpp/bindings/qmf/python/ cpp/bindings/qmf/python/qmf/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/examples/messaging/ cpp/include/qmf/ cpp/include/qp...

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qmf/ValueImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qmf/ValueImpl.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qmf/ValueImpl.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qmf/ValueImpl.cpp Mon Oct  5 12:51:57 2009
@@ -67,6 +67,11 @@
     }
 }
 
+ValueImpl::ValueImpl(Value* e, Typecode t, Typecode at) :
+    envelope(e), typecode(t), valid(false), arrayTypecode(at)
+{
+}
+
 ValueImpl::ValueImpl(Typecode t) : envelope(new Value(this)), typecode(t)
 {
     ::memset(&value, 0, sizeof(value));
@@ -186,293 +191,64 @@
 // Wrappers
 //==================================================================
 
-Value::Value(Typecode t, Typecode at)
-{
-    impl = new ValueImpl(this, t, at);
-}
-
-Value::Value(ValueImpl* i)
-{
-    impl = i;
-}
-
-Value::~Value()
-{
-    delete impl;
-}
-
-Typecode Value::getType() const
-{
-    return impl->getType();
-}
-
-bool Value::isNull() const
-{
-    return impl->isNull();
-}
-
-void Value::setNull()
-{
-    impl->setNull();
-}
-
-bool Value::isObjectId() const
-{
-    return impl->isObjectId();
-}
-
-const ObjectId& Value::asObjectId() const
-{
-    return impl->asObjectId();
-}
-
-void Value::setObjectId(const ObjectId& oid)
-{
-    impl->setObjectId(oid);
-}
-
-bool Value::isUint() const
-{
-    return impl->isUint();
-}
-
-uint32_t Value::asUint() const
-{
-    return impl->asUint();
-}
-
-void Value::setUint(uint32_t val)
-{
-    impl->setUint(val);
-}
-
-bool Value::isInt() const
-{
-    return impl->isInt();
-}
-
-int32_t Value::asInt() const
-{
-    return impl->asInt();
-}
-
-void Value::setInt(int32_t val)
-{
-    impl->setInt(val);
-}
-
-bool Value::isUint64() const
-{
-    return impl->isUint64();
-}
-
-uint64_t Value::asUint64() const
-{
-    return impl->asUint64();
-}
-
-void Value::setUint64(uint64_t val)
-{
-    impl->setUint64(val);
-}
-
-bool Value::isInt64() const
-{
-    return impl->isInt64();
-}
-
-int64_t Value::asInt64() const
-{
-    return impl->asInt64();
-}
-
-void Value::setInt64(int64_t val)
-{
-    impl->setInt64(val);
-}
-
-bool Value::isString() const
-{
-    return impl->isString();
-}
-
-const char* Value::asString() const
-{
-    return impl->asString();
-}
-
-void Value::setString(const char* val)
-{
-    impl->setString(val);
-}
-
-bool Value::isBool() const
-{
-    return impl->isBool();
-}
-
-bool Value::asBool() const
-{
-    return impl->asBool();
-}
-
-void Value::setBool(bool val)
-{
-    impl->setBool(val);
-}
-
-bool Value::isFloat() const
-{
-    return impl->isFloat();
-}
-
-float Value::asFloat() const
-{
-    return impl->asFloat();
-}
-
-void Value::setFloat(float val)
-{
-    impl->setFloat(val);
-}
-
-bool Value::isDouble() const
-{
-    return impl->isDouble();
-}
-
-double Value::asDouble() const
-{
-    return impl->asDouble();
-}
-
-void Value::setDouble(double val)
-{
-    impl->setDouble(val);
-}
-
-bool Value::isUuid() const
-{
-    return impl->isUuid();
-}
-
-const uint8_t* Value::asUuid() const
-{
-    return impl->asUuid();
-}
-
-void Value::setUuid(const uint8_t* val)
-{
-    impl->setUuid(val);
-}
-
-bool Value::isObject() const
-{
-    return impl->isObject();
-}
-
-Object* Value::asObject() const
-{
-    return impl->asObject();
-}
-
-void Value::setObject(Object* val)
-{
-    impl->setObject(val);
-}
-
-bool Value::isMap() const
-{
-    return impl->isMap();
-}
-
-bool Value::keyInMap(const char* key) const
-{
-    return impl->keyInMap(key);
-}
-
-Value* Value::byKey(const char* key)
-{
-    return impl->byKey(key);
-}
-
-const Value* Value::byKey(const char* key) const
-{
-    return impl->byKey(key);
-}
-
-void Value::deleteKey(const char* key)
-{
-    impl->deleteKey(key);
-}
-
-void Value::insert(const char* key, Value* val)
-{
-    impl->insert(key, val);
-}
-
-uint32_t Value::keyCount() const
-{
-    return impl->keyCount();
-}
-
-const char* Value::key(uint32_t idx) const
-{
-    return impl->key(idx);
-}
-
-bool Value::isList() const
-{
-    return impl->isList();
-}
-
-uint32_t Value::listItemCount() const
-{
-    return impl->listItemCount();
-}
-
-Value* Value::listItem(uint32_t idx)
-{
-    return impl->listItem(idx);
-}
-
-void Value::appendToList(Value* val)
-{
-    impl->appendToList(val);
-}
-
-void Value::deleteListItem(uint32_t idx)
-{
-    impl->deleteListItem(idx);
-}
-
-bool Value::isArray() const
-{
-    return impl->isArray();
-}
-
-Typecode Value::arrayType() const
-{
-    return impl->arrayType();
-}
-
-uint32_t Value::arrayItemCount() const
-{
-    return impl->arrayItemCount();
-}
-
-Value* Value::arrayItem(uint32_t idx)
-{
-    return impl->arrayItem(idx);
-}
-
-void Value::appendToArray(Value* val)
-{
-    impl->appendToArray(val);
-}
-
-void Value::deleteArrayItem(uint32_t idx)
-{
-    impl->deleteArrayItem(idx);
-}
+Value::Value(const Value& from) : impl(new ValueImpl(*(from.impl))) {}
+Value::Value(Typecode t, Typecode at) : impl(new ValueImpl(this, t, at)) {}
+Value::Value(ValueImpl* i) : impl(i) {}
+Value::~Value() { delete impl; }
+
+Typecode Value::getType() const { return impl->getType(); }
+bool Value::isNull() const { return impl->isNull(); }
+void Value::setNull() { impl->setNull(); }
+bool Value::isObjectId() const { return impl->isObjectId(); }
+const ObjectId& Value::asObjectId() const { return impl->asObjectId(); }
+void Value::setObjectId(const ObjectId& oid) { impl->setObjectId(oid); }
+bool Value::isUint() const { return impl->isUint(); }
+uint32_t Value::asUint() const { return impl->asUint(); }
+void Value::setUint(uint32_t val) { impl->setUint(val); }
+bool Value::isInt() const { return impl->isInt(); }
+int32_t Value::asInt() const { return impl->asInt(); }
+void Value::setInt(int32_t val) { impl->setInt(val); }
+bool Value::isUint64() const { return impl->isUint64(); }
+uint64_t Value::asUint64() const { return impl->asUint64(); }
+void Value::setUint64(uint64_t val) { impl->setUint64(val); }
+bool Value::isInt64() const { return impl->isInt64(); }
+int64_t Value::asInt64() const { return impl->asInt64(); }
+void Value::setInt64(int64_t val) { impl->setInt64(val); }
+bool Value::isString() const { return impl->isString(); }
+const char* Value::asString() const { return impl->asString(); }
+void Value::setString(const char* val) { impl->setString(val); }
+bool Value::isBool() const { return impl->isBool(); }
+bool Value::asBool() const { return impl->asBool(); }
+void Value::setBool(bool val) { impl->setBool(val); }
+bool Value::isFloat() const { return impl->isFloat(); }
+float Value::asFloat() const { return impl->asFloat(); }
+void Value::setFloat(float val) { impl->setFloat(val); }
+bool Value::isDouble() const { return impl->isDouble(); }
+double Value::asDouble() const { return impl->asDouble(); }
+void Value::setDouble(double val) { impl->setDouble(val); }
+bool Value::isUuid() const { return impl->isUuid(); }
+const uint8_t* Value::asUuid() const { return impl->asUuid(); }
+void Value::setUuid(const uint8_t* val) { impl->setUuid(val); }
+bool Value::isObject() const { return impl->isObject(); }
+Object* Value::asObject() const { return impl->asObject(); }
+void Value::setObject(Object* val) { impl->setObject(val); }
+bool Value::isMap() const { return impl->isMap(); }
+bool Value::keyInMap(const char* key) const { return impl->keyInMap(key); }
+Value* Value::byKey(const char* key) { return impl->byKey(key); }
+const Value* Value::byKey(const char* key) const { return impl->byKey(key); }
+void Value::deleteKey(const char* key) { impl->deleteKey(key); }
+void Value::insert(const char* key, Value* val) { impl->insert(key, val); }
+uint32_t Value::keyCount() const { return impl->keyCount(); }
+const char* Value::key(uint32_t idx) const { return impl->key(idx); }
+bool Value::isList() const { return impl->isList(); }
+uint32_t Value::listItemCount() const { return impl->listItemCount(); }
+Value* Value::listItem(uint32_t idx) { return impl->listItem(idx); }
+void Value::appendToList(Value* val) { impl->appendToList(val); }
+void Value::deleteListItem(uint32_t idx) { impl->deleteListItem(idx); }
+bool Value::isArray() const { return impl->isArray(); }
+Typecode Value::arrayType() const { return impl->arrayType(); }
+uint32_t Value::arrayItemCount() const { return impl->arrayItemCount(); }
+Value* Value::arrayItem(uint32_t idx) { return impl->arrayItem(idx); }
+void Value::appendToArray(Value* val) { impl->appendToArray(val); }
+void Value::deleteArrayItem(uint32_t idx) { impl->deleteArrayItem(idx); }
 

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qmf/ValueImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qmf/ValueImpl.h?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qmf/ValueImpl.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qmf/ValueImpl.h Mon Oct  5 12:51:57 2009
@@ -60,8 +60,7 @@
             uint8_t  uuidVal[16];
         } value;
 
-        ValueImpl(Value* e, Typecode t, Typecode at) :
-            envelope(e), typecode(t), valid(false), arrayTypecode(at) {}
+        ValueImpl(Value* e, Typecode t, Typecode at);
         ValueImpl(Typecode t, qpid::framing::Buffer& b);
         ValueImpl(Typecode t);
         ~ValueImpl();

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/acl/AclData.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/acl/AclData.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/acl/AclData.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/acl/AclData.cpp Mon Oct  5 12:51:57 2009
@@ -53,42 +53,65 @@
 	}
 }
  
-AclResult AclData::lookup(const std::string& id, const Action& action, const ObjectType& objType, const std::string& name, std::map<Property, std::string>* params)
-{
-     AclResult aclresult = decisionMode;
-	
-	 if (actionList[action] && actionList[action][objType]){
-	      AclData::actObjItr itrRule = actionList[action][objType]->find(id);
-		  if (itrRule == actionList[action][objType]->end())
-		       itrRule = actionList[action][objType]->find("*");
-		  if (itrRule != actionList[action][objType]->end() ) {
-			   
-			   //loop the vector
-    		   for (ruleSetItr i=itrRule->second.begin(); i<itrRule->second.end(); i++) {
-                    
-					// loop the names looking for match
-					bool match =true;
-					for (propertyMapItr pMItr = i->props.begin(); (pMItr != i->props.end()) && match; pMItr++)
-					{
-                        //match name is exists first
-						if (pMItr->first == acl::PROP_NAME){
-						     if (!matchProp(pMItr->second, name)){  
-							     match= false;
-							 }
-						}else if (params){ //match pMItr against params
-							propertyMapItr paramItr = params->find (pMItr->first);
-							if (paramItr == params->end()){
-						    	match = false;
-							}else if (!matchProp(paramItr->second, pMItr->second)){  
-							    	match = false;
-							}
+AclResult AclData::lookup(const std::string& id, const Action& action, const ObjectType& objType,
+		const std::string& name, std::map<Property, std::string>* params) {
+
+	QPID_LOG(debug, "ACL: Lookup for id:" << id << " action:" << AclHelper::getActionStr((Action) action)
+			<< " objectType:" << AclHelper::getObjectTypeStr((ObjectType) objType) << " name:" << name
+			<< " with params " << AclHelper::propertyMapToString(params));
+
+	AclResult aclresult = decisionMode;
+	if (actionList[action] && actionList[action][objType]) {
+		AclData::actObjItr itrRule = actionList[action][objType]->find(id);
+		if (itrRule == actionList[action][objType]->end())
+			itrRule = actionList[action][objType]->find("*");
+		if (itrRule != actionList[action][objType]->end()) {
+
+			QPID_LOG(debug, "ACL: checking the following rules for : " << itrRule->first );
+
+			//loop the vector
+			for (ruleSetItr i = itrRule->second.begin(); i < itrRule->second.end(); i++) {
+				QPID_LOG(debug, "ACL: checking rule " <<  i->toString());
+				// loop the names looking for match
+				bool match = true;
+				for (propertyMapItr pMItr = i->props.begin(); (pMItr != i->props.end()) && match; pMItr++) {
+					//match name is exists first
+					if (pMItr->first == acl::PROP_NAME) {
+						if (matchProp(pMItr->second, name)){
+							QPID_LOG(debug, "ACL: name '" << name << "' matched with name '"
+																        << pMItr->second << "' given in the rule");
+					    }else{
+							match = false;
+							QPID_LOG(debug, "ACL: name '" << name << "' didn't match with name '"
+									        << pMItr->second << "' given in the rule");
+						}
+					} else if (params) { //match pMItr against params
+						propertyMapItr paramItr = params->find(pMItr->first);
+						if (paramItr == params->end()) {
+							match = false;
+							QPID_LOG(debug, "ACL: the given parameter map in lookup doesn't contain the property '"
+									        << AclHelper::getPropertyStr(pMItr->first) << "'");
+						} else if (!matchProp(pMItr->second, paramItr->second)) {
+							QPID_LOG(debug, "ACL: the pair("
+									        << AclHelper::getPropertyStr(paramItr->first) << "," << paramItr->second
+									        << ") given in lookup doesn't match the pair("
+									        << AclHelper::getPropertyStr(pMItr->first) << "," << pMItr->second << ") given in the rule");
+							match = false;
 						}
 					}
-					if (match) return getACLResult(i->logOnly, i->log);
-    		   }
-		  }
-	 }
-     return aclresult;
+				}
+				if (match)
+				{
+					aclresult = getACLResult(i->logOnly, i->log);
+					QPID_LOG(debug,"Successful match, the decision is:" << AclHelper::getAclResultStr(aclresult));
+					return aclresult;
+				}
+			}
+		}
+	}
+
+	QPID_LOG(debug,"No successful match, defaulting to the decision mode " << AclHelper::getAclResultStr(aclresult));
+	return aclresult;
 }
 
 AclResult AclData::lookup(const std::string& id, const Action& action, const ObjectType& objType, const std::string& /*Exchange*/ name, const std::string& RoutingKey)

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/acl/AclData.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/acl/AclData.h?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/acl/AclData.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/acl/AclData.h Mon Oct  5 12:51:57 2009
@@ -22,7 +22,7 @@
 
 #include "qpid/broker/AclModule.h"
 #include <vector>
-
+#include <sstream>
 
 namespace qpid {
 namespace acl {
@@ -45,6 +45,16 @@
 	  
 	  
 	  rule (propertyMap& p):log(false),logOnly(false),props(p) {};
+
+	  std::string toString () const {
+	  	std::ostringstream ruleStr;
+	  	ruleStr << "[log=" << log << ", logOnly=" << logOnly << " props{";
+	  	for (propertyMapItr pMItr = props.begin(); pMItr != props.end(); pMItr++) {
+	  		ruleStr << " " << AclHelper::getPropertyStr((Property) pMItr-> first) << "=" << pMItr->second;
+	  	}
+	  	ruleStr << " }]";
+	  	return ruleStr.str();
+	  }
    };
    typedef  std::vector<rule> ruleSet;
    typedef  ruleSet::const_iterator ruleSetItr;

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/acl/AclReader.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/acl/AclReader.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/acl/AclReader.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/acl/AclReader.cpp Mon Oct  5 12:51:57 2009
@@ -83,115 +83,142 @@
     return oss.str();
 }
 
-void AclReader::loadDecisionData( boost::shared_ptr<AclData> d )
-{
-    d->clear();
-    QPID_LOG(debug, "ACL Load Rules");
-    int cnt = rules.size();
+void AclReader::loadDecisionData(boost::shared_ptr<AclData> d) {
+	d->clear();
+	QPID_LOG(debug, "ACL Load Rules");
+	int cnt = rules.size();
 	bool foundmode = false;
-    for (rlCitr i=rules.end()-1; cnt; i--,cnt--) {
-        QPID_LOG(debug, "ACL Processing " << std::setfill(' ') << std::setw(2) << cnt << " " << (*i)->toString());
-		
-		if (!foundmode && (*i)->actionAll && (*i)->names.size()==1 && (*((*i)->names.begin())).compare("*")==0 ){
-  	       d->decisionMode = (*i)->res;
-           QPID_LOG(debug, "ACL FoundMode " << AclHelper::getAclResultStr(d->decisionMode));
-		   foundmode=true;
-		}else{
-		    AclData::rule rule((*i)->props);
-		    bool addrule= true;
-		
-		    switch ((*i)->res)
-		    {
-		    case qpid::acl::ALLOWLOG:
-		        rule.log = true;
-                if (d->decisionMode == qpid::acl::ALLOW || d->decisionMode == qpid::acl::ALLOWLOG) 
-				    rule.logOnly = true;
+
+	for (rlCitr i = rules.end() - 1; cnt; i--, cnt--) {
+		QPID_LOG(debug, "ACL Processing " << std::setfill(' ') << std::setw(2)
+				<< cnt << " " << (*i)->toString());
+
+		if (!foundmode && (*i)->actionAll && (*i)->names.size() == 1
+				&& (*((*i)->names.begin())).compare("*") == 0) {
+			d->decisionMode = (*i)->res;
+			QPID_LOG(debug, "ACL FoundMode " << AclHelper::getAclResultStr(
+					d->decisionMode));
+			foundmode = true;
+		} else {
+			AclData::rule rule((*i)->props);
+			bool addrule = true;
+
+			switch ((*i)->res) {
+			case qpid::acl::ALLOWLOG:
+				rule.log = true;
+				if (d->decisionMode == qpid::acl::ALLOW || d->decisionMode
+						== qpid::acl::ALLOWLOG)
+					rule.logOnly = true;
+				break;
+			case qpid::acl::ALLOW:
+				if (d->decisionMode == qpid::acl::ALLOW || d->decisionMode
+						== qpid::acl::ALLOWLOG)
+					addrule = false;
+				break;
+			case qpid::acl::DENYLOG:
+				rule.log = true;
+				if (d->decisionMode == qpid::acl::DENY || d->decisionMode
+						== qpid::acl::DENYLOG)
+					rule.logOnly = true;
+				break;
+			case qpid::acl::DENY:
+				if (d->decisionMode == qpid::acl::DENY || d->decisionMode
+						== qpid::acl::DENYLOG)
+					addrule = false;
 				break;
-	    	case qpid::acl::ALLOW:
-                if (d->decisionMode == qpid::acl::ALLOW || d->decisionMode == qpid::acl::ALLOWLOG)
-				    addrule = false; 
-                break;
-		    case qpid::acl::DENYLOG:
-			    rule.log = true;
-                if (d->decisionMode == qpid::acl::DENY || d->decisionMode == qpid::acl::DENYLOG) 
-				    rule.logOnly = true;
-                break;
-		    case qpid::acl::DENY:
-                if (d->decisionMode == qpid::acl::DENY || d->decisionMode == qpid::acl::DENYLOG)
-				    addrule = false; 
-			    break;
-		    default:
-		        throw Exception("Invalid ACL Result loading rules.");
-		    }
-		    
-			
-            // Action -> Object -> map<user -> set<Rule> >
-            if (addrule){
-				for (int acnt= ((*i)->actionAll?0:(*i)->action); 
-        				 acnt< acl::ACTIONSIZE; (*i)->actionAll?acnt++:acnt=acl::ACTIONSIZE )  {
-
-            		if (acnt == acl::ACT_PUBLISH) d->transferAcl = true; // we have transfer ACL
-					
-					QPID_LOG(debug, "ACL Adding action:" << AclHelper::getActionStr((Action)acnt) );
-
-	        		//find the Action, create if not exist
-					if (d->actionList[acnt]==NULL) {
-			    		d->actionList[acnt] = new AclData::aclAction[qpid::acl::OBJECTSIZE];
-                		for (int j=0;j<qpid::acl::OBJECTSIZE; j++)
-                    		d->actionList[acnt][j] = NULL;
-            		}
+			default:
+				throw Exception("Invalid ACL Result loading rules.");
+			}
+
+			// Action -> Object -> map<user -> set<Rule> >
+			if (addrule) {
+				std::ostringstream actionstr;
+				for (int acnt = ((*i)->actionAll ? 0 : (*i)->action); acnt
+						< acl::ACTIONSIZE; (*i)->actionAll ? acnt++ : acnt
+						= acl::ACTIONSIZE) {
+
+					if (acnt == acl::ACT_PUBLISH)
+						d->transferAcl = true; // we have transfer ACL
+
+					actionstr << AclHelper::getActionStr((Action) acnt) << ",";
+
+					//find the Action, create if not exist
+					if (d->actionList[acnt] == NULL) {
+						d->actionList[acnt]
+								= new AclData::aclAction[qpid::acl::OBJECTSIZE];
+						for (int j = 0; j < qpid::acl::OBJECTSIZE; j++)
+							d->actionList[acnt][j] = NULL;
+					}
 
 					// optimize this loop to limit to valid options only!!
-					for (int ocnt= ((*i)->objStatus!=aclRule::VALUE ?0:(*i)->object); 
-        					 ocnt< acl::OBJECTSIZE; 
-							 (*i)->objStatus!=aclRule::VALUE?ocnt++:ocnt=acl::OBJECTSIZE )  {
-
-            				QPID_LOG(debug, "ACL Adding object:" << AclHelper::getObjectTypeStr((ObjectType)ocnt) );
-
-							//find the Object, create if not exist
-							if (d->actionList[acnt][ocnt] == NULL) 
-	            				d->actionList[acnt][ocnt] = new AclData::actionObject;
-
-            				// add users and Rule to object set
-							bool allNames=false;
-							// check to see if names.begin is '*'
-							if ( (*(*i)->names.begin()).compare("*")==0 ) allNames = true;
-
-							for (nsCitr itr = (allNames?names.begin():(*i)->names.begin());
-						        	  itr != (allNames?names.end():(*i)->names.end()); itr++) {
-							   AclData::actObjItr itrRule = d->actionList[acnt][ocnt]->find(*itr);
-							   if (itrRule == d->actionList[acnt][ocnt]->end()) {
-                				   QPID_LOG(debug, "ACL Adding rule & user:" << *itr);
-			    				   AclData::ruleSet rSet;
-			    				   rSet.push_back(rule);
-                				   d->actionList[acnt][ocnt]->insert(make_pair( std::string(*itr) , rSet) );
-            				   }else{
-							   
-							   // TODO add code to check for dead rules
-							   // allow peter create queue name=tmp <-- dead rule!!
-							   // allow peter create queue
-							   
-			   					   itrRule->second.push_back(rule);
-                				   QPID_LOG(debug, "ACL Adding rule to user:" << *itr);
-							   }
-            				}
-
-	        		}
-
-        	   }
-		   }else{
-            	QPID_LOG(debug, "ACL Skipping based on Mode:" << AclHelper::getAclResultStr(d->decisionMode) );
-		   }
-      }		
-		
-    }
+					for (int ocnt = ((*i)->objStatus != aclRule::VALUE ? 0
+							: (*i)->object); ocnt < acl::OBJECTSIZE; (*i)->objStatus
+							!= aclRule::VALUE ? ocnt++ : ocnt = acl::OBJECTSIZE) {
+
+						//find the Object, create if not exist
+						if (d->actionList[acnt][ocnt] == NULL)
+							d->actionList[acnt][ocnt]
+									= new AclData::actionObject;
+
+						// add users and Rule to object set
+						bool allNames = false;
+						// check to see if names.begin is '*'
+						if ((*(*i)->names.begin()).compare("*") == 0)
+							allNames = true;
+
+						for (nsCitr itr = (allNames ? names.begin()
+								: (*i)->names.begin()); itr
+								!= (allNames ? names.end() : (*i)->names.end()); itr++) {
+
+							AclData::actObjItr itrRule =
+									d->actionList[acnt][ocnt]->find(*itr);
+
+							if (itrRule == d->actionList[acnt][ocnt]->end()) {
+								AclData::ruleSet rSet;
+								rSet.push_back(rule);
+								d->actionList[acnt][ocnt]->insert(make_pair(
+										std::string(*itr), rSet));
+							} else {
+
+								// TODO add code to check for dead rules
+								// allow peter create queue name=tmp <-- dead rule!!
+								// allow peter create queue
+
+								itrRule->second.push_back(rule);
+							}
+						}
+
+					}
+				}
+
+				std::ostringstream objstr;
+				for (int ocnt = ((*i)->objStatus != aclRule::VALUE ? 0 : (*i)->object); ocnt < acl::OBJECTSIZE;
+					 (*i)->objStatus != aclRule::VALUE ? ocnt++ : ocnt = acl::OBJECTSIZE) {
+										objstr << AclHelper::getObjectTypeStr((ObjectType) ocnt) << ",";
+				}
+
+				bool allNames = ((*(*i)->names.begin()).compare("*") == 0);
+				std::ostringstream userstr;
+				for (nsCitr itr = (allNames ? names.begin() : (*i)->names.begin());
+				     itr != (allNames ? names.end() : (*i)->names.end()); itr++) {
+											userstr << *itr << ",";
+				}
+
+				QPID_LOG(debug,"ACL: Adding actions {" << actionstr.str().substr(0,actionstr.str().length()-1)
+						       << "} to objects {" << objstr.str().substr(0,objstr.str().length()-1)
+						       << "} with props " << AclHelper::propertyMapToString(&rule.props)
+						       << " for users {" << userstr.str().substr(0,userstr.str().length()-1) << "}" );
+			} else {
+				QPID_LOG(debug, "ACL Skipping based on Mode:"
+						<< AclHelper::getAclResultStr(d->decisionMode));
+			}
+		}
 
+	}
 
 }
 
 
-
-
 void AclReader::aclRule::processName(const std::string& name, const groupMap& groups) {
     if (name.compare("all") == 0) {
         names.insert("*");

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Mon Oct  5 12:51:57 2009
@@ -435,8 +435,8 @@
     } else {
         if ((iter->second->getPackageName() != packageName) ||
             (iter->second->getClassName()   != className)) {
-            outBuffer.putLong        (Manageable::STATUS_INVALID_PARAMETER);
-            outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER));
+            outBuffer.putLong        (Manageable::STATUS_PARAMETER_INVALID);
+            outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_PARAMETER_INVALID));
         }
         else
             try {

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/AclModule.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/AclModule.h?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/AclModule.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/AclModule.h Mon Oct  5 12:51:57 2009
@@ -26,7 +26,7 @@
 #include <map>
 #include <set>
 #include <string>
-
+#include <sstream>
 
 namespace qpid {
 
@@ -179,6 +179,8 @@
     typedef std::map<ObjectType, actionMapPtr> objectMap;
     typedef objectMap::const_iterator omCitr;
     typedef boost::shared_ptr<objectMap> objectMapPtr;
+    typedef std::map<Property, std::string> propMap;
+    typedef propMap::const_iterator propMapItr;
 
     // This map contains the legal combinations of object/action/properties found in an ACL file
     static void loadValidationMap(objectMapPtr& map) {
@@ -248,6 +250,19 @@
 
         map->insert(objectPair(OBJ_METHOD, a4));
     }
+
+    static std::string propertyMapToString(const std::map<Property, std::string>* params) {
+    	std::ostringstream ss;
+	ss << "{";
+	if (params)
+	{
+		for (propMapItr pMItr = params->begin(); pMItr != params->end(); pMItr++) {
+			ss << " " << getPropertyStr((Property) pMItr-> first) << "=" << pMItr->second;
+		}
+	}
+	ss << " }";
+	return ss.str();
+    }
 };
 
     

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Broker.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Broker.cpp Mon Oct  5 12:51:57 2009
@@ -386,7 +386,7 @@
 	if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty))
             status = Manageable::STATUS_OK;
 	else
-            return Manageable::STATUS_INVALID_PARAMETER;
+            return Manageable::STATUS_PARAMETER_INVALID;
         break;
       }
    default:

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Connection.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Connection.cpp Mon Oct  5 12:51:57 2009
@@ -269,11 +269,13 @@
                 cb(); // Lend the IO thread for management processing
             }
         }
-        if (mgmtClosing)
+        if (mgmtClosing) {
+            closed();
             close(connection::CLOSE_CODE_CONNECTION_FORCED, "Closed by Management Request");
-        else
+        } else {
             //then do other output as needed:
             return outputTasks.doOutput();
+	}
     }catch(ConnectionException& e){
         close(e.code, e.getMessage());
     }catch(std::exception& e){

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Exchange.cpp Mon Oct  5 12:51:57 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -38,7 +38,7 @@
 using qpid::management::Args;
 namespace _qmf = qmf::org::apache::qpid::broker;
 
-namespace 
+namespace
 {
 const std::string qpidMsgSequence("qpid.msg_sequence");
 const std::string qpidSequenceCounter("qpid.sequence_counter");
@@ -51,17 +51,19 @@
 const std::string fedOpUnbind("U");
 const std::string fedOpReorigin("R");
 const std::string fedOpHello("H");
+
+const std::string QPID_MANAGEMENT("qpid.management");
 }
 
 
 Exchange::PreRoute::PreRoute(Deliverable& msg, Exchange* _p):parent(_p) {
     if (parent){
         if (parent->sequence || parent->ive) parent->sequenceLock.lock();
-        
+
         if (parent->sequence){
             parent->sequenceNo++;
-            msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo); 
-        } 
+            msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo);
+        }
         if (parent->ive) {
             parent->lastMsg =  &( msg.getMessage());
         }
@@ -99,11 +101,9 @@
     }
 }
 
-static const std::string QPID_MANAGEMENT("qpid.management");
-
 Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
                    Manageable* parent, Broker* b)
-    : name(_name), durable(_durable), alternateUsers(0), persistenceId(0), 
+    : name(_name), durable(_durable), alternateUsers(0), persistenceId(0),
       args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), broker(b)
 {
     if (parent != 0 && broker != 0)
@@ -169,7 +169,7 @@
     string name;
     string type;
     FieldTable args;
-    
+
     buffer.getShortString(name);
     bool durable(buffer.getOctet());
     buffer.getShortString(type);
@@ -185,7 +185,7 @@
     }
 }
 
-void Exchange::encode(Buffer& buffer) const 
+void Exchange::encode(Buffer& buffer) const
 {
     buffer.putShortString(name);
     buffer.putOctet(durable);
@@ -195,8 +195,8 @@
     buffer.put(args);
 }
 
-uint32_t Exchange::encodedSize() const 
-{ 
+uint32_t Exchange::encodedSize() const
+{
     return name.size() + 1/*short string size*/
         + 1 /*durable*/
         + getType().size() + 1/*short string size*/

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Link.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Link.cpp Mon Oct  5 12:51:57 2009
@@ -200,8 +200,10 @@
 
         // Move the bridges to be deleted into a local vector so there is no
         // corruption of the iterator caused by bridge deletion.
-        for (Bridges::iterator i = active.begin(); i != active.end(); i++)
+        for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+            (*i)->closed();
             toDelete.push_back(*i);
+        }
         active.clear();
 
         for (Bridges::iterator i = created.begin(); i != created.end(); i++)
@@ -264,7 +266,6 @@
     }
     if (!cancellations.empty()) {
         for (Bridges::iterator i = cancellations.begin(); i != cancellations.end(); ++i) {
-            active.push_back(*i);
             (*i)->cancel(*connection);
         }
         cancellations.clear();

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Message.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Message.cpp Mon Oct  5 12:51:57 2009
@@ -362,7 +362,7 @@
     Replacement::iterator i = replacement.find(qfor);
     if (i != replacement.end()){
         return i->second;
-    }		 
+    }           
     return empty;
 }
 

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Message.h?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Message.h Mon Oct  5 12:51:57 2009
@@ -34,12 +34,12 @@
 #include <vector>
 
 namespace qpid {
-	
+       
 namespace framing {
 class FieldTable;
 class SequenceNumber;
 }
-	
+       
 namespace broker {
 class ConnectionToken;
 class Exchange;
@@ -145,9 +145,9 @@
 
     bool isExcluded(const std::vector<std::string>& excludes) const;
     void addTraceId(const std::string& id);
-	
-	void forcePersistent();
-	bool isForcedPersistent();
+       
+       void forcePersistent();
+       bool isForcedPersistent();
     
     boost::intrusive_ptr<Message>& getReplacementMessage(const Queue* qfor) const;
     void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor);

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/MessageBuilder.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/MessageBuilder.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/MessageBuilder.cpp Mon Oct  5 12:51:57 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,14 +30,14 @@
 using namespace qpid::broker;
 using namespace qpid::framing;
 
-namespace 
+namespace
 {
     std::string type_str(uint8_t type);
+    const std::string QPID_MANAGEMENT("qpid.management");
 }
-MessageBuilder::MessageBuilder(MessageStore* const _store, uint64_t _stagingThreshold) : 
-    state(DORMANT), store(_store), stagingThreshold(_stagingThreshold), staging(false) {}
 
-static const std::string QPID_MANAGEMENT("qpid.management");
+MessageBuilder::MessageBuilder(MessageStore* const _store, uint64_t _stagingThreshold) :
+    state(DORMANT), store(_store), stagingThreshold(_stagingThreshold), staging(false) {}
 
 void MessageBuilder::handle(AMQFrame& frame)
 {
@@ -54,10 +54,10 @@
             AMQFrame header((AMQHeaderBody()));
             header.setBof(false);
             header.setEof(false);
-            message->getFrames().append(header);            
+            message->getFrames().append(header);
         } else if (type != HEADER_BODY) {
             throw CommandInvalidException(
-                QPID_MSG("Invalid frame sequence for message, expected header or content got " 
+                QPID_MSG("Invalid frame sequence for message, expected header or content got "
                          << type_str(type) << ")"));
         }
         state = CONTENT;
@@ -74,13 +74,13 @@
     } else {
         message->getFrames().append(frame);
         //have we reached the staging limit? if so stage message and release content
-        if (state == CONTENT 
-            && stagingThreshold 
+        if (state == CONTENT
+            && stagingThreshold
             && message->getFrames().getContentSize() >= stagingThreshold
             && !NullMessageStore::isNullStore(store)
-            && message->getExchangeName() != QPID_MANAGEMENT /* don't stage mgnt messages */)   
+            && message->getExchangeName() != QPID_MANAGEMENT /* don't stage mgnt messages */)
         {
-            message->releaseContent(store); 
+            message->releaseContent(store);
             staging = true;
         }
     }
@@ -108,7 +108,7 @@
 const std::string HEARTBEAT_BODY_S = "HEARTBEAT";
 const std::string UNKNOWN = "unknown";
 
-std::string type_str(uint8_t type) 
+std::string type_str(uint8_t type)
 {
     switch(type) {
     case METHOD_BODY: return METHOD_BODY_S;
@@ -124,7 +124,7 @@
 void MessageBuilder::checkType(uint8_t expected, uint8_t actual)
 {
     if (expected != actual) {
-        throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (expected " 
+        throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (expected "
                                                << type_str(expected) << " got " << type_str(actual) << ")"));
     }
 }

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/PersistableMessage.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/PersistableMessage.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/PersistableMessage.cpp Mon Oct  5 12:51:57 2009
@@ -62,7 +62,7 @@
 void PersistableMessage::setContentReleased() {contentReleased = true; }
 
 bool PersistableMessage::isContentReleased()const { return contentReleased; }
-	
+       
 bool PersistableMessage::isEnqueueComplete() {
     sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
     return asyncEnqueueCounter == 0;

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/PersistableMessage.h Mon Oct  5 12:51:57 2009
@@ -46,7 +46,7 @@
     sys::Mutex asyncEnqueueLock;
     sys::Mutex asyncDequeueLock;
     sys::Mutex storeLock;
-	
+       
     /**
      * Tracks the number of outstanding asynchronous enqueue
      * operations. When the message is enqueued asynchronously the
@@ -97,7 +97,7 @@
     void flush();
     
     bool isContentReleased() const;
-	
+       
     QPID_BROKER_EXTERN bool isEnqueueComplete();
 
     QPID_BROKER_EXTERN void enqueueComplete();

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Queue.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Queue.cpp Mon Oct  5 12:51:57 2009
@@ -597,7 +597,7 @@
                     Mutex::ScopedUnlock u(messageLock);   
                     dequeue(0, QueuedMessage(qm.queue, old, qm.position));
                 }
-            }		 
+            }           
         }else {
             messages.push_back(qm);
             listeners.populate(copy);
@@ -702,7 +702,7 @@
     if (inLastNodeFailure && persistLastNode){
         msg->forcePersistent();
     }
-	
+       
     if (traceId.size()) {
         msg->addTraceId(traceId);
     }

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/QueueRegistry.h?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/QueueRegistry.h Mon Oct  5 12:51:57 2009
@@ -111,7 +111,7 @@
 
     /** Call f for each queue in the registry. */
     template <class F> void eachQueue(F f) const {
-        qpid::sys::RWlock::ScopedWlock l(lock);
+        qpid::sys::RWlock::ScopedRlock l(lock);
         for (QueueMap::const_iterator i = queues.begin(); i != queues.end(); ++i)
             f(i->second);
     }

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Mon Oct  5 12:51:57 2009
@@ -195,7 +195,7 @@
 {
     queue->setPersistenceId(id);
 }
-	
+       
 uint64_t RecoverableQueueImpl::getPersistenceId() const
 {
 	return queue->getPersistenceId();

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Mon Oct  5 12:51:57 2009
@@ -72,7 +72,7 @@
         params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? _TRUE : _FALSE) ));
         params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
         if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,&params) )
-            throw NotAllowedException(QPID_MSG("ACL denied exhange declare request from " << getConnection().getUserId()));
+            throw NotAllowedException(QPID_MSG("ACL denied exchange declare request from " << getConnection().getUserId()));
     }
     
     //TODO: implement autoDelete
@@ -121,9 +121,11 @@
 
 void SessionAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate)
 {
-    if (alternate && alternate != exchange->getAlternate()) 
+    if (alternate && ((exchange->getAlternate() && alternate != exchange->getAlternate())
+                      || !exchange->getAlternate()))
         throw NotAllowedException(QPID_MSG("Exchange declared with alternate-exchange "
-                                           << exchange->getAlternate()->getName() << ", requested " 
+                                           << (exchange->getAlternate() ? exchange->getAlternate()->getName() : "<nonexistent>")
+                                           << ", requested " 
                                            << alternate->getName()));
 }
                 
@@ -132,7 +134,7 @@
     AclModule* acl = getBroker().getAcl();
     if (acl) {
         if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) )
-            throw NotAllowedException(QPID_MSG("ACL denied exhange delete request from " << getConnection().getUserId()));
+            throw NotAllowedException(QPID_MSG("ACL denied exchange delete request from " << getConnection().getUserId()));
     }
 
     //TODO: implement unused
@@ -152,7 +154,7 @@
     AclModule* acl = getBroker().getAcl();
     if (acl) {
         if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,name,NULL) )
-            throw NotAllowedException(QPID_MSG("ACL denied exhange query request from " << getConnection().getUserId()));
+            throw NotAllowedException(QPID_MSG("ACL denied exchange query request from " << getConnection().getUserId()));
     }
 
     try {
@@ -169,8 +171,12 @@
 {
     AclModule* acl = getBroker().getAcl();
     if (acl) {
-        if (!acl->authorise(getConnection().getUserId(),acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,routingKey) )
-            throw NotAllowedException(QPID_MSG("ACL denied exhange bind request from " << getConnection().getUserId()));
+        std::map<acl::Property, std::string> params;
+        params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
+        params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey));
+
+        if (!acl->authorise(getConnection().getUserId(),acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,&params))
+            throw NotAllowedException(QPID_MSG("ACL denied exchange bind request from " << getConnection().getUserId()));
     }
 
     Queue::shared_ptr queue = getQueue(queueName);
@@ -232,8 +238,8 @@
         std::map<acl::Property, std::string> params;
         params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
         params.insert(make_pair(acl::PROP_ROUTINGKEY, key));
-        if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchangeName,&params) )
-            throw NotAllowedException(QPID_MSG("ACL denied exhange bound request from " << getConnection().getUserId()));
+        if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,exchangeName,&params) )
+            throw NotAllowedException(QPID_MSG("ACL denied exchange bound request from " << getConnection().getUserId()));
     }
     
     Exchange::shared_ptr exchange;

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/Dispatcher.cpp Mon Oct  5 12:51:57 2009
@@ -29,7 +29,14 @@
 #include "qpid/client/Message.h"
 #include "qpid/client/MessageImpl.h"
 
-#include <boost/state_saver.hpp>
+#include <boost/version.hpp>
+#if (BOOST_VERSION >= 104000)
+#  include <boost/serialization/state_saver.hpp>
+  using boost::serialization::state_saver;
+#else
+#  include <boost/state_saver.hpp>
+  using boost::state_saver;
+#endif /* BOOST_VERSION */
 
 using qpid::framing::FrameSet;
 using qpid::framing::MessageTransferBody;
@@ -65,7 +72,7 @@
     Mutex::ScopedLock l(lock);
     if (running) 
         throw Exception("Dispatcher is already running.");
-    boost::state_saver<bool>  reset(running); // Reset to false on exit.
+    state_saver<bool>  reset(running); // Reset to false on exit.
     running = true;
     try {
         while (!queue->isClosed()) {

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Mon Oct  5 12:51:57 2009
@@ -22,6 +22,7 @@
 #include "qpid/client/amqp0_10/Codecs.h"
 #include "qpid/client/amqp0_10/MessageSource.h"
 #include "qpid/client/amqp0_10/MessageSink.h"
+#include "qpid/client/amqp0_10/OutgoingMessage.h"
 #include "qpid/messaging/Address.h"
 #include "qpid/messaging/Filter.h"
 #include "qpid/messaging/Message.h"
@@ -122,7 +123,7 @@
              bool passive = true, const std::string& type = EMPTY_STRING, bool durable = false, 
              const FieldTable& options = EMPTY_FIELD_TABLE);
     void declare(qpid::client::AsyncSession& session, const std::string& name);
-    void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message);
+    void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message);
     void cancel(qpid::client::AsyncSession& session, const std::string& name);
   private:
     const std::string name;
@@ -139,7 +140,7 @@
     QueueSink(const std::string& name, bool passive=true, bool exclusive=false, 
               bool autoDelete=false, bool durable=false, const FieldTable& options = EMPTY_FIELD_TABLE);
     void declare(qpid::client::AsyncSession& session, const std::string& name);
-    void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message);
+    void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message);
     void cancel(qpid::client::AsyncSession& session, const std::string& name);
   private:
     const std::string name;
@@ -328,14 +329,12 @@
     }
 }
 
-void Exchange::send(qpid::client::AsyncSession& session, const std::string&, qpid::messaging::Message& m)
+void Exchange::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
 {
-    qpid::client::Message message;
-    convert(m, message);
-    if (message.getDeliveryProperties().getRoutingKey().empty() && !defaultSubject.empty()) {
-        message.getDeliveryProperties().setRoutingKey(defaultSubject);
+    if (m.message.getDeliveryProperties().getRoutingKey().empty() && !defaultSubject.empty()) {
+        m.message.getDeliveryProperties().setRoutingKey(defaultSubject);
     }
-    session.messageTransfer(arg::destination=name, arg::content=message);
+    m.status = session.messageTransfer(arg::destination=name, arg::content=m.message);
 }
 
 void Exchange::cancel(qpid::client::AsyncSession&, const std::string&) {}
@@ -355,12 +354,10 @@
                                    arg::autoDelete=autoDelete, arg::arguments=options);
     }
 }
-void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, qpid::messaging::Message& m)
+void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
 {
-    qpid::client::Message message;
-    convert(m, message);
-    message.getDeliveryProperties().setRoutingKey(name);
-    session.messageTransfer(arg::content=message);
+    m.message.getDeliveryProperties().setRoutingKey(name);
+    m.status = session.messageTransfer(arg::content=m.message);
 }
 
 void QueueSink::cancel(qpid::client::AsyncSession&, const std::string&) {}

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h Mon Oct  5 12:51:57 2009
@@ -31,8 +31,8 @@
 }
 
 namespace messaging {
-class Address;
-class Filter;
+struct Address;
+struct Filter;
 }
 
 namespace client {

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/Codecs.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/Codecs.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/Codecs.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/Codecs.cpp Mon Oct  5 12:51:57 2009
@@ -183,25 +183,25 @@
 {
     boost::shared_ptr<FieldValue> out;
     switch (in.getType()) {
-      case VOID: out = boost::shared_ptr<FieldValue>(new VoidValue()); break;
-      case BOOL: out = boost::shared_ptr<FieldValue>(new BoolValue(in.asBool())); break;
-      case UINT8: out = boost::shared_ptr<FieldValue>(new Unsigned8Value(in.asUint8())); break;
-      case UINT16: out = boost::shared_ptr<FieldValue>(new Unsigned16Value(in.asUint16())); break;
-      case UINT32: out = boost::shared_ptr<FieldValue>(new Unsigned32Value(in.asUint32())); break;
-      case UINT64: out = boost::shared_ptr<FieldValue>(new Unsigned64Value(in.asUint64())); break;
-      case INT8: out = boost::shared_ptr<FieldValue>(new Integer8Value(in.asInt8())); break;
-      case INT16: out = boost::shared_ptr<FieldValue>(new Integer16Value(in.asInt16())); break;
-      case INT32: out = boost::shared_ptr<FieldValue>(new Integer32Value(in.asInt32())); break;
-      case INT64: out = boost::shared_ptr<FieldValue>(new Integer64Value(in.asInt64())); break;
-      case FLOAT: out = boost::shared_ptr<FieldValue>(new FloatValue(in.asFloat())); break;
-      case DOUBLE: out = boost::shared_ptr<FieldValue>(new DoubleValue(in.asDouble())); break;
+      case VAR_VOID: out = boost::shared_ptr<FieldValue>(new VoidValue()); break;
+      case VAR_BOOL: out = boost::shared_ptr<FieldValue>(new BoolValue(in.asBool())); break;
+      case VAR_UINT8: out = boost::shared_ptr<FieldValue>(new Unsigned8Value(in.asUint8())); break;
+      case VAR_UINT16: out = boost::shared_ptr<FieldValue>(new Unsigned16Value(in.asUint16())); break;
+      case VAR_UINT32: out = boost::shared_ptr<FieldValue>(new Unsigned32Value(in.asUint32())); break;
+      case VAR_UINT64: out = boost::shared_ptr<FieldValue>(new Unsigned64Value(in.asUint64())); break;
+      case VAR_INT8: out = boost::shared_ptr<FieldValue>(new Integer8Value(in.asInt8())); break;
+      case VAR_INT16: out = boost::shared_ptr<FieldValue>(new Integer16Value(in.asInt16())); break;
+      case VAR_INT32: out = boost::shared_ptr<FieldValue>(new Integer32Value(in.asInt32())); break;
+      case VAR_INT64: out = boost::shared_ptr<FieldValue>(new Integer64Value(in.asInt64())); break;
+      case VAR_FLOAT: out = boost::shared_ptr<FieldValue>(new FloatValue(in.asFloat())); break;
+      case VAR_DOUBLE: out = boost::shared_ptr<FieldValue>(new DoubleValue(in.asDouble())); break;
         //TODO: check encoding (and length?) when deciding what AMQP type to treat string as
-      case STRING: out = boost::shared_ptr<FieldValue>(new Str16Value(in.asString())); break;
-      case MAP: 
+      case VAR_STRING: out = boost::shared_ptr<FieldValue>(new Str16Value(in.asString())); break;
+      case VAR_MAP: 
         //out = boost::shared_ptr<FieldValue>(toFieldValueCollection<FieldTableValue>(in.asMap(), &toFieldTableEntry));
         out = boost::shared_ptr<FieldValue>(toFieldTableValue(in.asMap()));
         break;
-      case LIST: 
+      case VAR_LIST: 
         //out = boost::shared_ptr<FieldValue>(toFieldValueCollection<ListValue>(in.asList(), &toFieldValue));
         out = boost::shared_ptr<FieldValue>(toListValue(in.asList()));
         break;

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Mon Oct  5 12:51:57 2009
@@ -21,14 +21,16 @@
 #include "ConnectionImpl.h"
 #include "SessionImpl.h"
 #include "qpid/messaging/Session.h"
-#include "qpid/client/ConnectionSettings.h"
+#include "qpid/client/PrivateImplRef.h"
 #include "qpid/log/Statement.h"
+#include <boost/intrusive_ptr.hpp>
 
 namespace qpid {
 namespace client {
 namespace amqp0_10 {
 
 using qpid::messaging::Variant;
+using namespace qpid::sys;
 
 template <class T> void setIfFound(const Variant::Map& map, const std::string& key, T& value)
 {
@@ -56,24 +58,124 @@
     setIfFound(from, "bounds", to.bounds);
 }
 
-ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options)
+ConnectionImpl::ConnectionImpl(const std::string& u, const Variant::Map& options) : 
+    url(u), reconnectionEnabled(true), timeout(-1),
+    minRetryInterval(1), maxRetryInterval(30)
 {
     QPID_LOG(debug, "Opening connection to " << url << " with " << options);
-    Url u(url);
-    ConnectionSettings settings;
     convert(options, settings);
-    connection.open(u, settings);
+    setIfFound(options, "reconnection-enabled", reconnectionEnabled);
+    setIfFound(options, "reconnection-timeout", timeout);
+    setIfFound(options, "min-retry-interval", minRetryInterval);
+    setIfFound(options, "max-retry-interval", maxRetryInterval);
+    connection.open(url, settings);
 }
 
 void ConnectionImpl::close()
 {
+    qpid::sys::Mutex::ScopedLock l(lock);
     connection.close();
 }
 
+boost::intrusive_ptr<SessionImpl> getImplPtr(qpid::messaging::Session& session)
+{
+    return boost::dynamic_pointer_cast<SessionImpl>(
+        qpid::client::PrivateImplRef<qpid::messaging::Session>::get(session)
+    );
+}
+
+void ConnectionImpl::closed(SessionImpl& s)
+{
+    qpid::sys::Mutex::ScopedLock l(lock);
+    for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+        if (getImplPtr(*i).get() == &s) {
+            sessions.erase(i);
+            break;
+        }
+    }
+}
+
 qpid::messaging::Session ConnectionImpl::newSession()
 {
-    qpid::messaging::Session impl(new SessionImpl(connection.newSession()));
+    qpid::messaging::Session impl(new SessionImpl(*this));
+    {
+        qpid::sys::Mutex::ScopedLock l(lock);
+        sessions.push_back(impl);
+    }
+    try {
+        getImplPtr(impl)->setSession(connection.newSession());
+    } catch (const TransportFailure&) {
+        reconnect();
+    }
     return impl;
 }
 
+void ConnectionImpl::reconnect()
+{
+    AbsTime start = now();
+    ScopedLock<Semaphore> l(semaphore);
+    if (!connection.isOpen()) connect(start);
+}
+
+bool expired(const AbsTime& start, int timeout)
+{
+    if (timeout == 0) return true;
+    if (timeout < 0) return false;
+    Duration used(start, now());
+    Duration allowed = timeout * TIME_SEC;
+    return allowed > used;
+}
+
+void ConnectionImpl::connect(const AbsTime& started)
+{
+    for (int i = minRetryInterval; !tryConnect(); i = std::min(i * 2, maxRetryInterval)) {
+        if (expired(started, timeout)) throw TransportFailure();
+        else qpid::sys::sleep(i);
+    }
+}
+
+bool ConnectionImpl::tryConnect()
+{
+    if (tryConnect(url) || tryConnect(connection.getKnownBrokers())) {
+        return resetSessions();
+    } else {
+        return false;
+    }
+}
+
+bool ConnectionImpl::tryConnect(const Url& u)
+{
+    try {
+        QPID_LOG(info, "Trying to connect to " << url << "...");                
+        connection.open(u, settings);
+        return true;
+    } catch (const Exception& e) {
+        //TODO: need to fix timeout on open so that it throws TransportFailure
+        QPID_LOG(info, "Failed to connect to " << u << ": " << e.what());                
+    }
+    return false;
+}
+
+bool ConnectionImpl::tryConnect(const std::vector<Url>& urls)
+{
+    for (std::vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
+        if (tryConnect(*i)) return true;
+    }
+    return false;
+}
+
+bool ConnectionImpl::resetSessions()
+{
+    try {
+        qpid::sys::Mutex::ScopedLock l(lock);
+        for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+            getImplPtr(*i)->setSession(connection.newSession());
+        }
+        return true;
+    } catch (const TransportFailure&) {
+        QPID_LOG(debug, "Connection failed while re-inialising sessions");
+        return false;
+    }
+}
+
 }}} // namespace qpid::client::amqp0_10

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h Mon Oct  5 12:51:57 2009
@@ -23,20 +23,46 @@
  */
 #include "qpid/messaging/ConnectionImpl.h"
 #include "qpid/messaging/Variant.h"
+#include "qpid/Url.h"
 #include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionSettings.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Semaphore.h"
+#include <vector>
 
 namespace qpid {
 namespace client {
 namespace amqp0_10 {
 
+class SessionImpl;
+
 class ConnectionImpl : public qpid::messaging::ConnectionImpl
 {
   public:
     ConnectionImpl(const std::string& url, const qpid::messaging::Variant::Map& options);
     void close();
     qpid::messaging::Session newSession();
+    void closed(SessionImpl&);
+    void reconnect();
   private:
+    typedef std::vector<qpid::messaging::Session> Sessions;
+
+    qpid::sys::Mutex lock;//used to protect data structures
+    qpid::sys::Semaphore semaphore;//used to coordinate reconnection
     qpid::client::Connection connection;
+    qpid::Url url;
+    qpid::client::ConnectionSettings settings;
+    Sessions sessions;
+    bool reconnectionEnabled;
+    int timeout;
+    int minRetryInterval;
+    int maxRetryInterval;
+
+    void connect(const qpid::sys::AbsTime& started);
+    bool tryConnect();
+    bool tryConnect(const std::vector<Url>& urls);
+    bool tryConnect(const Url&);
+    bool resetSessions();
 };
 }}} // namespace qpid::client::amqp0_10
 

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Mon Oct  5 12:51:57 2009
@@ -21,11 +21,13 @@
 #include "qpid/client/amqp0_10/IncomingMessages.h"
 #include "qpid/client/amqp0_10/AddressResolution.h"
 #include "qpid/client/amqp0_10/Codecs.h"
+#include "qpid/client/amqp0_10/CodecsInternal.h"
 #include "qpid/client/SessionImpl.h"
 #include "qpid/client/SessionBase_0_10Access.h"
 #include "qpid/log/Statement.h"
 #include "qpid/messaging/Address.h"
 #include "qpid/messaging/Message.h"
+#include "qpid/messaging/MessageImpl.h"
 #include "qpid/messaging/Variant.h"
 #include "qpid/framing/DeliveryProperties.h"
 #include "qpid/framing/FrameSet.h"
@@ -41,6 +43,7 @@
 using namespace qpid::framing::message;
 using qpid::sys::AbsTime;
 using qpid::sys::Duration;
+using qpid::messaging::MessageImplAccess;
 using qpid::messaging::Variant;
 
 namespace {
@@ -78,11 +81,32 @@
         }
     }
 };
+
+struct Match
+{
+    const std::string destination;
+    uint32_t matched;
+
+    Match(const std::string& d) : destination(d), matched(0) {}
+
+    bool operator()(boost::shared_ptr<qpid::framing::FrameSet> command)
+    {
+        if (command->as<MessageTransferBody>()->getDestination() == destination) {
+            ++matched;
+            return true;
+        } else {
+            return false;
+        }
+    }
+};
 }
 
-IncomingMessages::IncomingMessages(qpid::client::AsyncSession s) : 
-    session(s), 
-    incoming(SessionBase_0_10Access(session).get()->getDemux().getDefault()) {}
+void IncomingMessages::setSession(qpid::client::AsyncSession s)
+{
+    session = s;
+    incoming = SessionBase_0_10Access(session).get()->getDemux().getDefault();
+    acceptTracker.reset();
+}
 
 bool IncomingMessages::get(Handler& handler, Duration timeout)
 {
@@ -101,8 +125,7 @@
 
 void IncomingMessages::accept()
 {
-    session.messageAccept(unaccepted);
-    unaccepted.clear();
+    acceptTracker.accept(session);
 }
 
 void IncomingMessages::releaseAll()
@@ -116,8 +139,7 @@
     GetAny handler;
     while (process(&handler, 0)) ;
     //now release all messages
-    session.messageRelease(unaccepted);
-    unaccepted.clear();
+    acceptTracker.release(session);
 }
 
 void IncomingMessages::releasePending(const std::string& destination)
@@ -161,6 +183,32 @@
     return false;
 }
 
+uint32_t IncomingMessages::pendingAccept()
+{
+    return acceptTracker.acceptsPending();
+}
+uint32_t IncomingMessages::pendingAccept(const std::string& destination)
+{
+    return acceptTracker.acceptsPending(destination);
+}
+
+uint32_t IncomingMessages::available()
+{
+    //first pump all available messages from incoming to received...
+    while (process(0, 0)) {}
+    //return the count of received messages
+    return received.size();
+}
+
+uint32_t IncomingMessages::available(const std::string& destination)
+{
+    //first pump all available messages from incoming to received...
+    while (process(0, 0)) {}
+
+    //count all messages for this destination from received list
+    return std::for_each(received.begin(), received.end(), Match(destination)).matched;
+}
+
 void populate(qpid::messaging::Message& message, FrameSet& command);
 
 /**
@@ -175,7 +223,7 @@
     }
     const MessageTransferBody* transfer = command->as<MessageTransferBody>(); 
     if (transfer->getAcquireMode() == ACQUIRE_MODE_PRE_ACQUIRED && transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) {
-        unaccepted.add(command->getId());
+        acceptTracker.delivered(transfer->getDestination(), command->getId());
     }
     session.markCompleted(command->getId(), false, false);
 }
@@ -191,8 +239,6 @@
     parent.retrieve(content, message);
 }
 
-void translate(const FieldTable& from, Variant::Map& to);//implemented in Codecs.cpp
-
 void populateHeaders(qpid::messaging::Message& message, 
                      const DeliveryProperties* deliveryProperties, 
                      const MessageProperties* messageProperties)
@@ -206,6 +252,7 @@
         if (messageProperties->hasReplyTo()) {
             message.setReplyTo(AddressResolution::convert(messageProperties->getReplyTo()));
         }
+        message.getHeaders().clear();
         translate(messageProperties->getApplicationHeaders(), message.getHeaders());
         //TODO: convert other message properties
     }
@@ -219,9 +266,8 @@
 void populate(qpid::messaging::Message& message, FrameSet& command)
 {
     //need to be able to link the message back to the transfer it was delivered by
-    //e.g. for rejecting. TODO: hide this from API
-    uint32_t commandId = command.getId();
-    message.setInternalId(reinterpret_cast<void*>(commandId));
+    //e.g. for rejecting.
+    MessageImplAccess::get(message).setInternalId(command.getId());
         
     command.getContent(message.getBytes());
 

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h Mon Oct  5 12:51:57 2009
@@ -27,6 +27,7 @@
 #include "qpid/framing/SequenceSet.h"
 #include "qpid/sys/BlockingQueue.h"
 #include "qpid/sys/Time.h"
+#include "qpid/client/amqp0_10/AcceptTracker.h"
 
 namespace qpid {
 
@@ -67,20 +68,26 @@
         virtual bool accept(MessageTransfer& transfer) = 0;
     };
 
-    IncomingMessages(qpid::client::AsyncSession session);
+    void setSession(qpid::client::AsyncSession session);
     bool get(Handler& handler, qpid::sys::Duration timeout);
     //bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout);
     //bool get(const std::string& destination, qpid::messaging::Message& message, qpid::sys::Duration timeout);
     void accept();
     void releaseAll();
     void releasePending(const std::string& destination);
+
+    uint32_t pendingAccept();
+    uint32_t pendingAccept(const std::string& destination);
+
+    uint32_t available();
+    uint32_t available(const std::string& destination);
   private:
     typedef std::deque<FrameSetPtr> FrameSetQueue;
 
     qpid::client::AsyncSession session;
-    qpid::framing::SequenceSet unaccepted;
     boost::shared_ptr< sys::BlockingQueue<FrameSetPtr> > incoming;
     FrameSetQueue received;
+    AcceptTracker acceptTracker;
 
     bool process(Handler*, qpid::sys::Duration);
     void retrieve(FrameSetPtr, qpid::messaging::Message*);

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h Mon Oct  5 12:51:57 2009
@@ -33,6 +33,8 @@
 namespace client {
 namespace amqp0_10 {
 
+class OutgoingMessage;
+
 /**
  *
  */
@@ -41,7 +43,7 @@
   public:
     virtual ~MessageSink() {}
     virtual void declare(qpid::client::AsyncSession& session, const std::string& name) = 0;
-    virtual void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message) = 0;
+    virtual void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message) = 0;
     virtual void cancel(qpid::client::AsyncSession& session, const std::string& name) = 0;
   private:
 };

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp Mon Oct  5 12:51:57 2009
@@ -19,6 +19,7 @@
  *
  */
 #include "ReceiverImpl.h"
+#include "AddressResolution.h"
 #include "MessageSource.h"
 #include "SessionImpl.h"
 #include "qpid/messaging/MessageListener.h"
@@ -38,11 +39,6 @@
         window = capacity;
     }
 }
-
-bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::sys::Duration timeout)
-{
-    return parent.get(*this, message, timeout);
-}
     
 qpid::messaging::Message ReceiverImpl::get(qpid::sys::Duration timeout) 
 {
@@ -50,24 +46,6 @@
     if (!get(result, timeout)) throw Receiver::NoMessageAvailable();
     return result;
 }
-
-bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout)
-{
-    if (capacity == 0 && !cancelled) {
-        session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1);
-        if (!started) session.messageFlow(destination, CREDIT_UNIT_BYTE, byteCredit);
-    }
-    
-    if (get(message, timeout)) {
-        return true;
-    } else {
-        if (!cancelled) {
-            sync(session).messageFlush(destination);
-            start();//reallocate credit
-        }
-        return get(message, 0);
-    }
-}
     
 qpid::messaging::Message ReceiverImpl::fetch(qpid::sys::Duration timeout) 
 {
@@ -76,71 +54,152 @@
     return result;
 }
 
+bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::sys::Duration timeout)
+{
+    Get f(*this, message, timeout);
+    while (!parent.execute(f)) {}
+    return f.result;
+}
+
+bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout)
+{
+    Fetch f(*this, message, timeout);
+    while (!parent.execute(f)) {}
+    return f.result;
+}
+
 void ReceiverImpl::cancel() 
 { 
-    if (!cancelled) {
-        //TODO: should syncronicity be an optional argument to this call?
-        source->cancel(session, destination);
-        //need to be sure cancel is complete and all incoming
-        //framesets are processed before removing the receiver
-        parent.receiverCancelled(destination);
-        cancelled = true;
-    }
+    execute<Cancel>();
 }
 
 void ReceiverImpl::start()
 {
-    if (!cancelled) {
-        started = true;
-        session.messageSetFlowMode(destination, capacity > 0);
+    execute<Start>();
+}
+
+void ReceiverImpl::stop()
+{
+    execute<Stop>();
+}
+
+void ReceiverImpl::setCapacity(uint32_t c)
+{
+    execute1<SetCapacity>(c);
+}
+
+void ReceiverImpl::startFlow()
+{
+    if (capacity > 0) {
+        session.messageSetFlowMode(destination, FLOW_MODE_WINDOW);
         session.messageFlow(destination, CREDIT_UNIT_MESSAGE, capacity);
         session.messageFlow(destination, CREDIT_UNIT_BYTE, byteCredit);
         window = capacity;
     }
 }
 
-void ReceiverImpl::stop()
+void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver)
 {
-    session.messageStop(destination);
-    started = false;
+
+    session = s;
+    if (state == UNRESOLVED) {
+        source = resolver.resolveSource(session, address, filter, options);
+        state = STOPPED;//TODO: if session is started, go straight to started
+    }
+    if (state == CANCELLED) {
+        source->cancel(session, destination);
+        parent.receiverCancelled(destination);        
+    } else {
+        source->subscribe(session, destination);
+        if (state == STARTED) start();
+    }
 }
 
-void ReceiverImpl::subscribe()
+void ReceiverImpl::setListener(qpid::messaging::MessageListener* l) { listener = l; }
+qpid::messaging::MessageListener* ReceiverImpl::getListener() { return listener; }
+
+const std::string& ReceiverImpl::getName() const { return destination; }
+
+uint32_t ReceiverImpl::getCapacity()
+{
+    return capacity;
+}
+
+uint32_t ReceiverImpl::available()
+{
+    return parent.available(destination);
+}
+
+uint32_t ReceiverImpl::pendingAck()
 {
-    source->subscribe(session, destination);
+    return parent.pendingAck(destination);
 }
 
-void ReceiverImpl::setSession(qpid::client::AsyncSession s) 
+ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, 
+                           const qpid::messaging::Address& a,
+                           const qpid::messaging::Filter* f, 
+                           const qpid::messaging::Variant::Map& o) : 
+
+    parent(p), destination(name), address(a), filter(f), options(o), byteCredit(0xFFFFFFFF), 
+    state(UNRESOLVED), capacity(0), listener(0), window(0) {}
+
+bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout)
+{
+    return parent.get(*this, message, timeout);
+}
+
+bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout)
+{
+    if (state == CANCELLED) return false;//TODO: or should this be an error?
+
+    if (capacity == 0 || state != STARTED) {
+        session.messageSetFlowMode(destination, FLOW_MODE_CREDIT);
+        session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1);
+        session.messageFlow(destination, CREDIT_UNIT_BYTE, 0xFFFFFFFF);
+    }
+    
+    if (getImpl(message, timeout)) {
+        return true;
+    } else {
+        sync(session).messageFlush(destination);
+        startFlow();//reallocate credit
+        return getImpl(message, 0);
+    }
+}
+
+void ReceiverImpl::cancelImpl() 
 { 
-    session = s;
-    if (!cancelled) {
-        subscribe();
-        //if we were in started state before the session was changed,
-        //start again on this new session
-        //TODO: locking if receiver is to be threadsafe...
-        if (started) start();
+    if (state != CANCELLED) {
+        state = CANCELLED;
+        source->cancel(session, destination);
+        parent.receiverCancelled(destination);
     }
 }
 
-void ReceiverImpl::setCapacity(uint32_t c)
+void ReceiverImpl::startImpl()
+{
+    if (state == STOPPED) {
+        state = STARTED;
+        startFlow();
+    }
+}
+
+void ReceiverImpl::stopImpl()
+{
+    state = STOPPED;
+    session.messageStop(destination);
+}
+
+void ReceiverImpl::setCapacityImpl(uint32_t c)
 {
     if (c != capacity) {
         capacity = c;
-        if (!cancelled && started) {
-            stop();
-            start();
+        if (state == STARTED) {
+            session.messageStop(destination);
+            startFlow();
         }
     }
 }
 
-void ReceiverImpl::setListener(qpid::messaging::MessageListener* l) { listener = l; }
-qpid::messaging::MessageListener* ReceiverImpl::getListener() { return listener; }
-
-const std::string& ReceiverImpl::getName() const { return destination; }
-
-ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, std::auto_ptr<MessageSource> s) : 
-    parent(p), source(s), destination(name), byteCredit(0xFFFFFFFF), 
-    capacity(0), started(false), cancelled(false), listener(0), window(0) {}
-
 
 }}} // namespace qpid::client::amqp0_10

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h Mon Oct  5 12:51:57 2009
@@ -21,9 +21,13 @@
  * under the License.
  *
  */
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Filter.h"
 #include "qpid/messaging/Message.h"
 #include "qpid/messaging/ReceiverImpl.h"
+#include "qpid/messaging/Variant.h"
 #include "qpid/client/AsyncSession.h"
+#include "qpid/client/amqp0_10/SessionImpl.h"
 #include "qpid/sys/Time.h"
 #include <memory>
 
@@ -31,8 +35,8 @@
 namespace client {
 namespace amqp0_10 {
 
+class AddressResolution;
 class MessageSource;
-class SessionImpl;
 
 /**
  * A receiver implementation based on an AMQP 0-10 subscription.
@@ -41,8 +45,14 @@
 {
   public:
 
-    ReceiverImpl(SessionImpl& parent, const std::string& name, std::auto_ptr<MessageSource> source);
+    enum State {UNRESOLVED, STOPPED, STARTED, CANCELLED};
 
+    ReceiverImpl(SessionImpl& parent, const std::string& name,
+                 const qpid::messaging::Address& address,
+                 const qpid::messaging::Filter* filter,
+                 const qpid::messaging::Variant::Map& options);
+
+    void init(qpid::client::AsyncSession session, AddressResolution& resolver);
     bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout);
     qpid::messaging::Message get(qpid::sys::Duration timeout);
     bool fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout);
@@ -50,25 +60,107 @@
     void cancel();
     void start();
     void stop();
-    void subscribe();
-    void setSession(qpid::client::AsyncSession s);
     const std::string& getName() const;
     void setCapacity(uint32_t);
+    uint32_t getCapacity();
+    uint32_t available();
+    uint32_t pendingAck();
     void setListener(qpid::messaging::MessageListener* listener);
     qpid::messaging::MessageListener* getListener();
     void received(qpid::messaging::Message& message);
   private:
     SessionImpl& parent;
-    const std::auto_ptr<MessageSource> source;
     const std::string destination;
+    const qpid::messaging::Address address;
+    const qpid::messaging::Filter* filter;
+    const qpid::messaging::Variant::Map options;
     const uint32_t byteCredit;
-    
+    State state;
+
+    std::auto_ptr<MessageSource> source;
     uint32_t capacity;
     qpid::client::AsyncSession session;
-    bool started;
-    bool cancelled;
     qpid::messaging::MessageListener* listener;
     uint32_t window;
+
+    void startFlow();
+    //implementation of public facing methods
+    bool fetchImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout);
+    bool getImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout);
+    void startImpl();
+    void stopImpl();
+    void cancelImpl();
+    void setCapacityImpl(uint32_t);
+
+    //functors for public facing methods (allows locking and retry
+    //logic to be centralised)
+    struct Command
+    {
+        ReceiverImpl& impl;
+
+        Command(ReceiverImpl& i) : impl(i) {}
+    };
+
+    struct Get : Command
+    {
+        qpid::messaging::Message& message;
+        qpid::sys::Duration timeout;
+        bool result;
+
+        Get(ReceiverImpl& i, qpid::messaging::Message& m, qpid::sys::Duration t) : 
+            Command(i), message(m), timeout(t), result(false) {}
+        void operator()() { result = impl.getImpl(message, timeout); }
+    };
+
+    struct Fetch : Command
+    {
+        qpid::messaging::Message& message;
+        qpid::sys::Duration timeout;
+        bool result;
+
+        Fetch(ReceiverImpl& i, qpid::messaging::Message& m, qpid::sys::Duration t) : 
+            Command(i), message(m), timeout(t), result(false) {}
+        void operator()() { result = impl.fetchImpl(message, timeout); }
+    };
+
+    struct Stop : Command
+    {
+        Stop(ReceiverImpl& i) : Command(i) {}
+        void operator()() { impl.stopImpl(); }
+    };
+
+    struct Start : Command
+    {
+        Start(ReceiverImpl& i) : Command(i) {}
+        void operator()() { impl.startImpl(); }
+    };
+
+    struct Cancel : Command
+    {
+        Cancel(ReceiverImpl& i) : Command(i) {}
+        void operator()() { impl.cancelImpl(); }
+    };
+
+    struct SetCapacity : Command
+    {
+        uint32_t capacity;
+
+        SetCapacity(ReceiverImpl& i, uint32_t c) : Command(i), capacity(c) {}
+        void operator()() { impl.setCapacityImpl(capacity); }
+    };
+
+    //helper templates for some common patterns
+    template <class F> void execute()
+    {
+        F f(*this);
+        parent.execute(f);
+    }
+    
+    template <class F, class P> void execute1(P p)
+    {
+        F f(*this, p);
+        parent.execute(f);
+    }
 };
 
 }}} // namespace qpid::client::amqp0_10

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp Mon Oct  5 12:51:57 2009
@@ -21,29 +21,113 @@
 #include "SenderImpl.h"
 #include "MessageSink.h"
 #include "SessionImpl.h"
+#include "AddressResolution.h"
+#include "OutgoingMessage.h"
 
 namespace qpid {
 namespace client {
 namespace amqp0_10 {
 
-SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, std::auto_ptr<MessageSink> _sink) : 
-    parent(_parent), name(_name), sink(_sink) {}
+SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, 
+                       const qpid::messaging::Address& _address, 
+                       const qpid::messaging::Variant::Map& _options) : 
+    parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED),
+    capacity(50), window(0), flushed(false) {}
 
-void SenderImpl::send(qpid::messaging::Message& m) 
+void SenderImpl::send(const qpid::messaging::Message& message) 
 {
-    sink->send(session, name, m);
+    Send f(*this, &message);
+    while (f.repeat) parent.execute(f);
 }
 
 void SenderImpl::cancel()
 {
-    sink->cancel(session, name);
-    parent.senderCancelled(name);
+    execute<Cancel>();
+}
+
+void SenderImpl::setCapacity(uint32_t c)
+{
+    bool flush = c < capacity;
+    capacity = c;
+    execute1<CheckPendingSends>(flush);
 }
+uint32_t SenderImpl::getCapacity() { return capacity; }
+uint32_t SenderImpl::pending()
+{
+    CheckPendingSends f(*this, false);
+    parent.execute(f);
+    return f.pending;
+} 
 
-void SenderImpl::setSession(qpid::client::AsyncSession s)
+void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver)
 {
     session = s;
-    sink->declare(session, name);
+    if (state == UNRESOLVED) {
+        sink = resolver.resolveSink(session, address, options);
+        state = ACTIVE;
+    }
+    if (state == CANCELLED) {
+        sink->cancel(session, name);
+        parent.senderCancelled(name);
+    } else {
+        sink->declare(session, name);
+        replay();
+    }
+}
+
+void SenderImpl::waitForCapacity() 
+{
+    //TODO: add option to throw exception rather than blocking?
+    if (capacity <= (flushed ? checkPendingSends(false) : outgoing.size())) {
+        //Initial implementation is very basic. As outgoing is
+        //currently only reduced on receiving completions and we are
+        //blocking anyway we may as well sync(). If successful that
+        //should clear all outstanding sends.
+        session.sync();
+        checkPendingSends(false);
+    }
+    //flush periodically and check for conmpleted sends
+    if (++window > (capacity / 4)) {//TODO: make this configurable?
+        checkPendingSends(true);
+        window = 0;
+    }
+}
+
+void SenderImpl::sendImpl(const qpid::messaging::Message& m) 
+{
+    //TODO: make recording for replay optional (would still want to track completion however)
+    std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage());
+    msg->convert(m);
+    outgoing.push_back(msg.release());
+    sink->send(session, name, outgoing.back());
+}
+
+void SenderImpl::replay()
+{
+    for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) {
+        sink->send(session, name, *i);
+    }
+}
+
+uint32_t SenderImpl::checkPendingSends(bool flush)
+{
+    if (flush) {
+        session.flush(); 
+        flushed = true;
+    } else {
+        flushed = false;
+    }
+    while (!outgoing.empty() && outgoing.front().status.isComplete()) {
+        outgoing.pop_front();
+    }
+    return outgoing.size();
+}
+
+void SenderImpl::cancelImpl()
+{
+    state = CANCELLED;
+    sink->cancel(session, name);
+    parent.senderCancelled(name);
 }
 
 }}} // namespace qpid::client::amqp0_10



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org