You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2011/02/02 19:16:57 UTC

svn commit: r1066562 - in /qpid/trunk/qpid/cpp: include/qmf/AgentSession.h include/qmf/ConsoleSession.h src/qmf/Agent.cpp src/qmf/AgentImpl.h src/qmf/AgentSession.cpp src/qmf/ConsoleSession.cpp src/qmf/ConsoleSessionImpl.h src/qmf/SchemaId.cpp

Author: tross
Date: Wed Feb  2 18:16:57 2011
New Revision: 1066562

URL: http://svn.apache.org/viewvc?rev=1066562&view=rev
Log:
QPID-3032 - Modifications to the QMFv2 implementation:

1) Use the topic exchange as the base for direct and reply-to addresses.
2) Add a strict-security option to the Console and Agent APIs that narrows the messaging
   patterns used such that they can easily be controlled by broker ACL policy.

Modified:
    qpid/trunk/qpid/cpp/include/qmf/AgentSession.h
    qpid/trunk/qpid/cpp/include/qmf/ConsoleSession.h
    qpid/trunk/qpid/cpp/src/qmf/Agent.cpp
    qpid/trunk/qpid/cpp/src/qmf/AgentImpl.h
    qpid/trunk/qpid/cpp/src/qmf/AgentSession.cpp
    qpid/trunk/qpid/cpp/src/qmf/ConsoleSession.cpp
    qpid/trunk/qpid/cpp/src/qmf/ConsoleSessionImpl.h
    qpid/trunk/qpid/cpp/src/qmf/SchemaId.cpp

