You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2011/02/02 19:16:57 UTC
svn commit: r1066562 - in /qpid/trunk/qpid/cpp: include/qmf/AgentSession.h
include/qmf/ConsoleSession.h src/qmf/Agent.cpp src/qmf/AgentImpl.h
src/qmf/AgentSession.cpp src/qmf/ConsoleSession.cpp
src/qmf/ConsoleSessionImpl.h src/qmf/SchemaId.cpp
Author: tross
Date: Wed Feb 2 18:16:57 2011
New Revision: 1066562
URL: http://svn.apache.org/viewvc?rev=1066562&view=rev
Log:
QPID-3032 - Modifications to the QMFv2 implementation:
1) Use the topic exchange as the base for direct and reply-to addresses.
2) Add a strict-security option to the Console and Agent APIs that narrows the messaging
patterns used such that they can easily be controlled by broker ACL policy.
Modified:
qpid/trunk/qpid/cpp/include/qmf/AgentSession.h
qpid/trunk/qpid/cpp/include/qmf/ConsoleSession.h
qpid/trunk/qpid/cpp/src/qmf/Agent.cpp
qpid/trunk/qpid/cpp/src/qmf/AgentImpl.h
qpid/trunk/qpid/cpp/src/qmf/AgentSession.cpp
qpid/trunk/qpid/cpp/src/qmf/ConsoleSession.cpp
qpid/trunk/qpid/cpp/src/qmf/ConsoleSessionImpl.h
qpid/trunk/qpid/cpp/src/qmf/SchemaId.cpp
Modified: qpid/trunk/qpid/cpp/include/qmf/AgentSession.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qmf/AgentSession.h?rev=1066562&r1=1066561&r2=1066562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qmf/AgentSession.h (original)
+++ qpid/trunk/qpid/cpp/include/qmf/AgentSession.h Wed Feb 2 18:16:57 2011
@@ -67,6 +67,10 @@ namespace qmf {
* sub-lifetime:N - Lifetime (in seconds with no keepalive) for a subscription [default: 300]
* public-events:{True,False} - If True: QMF events are sent to the topic exchange [default]
* If False: QMF events are only sent to authorized subscribers
+ * listen-on-direct:{True,False} - If True: Listen on legacy direct-exchange address for backward compatibility [default]
+ * If False: Listen only on the routable direct address
+ * strict-security:{True,False} - If True: Cooperate with the broker to enforce string access control to the network
+ * - If False: Operate more flexibly with regard to use of messaging facilities [default]
*/
QMF_EXTERN AgentSession(qpid::messaging::Connection&, const std::string& options="");
Modified: qpid/trunk/qpid/cpp/include/qmf/ConsoleSession.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qmf/ConsoleSession.h?rev=1066562&r1=1066561&r2=1066562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qmf/ConsoleSession.h (original)
+++ qpid/trunk/qpid/cpp/include/qmf/ConsoleSession.h Wed Feb 2 18:16:57 2011
@@ -57,6 +57,10 @@ namespace qmf {
* domain:NAME - QMF Domain to join [default: "default"]
* max-agent-age:N - Maximum time, in minutes, that we will tolerate not hearing from
* an agent before deleting it [default: 5]
+ * listen-on-direct:{True,False} - If True: Listen on legacy direct-exchange address for backward compatibility [default]
+ * If False: Listen only on the routable direct address
+ * strict-security:{True,False} - If True: Cooperate with the broker to enforce string access control to the network
+ * - If False: Operate more flexibly with regard to use of messaging facilities [default]
*/
QMF_EXTERN ConsoleSession(qpid::messaging::Connection&, const std::string& options="");
QMF_EXTERN void setDomain(const std::string&);
Modified: qpid/trunk/qpid/cpp/src/qmf/Agent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/Agent.cpp?rev=1066562&r1=1066561&r2=1066562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/Agent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/Agent.cpp Wed Feb 2 18:16:57 2011
@@ -71,8 +71,8 @@ Schema Agent::getSchema(const SchemaId&
AgentImpl::AgentImpl(const std::string& n, uint32_t e, ConsoleSessionImpl& s) :
- name(n), epoch(e), session(s), touched(true), untouchedCount(0), capability(0),
- nextCorrelator(1), schemaCache(s.schemaCache)
+ name(n), directSubject(n), epoch(e), session(s), touched(true), untouchedCount(0), capability(0),
+ sender(session.directSender), nextCorrelator(1), schemaCache(s.schemaCache)
{
}
@@ -83,6 +83,11 @@ void AgentImpl::setAttribute(const std::
try {
capability = v.asUint32();
} catch (std::exception&) {}
+ if (k == "_direct_subject")
+ try {
+ directSubject = v.asString();
+ sender = session.topicSender;
+ } catch (std::exception&) {}
}
const Variant& AgentImpl::getAttribute(const string& k) const
@@ -514,9 +519,10 @@ void AgentImpl::sendQuery(const Query& q
msg.setReplyTo(session.replyAddress);
msg.setCorrelationId(boost::lexical_cast<string>(correlator));
- msg.setSubject(name);
+ msg.setSubject(directSubject);
encode(QueryImplAccess::get(query).asMap(), msg);
- session.directSender.send(msg);
+ if (sender.isValid())
+ sender.send(msg);
QPID_LOG(trace, "SENT QueryRequest to=" << name);
}
@@ -538,9 +544,10 @@ void AgentImpl::sendMethod(const string&
msg.setReplyTo(session.replyAddress);
msg.setCorrelationId(boost::lexical_cast<string>(correlator));
- msg.setSubject(name);
+ msg.setSubject(directSubject);
encode(map, msg);
- session.directSender.send(msg);
+ if (sender.isValid())
+ sender.send(msg);
QPID_LOG(trace, "SENT MethodRequest method=" << method << " to=" << name);
}
@@ -578,8 +585,9 @@ void AgentImpl::sendSchemaRequest(const
Message msg;
msg.setReplyTo(session.replyAddress);
msg.setContent(content);
- msg.setSubject(name);
- session.directSender.send(msg);
+ msg.setSubject(directSubject);
+ if (sender.isValid())
+ sender.send(msg);
QPID_LOG(trace, "SENT V1SchemaRequest to=" << name);
}
Modified: qpid/trunk/qpid/cpp/src/qmf/AgentImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/AgentImpl.h?rev=1066562&r1=1066561&r2=1066562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/AgentImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qmf/AgentImpl.h Wed Feb 2 18:16:57 2011
@@ -29,6 +29,7 @@
#include "qmf/SchemaCache.h"
#include "qpid/messaging/Session.h"
#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Sender.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Condition.h"
#include <boost/shared_ptr.hpp>
@@ -90,11 +91,13 @@ namespace qmf {
mutable qpid::sys::Mutex lock;
std::string name;
+ std::string directSubject;
uint32_t epoch;
ConsoleSessionImpl& session;
bool touched;
uint32_t untouchedCount;
uint32_t capability;
+ qpid::messaging::Sender sender;
qpid::types::Variant::Map attributes;
uint32_t nextCorrelator;
std::map<uint32_t, boost::shared_ptr<SyncContext> > contextMap;
Modified: qpid/trunk/qpid/cpp/src/qmf/AgentSession.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/AgentSession.cpp?rev=1066562&r1=1066561&r2=1066562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/AgentSession.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/AgentSession.cpp Wed Feb 2 18:16:57 2011
@@ -116,6 +116,8 @@ namespace qmf {
uint32_t minSubInterval;
uint32_t subLifetime;
bool publicEvents;
+ bool listenOnDirect;
+ bool strictSecurity;
uint64_t schemaUpdateTime;
string directBase;
string topicBase;
@@ -179,6 +181,7 @@ AgentSessionImpl::AgentSessionImpl(Conne
bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false),
externalStorage(false), autoAllowQueries(true), autoAllowMethods(true),
maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true),
+ listenOnDirect(true), strictSecurity(false),
schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())))
{
//
@@ -231,6 +234,14 @@ AgentSessionImpl::AgentSessionImpl(Conne
iter = optMap.find("public-events");
if (iter != optMap.end())
publicEvents = iter->second.asBool();
+
+ iter = optMap.find("listen-on-direct");
+ if (iter != optMap.end())
+ listenOnDirect = iter->second.asBool();
+
+ iter = optMap.find("strict-security");
+ if (iter != optMap.end())
+ strictSecurity = iter->second.asBool();
}
}
@@ -248,6 +259,8 @@ void AgentSessionImpl::open()
throw QmfException("The session is already open");
const string addrArgs(";{create:never,node:{type:topic}}");
+ const string routableAddr("direct-agent.route." + qpid::types::Uuid(true).str());
+ attributes["_direct_subject"] = routableAddr;
// Establish messaging addresses
setAgentName();
@@ -256,13 +269,20 @@ void AgentSessionImpl::open()
// Create AMQP session, receivers, and senders
session = connection.createSession();
- Receiver directRx = session.createReceiver(directBase + "/" + agentName + addrArgs);
+ Receiver directRx;
+ Receiver routableDirectRx = session.createReceiver(topicBase + "/" + routableAddr + addrArgs);
Receiver topicRx = session.createReceiver(topicBase + "/console.#" + addrArgs);
- directRx.setCapacity(64);
+ if (listenOnDirect && !strictSecurity) {
+ directRx = session.createReceiver(directBase + "/" + agentName + addrArgs);
+ directRx.setCapacity(64);
+ }
+
+ routableDirectRx.setCapacity(64);
topicRx.setCapacity(64);
- directSender = session.createSender(directBase + addrArgs);
+ if (!strictSecurity)
+ directSender = session.createSender(directBase + addrArgs);
topicSender = session.createSender(topicBase + addrArgs);
// Start the receiver thread
@@ -794,6 +814,17 @@ void AgentSessionImpl::dispatch(Message
const Variant::Map& properties(msg.getProperties());
Variant::Map::const_iterator iter;
+ //
+ // If strict-security is enabled, make sure that reply-to address complies with the
+ // strict-security addressing pattern (i.e. start with 'qmf.<domain>.topic/direct-console.').
+ //
+ if (strictSecurity && msg.getReplyTo()) {
+ if (msg.getReplyTo().getName() != topicBase || msg.getReplyTo().getSubject().find("direct-console.") != 0) {
+ QPID_LOG(warning, "Reply-to violates strict-security policy: " << msg.getReplyTo().str());
+ return;
+ }
+ }
+
iter = properties.find(protocol::HEADER_KEY_APP_ID);
if (iter != properties.end() && iter->second.asString() == protocol::HEADER_APP_ID_QMF) {
//
@@ -892,6 +923,11 @@ void AgentSessionImpl::send(Message msg,
{
Sender sender;
+ if (strictSecurity && to.getName() != topicBase) {
+ QPID_LOG(warning, "Address violates strict-security policy: " << to);
+ return;
+ }
+
if (to.getName() == directBase) {
msg.setSubject(to.getSubject());
sender = directSender;
Modified: qpid/trunk/qpid/cpp/src/qmf/ConsoleSession.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/ConsoleSession.cpp?rev=1066562&r1=1066561&r2=1066562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/ConsoleSession.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/ConsoleSession.cpp Wed Feb 2 18:16:57 2011
@@ -36,6 +36,7 @@ using namespace qmf;
using qpid::messaging::Address;
using qpid::messaging::Connection;
using qpid::messaging::Receiver;
+using qpid::messaging::Sender;
using qpid::messaging::Duration;
using qpid::messaging::Message;
using qpid::types::Variant;
@@ -82,6 +83,14 @@ ConsoleSessionImpl::ConsoleSessionImpl(C
iter = optMap.find("max-agent-age");
if (iter != optMap.end())
maxAgentAgeMinutes = iter->second.asUint32();
+
+ iter = optMap.find("listen-on-direct");
+ if (iter != optMap.end())
+ listenOnDirect = iter->second.asBool();
+
+ iter = optMap.find("strict-security");
+ if (iter != optMap.end())
+ strictSecurity = iter->second.asBool();
}
}
@@ -148,24 +157,26 @@ void ConsoleSessionImpl::open()
directBase = "qmf." + domain + ".direct";
topicBase = "qmf." + domain + ".topic";
- string myKey("qmf-console-" + qpid::types::Uuid(true).str());
+ string myKey("direct-console." + qpid::types::Uuid(true).str());
- replyAddress = Address(directBase + "/" + myKey + ";{node:{type:topic}}");
+ replyAddress = Address(topicBase + "/" + myKey + ";{node:{type:topic}}");
// Create AMQP session, receivers, and senders
session = connection.createSession();
Receiver directRx = session.createReceiver(replyAddress);
Receiver topicRx = session.createReceiver(topicBase + "/agent.#"); // TODO: be more discriminating
- Receiver legacyRx = session.createReceiver("amq.direct/" + myKey + ";{node:{type:topic}}");
+ if (!strictSecurity) {
+ Receiver legacyRx = session.createReceiver("amq.direct/" + myKey + ";{node:{type:topic}}");
+ legacyRx.setCapacity(64);
+ directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}");
+ directSender.setCapacity(128);
+ }
directRx.setCapacity(64);
topicRx.setCapacity(128);
- legacyRx.setCapacity(64);
- directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}");
topicSender = session.createSender(topicBase + ";{create:never,node:{type:topic}}");
- directSender.setCapacity(64);
topicSender.setCapacity(128);
// Start the receiver thread
@@ -371,7 +382,9 @@ void ConsoleSessionImpl::sendBrokerLocat
msg.setCorrelationId("broker-locate");
msg.setSubject("broker");
- directSender.send(msg);
+ Sender sender = session.createSender(directBase + ";{create:never,node:{type:topic}}");
+ sender.send(msg);
+ sender.close();
QPID_LOG(trace, "SENT AgentLocate to broker");
}
Modified: qpid/trunk/qpid/cpp/src/qmf/ConsoleSessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/ConsoleSessionImpl.h?rev=1066562&r1=1066561&r2=1066562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/ConsoleSessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qmf/ConsoleSessionImpl.h Wed Feb 2 18:16:57 2011
@@ -73,6 +73,8 @@ namespace qmf {
qpid::messaging::Sender topicSender;
std::string domain;
uint32_t maxAgentAgeMinutes;
+ bool listenOnDirect;
+ bool strictSecurity;
Query agentQuery;
bool opened;
std::queue<ConsoleEvent> eventQueue;
Modified: qpid/trunk/qpid/cpp/src/qmf/SchemaId.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/SchemaId.cpp?rev=1066562&r1=1066561&r2=1066562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/SchemaId.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/SchemaId.cpp Wed Feb 2 18:16:57 2011
@@ -78,7 +78,8 @@ Variant::Map SchemaIdImpl::asMap() const
result["_type"] = "_data";
else
result["_type"] = "_event";
- result["_hash"] = hash;
+ if (!hash.isNull())
+ result["_hash"] = hash;
return result;
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org