You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/06/25 15:28:16 UTC

svn commit: r1496466 [2/2] - in /qpid/trunk/qpid/cpp/src: ./ qpid/amqp/ qpid/broker/ qpid/broker/amqp/ qpid/broker/amqp_0_10/ qpid/ha/ qpid/management/ qpid/messaging/amqp/

Copied: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Exception.cpp (from r1496401, qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Exception.cpp?p2=qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Exception.cpp&p1=qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.h&r1=1496401&r2=1496466&rev=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/DataReader.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Exception.cpp Tue Jun 25 13:28:15 2013
@@ -1,6 +1,3 @@
-#ifndef QPID_BROKER_AMQP_DATAREADER_H
-#define QPID_BROKER_AMQP_DATAREADER_H
-
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -21,33 +18,13 @@
  * under the License.
  *
  */
-#include "qpid/amqp/Reader.h"
-
-struct pn_data_t;
+#include "Exception.h"
 
 namespace qpid {
-namespace amqp {
-struct Descriptor;
-}
 namespace broker {
 namespace amqp {
-
-/**
- * Allows use of Reader interface to read pn_data_t* data.
- */
-class DataReader
-{
-  public:
-    DataReader(qpid::amqp::Reader& reader);
-    void read(pn_data_t*);
-  private:
-    qpid::amqp::Reader& reader;
-
-    void readOne(pn_data_t*);
-    void readMap(pn_data_t*, const qpid::amqp::Descriptor*);
-    void readList(pn_data_t*, const qpid::amqp::Descriptor*);
-    void readArray(pn_data_t*, const qpid::amqp::Descriptor*);
-};
+Exception::Exception(const std::string& n, const std::string& d) : name(n), description(d) {}
+Exception::~Exception() throw() {}
+const char* Exception::what() const throw() { return description.c_str(); }
+const char* Exception::symbol() const throw() { return name.c_str(); }
 }}} // namespace qpid::broker::amqp
-
-#endif  /*!QPID_BROKER_AMQP_DATAREADER_H*/

Copied: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Exception.h (from r1496401, qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionIdentity.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Exception.h?p2=qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Exception.h&p1=qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionIdentity.h&r1=1496401&r2=1496466&rev=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionIdentity.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Exception.h Tue Jun 25 13:28:15 2013
@@ -1,5 +1,5 @@
-#ifndef QPID_BROKER_CONNECTIONIDENTITY_H
-#define QPID_BROKER_CONNECTIONIDENTITY_H
+#ifndef QPID_BROKER_AMQP_EXCEPTION_H
+#define QPID_BROKER_AMQP_EXCEPTION_H
 
 /*
  *
@@ -21,31 +21,25 @@
  * under the License.
  *
  */
-
 #include <string>
 
 namespace qpid {
-
-namespace management {
-class ObjectId;
-}
-
 namespace broker {
-
-class OwnershipToken;
-
-// Interface used to hold Connection authentication and object details for use when authenticating
-// publihed management requests.
-class ConnectionIdentity {
-protected:
-    virtual ~ConnectionIdentity() {}
-
-public:
-    virtual const OwnershipToken* getOwnership() const = 0;
-    virtual const management::ObjectId getObjectId() const = 0;
-    virtual const std::string& getUserId() const = 0;
-    virtual const std::string& getUrl() const = 0;
+namespace amqp {
+/**
+ * Exception to signal various AMQP 1.0 defined conditions
+ */
+class Exception : public std::exception
+{
+  public:
+    Exception(const std::string& name, const std::string& description);
+    virtual ~Exception() throw();
+    const char* what() const throw();
+    const char* symbol() const throw();
+  private:
+    std::string name;
+    std::string description;
 };
+}}} // namespace qpid::broker::amqp
 
-}}
-#endif // QPID_BROKER_CONNECTIONIDENTITY_H
+#endif  /*!QPID_BROKER_AMQP_EXCEPTION_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.cpp Tue Jun 25 13:28:15 2013
@@ -19,6 +19,7 @@
  *
  */
 #include "qpid/broker/amqp/Filter.h"
+#include "qpid/broker/amqp/Authorise.h"
 #include "qpid/broker/amqp/DataReader.h"
 #include "qpid/broker/amqp/Outgoing.h"
 #include "qpid/broker/DirectExchange.h"
@@ -235,6 +236,15 @@ void Filter::configure(QueueSettings& se
     }
 }
 
+std::string Filter::getBindingKey(boost::shared_ptr<Exchange> exchange) const
+{
+    if (subjectFilter.value.empty() && exchange->getType() == TopicExchange::typeName) {
+        return WILDCARD;
+    } else {
+        return subjectFilter.value;
+    }
+}
+
 void Filter::bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue)
 {
     qpid::framing::FieldTable bindingArgs;
@@ -379,5 +389,12 @@ void Filter::MapFilter::writeValue(pn_da
     pn_data_exit(data);
 }
 
+void Filter::write(std::map<std::string, qpid::types::Variant> source, pn_data_t* target)
+{
+    MapFilter dummy;
+    dummy.value = source;
+    dummy.writeValue(target);
+}
+
 
 }}} // namespace qpid::broker::amqp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.h Tue Jun 25 13:28:15 2013