Modified: qpid/trunk/qpid/cpp/include/qmf/AgentSession.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qmf/AgentSession.h?rev=1066562&r1=1066561&r2=1066562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qmf/AgentSession.h (original)
+++ qpid/trunk/qpid/cpp/include/qmf/AgentSession.h Wed Feb  2 18:16:57 2011
@@ -67,6 +67,10 @@ namespace qmf {
          *    sub-lifetime:N             - Lifetime (in seconds with no keepalive) for a subscription [default: 300]
          *    public-events:{True,False} - If True:  QMF events are sent to the topic exchange [default]
          *                                 If False: QMF events are only sent to authorized subscribers
+         *    listen-on-direct:{True,False} - If True:  Listen on legacy direct-exchange address for backward compatibility [default]
+         *                                    If False: Listen only on the routable direct address
+         *    strict-security:{True,False}  - If True:  Cooperate with the broker to enforce string access control to the network
+         *                                  - If False: Operate more flexibly with regard to use of messaging facilities [default]
          */
         QMF_EXTERN AgentSession(qpid::messaging::Connection&, const std::string& options="");
 

Modified: qpid/trunk/qpid/cpp/include/qmf/ConsoleSession.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qmf/ConsoleSession.h?rev=1066562&r1=1066561&r2=1066562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qmf/ConsoleSession.h (original)
+++ qpid/trunk/qpid/cpp/include/qmf/ConsoleSession.h Wed Feb  2 18:16:57 2011
@@ -57,6 +57,10 @@ namespace qmf {
          *    domain:NAME                - QMF Domain to join [default: "default"]
          *    max-agent-age:N            - Maximum time, in minutes, that we will tolerate not hearing from
          *                                 an agent before deleting it [default: 5]
+         *    listen-on-direct:{True,False} - If True:  Listen on legacy direct-exchange address for backward compatibility [default]
+         *                                    If False: Listen only on the routable direct address
+         *    strict-security:{True,False}  - If True:  Cooperate with the broker to enforce string access control to the network
+         *                                  - If False: Operate more flexibly with regard to use of messaging facilities [default]
          */
         QMF_EXTERN ConsoleSession(qpid::messaging::Connection&, const std::string& options="");
         QMF_EXTERN void setDomain(const std::string&);

Modified: qpid/trunk/qpid/cpp/src/qmf/Agent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/Agent.cpp?rev=1066562&r1=1066561&r2=1066562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/Agent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/Agent.cpp Wed Feb  2 18:16:57 2011
@@ -71,8 +71,8 @@ Schema Agent::getSchema(const SchemaId& 
 
 
 AgentImpl::AgentImpl(const std::string& n, uint32_t e, ConsoleSessionImpl& s) :
-    name(n), epoch(e), session(s), touched(true), untouchedCount(0), capability(0),
-    nextCorrelator(1), schemaCache(s.schemaCache)
+    name(n), directSubject(n), epoch(e), session(s), touched(true), untouchedCount(0), capability(0),
+    sender(session.directSender), nextCorrelator(1), schemaCache(s.schemaCache)
 {
 }
 
@@ -83,6 +83,11 @@ void AgentImpl::setAttribute(const std::
         try {
             capability = v.asUint32();
         } catch (std::exception&) {}
+    if (k == "_direct_subject")
+        try {
+            directSubject = v.asString();
+            sender = session.topicSender;
+        } catch (std::exception&) {}
 }
 
 const Variant& AgentImpl::getAttribute(const string& k) const
@@ -514,9 +519,10 @@ void AgentImpl::sendQuery(const Query& q
 
     msg.setReplyTo(session.replyAddress);
     msg.setCorrelationId(boost::lexical_cast<string>(correlator));
-    msg.setSubject(name);
+    msg.setSubject(directSubject);
     encode(QueryImplAccess::get(query).asMap(), msg);
-    session.directSender.send(msg);
+    if (sender.isValid())
+        sender.send(msg);
 
     QPID_LOG(trace, "SENT QueryRequest to=" << name);
 }
@@ -538,9 +544,10 @@ void AgentImpl::sendMethod(const string&
 
     msg.setReplyTo(session.replyAddress);
     msg.setCorrelationId(boost::lexical_cast<string>(correlator));
-    msg.setSubject(name);
+    msg.setSubject(directSubject);
     encode(map, msg);
-    session.directSender.send(msg);
+    if (sender.isValid())
+        sender.send(msg);
 
     QPID_LOG(trace, "SENT MethodRequest method=" << method << " to=" << name);
 }
@@ -578,8 +585,9 @@ void AgentImpl::sendSchemaRequest(const 
     Message msg;
     msg.setReplyTo(session.replyAddress);
     msg.setContent(content);
-    msg.setSubject(name);
-    session.directSender.send(msg);
+    msg.setSubject(directSubject);
+    if (sender.isValid())
+        sender.send(msg);
 
     QPID_LOG(trace, "SENT V1SchemaRequest to=" << name);
 }

Modified: qpid/trunk/qpid/cpp/src/qmf/AgentImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/AgentImpl.h?rev=1066562&r1=1066561&r2=1066562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/AgentImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qmf/AgentImpl.h Wed Feb  2 18:16:57 2011
@@ -29,6 +29,7 @@
 #include "qmf/SchemaCache.h"
 #include "qpid/messaging/Session.h"
 #include "qpid/messaging/Message.h"
+#include "qpid/messaging/Sender.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/sys/Condition.h"
 #include <boost/shared_ptr.hpp>
@@ -90,11 +91,13 @@ namespace qmf {
 
         mutable qpid::sys::Mutex lock;
         std::string name;
+        std::string directSubject;
         uint32_t epoch;
         ConsoleSessionImpl& session;
         bool touched;
         uint32_t untouchedCount;
         uint32_t capability;
+        qpid::messaging::Sender sender;
         qpid::types::Variant::Map attributes;
         uint32_t nextCorrelator;
         std::map<uint32_t, boost::shared_ptr<SyncContext> > contextMap;

Modified: qpid/trunk/qpid/cpp/src/qmf/AgentSession.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/AgentSession.cpp?rev=1066562&r1=1066561&r2=1066562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/AgentSession.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/AgentSession.cpp Wed Feb  2 18:16:57 2011
@@ -116,6 +116,8 @@ namespace qmf {
         uint32_t minSubInterval;
         uint32_t subLifetime;
         bool publicEvents;
+        bool listenOnDirect;
+        bool strictSecurity;
         uint64_t schemaUpdateTime;
         string directBase;
         string topicBase;
@@ -179,6 +181,7 @@ AgentSessionImpl::AgentSessionImpl(Conne
     bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false),
     externalStorage(false), autoAllowQueries(true), autoAllowMethods(true),
     maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true),
+    listenOnDirect(true), strictSecurity(false),
     schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())))
 {
     //
@@ -231,6 +234,14 @@ AgentSessionImpl::AgentSessionImpl(Conne
         iter = optMap.find("public-events");
         if (iter != optMap.end())
             publicEvents = iter->second.asBool();
+
+        iter = optMap.find("listen-on-direct");
+        if (iter != optMap.end())
+            listenOnDirect = iter->second.asBool();
+
+        iter = optMap.find("strict-security");
+        if (iter != optMap.end())
+            strictSecurity = iter->second.asBool();
     }
 }
 
@@ -248,6 +259,8 @@ void AgentSessionImpl::open()
         throw QmfException("The session is already open");
 
     const string addrArgs(";{create:never,node:{type:topic}}");
+    const string routableAddr("direct-agent.route." + qpid::types::Uuid(true).str());
+    attributes["_direct_subject"] = routableAddr;
 
     // Establish messaging addresses
     setAgentName();
@@ -256,13 +269,20 @@ void AgentSessionImpl::open()
 
     // Create AMQP session, receivers, and senders
     session = connection.createSession();
-    Receiver directRx = session.createReceiver(directBase + "/" + agentName + addrArgs);
+    Receiver directRx;
+    Receiver routableDirectRx = session.createReceiver(topicBase + "/" + routableAddr + addrArgs);
     Receiver topicRx = session.createReceiver(topicBase + "/console.#" + addrArgs);
 
-    directRx.setCapacity(64);
+    if (listenOnDirect && !strictSecurity) {
+        directRx = session.createReceiver(directBase + "/" + agentName + addrArgs);
+        directRx.setCapacity(64);
+    }
+
+    routableDirectRx.setCapacity(64);
     topicRx.setCapacity(64);
 
-    directSender = session.createSender(directBase + addrArgs);
+    if (!strictSecurity)
+        directSender = session.createSender(directBase + addrArgs);
     topicSender = session.createSender(topicBase + addrArgs);
 
     // Start the receiver thread
@@ -794,6 +814,17 @@ void AgentSessionImpl::dispatch(Message 
     const Variant::Map& properties(msg.getProperties());
     Variant::Map::const_iterator iter;
 
+    //
+    // If strict-security is enabled, make sure that reply-to address complies with the
+    // strict-security addressing pattern (i.e. start with 'qmf.<domain>.topic/direct-console.').
+    //
+    if (strictSecurity && msg.getReplyTo()) {
+        if (msg.getReplyTo().getName() != topicBase || msg.getReplyTo().getSubject().find("direct-console.") != 0) {
+            QPID_LOG(warning, "Reply-to violates strict-security policy: " << msg.getReplyTo().str());
+            return;
+        }
+    }
+
     iter = properties.find(protocol::HEADER_KEY_APP_ID);
     if (iter != properties.end() && iter->second.asString() == protocol::HEADER_APP_ID_QMF) {
         //
@@ -892,6 +923,11 @@ void AgentSessionImpl::send(Message msg,
 {
     Sender sender;
 
+    if (strictSecurity && to.getName() != topicBase) {
+        QPID_LOG(warning, "Address violates strict-security policy: " << to);
+        return;
+    }
+
     if (to.getName() == directBase) {
         msg.setSubject(to.getSubject());
         sender = directSender;

Modified: qpid/trunk/qpid/cpp/src/qmf/ConsoleSession.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/ConsoleSession.cpp?rev=1066562&r1=1066561&r2=1066562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/ConsoleSession.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/ConsoleSession.cpp Wed Feb  2 18:16:57 2011
@@ -36,6 +36,7 @@ using namespace qmf;
 using qpid::messaging::Address;
 using qpid::messaging::Connection;
 using qpid::messaging::Receiver;
+using qpid::messaging::Sender;
 using qpid::messaging::Duration;
 using qpid::messaging::Message;
 using qpid::types::Variant;
@@ -82,6 +83,14 @@ ConsoleSessionImpl::ConsoleSessionImpl(C
         iter = optMap.find("max-agent-age");
         if (iter != optMap.end())
             maxAgentAgeMinutes = iter->second.asUint32();
+
+        iter = optMap.find("listen-on-direct");
+        if (iter != optMap.end())
+            listenOnDirect = iter->second.asBool();
+
+        iter = optMap.find("strict-security");
+        if (iter != optMap.end())
+            strictSecurity = iter->second.asBool();
     }
 }
 
@@ -148,24 +157,26 @@ void ConsoleSessionImpl::open()
     directBase = "qmf." + domain + ".direct";
     topicBase = "qmf." + domain + ".topic";
 
-    string myKey("qmf-console-" + qpid::types::Uuid(true).str());
+    string myKey("direct-console." + qpid::types::Uuid(true).str());
 
-    replyAddress = Address(directBase + "/" + myKey + ";{node:{type:topic}}");
+    replyAddress = Address(topicBase + "/" + myKey + ";{node:{type:topic}}");
 
     // Create AMQP session, receivers, and senders
     session = connection.createSession();
     Receiver directRx = session.createReceiver(replyAddress);
     Receiver topicRx = session.createReceiver(topicBase + "/agent.#"); // TODO: be more discriminating
-    Receiver legacyRx = session.createReceiver("amq.direct/" + myKey + ";{node:{type:topic}}");
+    if (!strictSecurity) {
+        Receiver legacyRx = session.createReceiver("amq.direct/" + myKey + ";{node:{type:topic}}");
+        legacyRx.setCapacity(64);
+        directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}");
+        directSender.setCapacity(128);
+    }
 
     directRx.setCapacity(64);
     topicRx.setCapacity(128);
-    legacyRx.setCapacity(64);
 
-    directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}");
     topicSender = session.createSender(topicBase + ";{create:never,node:{type:topic}}");
 
-    directSender.setCapacity(64);
     topicSender.setCapacity(128);
 
     // Start the receiver thread
@@ -371,7 +382,9 @@ void ConsoleSessionImpl::sendBrokerLocat
     msg.setCorrelationId("broker-locate");
     msg.setSubject("broker");
 
-    directSender.send(msg);
+    Sender sender = session.createSender(directBase + ";{create:never,node:{type:topic}}");
+    sender.send(msg);
+    sender.close();
 
     QPID_LOG(trace, "SENT AgentLocate to broker");
 }

Modified: qpid/trunk/qpid/cpp/src/qmf/ConsoleSessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/ConsoleSessionImpl.h?rev=1066562&r1=1066561&r2=1066562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/ConsoleSessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qmf/ConsoleSessionImpl.h Wed Feb  2 18:16:57 2011
@@ -73,6 +73,8 @@ namespace qmf {
         qpid::messaging::Sender topicSender;
         std::string domain;
         uint32_t maxAgentAgeMinutes;
+        bool listenOnDirect;
+        bool strictSecurity;
         Query agentQuery;
         bool opened;
         std::queue<ConsoleEvent> eventQueue;

Modified: qpid/trunk/qpid/cpp/src/qmf/SchemaId.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/SchemaId.cpp?rev=1066562&r1=1066561&r2=1066562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/SchemaId.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/SchemaId.cpp Wed Feb  2 18:16:57 2011
@@ -78,7 +78,8 @@ Variant::Map SchemaIdImpl::asMap() const
         result["_type"] = "_data";
     else
         result["_type"] = "_event";
-    result["_hash"] = hash;
+    if (!hash.isNull())
+        result["_hash"] = hash;
     return result;
 }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org