You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/06/25 15:28:16 UTC
svn commit: r1496466 [2/2] - in /qpid/trunk/qpid/cpp/src: ./ qpid/amqp/
qpid/broker/ qpid/broker/amqp/ qpid/broker/amqp_0_10/ qpid/ha/
qpid/management/ qpid/messaging/amqp/
Copied: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Exception.cpp (from r1496401, qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Exception.cpp?p2=qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Exception.cpp&p1=qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.h&r1=1496401&r2=1496466&rev=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Exception.cpp Tue Jun 25 13:28:15 2013
@@ -1,6 +1,3 @@
-#ifndef QPID_BROKER_AMQP_DATAREADER_H
-#define QPID_BROKER_AMQP_DATAREADER_H
-
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -21,33 +18,13 @@
* under the License.
*
*/
-#include "qpid/amqp/Reader.h"
-
-struct pn_data_t;
+#include "Exception.h"
namespace qpid {
-namespace amqp {
-struct Descriptor;
-}
namespace broker {
namespace amqp {
-
-/**
- * Allows use of Reader interface to read pn_data_t* data.
- */
-class DataReader
-{
- public:
- DataReader(qpid::amqp::Reader& reader);
- void read(pn_data_t*);
- private:
- qpid::amqp::Reader& reader;
-
- void readOne(pn_data_t*);
- void readMap(pn_data_t*, const qpid::amqp::Descriptor*);
- void readList(pn_data_t*, const qpid::amqp::Descriptor*);
- void readArray(pn_data_t*, const qpid::amqp::Descriptor*);
-};
+Exception::Exception(const std::string& n, const std::string& d) : name(n), description(d) {}
+Exception::~Exception() throw() {}
+const char* Exception::what() const throw() { return description.c_str(); }
+const char* Exception::symbol() const throw() { return name.c_str(); }
}}} // namespace qpid::broker::amqp
-
-#endif /*!QPID_BROKER_AMQP_DATAREADER_H*/
Copied: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Exception.h (from r1496401, qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionIdentity.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Exception.h?p2=qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Exception.h&p1=qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionIdentity.h&r1=1496401&r2=1496466&rev=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionIdentity.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Exception.h Tue Jun 25 13:28:15 2013
@@ -1,5 +1,5 @@
-#ifndef QPID_BROKER_CONNECTIONIDENTITY_H
-#define QPID_BROKER_CONNECTIONIDENTITY_H
+#ifndef QPID_BROKER_AMQP_EXCEPTION_H
+#define QPID_BROKER_AMQP_EXCEPTION_H
/*
*
@@ -21,31 +21,25 @@
* under the License.
*
*/
-
#include <string>
namespace qpid {
-
-namespace management {
-class ObjectId;
-}
-
namespace broker {
-
-class OwnershipToken;
-
-// Interface used to hold Connection authentication and object details for use when authenticating
-// publihed management requests.
-class ConnectionIdentity {
-protected:
- virtual ~ConnectionIdentity() {}
-
-public:
- virtual const OwnershipToken* getOwnership() const = 0;
- virtual const management::ObjectId getObjectId() const = 0;
- virtual const std::string& getUserId() const = 0;
- virtual const std::string& getUrl() const = 0;
+namespace amqp {
+/**
+ * Exception to signal various AMQP 1.0 defined conditions
+ */
+class Exception : public std::exception
+{
+ public:
+ Exception(const std::string& name, const std::string& description);
+ virtual ~Exception() throw();
+ const char* what() const throw();
+ const char* symbol() const throw();
+ private:
+ std::string name;
+ std::string description;
};
+}}} // namespace qpid::broker::amqp
-}}
-#endif // QPID_BROKER_CONNECTIONIDENTITY_H
+#endif /*!QPID_BROKER_AMQP_EXCEPTION_H*/
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.cpp Tue Jun 25 13:28:15 2013
@@ -19,6 +19,7 @@
*
*/
#include "qpid/broker/amqp/Filter.h"
+#include "qpid/broker/amqp/Authorise.h"
#include "qpid/broker/amqp/DataReader.h"
#include "qpid/broker/amqp/Outgoing.h"
#include "qpid/broker/DirectExchange.h"
@@ -235,6 +236,15 @@ void Filter::configure(QueueSettings& se
}
}
+std::string Filter::getBindingKey(boost::shared_ptr<Exchange> exchange) const
+{
+ if (subjectFilter.value.empty() && exchange->getType() == TopicExchange::typeName) {
+ return WILDCARD;
+ } else {
+ return subjectFilter.value;
+ }
+}
+
void Filter::bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue)
{
qpid::framing::FieldTable bindingArgs;
@@ -379,5 +389,12 @@ void Filter::MapFilter::writeValue(pn_da
pn_data_exit(data);
}
+void Filter::write(std::map<std::string, qpid::types::Variant> source, pn_data_t* target)
+{
+ MapFilter dummy;
+ dummy.value = source;
+ dummy.writeValue(target);
+}
+
}}} // namespace qpid::broker::amqp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.h Tue Jun 25 13:28:15 2013
@@ -43,6 +43,7 @@ class Filter : qpid::amqp::MapReader
Filter();
void read(pn_data_t*);
void write(pn_data_t*);
+ std::string getBindingKey(boost::shared_ptr<Exchange> exchange) const;
/**
* Apply filters where source is a queue
@@ -57,6 +58,11 @@ class Filter : qpid::amqp::MapReader
* Bind subscription queue for case where source is an exchange
*/
void bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue);
+
+ /**
+ * Not really the ideal place for this, but the logic is already implemented here...
+ */
+ static void write(std::map<std::string, qpid::types::Variant> source, pn_data_t* target);
private:
struct FilterBase
{
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp Tue Jun 25 13:28:15 2013
@@ -19,8 +19,10 @@
*
*/
#include "Incoming.h"
+#include "Exception.h"
#include "Message.h"
#include "Session.h"
+#include "qpid/amqp/descriptors.h"
#include "qpid/broker/AsyncCompletion.h"
#include "qpid/broker/Message.h"
@@ -60,6 +62,30 @@ void Incoming::wakeup()
{
session.wakeup();
}
+
+void Incoming::verify(const std::string& u, const std::string& r)
+{
+ userid.init(u, r);
+}
+
+Incoming::UserId::UserId() : inDefaultRealm(false) {}
+void Incoming::UserId::init(const std::string& u, const std::string& defaultRealm)
+{
+ userid = u;
+ size_t at = userid.find('@');
+ if (at != std::string::npos) {
+ unqualified = userid.substr(0, at);
+ inDefaultRealm = defaultRealm == userid.substr(at+1);
+ }
+}
+void Incoming::UserId::verify(const std::string& claimed)
+{
+ if(!userid.empty() && !claimed.empty() && userid != claimed && !(inDefaultRealm && claimed == unqualified)) {
+ throw Exception(qpid::amqp::error_conditions::NOT_ALLOWED, QPID_MSG("Authenticated user id is " << userid << " but user id in message declared as " << claimed));
+ }
+}
+
+
namespace {
class Transfer : public qpid::broker::AsyncCompletion::Callback
{
@@ -89,7 +115,7 @@ void DecodingIncoming::readable(pn_deliv
pn_link_advance(link);
qpid::broker::Message message(received, received);
-
+ userid.verify(message.getUserId());
handle(message);
--window;
received->begin();
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h Tue Jun 25 13:28:15 2013
@@ -43,12 +43,28 @@ class Incoming : public ManagedIncomingL
virtual bool haveWork();//called when handling input to see whether any output work is needed
virtual void detached();
virtual void readable(pn_delivery_t* delivery) = 0;
+ void verify(const std::string& userid, const std::string& defaultRealm);
void wakeup();
protected:
+ class UserId
+ {
+ public:
+ UserId();
+ void init(const std::string& userid, const std::string& defaultRealm);
+ void verify(const std::string& claimed);
+ private:
+ std::string userid;
+ bool inDefaultRealm;
+ std::string unqualified;
+ };
+
const uint32_t credit;
uint32_t window;
+
+
pn_link_t* link;
Session& session;
+ UserId userid;
virtual uint32_t getCredit();
};
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp Tue Jun 25 13:28:15 2013
@@ -82,9 +82,7 @@ void Interconnect::process()
} else {
if ((pn_connection_state(connection) & UNINIT) == UNINIT) {
QPID_LOG_CAT(debug, model, id << " interconnect opened");
- pn_connection_set_container(connection, broker.getFederationTag().c_str());
- pn_connection_open(connection);
- out.connectionEstablished();
+ open();
pn_session_t* s = pn_session(connection);
pn_session_open(s);
@@ -116,4 +114,9 @@ void Interconnect::transportDeleted()
registry.remove(name);
}
+bool Interconnect::isLink() const
+{
+ return true;
+}
+
}}} // namespace qpid::broker::amqp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h Tue Jun 25 13:28:15 2013
@@ -44,6 +44,7 @@ class Interconnect : public Connection
size_t encode(char* buffer, size_t size);
void deletedFromRegistry();
void transportDeleted();
+ bool isLink() const;
private:
bool incoming;
std::string name;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp Tue Jun 25 13:28:15 2013
@@ -54,11 +54,17 @@ ManagedConnection::~ManagedConnection()
QPID_LOG_CAT(debug, model, "Delete connection. user:" << userid << " rhost:" << id);
}
-void ManagedConnection::setUserid(const std::string& uid)
+void ManagedConnection::setUserId(const std::string& uid)
{
userid = uid;
- if (agent && connection) {
+ if (connection) {
connection->set_authIdentity(userid);
+ }
+}
+
+void ManagedConnection::opened()
+{
+ if (agent) {
agent->raiseEvent(_qmf::EventClientConnect(id, userid, connection->get_remoteProperties()));
}
QPID_LOG_CAT(debug, model, "Create connection. user:" << userid << " rhost:" << id );
@@ -78,13 +84,20 @@ void ManagedConnection::setSaslSsf(int s
}
}
+void ManagedConnection::setPeerProperties(std::map<std::string, types::Variant>& p)
+{
+ peerProperties = p;
+ if (connection) {
+ connection->set_remoteProperties(peerProperties);
+ }
+}
+
void ManagedConnection::setContainerId(const std::string& container)
{
containerid = container;
+ peerProperties["container-id"] = containerid;
if (connection) {
- qpid::types::Variant::Map props;
- props["container-id"] = containerid;
- connection->set_remoteProperties(props);
+ connection->set_remoteProperties(peerProperties);
}
}
const std::string& ManagedConnection::getContainerId() const
@@ -98,7 +111,31 @@ qpid::management::ManagementObject::shar
}
std::string ManagedConnection::getId() const { return id; }
-std::string ManagedConnection::getUserid() const { return userid; }
+
+const OwnershipToken* ManagedConnection::getOwnership() const
+{
+ return this;
+}
+const management::ObjectId ManagedConnection::getObjectId() const
+{
+ return GetManagementObject()->getObjectId();
+}
+const std::string& ManagedConnection::getUserId() const
+{
+ return userid;
+}
+const std::string& ManagedConnection::getMgmtId() const
+{
+ return id;
+}
+const std::map<std::string, types::Variant>& ManagedConnection::getClientProperties() const
+{
+ return connection->get_remoteProperties();
+}
+bool ManagedConnection::isLink() const
+{
+ return false;
+}
bool ManagedConnection::isLocal(const OwnershipToken* t) const
{
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h Tue Jun 25 13:28:15 2013
@@ -22,7 +22,9 @@
*
*/
#include "qpid/management/Manageable.h"
+#include "qpid/broker/Connection.h"
#include "qpid/broker/OwnershipToken.h"
+#include "qpid/types/Variant.h"
#include "qmf/org/apache/qpid/broker/Connection.h"
namespace qpid {
@@ -34,28 +36,38 @@ namespace broker {
class Broker;
namespace amqp {
-class ManagedConnection : public qpid::management::Manageable, public OwnershipToken
+class ManagedConnection : public qpid::management::Manageable, public OwnershipToken, public qpid::broker::Connection
{
public:
ManagedConnection(Broker& broker, const std::string id);
virtual ~ManagedConnection();
- void setUserid(const std::string&);
+ virtual void setUserId(const std::string&);
std::string getId() const;
- std::string getUserid() const;
void setSaslMechanism(const std::string&);
void setSaslSsf(int);
void setContainerId(const std::string&);
const std::string& getContainerId() const;
+ void setPeerProperties(std::map<std::string, types::Variant>&);
qpid::management::ManagementObject::shared_ptr GetManagementObject() const;
bool isLocal(const OwnershipToken* t) const;
void incomingMessageReceived();
void outgoingMessageSent();
+
+ //ConnectionIdentity
+ const OwnershipToken* getOwnership() const;
+ const management::ObjectId getObjectId() const;
+ const std::string& getUserId() const;
+ const std::string& getMgmtId() const;
+ const std::map<std::string, types::Variant>& getClientProperties() const;
+ virtual bool isLink() const;
+ void opened();
private:
const std::string id;
std::string userid;
std::string containerid;
qmf::org::apache::qpid::broker::Connection::shared_ptr connection;
qpid::management::ManagementAgent* agent;
+ std::map<std::string, types::Variant> peerProperties;
};
}}} // namespace qpid::broker::amqp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp Tue Jun 25 13:28:15 2013
@@ -139,7 +139,7 @@ void Sasl::respond(qpid::SaslServer::Sta
{
switch (status) {
case qpid::SaslServer::OK:
- connection.setUserid(authenticator->getUserid());
+ connection.setUserId(authenticator->getUserid());
completed(true);
//can't set authenticated & failed until we have actually sent the outcome
state = SUCCESS_PENDING;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Tue Jun 25 13:28:15 2013
@@ -120,14 +120,16 @@ class IncomingToQueue : public DecodingI
class IncomingToExchange : public DecodingIncoming
{
public:
- IncomingToExchange(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l, const std::string& source) : DecodingIncoming(l, b, p, source, e->getName(), pn_link_name(l)), exchange(e) {}
+ IncomingToExchange(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l, const std::string& source)
+ : DecodingIncoming(l, b, p, source, e->getName(), pn_link_name(l)), exchange(e), authorise(p.getAuthorise()) {}
void handle(qpid::broker::Message& m);
private:
boost::shared_ptr<qpid::broker::Exchange> exchange;
+ Authorise& authorise;
};
Session::Session(pn_session_t* s, qpid::broker::Broker& b, Connection& c, qpid::sys::OutputControl& o)
- : ManagedSession(b, c, (boost::format("%1%") % s).str()), session(s), broker(b), connection(c), out(o), deleted(false) {}
+ : ManagedSession(b, c, (boost::format("%1%") % s).str()), session(s), broker(b), connection(c), out(o), deleted(false), authorise(connection.getUserId(), broker.getAcl()) {}
Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus, bool incoming)
@@ -140,11 +142,11 @@ Session::ResolvedNode Session::resolve(c
//is it a queue or an exchange?
node.properties.read(pn_terminus_properties(terminus));
if (node.properties.isQueue()) {
- node.queue = broker.createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserid(), connection.getId()).first;
+ node.queue = broker.createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId()).first;
} else {
qpid::framing::FieldTable args;
node.exchange = broker.createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.getAlternateExchange(),
- args, connection.getUserid(), connection.getId()).first;
+ args, connection.getUserId(), connection.getId()).first;
}
} else {
size_t i = name.find('@');
@@ -236,8 +238,13 @@ void Session::setupIncoming(pn_link_t* l
{
ResolvedNode node = resolve(name, target, true);
//set capabilities
- if (node.queue) setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.queue);
- else if (node.exchange) setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.exchange);
+ if (node.queue) {
+ setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.queue);
+ authorise.incoming(node.queue);
+ } else if (node.exchange) {
+ setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.exchange);
+ authorise.incoming(node.exchange);
+ }
const char* sourceAddress = pn_terminus_get_address(pn_link_remote_source(link));
if (!sourceAddress) {
@@ -260,6 +267,7 @@ void Session::setupIncoming(pn_link_t* l
pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED);
throw qpid::Exception("Node not found: " + name);/*not-found*/
}
+ if (broker.getOptions().auth && !connection.isLink()) incoming[link]->verify(connection.getUserId(), broker.getOptions().realm);
QPID_LOG(debug, "Incoming link attached");
}
@@ -282,11 +290,13 @@ void Session::setupOutgoing(pn_link_t* l
if (node.queue) {
+ authorise.outgoing(node.queue);
boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, node.queue, link, *this, out, false));
q->init();
filter.apply(q);
outgoing[link] = q;
} else if (node.exchange) {
+ authorise.access(node.exchange);//do separate access check before trying to create the queue
bool shared = is_capability_requested(SHARED, pn_terminus_capabilities(source));
bool durable = pn_terminus_get_durability(source);
QueueSettings settings(durable, !durable);
@@ -295,7 +305,7 @@ void Session::setupOutgoing(pn_link_t* l
std::stringstream queueName;
if (shared) {
//just use link name (TODO: could allow this to be
- //overridden when acces to link properties is provided
+ //overridden when access to link properties is provided
//(PROTON-335))
queueName << pn_link_name(link);
} else {
@@ -303,9 +313,9 @@ void Session::setupOutgoing(pn_link_t* l
queueName << connection.getContainerId() << "_" << pn_link_name(link);
}
boost::shared_ptr<qpid::broker::Queue> queue
- = broker.createQueue(queueName.str(), settings, this, "", connection.getUserid(), connection.getId()).first;
+ = broker.createQueue(queueName.str(), settings, this, "", connection.getUserId(), connection.getId()).first;
if (!shared) queue->setExclusiveOwner(this);
-
+ authorise.outgoing(node.exchange, queue, filter);
filter.bind(node.exchange, queue);
boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, queue, link, *this, out, !shared));
outgoing[link] = q;
@@ -460,6 +470,11 @@ void Session::wakeup()
out.activateOutput();
}
+Authorise& Session::getAuthorise()
+{
+ return authorise;
+}
+
void IncomingToQueue::handle(qpid::broker::Message& message)
{
queue->deliver(message);
@@ -467,6 +482,7 @@ void IncomingToQueue::handle(qpid::broke
void IncomingToExchange::handle(qpid::broker::Message& message)
{
+ authorise.route(exchange, message);
DeliverableMessage deliverable(message, 0);
exchange->route(deliverable);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.h Tue Jun 25 13:28:15 2013
@@ -23,6 +23,7 @@
*/
#include "qpid/sys/Mutex.h"
#include "qpid/sys/OutputControl.h"
+#include "qpid/broker/amqp/Authorise.h"
#include "qpid/broker/amqp/ManagedSession.h"
#include "qpid/broker/amqp/NodeProperties.h"
#include <deque>
@@ -75,6 +76,8 @@ class Session : public ManagedSession, p
void accepted(pn_delivery_t*, bool sync);
void wakeup();
+
+ Authorise& getAuthorise();
private:
typedef std::map<pn_link_t*, boost::shared_ptr<Outgoing> > OutgoingLinks;
typedef std::map<pn_link_t*, boost::shared_ptr<Incoming> > IncomingLinks;
@@ -88,6 +91,8 @@ class Session : public ManagedSession, p
bool deleted;
qpid::sys::Mutex lock;
std::set< boost::shared_ptr<Queue> > exclusiveQueues;
+ Authorise authorise;
+
struct ResolvedNode
{
boost::shared_ptr<qpid::broker::Exchange> exchange;
Copied: qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp (from r1496401, qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp?p2=qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp&p1=qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp&r1=1496401&r2=1496466&rev=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp Tue Jun 25 13:28:15 2013
@@ -18,7 +18,7 @@
* under the License.
*
*/
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/broker/ConnectionObserver.h"
#include "qpid/broker/SessionOutputException.h"
@@ -59,6 +59,7 @@ namespace _qmf = qmf::org::apache::qpid:
namespace qpid {
namespace broker {
+namespace amqp_0_10 {
struct ConnectionTimeoutTask : public sys::TimerTask {
sys::Timer& timer;
@@ -160,7 +161,6 @@ Connection::Connection(ConnectionOutputH
mgmtObject = _qmf::Connection::shared_ptr(new _qmf::Connection(agent, this, parent, mgmtId, !link, false, "AMQP 0-10"));
agent->addObject(mgmtObject, objectId);
}
- setUrl(mgmtId);
}
}
@@ -542,4 +542,4 @@ void Connection::restartTimeout()
bool Connection::isOpen() { return adapter.isOpen(); }
-}}
+}}}
Copied: qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h (from r1496401, qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h?p2=qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h&p1=qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h&r1=1496401&r2=1496466&rev=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h Tue Jun 25 13:28:15 2013
@@ -1,5 +1,5 @@
-#ifndef QPID_BROKER_CONNECTION_H
-#define QPID_BROKER_CONNECTION_H
+#ifndef QPID_BROKER_AMQP_0_10_CONNECTION_H
+#define QPID_BROKER_AMQP_0_10_CONNECTION_H
/*
*
@@ -30,13 +30,14 @@
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/ConnectionHandler.h"
-#include "qpid/broker/ConnectionIdentity.h"
+#include "qpid/broker/Connection.h"
#include "qpid/broker/OwnershipToken.h"
#include "qpid/management/Manageable.h"
#include "qpid/sys/AggregateOutput.h"
#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/SecuritySettings.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/types/Variant.h"
#include "qpid/RefCounted.h"
#include "qpid/Url.h"
#include "qpid/ptr_map.h"
@@ -62,9 +63,11 @@ class LinkRegistry;
class Queue;
class SecureConnection;
class SessionHandler;
+
+namespace amqp_0_10 {
struct ConnectionTimeoutTask;
-class Connection : public sys::ConnectionInputHandler, public ConnectionIdentity,
+class Connection : public sys::ConnectionInputHandler, public qpid::broker::Connection,
public OwnershipToken, public management::Manageable,
public RefCounted
{
@@ -77,12 +80,10 @@ class Connection : public sys::Connectio
void setHeartbeat(uint16_t hb) { heartbeat = hb; }
void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; }
- void setUrl(const std::string& _url) { url = _url; }
const OwnershipToken* getOwnership() const { return this; };
const management::ObjectId getObjectId() const { return GetManagementObject()->getObjectId(); };
const std::string& getUserId() const { return userId; }
- const std::string& getUrl() const { return url; }
void setUserProxyAuth(const bool b);
bool isUserProxyAuth() const { return userProxyAuth || federationPeerTag.size() > 0; } // links can proxy msgs with non-matching auth ids
@@ -172,11 +173,11 @@ class Connection : public sys::Connectio
/** @return true if the initial connection negotiation is complete. */
bool isOpen();
- bool isLink() { return link; }
+ bool isLink() const { return link; }
void startLinkHeartbeatTimeoutTask();
- void setClientProperties(const framing::FieldTable& cp) { clientProperties = cp; }
- const framing::FieldTable& getClientProperties() const { return clientProperties; }
+ void setClientProperties(const types::Variant::Map& cp) { clientProperties = cp; }
+ const types::Variant::Map& getClientProperties() const { return clientProperties; }
private:
// Management object is used in the constructor so must be early
@@ -195,7 +196,6 @@ class Connection : public sys::Connectio
uint16_t heartbeat;
uint16_t heartbeatmax;
std::string userId;
- std::string url;
bool userProxyAuth;
std::string federationPeerTag;
std::vector<Url> knownHosts;
@@ -218,7 +218,7 @@ class Connection : public sys::Connectio
boost::intrusive_ptr<sys::TimerTask> heartbeatTimer, linkHeartbeatTimer;
boost::intrusive_ptr<ConnectionTimeoutTask> timeoutTimer;
uint64_t objectId;
- framing::FieldTable clientProperties;
+ types::Variant::Map clientProperties;
friend class OutboundFrameTracker;
@@ -230,6 +230,6 @@ friend class OutboundFrameTracker;
qmf::org::apache::qpid::broker::Connection::shared_ptr getMgmtObject() { return mgmtObject; }
};
-}}
+}}}
-#endif /*!QPID_BROKER_CONNECTION_H*/
+#endif /*!QPID_BROKER_AMQP_0_10_CONNECTION_H*/
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Tue Jun 25 13:28:15 2013
@@ -22,7 +22,7 @@
#include "HaBroker.h"
#include "QueueReplicator.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/broker/ConnectionObserver.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueSettings.h"
@@ -378,7 +378,7 @@ void BrokerReplicator::connected(Bridge&
connection = link->getConnection();
assert(connection);
userId = link->getConnection()->getUserId();
- remoteHost = link->getConnection()->getUrl();
+ remoteHost = link->getConnection()->getMgmtId();
link->getRemoteAddress(primary);
string queueName = bridge.getQueueName();
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp Tue Jun 25 13:28:15 2013
@@ -23,7 +23,7 @@
#include "BrokerInfo.h"
#include "HaBroker.h"
#include "qpid/Url.h"
-#include "qpid/framing/FieldTable.h"
+#include "qpid/types/Variant.h"
#include "qpid/broker/Connection.h"
#include "qpid/log/Statement.h"
@@ -34,21 +34,23 @@ ConnectionObserver::ConnectionObserver(H
: haBroker(hb), logPrefix("Backup: "), self(uuid) {}
bool ConnectionObserver::getBrokerInfo(const broker::Connection& connection, BrokerInfo& info) {
- framing::FieldTable ft;
- if (connection.getClientProperties().getTable(ConnectionObserver::BACKUP_TAG, ft)) {
- info = BrokerInfo(ft);
+ qpid::types::Variant::Map::const_iterator i = connection.getClientProperties().find(ConnectionObserver::BACKUP_TAG);
+ if (i != connection.getClientProperties().end() && i->second.getType() == qpid::types::VAR_MAP) {
+ info = BrokerInfo(i->second.asMap());
return true;
}
return false;
}
bool ConnectionObserver::getAddress(const broker::Connection& connection, Address& addr) {
- Url url;
- url.parseNoThrow(
- connection.getClientProperties().getAsString(ConnectionObserver::ADDRESS_TAG).c_str());
- if (!url.empty()) {
- addr = url[0];
- return true;
+ qpid::types::Variant::Map::const_iterator i = connection.getClientProperties().find(ConnectionObserver::ADDRESS_TAG);
+ if (i != connection.getClientProperties().end()) {
+ Url url;
+ url.parseNoThrow(i->second.asString().c_str());
+ if (!url.empty()) {
+ addr = url[0];
+ return true;
+ }
}
return false;
}
@@ -86,7 +88,7 @@ void ConnectionObserver::opened(broker::
return;
}
if (connection.isLink()) return; // Allow outgoing links.
- if (connection.getClientProperties().isSet(ADMIN_TAG)) {
+ if (connection.getClientProperties().find(ADMIN_TAG) != connection.getClientProperties().end()) {
QPID_LOG(debug, logPrefix << "Accepted admin connection: "
<< connection.getMgmtId());
return; // No need to call observer, always allow admins.
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Tue Jun 25 13:28:15 2013
@@ -88,10 +88,27 @@ const string keyifyNameStr(const string&
struct ScopedManagementContext
{
- ScopedManagementContext(const ConnectionIdentity& p)
+ const Connection* context;
+
+ ScopedManagementContext(const Connection* p) : context(p)
+ {
+ if (p) setManagementExecutionContext(*p);
+ }
+
+ management::ObjectId getObjectId() const
+ {
+ return context ? context->getObjectId() : management::ObjectId();
+ }
+ std::string getUserId() const
+ {
+ return context ? context->getUserId() : std::string();
+ }
+ std::string getMgmtId() const
{
- setManagementExecutionContext(p);
+ return context ? context->getMgmtId() : std::string();
}
+
+
~ScopedManagementContext()
{
resetManagementExecutionContext();
@@ -2288,7 +2305,7 @@ void ManagementAgent::dispatchAgentComma
}
if (opcode == "_method_request")
- return handleMethodRequest(body, rte, rtk, cid, msg.getPublisherUserId(), viaLocal);
+ return handleMethodRequest(body, rte, rtk, cid, context.getUserId(), viaLocal);
else if (opcode == "_query_request")
return handleGetQuery(body, rte, rtk, cid, viaLocal);
else if (opcode == "_agent_locate_request")
@@ -2311,9 +2328,9 @@ void ManagementAgent::dispatchAgentComma
else if (opcode == 'q') handleClassInd (inBuffer, rtk, sequence);
else if (opcode == 'S') handleSchemaRequest (inBuffer, rte, rtk, sequence);
else if (opcode == 's') handleSchemaResponse (inBuffer, rtk, sequence);
- else if (opcode == 'A') handleAttachRequest (inBuffer, rtk, sequence, msg.getPublisherObjectId());
+ else if (opcode == 'A') handleAttachRequest (inBuffer, rtk, sequence, context.getObjectId());
else if (opcode == 'G') handleGetQuery (inBuffer, rtk, sequence);
- else if (opcode == 'M') handleMethodRequest (inBuffer, rtk, sequence, msg.getPublisherUserId());
+ else if (opcode == 'M') handleMethodRequest (inBuffer, rtk, sequence, context.getMgmtId());
}
}
@@ -2752,10 +2769,10 @@ ManagementAgent::EventQueue::Batch::cons
}
namespace {
-QPID_TSS const ConnectionIdentity* currentPublisher = 0;
+QPID_TSS const Connection* currentPublisher = 0;
}
-void setManagementExecutionContext(const ConnectionIdentity& p)
+void setManagementExecutionContext(const Connection& p)
{
currentPublisher = &p;
}
@@ -2765,7 +2782,7 @@ void resetManagementExecutionContext()
currentPublisher = 0;
}
-const ConnectionIdentity* getCurrentPublisher()
+const Connection* getCurrentPublisher()
{
return currentPublisher;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Tue Jun 25 13:28:15 2013
@@ -44,7 +44,6 @@
namespace qpid {
namespace broker {
class Connection;
-class ConnectionIdentity;
}
namespace sys {
class Timer;
@@ -379,9 +378,9 @@ private:
std::auto_ptr<EventQueue> sendQueue;
};
-void setManagementExecutionContext(const broker::ConnectionIdentity&);
+void setManagementExecutionContext(const broker::Connection&);
void resetManagementExecutionContext();
-const broker::ConnectionIdentity* getCurrentPublisher();
+const broker::Connection* getCurrentPublisher();
}}
#endif /*!_ManagementAgent_*/
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Tue Jun 25 13:28:15 2013
@@ -34,6 +34,7 @@
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
+#include "qpid/sys/SystemInfo.h"
#include "qpid/sys/Time.h"
#include <vector>
extern "C" {
@@ -125,6 +126,7 @@ void ConnectionContext::open()
}
QPID_LOG(debug, id << " Opening...");
+ setProperties();
pn_connection_open(connection);
wakeupDriver(); //want to write
while (pn_connection_state(connection) & PN_REMOTE_UNINIT) {
@@ -148,7 +150,7 @@ void ConnectionContext::endSession(boost
//wait for outstanding sends to settle
while (!ssn->settled()) {
QPID_LOG(debug, "Waiting for sends to settle before closing");
- wait();//wait until message has been confirmed
+ wait(ssn);//wait until message has been confirmed
}
pn_session_close(ssn->session);
@@ -165,7 +167,7 @@ void ConnectionContext::close()
//wait for outstanding sends to settle
while (!i->second->settled()) {
QPID_LOG(debug, "Waiting for sends to settle before closing");
- wait();//wait until message has been confirmed
+ wait(i->second);//wait until message has been confirmed
}
@@ -304,6 +306,7 @@ void ConnectionContext::attach(boost::sh
QPID_LOG(debug, "Dynamic target name set to " << lnk->address.getName());
}
lnk->verify(t);
+ checkClosed(ssn, lnk);
QPID_LOG(debug, "Attach succeeded to " << lnk->getTarget());
}
@@ -322,6 +325,7 @@ void ConnectionContext::attach(boost::sh
QPID_LOG(debug, "Dynamic source name set to " << lnk->address.getName());
}
lnk->verify(s);
+ checkClosed(ssn, lnk);
QPID_LOG(debug, "Attach succeeded from " << lnk->getSource());
}
@@ -471,8 +475,15 @@ void ConnectionContext::checkClosed(boos
{
checkClosed(ssn);
if ((pn_link_state(lnk) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
+ pn_condition_t* error = pn_link_remote_condition(lnk);
+ std::stringstream text;
+ if (pn_condition_is_set(error)) {
+ text << "Link detached by peer with " << pn_condition_get_name(error) << ": " << pn_condition_get_description(error);
+ } else {
+ text << "Link detached by peer";
+ }
pn_link_close(lnk);
- throw qpid::messaging::LinkError("Link detached by peer");
+ throw qpid::messaging::LinkError(text.str());
} else if ((pn_link_state(lnk) & IS_CLOSED) == IS_CLOSED) {
throw qpid::messaging::LinkError("Link is not attached");
}
@@ -692,5 +703,39 @@ bool ConnectionContext::CodecSwitch::can
return parent.canEncode();
}
+namespace {
+const std::string CLIENT_PROCESS_NAME("qpid.client_process");
+const std::string CLIENT_PID("qpid.client_pid");
+const std::string CLIENT_PPID("qpid.client_ppid");
+pn_bytes_t convert(const std::string& s)
+{
+ pn_bytes_t result;
+ result.start = const_cast<char*>(s.data());
+ result.size = s.size();
+ return result;
+}
+}
+void ConnectionContext::setProperties()
+{
+ /**
+ * Enable when proton 0.5 is released and qpidc has been updated
+ * to use it
+ *
+ pn_data_t* data = pn_connection_properties(connection);
+ pn_data_put_map(data);
+ pn_data_enter(data);
+
+ pn_data_put_symbol(data, convert(CLIENT_PROCESS_NAME));
+ std::string processName = sys::SystemInfo::getProcessName();
+ pn_data_put_string(data, convert(processName));
+
+ pn_data_put_symbol(data, convert(CLIENT_PID));
+ pn_data_put_int(data, sys::SystemInfo::getProcessId());
+
+ pn_data_put_symbol(data, convert(CLIENT_PPID));
+ pn_data_put_int(data, sys::SystemInfo::getParentProcessId());
+ pn_data_exit(data);
+ **/
+}
}}} // namespace qpid::messaging::amqp
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h Tue Jun 25 13:28:15 2013
@@ -150,6 +150,7 @@ class ConnectionContext : public qpid::s
std::size_t writeProtocolHeader(char* buffer, std::size_t size);
std::string getError();
bool useSasl();
+ void setProperties();
};
}}} // namespace qpid::messaging::amqp
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp Tue Jun 25 13:28:15 2013
@@ -93,8 +93,24 @@ SenderContext::Delivery* SenderContext::
}
}
+void SenderContext::check()
+{
+ if (pn_link_state(sender) & PN_REMOTE_CLOSED && !(pn_link_state(sender) & PN_LOCAL_CLOSED)) {
+ pn_condition_t* error = pn_link_remote_condition(sender);
+ std::stringstream text;
+ if (pn_condition_is_set(error)) {
+ text << "Link detached by peer with " << pn_condition_get_name(error) << ": " << pn_condition_get_description(error);
+ } else {
+ text << "Link detached by peer";
+ }
+ pn_link_close(sender);
+ throw qpid::messaging::LinkError(text.str());
+ }
+}
+
uint32_t SenderContext::processUnsettled()
{
+ check();
//remove messages from front of deque once peer has confirmed receipt
while (!deliveries.empty() && deliveries.front().delivered()) {
deliveries.front().settle();
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h Tue Jun 25 13:28:15 2013
@@ -74,6 +74,7 @@ class SenderContext
Delivery* send(const qpid::messaging::Message& message);
void configure();
void verify(pn_terminus_t*);
+ void check();
bool settled();
Address getAddress() const;
private:
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp Tue Jun 25 13:28:15 2013
@@ -144,7 +144,12 @@ bool SessionContext::settled()
{
bool result = true;
for (SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) {
- if (!i->second->settled()) result = false;
+ try {
+ if (!i->second->settled()) result = false;
+ } catch (const std::exception&) {
+ senders.erase(i);
+ throw;
+ }
}
return result;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org