@@ -43,6 +43,7 @@ class Filter : qpid::amqp::MapReader
     Filter();
     void read(pn_data_t*);
     void write(pn_data_t*);
+    std::string getBindingKey(boost::shared_ptr<Exchange> exchange) const;
 
     /**
      * Apply filters where source is a queue
@@ -57,6 +58,11 @@ class Filter : qpid::amqp::MapReader
      * Bind subscription queue for case where source is an exchange
      */
     void bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue);
+
+    /**
+     * Not really the ideal place for this, but the logic is already implemented here...
+     */
+    static void write(std::map<std::string, qpid::types::Variant> source, pn_data_t* target);
   private:
     struct FilterBase
     {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp Tue Jun 25 13:28:15 2013
@@ -19,8 +19,10 @@
  *
  */
 #include "Incoming.h"
+#include "Exception.h"
 #include "Message.h"
 #include "Session.h"
+#include "qpid/amqp/descriptors.h"
 #include "qpid/broker/AsyncCompletion.h"
 #include "qpid/broker/Message.h"
 
@@ -60,6 +62,30 @@ void Incoming::wakeup()
 {
     session.wakeup();
 }
+
+void Incoming::verify(const std::string& u, const std::string& r)
+{
+    userid.init(u, r);
+}
+
+Incoming::UserId::UserId() : inDefaultRealm(false) {}
+void Incoming::UserId::init(const std::string& u, const std::string& defaultRealm)
+{
+    userid = u;
+    size_t at = userid.find('@');
+    if (at != std::string::npos) {
+        unqualified = userid.substr(0, at);
+        inDefaultRealm = defaultRealm == userid.substr(at+1);
+    }
+}
+void Incoming::UserId::verify(const std::string& claimed)
+{
+    if(!userid.empty() && !claimed.empty() && userid != claimed && !(inDefaultRealm && claimed == unqualified)) {
+        throw Exception(qpid::amqp::error_conditions::NOT_ALLOWED, QPID_MSG("Authenticated user id is " << userid << " but user id in message declared as " << claimed));
+    }
+}
+
+
 namespace {
     class Transfer : public qpid::broker::AsyncCompletion::Callback
     {
@@ -89,7 +115,7 @@ void DecodingIncoming::readable(pn_deliv
     pn_link_advance(link);
 
     qpid::broker::Message message(received, received);
-
+    userid.verify(message.getUserId());
     handle(message);
     --window;
     received->begin();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h Tue Jun 25 13:28:15 2013
@@ -43,12 +43,28 @@ class Incoming : public ManagedIncomingL
     virtual bool haveWork();//called when handling input to see whether any output work is needed
     virtual void detached();
     virtual void readable(pn_delivery_t* delivery) = 0;
+    void verify(const std::string& userid, const std::string& defaultRealm);
     void wakeup();
   protected:
+    class UserId
+    {
+      public:
+        UserId();
+        void init(const std::string& userid, const std::string& defaultRealm);
+        void verify(const std::string& claimed);
+      private:
+        std::string userid;
+        bool inDefaultRealm;
+        std::string unqualified;
+    };
+
     const uint32_t credit;
     uint32_t window;
+
+
     pn_link_t* link;
     Session& session;
+    UserId userid;
     virtual uint32_t getCredit();
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp Tue Jun 25 13:28:15 2013
@@ -82,9 +82,7 @@ void Interconnect::process()
     } else {
         if ((pn_connection_state(connection) & UNINIT) == UNINIT) {
             QPID_LOG_CAT(debug, model, id << " interconnect opened");
-            pn_connection_set_container(connection, broker.getFederationTag().c_str());
-            pn_connection_open(connection);
-            out.connectionEstablished();
+            open();
 
             pn_session_t* s = pn_session(connection);
             pn_session_open(s);
@@ -116,4 +114,9 @@ void Interconnect::transportDeleted()
     registry.remove(name);
 }
 
+bool Interconnect::isLink() const
+{
+    return true;
+}
+
 }}} // namespace qpid::broker::amqp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h Tue Jun 25 13:28:15 2013
@@ -44,6 +44,7 @@ class Interconnect : public Connection
     size_t encode(char* buffer, size_t size);
     void deletedFromRegistry();
     void transportDeleted();
+    bool isLink() const;
   private:
     bool incoming;
     std::string name;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp Tue Jun 25 13:28:15 2013
@@ -54,11 +54,17 @@ ManagedConnection::~ManagedConnection()
     QPID_LOG_CAT(debug, model, "Delete connection. user:" << userid << " rhost:" << id);
 }
 
-void ManagedConnection::setUserid(const std::string& uid)
+void ManagedConnection::setUserId(const std::string& uid)
 {
     userid = uid;
-    if (agent && connection) {
+    if (connection) {
         connection->set_authIdentity(userid);
+    }
+}
+
+void ManagedConnection::opened()
+{
+    if (agent) {
         agent->raiseEvent(_qmf::EventClientConnect(id, userid, connection->get_remoteProperties()));
     }
     QPID_LOG_CAT(debug, model, "Create connection. user:" << userid << " rhost:" << id );
@@ -78,13 +84,20 @@ void ManagedConnection::setSaslSsf(int s
     }
 }
 
+void ManagedConnection::setPeerProperties(std::map<std::string, types::Variant>& p)
+{
+    peerProperties = p;
+    if (connection) {
+        connection->set_remoteProperties(peerProperties);
+    }
+}
+
 void ManagedConnection::setContainerId(const std::string& container)
 {
     containerid = container;
+    peerProperties["container-id"] = containerid;
     if (connection) {
-        qpid::types::Variant::Map props;
-        props["container-id"] = containerid;
-        connection->set_remoteProperties(props);
+        connection->set_remoteProperties(peerProperties);
     }
 }
 const std::string& ManagedConnection::getContainerId() const
@@ -98,7 +111,31 @@ qpid::management::ManagementObject::shar
 }
 
 std::string ManagedConnection::getId() const { return id; }
-std::string ManagedConnection::getUserid() const { return userid; }
+
+const OwnershipToken* ManagedConnection::getOwnership() const
+{
+    return this;
+}
+const management::ObjectId ManagedConnection::getObjectId() const
+{
+    return GetManagementObject()->getObjectId();
+}
+const std::string& ManagedConnection::getUserId() const
+{
+    return userid;
+}
+const std::string& ManagedConnection::getMgmtId() const
+{
+    return id;
+}
+const std::map<std::string, types::Variant>& ManagedConnection::getClientProperties() const
+{
+    return connection->get_remoteProperties();
+}
+bool ManagedConnection::isLink() const
+{
+    return false;
+}
 
 bool ManagedConnection::isLocal(const OwnershipToken* t) const
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h Tue Jun 25 13:28:15 2013
@@ -22,7 +22,9 @@
  *
  */
 #include "qpid/management/Manageable.h"
+#include "qpid/broker/Connection.h"
 #include "qpid/broker/OwnershipToken.h"
+#include "qpid/types/Variant.h"
 #include "qmf/org/apache/qpid/broker/Connection.h"
 
 namespace qpid {
@@ -34,28 +36,38 @@ namespace broker {
 class Broker;
 namespace amqp {
 
-class ManagedConnection : public qpid::management::Manageable, public OwnershipToken
+class ManagedConnection : public qpid::management::Manageable, public OwnershipToken, public qpid::broker::Connection
 {
   public:
     ManagedConnection(Broker& broker, const std::string id);
     virtual ~ManagedConnection();
-    void setUserid(const std::string&);
+    virtual void setUserId(const std::string&);
     std::string getId() const;
-    std::string getUserid() const;
     void setSaslMechanism(const std::string&);
     void setSaslSsf(int);
     void setContainerId(const std::string&);
     const std::string& getContainerId() const;
+    void setPeerProperties(std::map<std::string, types::Variant>&);
     qpid::management::ManagementObject::shared_ptr GetManagementObject() const;
     bool isLocal(const OwnershipToken* t) const;
     void incomingMessageReceived();
     void outgoingMessageSent();
+
+    //ConnectionIdentity
+    const OwnershipToken* getOwnership() const;
+    const management::ObjectId getObjectId() const;
+    const std::string& getUserId() const;
+    const std::string& getMgmtId() const;
+    const std::map<std::string, types::Variant>& getClientProperties() const;
+    virtual bool isLink() const;
+    void opened();
   private:
     const std::string id;
     std::string userid;
     std::string containerid;
     qmf::org::apache::qpid::broker::Connection::shared_ptr connection;
     qpid::management::ManagementAgent* agent;
+    std::map<std::string, types::Variant> peerProperties;
 };
 }}} // namespace qpid::broker::amqp
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp Tue Jun 25 13:28:15 2013
@@ -139,7 +139,7 @@ void Sasl::respond(qpid::SaslServer::Sta
 {
     switch (status) {
       case qpid::SaslServer::OK:
-        connection.setUserid(authenticator->getUserid());
+        connection.setUserId(authenticator->getUserid());
         completed(true);
         //can't set authenticated & failed until we have actually sent the outcome
         state = SUCCESS_PENDING;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Tue Jun 25 13:28:15 2013
@@ -120,14 +120,16 @@ class IncomingToQueue : public DecodingI
 class IncomingToExchange : public DecodingIncoming
 {
   public:
-    IncomingToExchange(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l, const std::string& source) : DecodingIncoming(l, b, p, source, e->getName(), pn_link_name(l)), exchange(e) {}
+    IncomingToExchange(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l, const std::string& source)
+        : DecodingIncoming(l, b, p, source, e->getName(), pn_link_name(l)), exchange(e), authorise(p.getAuthorise()) {}
     void handle(qpid::broker::Message& m);
   private:
     boost::shared_ptr<qpid::broker::Exchange> exchange;
+    Authorise& authorise;
 };
 
 Session::Session(pn_session_t* s, qpid::broker::Broker& b, Connection& c, qpid::sys::OutputControl& o)
-    : ManagedSession(b, c, (boost::format("%1%") % s).str()), session(s), broker(b), connection(c), out(o), deleted(false) {}
+    : ManagedSession(b, c, (boost::format("%1%") % s).str()), session(s), broker(b), connection(c), out(o), deleted(false), authorise(connection.getUserId(), broker.getAcl()) {}
 
 
 Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus, bool incoming)
@@ -140,11 +142,11 @@ Session::ResolvedNode Session::resolve(c
             //is it a queue or an exchange?
             node.properties.read(pn_terminus_properties(terminus));
             if (node.properties.isQueue()) {
-                node.queue = broker.createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserid(), connection.getId()).first;
+                node.queue = broker.createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId()).first;
             } else {
                 qpid::framing::FieldTable args;
                 node.exchange = broker.createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.getAlternateExchange(),
-                                                      args, connection.getUserid(), connection.getId()).first;
+                                                      args, connection.getUserId(), connection.getId()).first;
             }
         } else {
             size_t i = name.find('@');
@@ -236,8 +238,13 @@ void Session::setupIncoming(pn_link_t* l
 {
     ResolvedNode node = resolve(name, target, true);
     //set capabilities
-    if (node.queue) setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.queue);
-    else if (node.exchange) setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.exchange);
+    if (node.queue) {
+        setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.queue);
+        authorise.incoming(node.queue);
+    } else if (node.exchange) {
+        setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.exchange);
+        authorise.incoming(node.exchange);
+    }
 
     const char* sourceAddress = pn_terminus_get_address(pn_link_remote_source(link));
     if (!sourceAddress) {
@@ -260,6 +267,7 @@ void Session::setupIncoming(pn_link_t* l
         pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED);
         throw qpid::Exception("Node not found: " + name);/*not-found*/
     }
+    if (broker.getOptions().auth && !connection.isLink()) incoming[link]->verify(connection.getUserId(), broker.getOptions().realm);
     QPID_LOG(debug, "Incoming link attached");
 }
 
@@ -282,11 +290,13 @@ void Session::setupOutgoing(pn_link_t* l
 
 
     if (node.queue) {
+        authorise.outgoing(node.queue);
         boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, node.queue, link, *this, out, false));
         q->init();
         filter.apply(q);
         outgoing[link] = q;
     } else if (node.exchange) {
+        authorise.access(node.exchange);//do separate access check before trying to create the queue
         bool shared = is_capability_requested(SHARED, pn_terminus_capabilities(source));
         bool durable = pn_terminus_get_durability(source);
         QueueSettings settings(durable, !durable);
@@ -295,7 +305,7 @@ void Session::setupOutgoing(pn_link_t* l
         std::stringstream queueName;
         if (shared) {
             //just use link name (TODO: could allow this to be
-            //overridden when acces to link properties is provided
+            //overridden when access to link properties is provided
             //(PROTON-335))
             queueName << pn_link_name(link);
         } else {
@@ -303,9 +313,9 @@ void Session::setupOutgoing(pn_link_t* l
             queueName << connection.getContainerId() << "_" << pn_link_name(link);
         }
         boost::shared_ptr<qpid::broker::Queue> queue
-            = broker.createQueue(queueName.str(), settings, this, "", connection.getUserid(), connection.getId()).first;
+            = broker.createQueue(queueName.str(), settings, this, "", connection.getUserId(), connection.getId()).first;
         if (!shared) queue->setExclusiveOwner(this);
-
+        authorise.outgoing(node.exchange, queue, filter);
         filter.bind(node.exchange, queue);
         boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, queue, link, *this, out, !shared));
         outgoing[link] = q;
@@ -460,6 +470,11 @@ void Session::wakeup()
     out.activateOutput();
 }
 
+Authorise& Session::getAuthorise()
+{
+    return authorise;
+}
+
 void IncomingToQueue::handle(qpid::broker::Message& message)
 {
     queue->deliver(message);
@@ -467,6 +482,7 @@ void IncomingToQueue::handle(qpid::broke
 
 void IncomingToExchange::handle(qpid::broker::Message& message)
 {
+    authorise.route(exchange, message);
     DeliverableMessage deliverable(message, 0);
     exchange->route(deliverable);
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.h Tue Jun 25 13:28:15 2013
@@ -23,6 +23,7 @@
  */
 #include "qpid/sys/Mutex.h"
 #include "qpid/sys/OutputControl.h"
+#include "qpid/broker/amqp/Authorise.h"
 #include "qpid/broker/amqp/ManagedSession.h"
 #include "qpid/broker/amqp/NodeProperties.h"
 #include <deque>
@@ -75,6 +76,8 @@ class Session : public ManagedSession, p
     void accepted(pn_delivery_t*, bool sync);
 
     void wakeup();
+
+    Authorise& getAuthorise();
   private:
     typedef std::map<pn_link_t*, boost::shared_ptr<Outgoing> > OutgoingLinks;
     typedef std::map<pn_link_t*, boost::shared_ptr<Incoming> > IncomingLinks;
@@ -88,6 +91,8 @@ class Session : public ManagedSession, p
     bool deleted;
     qpid::sys::Mutex lock;
     std::set< boost::shared_ptr<Queue> > exclusiveQueues;
+    Authorise authorise;
+
     struct ResolvedNode
     {
         boost::shared_ptr<qpid::broker::Exchange> exchange;

Copied: qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp (from r1496401, qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp?p2=qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp&p1=qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp&r1=1496401&r2=1496466&rev=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp Tue Jun 25 13:28:15 2013
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
 
 #include "qpid/broker/ConnectionObserver.h"
 #include "qpid/broker/SessionOutputException.h"
@@ -59,6 +59,7 @@ namespace _qmf = qmf::org::apache::qpid:
 
 namespace qpid {
 namespace broker {
+namespace amqp_0_10 {
 
 struct ConnectionTimeoutTask : public sys::TimerTask {
     sys::Timer& timer;
@@ -160,7 +161,6 @@ Connection::Connection(ConnectionOutputH
             mgmtObject = _qmf::Connection::shared_ptr(new _qmf::Connection(agent, this, parent, mgmtId, !link, false, "AMQP 0-10"));
             agent->addObject(mgmtObject, objectId);
         }
-        setUrl(mgmtId);
     }
 }
 
@@ -542,4 +542,4 @@ void Connection::restartTimeout()
 
 bool Connection::isOpen() { return adapter.isOpen(); }
 
-}}
+}}}

Copied: qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h (from r1496401, qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h?p2=qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h&p1=qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h&r1=1496401&r2=1496466&rev=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h Tue Jun 25 13:28:15 2013
@@ -1,5 +1,5 @@
-#ifndef QPID_BROKER_CONNECTION_H
-#define QPID_BROKER_CONNECTION_H
+#ifndef QPID_BROKER_AMQP_0_10_CONNECTION_H
+#define QPID_BROKER_AMQP_0_10_CONNECTION_H
 
 /*
  *
@@ -30,13 +30,14 @@
 #include "qpid/broker/BrokerImportExport.h"
 
 #include "qpid/broker/ConnectionHandler.h"
-#include "qpid/broker/ConnectionIdentity.h"
+#include "qpid/broker/Connection.h"
 #include "qpid/broker/OwnershipToken.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/sys/AggregateOutput.h"
 #include "qpid/sys/ConnectionInputHandler.h"
 #include "qpid/sys/SecuritySettings.h"
 #include "qpid/sys/Mutex.h"
+#include "qpid/types/Variant.h"
 #include "qpid/RefCounted.h"
 #include "qpid/Url.h"
 #include "qpid/ptr_map.h"
@@ -62,9 +63,11 @@ class LinkRegistry;
 class Queue;
 class SecureConnection;
 class SessionHandler;
+
+namespace amqp_0_10 {
 struct ConnectionTimeoutTask;
 
-class Connection : public sys::ConnectionInputHandler, public ConnectionIdentity,
+class Connection : public sys::ConnectionInputHandler, public qpid::broker::Connection,
                    public OwnershipToken, public management::Manageable,
                    public RefCounted
 {
@@ -77,12 +80,10 @@ class Connection : public sys::Connectio
     void setHeartbeat(uint16_t hb) { heartbeat = hb; }
     void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; }
 
-    void setUrl(const std::string& _url) { url = _url; }
 
     const OwnershipToken* getOwnership() const { return this; };
     const management::ObjectId getObjectId() const { return GetManagementObject()->getObjectId(); };
     const std::string& getUserId() const { return userId; }
-    const std::string& getUrl() const { return url; }
 
     void setUserProxyAuth(const bool b);
     bool isUserProxyAuth() const { return userProxyAuth || federationPeerTag.size() > 0; } // links can proxy msgs with non-matching auth ids
@@ -172,11 +173,11 @@ class Connection : public sys::Connectio
     /** @return true if the initial connection negotiation is complete. */
     bool isOpen();
 
-    bool isLink() { return link; }
+    bool isLink() const { return link; }
     void startLinkHeartbeatTimeoutTask();
 
-    void setClientProperties(const framing::FieldTable& cp) { clientProperties = cp; }
-    const framing::FieldTable& getClientProperties() const { return clientProperties; }
+    void setClientProperties(const types::Variant::Map& cp) { clientProperties = cp; }
+    const types::Variant::Map& getClientProperties() const { return clientProperties; }
 
   private:
     // Management object is used in the constructor so must be early
@@ -195,7 +196,6 @@ class Connection : public sys::Connectio
     uint16_t heartbeat;
     uint16_t heartbeatmax;
     std::string userId;
-    std::string url;
     bool userProxyAuth;
     std::string federationPeerTag;
     std::vector<Url> knownHosts;
@@ -218,7 +218,7 @@ class Connection : public sys::Connectio
     boost::intrusive_ptr<sys::TimerTask> heartbeatTimer, linkHeartbeatTimer;
     boost::intrusive_ptr<ConnectionTimeoutTask> timeoutTimer;
     uint64_t objectId;
-    framing::FieldTable clientProperties;
+    types::Variant::Map clientProperties;
 
 friend class OutboundFrameTracker;
 
@@ -230,6 +230,6 @@ friend class OutboundFrameTracker;
     qmf::org::apache::qpid::broker::Connection::shared_ptr getMgmtObject() { return mgmtObject; }
 };
 
-}}
+}}}
 
-#endif  /*!QPID_BROKER_CONNECTION_H*/
+#endif  /*!QPID_BROKER_AMQP_0_10_CONNECTION_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Tue Jun 25 13:28:15 2013
@@ -22,7 +22,7 @@
 #include "HaBroker.h"
 #include "QueueReplicator.h"
 #include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
 #include "qpid/broker/ConnectionObserver.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueueSettings.h"
@@ -378,7 +378,7 @@ void BrokerReplicator::connected(Bridge&
     connection = link->getConnection();
     assert(connection);
     userId = link->getConnection()->getUserId();
-    remoteHost = link->getConnection()->getUrl();
+    remoteHost = link->getConnection()->getMgmtId();
 
     link->getRemoteAddress(primary);
     string queueName = bridge.getQueueName();

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp Tue Jun 25 13:28:15 2013
@@ -23,7 +23,7 @@
 #include "BrokerInfo.h"
 #include "HaBroker.h"
 #include "qpid/Url.h"
-#include "qpid/framing/FieldTable.h"
+#include "qpid/types/Variant.h"
 #include "qpid/broker/Connection.h"
 #include "qpid/log/Statement.h"
 
@@ -34,21 +34,23 @@ ConnectionObserver::ConnectionObserver(H
     : haBroker(hb), logPrefix("Backup: "), self(uuid) {}
 
 bool ConnectionObserver::getBrokerInfo(const broker::Connection& connection, BrokerInfo& info) {
-    framing::FieldTable ft;
-    if (connection.getClientProperties().getTable(ConnectionObserver::BACKUP_TAG, ft)) {
-        info = BrokerInfo(ft);
+    qpid::types::Variant::Map::const_iterator i = connection.getClientProperties().find(ConnectionObserver::BACKUP_TAG);
+    if (i != connection.getClientProperties().end() && i->second.getType() == qpid::types::VAR_MAP) {
+        info = BrokerInfo(i->second.asMap());
         return true;
     }
     return false;
 }
 
 bool ConnectionObserver::getAddress(const broker::Connection& connection, Address& addr) {
-    Url url;
-    url.parseNoThrow(
-        connection.getClientProperties().getAsString(ConnectionObserver::ADDRESS_TAG).c_str());
-    if (!url.empty()) {
-        addr = url[0];
-        return true;
+    qpid::types::Variant::Map::const_iterator i = connection.getClientProperties().find(ConnectionObserver::ADDRESS_TAG);
+    if (i != connection.getClientProperties().end()) {
+        Url url;
+        url.parseNoThrow(i->second.asString().c_str());
+        if (!url.empty()) {
+            addr = url[0];
+            return true;
+        }
     }
     return false;
 }
@@ -86,7 +88,7 @@ void ConnectionObserver::opened(broker::
             return;
         }
         if (connection.isLink()) return; // Allow outgoing links.
-        if (connection.getClientProperties().isSet(ADMIN_TAG)) {
+        if (connection.getClientProperties().find(ADMIN_TAG) != connection.getClientProperties().end()) {
             QPID_LOG(debug, logPrefix << "Accepted admin connection: "
                      << connection.getMgmtId());
             return;                 // No need to call observer, always allow admins.

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Tue Jun 25 13:28:15 2013
@@ -88,10 +88,27 @@ const string keyifyNameStr(const string&
 
 struct ScopedManagementContext
 {
-    ScopedManagementContext(const ConnectionIdentity& p)
+    const Connection* context;
+
+    ScopedManagementContext(const Connection* p) : context(p)
+    {
+        if (p) setManagementExecutionContext(*p);
+    }
+
+    management::ObjectId getObjectId() const
+    {
+        return context ? context->getObjectId() : management::ObjectId();
+    }
+    std::string getUserId() const
+    {
+        return context ? context->getUserId() : std::string();
+    }
+    std::string getMgmtId() const
     {
-        setManagementExecutionContext(p);
+        return context ? context->getMgmtId() : std::string();
     }
+
+
     ~ScopedManagementContext()
     {
         resetManagementExecutionContext();
@@ -2288,7 +2305,7 @@ void ManagementAgent::dispatchAgentComma
             }
 
             if (opcode == "_method_request")
-                return handleMethodRequest(body, rte, rtk, cid, msg.getPublisherUserId(), viaLocal);
+                return handleMethodRequest(body, rte, rtk, cid, context.getUserId(), viaLocal);
             else if (opcode == "_query_request")
                 return handleGetQuery(body, rte, rtk, cid, viaLocal);
             else if (opcode == "_agent_locate_request")
@@ -2311,9 +2328,9 @@ void ManagementAgent::dispatchAgentComma
         else if (opcode == 'q') handleClassInd       (inBuffer, rtk, sequence);
         else if (opcode == 'S') handleSchemaRequest  (inBuffer, rte, rtk, sequence);
         else if (opcode == 's') handleSchemaResponse (inBuffer, rtk, sequence);
-        else if (opcode == 'A') handleAttachRequest  (inBuffer, rtk, sequence, msg.getPublisherObjectId());
+        else if (opcode == 'A') handleAttachRequest  (inBuffer, rtk, sequence, context.getObjectId());
         else if (opcode == 'G') handleGetQuery       (inBuffer, rtk, sequence);
-        else if (opcode == 'M') handleMethodRequest  (inBuffer, rtk, sequence, msg.getPublisherUserId());
+        else if (opcode == 'M') handleMethodRequest  (inBuffer, rtk, sequence, context.getMgmtId());
     }
 }
 
@@ -2752,10 +2769,10 @@ ManagementAgent::EventQueue::Batch::cons
 }
 
 namespace {
-QPID_TSS const ConnectionIdentity* currentPublisher = 0;
+QPID_TSS const Connection* currentPublisher = 0;
 }
 
-void setManagementExecutionContext(const ConnectionIdentity& p)
+void setManagementExecutionContext(const Connection& p)
 {
     currentPublisher = &p;
 }
@@ -2765,7 +2782,7 @@ void resetManagementExecutionContext()
     currentPublisher = 0;
 }
 
-const ConnectionIdentity* getCurrentPublisher()
+const Connection* getCurrentPublisher()
 {
     return currentPublisher;
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Tue Jun 25 13:28:15 2013
@@ -44,7 +44,6 @@
 namespace qpid {
 namespace broker {
 class Connection;
-class ConnectionIdentity;
 }
 namespace sys {
 class Timer;
@@ -379,9 +378,9 @@ private:
     std::auto_ptr<EventQueue> sendQueue;
 };
 
-void setManagementExecutionContext(const broker::ConnectionIdentity&);
+void setManagementExecutionContext(const broker::Connection&);
 void resetManagementExecutionContext();
-const broker::ConnectionIdentity* getCurrentPublisher();
+const broker::Connection* getCurrentPublisher();
 }}
 
 #endif  /*!_ManagementAgent_*/

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Tue Jun 25 13:28:15 2013
@@ -34,6 +34,7 @@
 #include "qpid/framing/ProtocolInitiation.h"
 #include "qpid/framing/Uuid.h"
 #include "qpid/log/Statement.h"
+#include "qpid/sys/SystemInfo.h"
 #include "qpid/sys/Time.h"
 #include <vector>
 extern "C" {
@@ -125,6 +126,7 @@ void ConnectionContext::open()
     }
 
     QPID_LOG(debug, id << " Opening...");
+    setProperties();
     pn_connection_open(connection);
     wakeupDriver(); //want to write
     while (pn_connection_state(connection) & PN_REMOTE_UNINIT) {
@@ -148,7 +150,7 @@ void ConnectionContext::endSession(boost
     //wait for outstanding sends to settle
     while (!ssn->settled()) {
         QPID_LOG(debug, "Waiting for sends to settle before closing");
-        wait();//wait until message has been confirmed
+        wait(ssn);//wait until message has been confirmed
     }
 
     pn_session_close(ssn->session);
@@ -165,7 +167,7 @@ void ConnectionContext::close()
             //wait for outstanding sends to settle
             while (!i->second->settled()) {
                 QPID_LOG(debug, "Waiting for sends to settle before closing");
-                wait();//wait until message has been confirmed
+                wait(i->second);//wait until message has been confirmed
             }
 
 
@@ -304,6 +306,7 @@ void ConnectionContext::attach(boost::sh
         QPID_LOG(debug, "Dynamic target name set to " << lnk->address.getName());
     }
     lnk->verify(t);
+    checkClosed(ssn, lnk);
     QPID_LOG(debug, "Attach succeeded to " << lnk->getTarget());
 }
 
@@ -322,6 +325,7 @@ void ConnectionContext::attach(boost::sh
         QPID_LOG(debug, "Dynamic source name set to " << lnk->address.getName());
     }
     lnk->verify(s);
+    checkClosed(ssn, lnk);
     QPID_LOG(debug, "Attach succeeded from " << lnk->getSource());
 }
 
@@ -471,8 +475,15 @@ void ConnectionContext::checkClosed(boos
 {
     checkClosed(ssn);
     if ((pn_link_state(lnk) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
+        pn_condition_t* error = pn_link_remote_condition(lnk);
+        std::stringstream text;
+        if (pn_condition_is_set(error)) {
+            text << "Link detached by peer with " << pn_condition_get_name(error) << ": " << pn_condition_get_description(error);
+        } else {
+            text << "Link detached by peer";
+        }
         pn_link_close(lnk);
-        throw qpid::messaging::LinkError("Link detached by peer");
+        throw qpid::messaging::LinkError(text.str());
     } else if ((pn_link_state(lnk) & IS_CLOSED) == IS_CLOSED) {
         throw qpid::messaging::LinkError("Link is not attached");
     }
@@ -692,5 +703,39 @@ bool ConnectionContext::CodecSwitch::can
     return parent.canEncode();
 }
 
+namespace {
+const std::string CLIENT_PROCESS_NAME("qpid.client_process");
+const std::string CLIENT_PID("qpid.client_pid");
+const std::string CLIENT_PPID("qpid.client_ppid");
+pn_bytes_t convert(const std::string& s)
+{
+    pn_bytes_t result;
+    result.start = const_cast<char*>(s.data());
+    result.size = s.size();
+    return result;
+}
+}
+void ConnectionContext::setProperties()
+{
+    /**
+     * Enable when proton 0.5 is released and qpidc has been updated
+     * to use it
+     *
+    pn_data_t* data = pn_connection_properties(connection);
+    pn_data_put_map(data);
+    pn_data_enter(data);
+
+    pn_data_put_symbol(data, convert(CLIENT_PROCESS_NAME));
+    std::string processName = sys::SystemInfo::getProcessName();
+    pn_data_put_string(data, convert(processName));
+
+    pn_data_put_symbol(data, convert(CLIENT_PID));
+    pn_data_put_int(data, sys::SystemInfo::getProcessId());
+
+    pn_data_put_symbol(data, convert(CLIENT_PPID));
+    pn_data_put_int(data, sys::SystemInfo::getParentProcessId());
+    pn_data_exit(data);
+    **/
+}
 
 }}} // namespace qpid::messaging::amqp

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h Tue Jun 25 13:28:15 2013
@@ -150,6 +150,7 @@ class ConnectionContext : public qpid::s
     std::size_t writeProtocolHeader(char* buffer, std::size_t size);
     std::string getError();
     bool useSasl();
+    void setProperties();
 };
 
 }}} // namespace qpid::messaging::amqp

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp Tue Jun 25 13:28:15 2013
@@ -93,8 +93,24 @@ SenderContext::Delivery* SenderContext::
     }
 }
 
+void SenderContext::check()
+{
+    if (pn_link_state(sender) & PN_REMOTE_CLOSED && !(pn_link_state(sender) & PN_LOCAL_CLOSED)) {
+        pn_condition_t* error = pn_link_remote_condition(sender);
+        std::stringstream text;
+        if (pn_condition_is_set(error)) {
+            text << "Link detached by peer with " << pn_condition_get_name(error) << ": " << pn_condition_get_description(error);
+        } else {
+            text << "Link detached by peer";
+        }
+        pn_link_close(sender);
+        throw qpid::messaging::LinkError(text.str());
+    }
+}
+
 uint32_t SenderContext::processUnsettled()
 {
+    check();
     //remove messages from front of deque once peer has confirmed receipt
     while (!deliveries.empty() && deliveries.front().delivered()) {
         deliveries.front().settle();

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h Tue Jun 25 13:28:15 2013
@@ -74,6 +74,7 @@ class SenderContext
     Delivery* send(const qpid::messaging::Message& message);
     void configure();
     void verify(pn_terminus_t*);
+    void check();
     bool settled();
     Address getAddress() const;
   private:

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp?rev=1496466&r1=1496465&r2=1496466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp Tue Jun 25 13:28:15 2013
@@ -144,7 +144,12 @@ bool SessionContext::settled()
 {
     bool result = true;
     for (SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) {
-        if (!i->second->settled()) result = false;
+        try {
+            if (!i->second->settled()) result = false;
+        } catch (const std::exception&) {
+            senders.erase(i);
+            throw;
+        }
     }
     return result;
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org