You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/02/08 17:16:56 UTC
svn commit: r1068464 [2/6] - in /qpid/branches/qpid-2935/qpid: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/examples/python/ cpp/bindings/qmf2/examples/ruby/
cpp/bindings/qmf2/python/ cpp/bindings/qmf2/ruby/ cpp/bindings...
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSession.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSession.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSession.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSession.cpp Tue Feb 8 16:16:49 2011
@@ -36,6 +36,7 @@ using namespace qmf;
using qpid::messaging::Address;
using qpid::messaging::Connection;
using qpid::messaging::Receiver;
+using qpid::messaging::Sender;
using qpid::messaging::Duration;
using qpid::messaging::Message;
using qpid::types::Variant;
@@ -64,9 +65,9 @@ Subscription ConsoleSession::subscribe(c
//========================================================================================
ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
- connection(c), domain("default"), maxAgentAgeMinutes(5), opened(false),
- thread(0), threadCanceled(false),
- lastVisit(0), lastAgePass(0), connectedBrokerInAgentList(false), schemaCache(new SchemaCache())
+ connection(c), domain("default"), authUser(c.getAuthenticatedUsername()), maxAgentAgeMinutes(5),
+ opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
+ connectedBrokerInAgentList(false), schemaCache(new SchemaCache())
{
if (!options.empty()) {
qpid::messaging::AddressParser parser(options);
@@ -82,6 +83,14 @@ ConsoleSessionImpl::ConsoleSessionImpl(C
iter = optMap.find("max-agent-age");
if (iter != optMap.end())
maxAgentAgeMinutes = iter->second.asUint32();
+
+ iter = optMap.find("listen-on-direct");
+ if (iter != optMap.end())
+ listenOnDirect = iter->second.asBool();
+
+ iter = optMap.find("strict-security");
+ if (iter != optMap.end())
+ strictSecurity = iter->second.asBool();
}
}
@@ -148,24 +157,26 @@ void ConsoleSessionImpl::open()
directBase = "qmf." + domain + ".direct";
topicBase = "qmf." + domain + ".topic";
- string myKey("qmf-console-" + qpid::types::Uuid(true).str());
+ string myKey("direct-console." + qpid::types::Uuid(true).str());
- replyAddress = Address(directBase + "/" + myKey + ";{node:{type:topic}}");
+ replyAddress = Address(topicBase + "/" + myKey + ";{node:{type:topic}}");
// Create AMQP session, receivers, and senders
session = connection.createSession();
Receiver directRx = session.createReceiver(replyAddress);
Receiver topicRx = session.createReceiver(topicBase + "/agent.#"); // TODO: be more discriminating
- Receiver legacyRx = session.createReceiver("amq.direct/" + myKey + ";{node:{type:topic}}");
+ if (!strictSecurity) {
+ Receiver legacyRx = session.createReceiver("amq.direct/" + myKey + ";{node:{type:topic}}");
+ legacyRx.setCapacity(64);
+ directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}");
+ directSender.setCapacity(128);
+ }
directRx.setCapacity(64);
topicRx.setCapacity(128);
- legacyRx.setCapacity(64);
- directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}");
topicSender = session.createSender(topicBase + ";{create:never,node:{type:topic}}");
- directSender.setCapacity(64);
topicSender.setCapacity(128);
// Start the receiver thread
@@ -371,7 +382,9 @@ void ConsoleSessionImpl::sendBrokerLocat
msg.setCorrelationId("broker-locate");
msg.setSubject("broker");
- directSender.send(msg);
+ Sender sender = session.createSender(directBase + ";{create:never,node:{type:topic}}");
+ sender.send(msg);
+ sender.close();
QPID_LOG(trace, "SENT AgentLocate to broker");
}
@@ -468,6 +481,7 @@ void ConsoleSessionImpl::handleAgentUpda
//
// This is a refresh of an agent we are already tracking.
//
+ bool detectedRestart(false);
agent = aIter->second;
AgentImpl& impl(AgentImplAccess::get(agent));
impl.touch();
@@ -480,6 +494,7 @@ void ConsoleSessionImpl::handleAgentUpda
auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_RESTART));
eventImpl->setAgent(agent);
enqueueEventLH(ConsoleEvent(eventImpl.release()));
+ detectedRestart = true;
}
iter = attrs.find(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP);
@@ -488,12 +503,14 @@ void ConsoleSessionImpl::handleAgentUpda
if (ts > impl.getAttribute(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP).asUint64()) {
//
// The agent has added new schema entries since we last heard from it.
- // Enqueue a notification.
+ // Update the attribute and, if this doesn't accompany a restart, enqueue a notification.
//
- auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_SCHEMA_UPDATE));
- eventImpl->setAgent(agent);
- impl.setAttribute(iter->first, iter->second);
- enqueueEventLH(ConsoleEvent(eventImpl.release()));
+ if (!detectedRestart) {
+ auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_SCHEMA_UPDATE));
+ eventImpl->setAgent(agent);
+ enqueueEventLH(ConsoleEvent(eventImpl.release()));
+ }
+ impl.setAttribute(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP, iter->second);
}
}
}
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSessionImpl.h?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSessionImpl.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSessionImpl.h Tue Feb 8 16:16:49 2011
@@ -72,7 +72,10 @@ namespace qmf {
qpid::messaging::Sender directSender;
qpid::messaging::Sender topicSender;
std::string domain;
+ std::string authUser;
uint32_t maxAgentAgeMinutes;
+ bool listenOnDirect;
+ bool strictSecurity;
Query agentQuery;
bool opened;
std::queue<ConsoleEvent> eventQueue;
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qmf/SchemaId.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qmf/SchemaId.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qmf/SchemaId.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qmf/SchemaId.cpp Tue Feb 8 16:16:49 2011
@@ -78,7 +78,8 @@ Variant::Map SchemaIdImpl::asMap() const
result["_type"] = "_data";
else
result["_type"] = "_event";
- result["_hash"] = hash;
+ if (!hash.isNull())
+ result["_hash"] = hash;
return result;
}
Propchange: qpid/branches/qpid-2935/qpid/cpp/src/qmf/engine/Agent.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb 8 16:16:49 2011
@@ -0,0 +1 @@
+/qpid/trunk/qpid/cpp/src/qmf/engine/Agent.cpp:1061302-1068442
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Tue Feb 8 16:16:49 2011
@@ -362,7 +362,7 @@ uint32_t ManagementAgentImpl::pollCallba
methodQueue.pop_front();
{
sys::Mutex::ScopedUnlock unlock(agentLock);
- invokeMethodRequest(item->body, item->cid, item->replyTo, item->userId);
+ invokeMethodRequest(item->body, item->cid, item->replyToExchange, item->replyToKey, item->userId);
delete item;
}
}
@@ -497,7 +497,7 @@ void ManagementAgentImpl::sendHeartbeat(
QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
}
-void ManagementAgentImpl::sendException(const string& replyToKey, const string& cid,
+void ManagementAgentImpl::sendException(const string& rte, const string& rtk, const string& cid,
const string& text, uint32_t code)
{
Variant::Map map;
@@ -514,12 +514,12 @@ void ManagementAgentImpl::sendException(
map["_values"] = values;
MapCodec::encode(map, content);
- connThreadBody.sendBuffer(content, cid, headers, directExchange, replyToKey);
+ connThreadBody.sendBuffer(content, cid, headers, rte, rtk);
QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text);
}
-void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo)
+void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& rte, const string& rtk)
{
sys::Mutex::ScopedLock lock(agentLock);
string packageName;
@@ -546,7 +546,7 @@ void ManagementAgentImpl::handleSchemaRe
outBuffer.putRawData(body);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+ connThreadBody.sendBuffer(outBuffer, outLen, rte, rtk);
QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name);
}
@@ -561,7 +561,7 @@ void ManagementAgentImpl::handleConsoleA
QPID_LOG(trace, "RCVD ConsoleAddedInd");
}
-void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& cid, const string& replyTo, const string& userId)
+void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& cid, const string& rte, const string& rtk, const string& userId)
{
string methodName;
bool failed = false;
@@ -572,11 +572,9 @@ void ManagementAgentImpl::invokeMethodRe
MapCodec::decode(body, inMap);
- outMap["_values"] = Variant::Map();
-
if ((oid = inMap.find("_object_id")) == inMap.end() ||
(mid = inMap.find("_method_name")) == inMap.end()) {
- sendException(replyTo, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID),
+ sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID),
Manageable::STATUS_PARAMETER_INVALID);
failed = true;
} else {
@@ -595,6 +593,8 @@ void ManagementAgentImpl::invokeMethodRe
inArgs = (mid->second).asMap();
}
+ QPID_LOG(trace, "Invoking Method: name=" << methodName << " args=" << inArgs);
+
boost::shared_ptr<ManagementObject> oPtr;
{
sys::Mutex::ScopedLock lock(agentLock);
@@ -604,7 +604,7 @@ void ManagementAgentImpl::invokeMethodRe
}
if (oPtr.get() == 0) {
- sendException(replyTo, cid, Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT),
+ sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT),
Manageable::STATUS_UNKNOWN_OBJECT);
failed = true;
} else {
@@ -617,13 +617,13 @@ void ManagementAgentImpl::invokeMethodRe
if (iter->first != "_status_code" && iter->first != "_status_text")
outMap["_arguments"].asMap()[iter->first] = iter->second;
} else {
- sendException(replyTo, cid, callMap["_status_text"], callMap["_status_code"]);
+ sendException(rte, rtk, cid, callMap["_status_text"], callMap["_status_code"]);
failed = true;
}
}
} catch(types::InvalidConversion& e) {
- sendException(replyTo, cid, e.what(), Manageable::STATUS_EXCEPTION);
+ sendException(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION);
failed = true;
}
}
@@ -635,11 +635,11 @@ void ManagementAgentImpl::invokeMethodRe
headers["qmf.opcode"] = "_method_response";
QPID_LOG(trace, "SENT MethodResponse map=" << outMap);
MapCodec::encode(outMap, content);
- connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo);
+ connThreadBody.sendBuffer(content, cid, headers, rte, rtk);
}
}
-void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, const string& replyTo)
+void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, const string& rte, const string& rtk)
{
moveNewObjectsLH();
@@ -666,12 +666,12 @@ void ManagementAgentImpl::handleGetQuery
*/
i = inMap.find("_what");
if (i == inMap.end()) {
- sendException(replyTo, cid, "_what element missing in Query");
+ sendException(rte, rtk, cid, "_what element missing in Query");
return;
}
if (i->second.getType() != qpid::types::VAR_STRING) {
- sendException(replyTo, cid, "_what element is not a string");
+ sendException(rte, rtk, cid, "_what element is not a string");
return;
}
@@ -709,8 +709,8 @@ void ManagementAgentImpl::handleGetQuery
headers.erase("partial");
ListCodec::encode(list_, content);
- connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
- QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo);
+ connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << rte << "/" << rtk);
return;
}
} else { // match using schema_id, if supplied
@@ -771,8 +771,8 @@ void ManagementAgentImpl::handleGetQuery
if (++objCount >= maxV2ReplyObjs) {
objCount = 0;
ListCodec::encode(list_, content);
- connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
- QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo);
+ connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << rte << "/" << rtk);
content.clear();
list_.clear();
}
@@ -784,8 +784,8 @@ void ManagementAgentImpl::handleGetQuery
// Send last "non-partial" message to indicate CommandComplete
headers.erase("partial");
ListCodec::encode(list_, content);
- connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
- QPID_LOG(trace, "SENT QueryResponse (empty with no 'partial' indicator) to=" << replyTo);
+ connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (last message, no 'partial' indicator) to=" << rte << "/" << rtk);
} else if (i->second.asString() == "SCHEMA_ID") {
headers["qmf.content"] = "_schema_id";
@@ -806,16 +806,16 @@ void ManagementAgentImpl::handleGetQuery
headers.erase("partial");
ListCodec::encode(list_, content);
- connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
- QPID_LOG(trace, "SENT QueryResponse (SchemaId) to=" << replyTo);
+ connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (SchemaId) to=" << rte << "/" << rtk);
} else {
// Unknown query target
- sendException(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported");
+ sendException(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported");
}
}
-void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& replyTo)
+void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& rte, const string& rtk)
{
QPID_LOG(trace, "RCVD AgentLocateRequest");
@@ -829,9 +829,9 @@ void ManagementAgentImpl::handleLocateRe
getHeartbeatContent(map);
MapCodec::encode(map, content);
- connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo);
+ connThreadBody.sendBuffer(content, cid, headers, rte, rtk);
- QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo);
+ QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << rte << "/" << rtk);
{
sys::Mutex::ScopedLock lock(agentLock);
@@ -839,12 +839,12 @@ void ManagementAgentImpl::handleLocateRe
}
}
-void ManagementAgentImpl::handleMethodRequest(const string& body, const string& cid, const string& replyTo, const string& userId)
+void ManagementAgentImpl::handleMethodRequest(const string& body, const string& cid, const string& rte, const string& rtk, const string& userId)
{
if (extThread) {
sys::Mutex::ScopedLock lock(agentLock);
- methodQueue.push_back(new QueuedMethod(cid, replyTo, body, userId));
+ methodQueue.push_back(new QueuedMethod(cid, rte, rtk, body, userId));
if (pipeHandle != 0) {
pipeHandle->write("X", 1);
} else if (notifyable != 0) {
@@ -863,7 +863,7 @@ void ManagementAgentImpl::handleMethodRe
inCallback = false;
}
} else {
- invokeMethodRequest(body, cid, replyTo, userId);
+ invokeMethodRequest(body, cid, rte, rtk, userId);
}
QPID_LOG(trace, "RCVD MethodRequest");
@@ -871,10 +871,12 @@ void ManagementAgentImpl::handleMethodRe
void ManagementAgentImpl::received(Message& msg)
{
+ string replyToExchange;
string replyToKey;
framing::MessageProperties mp = msg.getMessageProperties();
if (mp.hasReplyTo()) {
const framing::ReplyTo& rt = mp.getReplyTo();
+ replyToExchange = rt.getExchange();
replyToKey = rt.getRoutingKey();
}
@@ -887,9 +889,9 @@ void ManagementAgentImpl::received(Messa
string opcode = mp.getApplicationHeaders().getAsString("qmf.opcode");
string cid = msg.getMessageProperties().getCorrelationId();
- if (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToKey);
- else if (opcode == "_method_request") handleMethodRequest(msg.getData(), cid, replyToKey, userId);
- else if (opcode == "_query_request") handleGetQuery(msg.getData(), cid, replyToKey);
+ if (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToExchange, replyToKey);
+ else if (opcode == "_method_request") handleMethodRequest(msg.getData(), cid, replyToExchange, replyToKey, userId);
+ else if (opcode == "_query_request") handleGetQuery(msg.getData(), cid, replyToExchange, replyToKey);
else {
QPID_LOG(warning, "Support for QMF V2 Opcode [" << opcode << "] TBD!!!");
}
@@ -906,7 +908,7 @@ void ManagementAgentImpl::received(Messa
if (checkHeader(inBuffer, &opcode, &sequence))
{
- if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToKey);
+ if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey);
else if (opcode == 'x') handleConsoleAddedIndication();
else
QPID_LOG(warning, "Ignoring old-format QMF Request! opcode=" << char(opcode));
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h Tue Feb 8 16:16:49 2011
@@ -128,11 +128,12 @@ class ManagementAgentImpl : public Manag
};
struct QueuedMethod {
- QueuedMethod(const std::string& _cid, const std::string& _reply, const std::string& _body, const std::string& _uid) :
- cid(_cid), replyTo(_reply), body(_body), userId(_uid) {}
+ QueuedMethod(const std::string& _cid, const std::string& _rte, const std::string& _rtk, const std::string& _body, const std::string& _uid) :
+ cid(_cid), replyToExchange(_rte), replyToKey(_rtk), body(_body), userId(_uid) {}
std::string cid;
- std::string replyTo;
+ std::string replyToExchange;
+ std::string replyToKey;
std::string body;
std::string userId;
};
@@ -278,16 +279,16 @@ class ManagementAgentImpl : public Manag
uint8_t type=ManagementItem::CLASS_KIND_TABLE);
bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
void sendHeartbeat();
- void sendException(const std::string& replyToKey, const std::string& cid,
+ void sendException(const std::string& replyToExchange, const std::string& replyToKey, const std::string& cid,
const std::string& text, uint32_t code=1);
void handlePackageRequest (qpid::framing::Buffer& inBuffer);
void handleClassQuery (qpid::framing::Buffer& inBuffer);
- void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, const std::string& replyTo);
- void invokeMethodRequest (const std::string& body, const std::string& cid, const std::string& replyTo, const std::string& userId);
+ void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, const std::string& rte, const std::string& rtk);
+ void invokeMethodRequest (const std::string& body, const std::string& cid, const std::string& rte, const std::string& rtk, const std::string& userId);
- void handleGetQuery (const std::string& body, const std::string& cid, const std::string& replyTo);
- void handleLocateRequest (const std::string& body, const std::string& sequence, const std::string& replyTo);
- void handleMethodRequest (const std::string& body, const std::string& sequence, const std::string& replyTo, const std::string& userId);
+ void handleGetQuery (const std::string& body, const std::string& cid, const std::string& rte, const std::string& rtk);
+ void handleLocateRequest (const std::string& body, const std::string& sequence, const std::string& rte, const std::string& rtk);
+ void handleMethodRequest (const std::string& body, const std::string& sequence, const std::string& rte, const std::string& rtk, const std::string& userId);
void handleConsoleAddedIndication();
void getHeartbeatContent (qpid::types::Variant::Map& map);
};
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Connection.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Connection.cpp Tue Feb 8 16:16:49 2011
@@ -160,7 +160,10 @@ void Connection::received(framing::AMQFr
if (frame.getChannel() == 0 && frame.getMethod()) {
adapter.handle(frame);
} else {
- getChannel(frame.getChannel()).in(frame);
+ if (adapter.isOpen())
+ getChannel(frame.getChannel()).in(frame);
+ else
+ close(connection::CLOSE_CODE_FRAMING_ERROR, "Connection not yet open, invalid frame received.");
}
if (isLink) //i.e. we are acting as the client to another broker
@@ -184,7 +187,8 @@ bool isMessage(const AMQMethodBody* meth
void Connection::recordFromServer(const framing::AMQFrame& frame)
{
- if (mgmtObject != 0)
+ // Don't record management stats in cluster-unsafe contexts
+ if (mgmtObject != 0 && isClusterSafe())
{
mgmtObject->inc_framesToClient();
mgmtObject->inc_bytesToClient(frame.encodedSize());
@@ -196,7 +200,8 @@ void Connection::recordFromServer(const
void Connection::recordFromClient(const framing::AMQFrame& frame)
{
- if (mgmtObject != 0)
+ // Don't record management stats in cluster-unsafe contexts
+ if (mgmtObject != 0 && isClusterSafe())
{
mgmtObject->inc_framesFromClient();
mgmtObject->inc_bytesFromClient(frame.encodedSize());
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Tue Feb 8 16:16:49 2011
@@ -255,8 +255,17 @@ MessageStore* LinkRegistry::getStore() c
return store;
}
-Link::shared_ptr LinkRegistry::findLink(const std::string& key)
+Link::shared_ptr LinkRegistry::findLink(const std::string& keyOrMgmtId)
{
+ // Convert keyOrMgmtId to a host:port key.
+ //
+ // TODO aconway 2011-02-01: centralize code that constructs/parses
+ // connection management IDs. Currently sys:: protocol factories
+ // and IO plugins construct the IDs and LinkRegistry parses them.
+ size_t separator = keyOrMgmtId.find('-');
+ if (separator == std::string::npos) separator = 0;
+ std::string key = keyOrMgmtId.substr(separator+1, std::string::npos);
+
Mutex::ScopedLock locker(lock);
LinkMap::iterator l = links.find(key);
if (l != links.end()) return l->second;
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Tue Feb 8 16:16:49 2011
@@ -372,7 +372,8 @@ void SessionAdapter::QueueHandlerImpl::d
}
//apply settings & create persistent record if required
- queue_created.first->create(arguments);
+ try { queue_created.first->create(arguments); }
+ catch (...) { getBroker().getQueues().destroy(name); throw; }
//add default binding:
getBroker().getExchanges().getDefault()->bind(queue, name, 0);
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp Tue Feb 8 16:16:49 2011
@@ -194,7 +194,7 @@ void SslProtocolFactory::established(sys
const qpid::sys::Socket& s,
sys::ConnectionCodec::Factory* f,
bool isClient) {
- sys::AsynchIOHandler* async = new sys::AsynchIOHandler(s.getPeerAddress(), f);
+ sys::AsynchIOHandler* async = new sys::AsynchIOHandler(s.getFullAddress(), f);
if (tcpNoDelay) {
s.setTcpNoDelay();
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/ConnectionSettings.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/ConnectionSettings.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/ConnectionSettings.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/ConnectionSettings.cpp Tue Feb 8 16:16:49 2011
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,7 +30,7 @@ namespace client {
ConnectionSettings::ConnectionSettings() :
protocol("tcp"),
- host("localhost"),
+ host("localhost"),
port(5672),
locale("en_US"),
heartbeat(0),
@@ -40,7 +40,8 @@ ConnectionSettings::ConnectionSettings()
tcpNoDelay(false),
service(qpid::saslName),
minSsf(0),
- maxSsf(256)
+ maxSsf(256),
+ sslCertName("")
{}
ConnectionSettings::~ConnectionSettings() {}
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/SslConnector.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/SslConnector.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/SslConnector.cpp Tue Feb 8 16:16:49 2011
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -130,7 +130,7 @@ class SslConnector : public Connector
public:
SslConnector(Poller::shared_ptr p, framing::ProtocolVersion pVersion,
- const ConnectionSettings&,
+ const ConnectionSettings&,
ConnectionImpl*);
};
@@ -170,7 +170,7 @@ SslConnector::SslConnector(Poller::share
const ConnectionSettings& settings,
ConnectionImpl* cimpl)
: maxFrameSize(settings.maxFrameSize),
- version(ver),
+ version(ver),
initiated(false),
closed(true),
shutdownHandler(0),
@@ -179,8 +179,11 @@ SslConnector::SslConnector(Poller::share
poller(p)
{
QPID_LOG(debug, "SslConnector created for " << version.toString());
- //TODO: how do we want to handle socket configuration with ssl?
- //settings.configureSocket(socket);
+
+ if (settings.sslCertName != "") {
+ QPID_LOG(debug, "ssl-cert-name = " << settings.sslCertName);
+ socket.setCertName(settings.sslCertName);
+ }
}
SslConnector::~SslConnector() {
@@ -244,14 +247,14 @@ void SslConnector::setShutdownHandler(Sh
}
OutputHandler* SslConnector::getOutputHandler() {
- return this;
+ return this;
}
sys::ShutdownHandler* SslConnector::getShutdownHandler() const {
return shutdownHandler;
}
-const std::string& SslConnector::getIdentifier() const {
+const std::string& SslConnector::getIdentifier() const {
return identifier;
}
@@ -271,7 +274,7 @@ void SslConnector::Writer::init(std::str
aio = a;
newBuffer();
}
-void SslConnector::Writer::handle(framing::AMQFrame& frame) {
+void SslConnector::Writer::handle(framing::AMQFrame& frame) {
Mutex::ScopedLock l(lock);
frames.push_back(frame);
if (frame.getEof() || (bounds && bounds->getCurrentSize() >= maxFrameSize)) {
@@ -372,7 +375,7 @@ const SecuritySettings* SslConnector::ge
{
securitySettings.ssf = socket.getKeyLen();
securitySettings.authid = "dummy";//set to non-empty string to enable external authentication
- return &securitySettings;
+ return &securitySettings;
}
}} // namespace qpid::client
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Tue Feb 8 16:16:49 2011
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -42,7 +42,7 @@ using qpid::framing::Uuid;
void convert(const Variant::List& from, std::vector<std::string>& to)
{
for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) {
- to.push_back(i->asString());
+ to.push_back(i->asString());
}
}
@@ -108,9 +108,11 @@ void convert(const Variant::Map& from, C
setIfFound(from, "bounds", to.bounds);
setIfFound(from, "transport", to.protocol);
+
+ setIfFound(from, "ssl-cert-name", to.sslCertName);
}
-ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) :
+ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) :
reconnect(false), timeout(-1), limit(-1),
minReconnectInterval(3), maxReconnectInterval(60),
retries(0), reconnectOnLimitExceeded(true)
@@ -135,7 +137,7 @@ void ConnectionImpl::setOptions(const Va
setIfFound(options, "reconnect-interval-max", maxReconnectInterval);
}
setIfFound(options, "reconnect-urls", urls);
- setIfFound(options, "x-reconnect-on-limit-exceeded", reconnectOnLimitExceeded);
+ setIfFound(options, "x-reconnect-on-limit-exceeded", reconnectOnLimitExceeded);
}
void ConnectionImpl::setOption(const std::string& name, const Variant& value)
@@ -216,7 +218,7 @@ qpid::messaging::Session ConnectionImpl:
} catch (const qpid::SessionException& e) {
throw qpid::messaging::SessionError(e.what());
} catch (const std::exception& e) {
- throw qpid::messaging::MessagingException(e.what());
+ throw qpid::messaging::MessagingException(e.what());
}
}
return impl;
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h Tue Feb 8 16:16:49 2011
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Tue Feb 8 16:16:49 2011
@@ -497,7 +497,7 @@ void UpdateClient::updateConsumer(
ci->isNotifyEnabled(),
ci->position
);
- consumerNumbering.add(ci);
+ consumerNumbering.add(ci.get());
QPID_LOG(debug, *this << " updated consumer " << ci->getName()
<< " on " << shadowSession.getId());
@@ -584,10 +584,9 @@ void UpdateClient::updateQueueListeners(
}
void UpdateClient::updateQueueListener(std::string& q,
- const boost::shared_ptr<broker::Consumer>& c)
+ const boost::shared_ptr<broker::Consumer>& c)
{
- const boost::shared_ptr<SemanticState::ConsumerImpl> ci =
- boost::dynamic_pointer_cast<SemanticState::ConsumerImpl>(c);
+ SemanticState::ConsumerImpl* ci = dynamic_cast<SemanticState::ConsumerImpl*>(c.get());
size_t n = consumerNumbering[ci];
if (n >= consumerNumbering.size())
throw Exception(QPID_MSG("Unexpected listener on queue " << q));
Propchange: qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb 8 16:16:49 2011
@@ -0,0 +1 @@
+/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp:1061302-1068442
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.h?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.h Tue Feb 8 16:16:49 2011
@@ -106,7 +106,7 @@ class UpdateClient : public sys::Runnabl
void updateBridge(const boost::shared_ptr<broker::Bridge>&);
- Numbering<broker::SemanticState::ConsumerImpl::shared_ptr> consumerNumbering;
+ Numbering<broker::SemanticState::ConsumerImpl*> consumerNumbering;
MemberId updaterId;
MemberId updateeId;
Url updateeUrl;
Propchange: qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb 8 16:16:49 2011
@@ -0,0 +1 @@
+/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h:1061302-1068442
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/framing/Buffer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/framing/Buffer.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/framing/Buffer.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/framing/Buffer.cpp Tue Feb 8 16:16:49 2011
@@ -246,6 +246,7 @@ void Buffer::putShortString(const string
size_t slen = s.length();
if (slen <= std::numeric_limits<uint8_t>::max()) {
uint8_t len = (uint8_t) slen;
+ checkAvailable(slen + 1);
putOctet(len);
s.copy(data + position, len);
position += len;
@@ -258,6 +259,7 @@ void Buffer::putMediumString(const strin
size_t slen = s.length();
if (slen <= std::numeric_limits<uint16_t>::max()) {
uint16_t len = (uint16_t) slen;
+ checkAvailable(slen + 2);
putShort(len);
s.copy(data + position, len);
position += len;
@@ -268,6 +270,7 @@ void Buffer::putMediumString(const strin
void Buffer::putLongString(const string& s){
uint32_t len = s.length();
+ checkAvailable(len + 4);
putLong(len);
s.copy(data + position, len);
position += len;
@@ -301,6 +304,7 @@ void Buffer::getBin128(uint8_t* b){
void Buffer::putRawData(const string& s){
uint32_t len = s.length();
+ checkAvailable(len);
s.copy(data + position, len);
position += len;
}
@@ -312,6 +316,7 @@ void Buffer::getRawData(string& s, uint3
}
void Buffer::putRawData(const uint8_t* s, size_t len){
+ checkAvailable(len);
memcpy(data + position, s, len);
position += len;
}
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.cpp Tue Feb 8 16:16:49 2011
@@ -106,7 +106,8 @@ ManagementAgent::ManagementAgent (const
startTime(sys::now()),
suppressed(false), disallowAllV1Methods(false),
vendorNameKey(defaultVendorName), productNameKey(defaultProductName),
- qmf1Support(qmfV1), qmf2Support(qmfV2), maxV2ReplyObjs(100)
+ qmf1Support(qmfV1), qmf2Support(qmfV2), maxReplyObjs(100),
+ msgBuffer(MA_BUFFER_SIZE)
{
nextObjectId = 1;
brokerBank = 1;
@@ -502,7 +503,7 @@ bool ManagementAgent::checkHeader (Buffe
void ManagementAgent::sendBufferLH(Buffer& buf,
uint32_t length,
qpid::broker::Exchange::shared_ptr exchange,
- string routingKey)
+ const string& routingKey)
{
if (suppressed) {
QPID_LOG(debug, "Suppressing management message to " << routingKey);
@@ -547,6 +548,17 @@ void ManagementAgent::sendBufferLH(Buffe
}
+void ManagementAgent::sendBufferLH(Buffer& buf,
+ uint32_t length,
+ const string& exchange,
+ const string& routingKey)
+{
+ qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange));
+ if (ex.get() != 0)
+ sendBufferLH(buf, length, ex, routingKey);
+}
+
+
// NOTE WELL: assumes userLock is held by caller (LH)
// NOTE EVEN WELLER: drops this lock when delivering the message!!!
void ManagementAgent::sendBufferLH(const string& data,
@@ -611,6 +623,20 @@ void ManagementAgent::sendBufferLH(const
}
+void ManagementAgent::sendBufferLH(const string& data,
+ const string& cid,
+ const Variant::Map& headers,
+ const string& content_type,
+ const string& exchange,
+ const string& routingKey,
+ uint64_t ttl_msec)
+{
+ qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange));
+ if (ex.get() != 0)
+ sendBufferLH(data, cid, headers, content_type, ex, routingKey, ttl_msec);
+}
+
+
/** Objects that have been added since the last periodic poll are temporarily
* saved in the newManagementObjects list. This allows objects to be
* added without needing to block on the userLock (addLock is used instead).
@@ -663,7 +689,6 @@ void ManagementAgent::periodicProcessing
#define HEADROOM 4096
debugSnapshot("Management agent periodic processing");
sys::Mutex::ScopedLock lock (userLock);
- char msgChars[BUFSIZE];
uint32_t contentSize;
string routingKey;
string sBuf;
@@ -704,7 +729,7 @@ void ManagementAgent::periodicProcessing
for (PendingDeletedObjsMap::iterator mIter = tmp.begin(); mIter != tmp.end(); mIter++) {
std::string packageName;
std::string className;
- Buffer msgBuffer(msgChars, BUFSIZE);
+ msgBuffer.reset();
uint32_t v1Objs = 0;
uint32_t v2Objs = 0;
Variant::List list_;
@@ -715,6 +740,7 @@ void ManagementAgent::periodicProcessing
for (DeletedObjectList::iterator lIter = mIter->second.begin();
lIter != mIter->second.end(); lIter++) {
+ msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space.
std::string oid = (*lIter)->objectId;
if (!(*lIter)->encodedV1Config.empty()) {
encodeHeader(msgBuffer, 'c');
@@ -730,9 +756,9 @@ void ManagementAgent::periodicProcessing
<< " len=" << (*lIter)->encodedV1Inst.size());
v1Objs++;
}
- if (v1Objs && msgBuffer.available() < HEADROOM) {
+ if (v1Objs >= maxReplyObjs) {
v1Objs = 0;
- contentSize = BUFSIZE - msgBuffer.available();
+ contentSize = msgBuffer.getSize();
stringstream key;
key << "console.obj.1.0." << packageName << "." << className;
msgBuffer.reset();
@@ -744,7 +770,7 @@ void ManagementAgent::periodicProcessing
if (!(*lIter)->encodedV2.empty()) {
QPID_LOG(trace, "Deleting V2 " << "map=" << (*lIter)->encodedV2);
list_.push_back((*lIter)->encodedV2);
- if (++v2Objs >= maxV2ReplyObjs) {
+ if (++v2Objs >= maxReplyObjs) {
v2Objs = 0;
string content;
@@ -815,11 +841,11 @@ void ManagementAgent::periodicProcessing
// sendBuffer() call, so always restart the search after a sendBuffer() call
//
while (1) {
- Buffer msgBuffer(msgChars, BUFSIZE);
+ msgBuffer.reset();
Variant::List list_;
uint32_t pcount;
uint32_t scount;
- uint32_t v2Objs;
+ uint32_t v1Objs, v2Objs;
ManagementObjectMap::iterator baseIter;
std::string packageName;
std::string className;
@@ -842,6 +868,7 @@ void ManagementAgent::periodicProcessing
break; // done - all objects processed
pcount = scount = 0;
+ v1Objs = 0;
v2Objs = 0;
list_.clear();
msgBuffer.reset();
@@ -849,6 +876,7 @@ void ManagementAgent::periodicProcessing
for (ManagementObjectMap::iterator iter = baseIter;
iter != managementObjects.end();
iter++) {
+ msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space
ManagementObject* baseObject = baseIter->second;
ManagementObject* object = iter->second;
bool send_stats, send_props;
@@ -875,6 +903,7 @@ void ManagementAgent::periodicProcessing
QPID_LOG(trace, "Changed V1 properties "
<< object->getObjectId().getV2Key()
<< " len=" << msgBuffer.getPosition()-pos);
+ ++v1Objs;
}
if (send_stats && qmf1Support) {
@@ -886,7 +915,7 @@ void ManagementAgent::periodicProcessing
QPID_LOG(trace, "Changed V1 statistics "
<< object->getObjectId().getV2Key()
<< " len=" << msgBuffer.getPosition()-pos);
-
+ ++v1Objs;
}
if ((send_stats || send_props) && qmf2Support) {
@@ -916,8 +945,8 @@ void ManagementAgent::periodicProcessing
object->setForcePublish(false);
- if ((qmf1Support && (msgBuffer.available() < HEADROOM)) ||
- (qmf2Support && (v2Objs >= maxV2ReplyObjs)))
+ if ((qmf1Support && (v1Objs >= maxReplyObjs)) ||
+ (qmf2Support && (v2Objs >= maxReplyObjs)))
break; // have enough objects, send an indication...
}
}
@@ -1102,7 +1131,7 @@ void ManagementAgent::sendCommandComplet
replyToKey << " seq=" << sequence);
}
-void ManagementAgent::sendExceptionLH(const string& replyToKey, const string& cid,
+void ManagementAgent::sendExceptionLH(const string& rte, const string& rtk, const string& cid,
const string& text, uint32_t code, bool viaLocal)
{
static const string addr_exchange("qmf.default.direct");
@@ -1121,7 +1150,7 @@ void ManagementAgent::sendExceptionLH(co
map["_values"] = values;
MapCodec::encode(map, content);
- sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyToKey);
+ sendBufferLH(content, cid, headers, "amqp/map", rte, rtk);
QPID_LOG(debug, "SENT Exception code=" << code <<" text=" << text);
}
@@ -1291,7 +1320,7 @@ void ManagementAgent::handleMethodReques
}
-void ManagementAgent::handleMethodRequestLH (const string& body, const string& replyTo,
+void ManagementAgent::handleMethodRequestLH (const string& body, const string& rte, const string& rtk,
const string& cid, const ConnectionToken* connToken, bool viaLocal)
{
moveNewObjectsLH();
@@ -1313,7 +1342,7 @@ void ManagementAgent::handleMethodReques
if ((oid = inMap.find("_object_id")) == inMap.end() ||
(mid = inMap.find("_method_name")) == inMap.end()) {
- sendExceptionLH(replyTo, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID),
+ sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID),
Manageable::STATUS_PARAMETER_INVALID, viaLocal);
return;
}
@@ -1332,7 +1361,7 @@ void ManagementAgent::handleMethodReques
inArgs = (mid->second).asMap();
}
} catch(exception& e) {
- sendExceptionLH(replyTo, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal);
+ sendExceptionLH(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal);
return;
}
@@ -1341,7 +1370,7 @@ void ManagementAgent::handleMethodReques
if (iter == managementObjects.end() || iter->second->isDeleted()) {
stringstream estr;
estr << "No object found with ID=" << objId;
- sendExceptionLH(replyTo, cid, estr.str(), 1, viaLocal);
+ sendExceptionLH(rte, rtk, cid, estr.str(), 1, viaLocal);
return;
}
@@ -1351,7 +1380,7 @@ void ManagementAgent::handleMethodReques
i = disallowed.find(make_pair(iter->second->getClassName(), methodName));
if (i != disallowed.end()) {
- sendExceptionLH(replyTo, cid, i->second, Manageable::STATUS_FORBIDDEN, viaLocal);
+ sendExceptionLH(rte, rtk, cid, i->second, Manageable::STATUS_FORBIDDEN, viaLocal);
return;
}
@@ -1362,7 +1391,7 @@ void ManagementAgent::handleMethodReques
params[acl::PROP_SCHEMACLASS] = iter->second->getClassName();
if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) {
- sendExceptionLH(replyTo, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
+ sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
Manageable::STATUS_FORBIDDEN, viaLocal);
return;
}
@@ -1372,7 +1401,7 @@ void ManagementAgent::handleMethodReques
QPID_LOG(debug, "RECV MethodRequest (v2) class=" << iter->second->getPackageName()
<< ":" << iter->second->getClassName() << " method=" <<
- methodName << " replyTo=" << replyTo << " objId=" << objId << " inArgs=" << inArgs);
+ methodName << " replyTo=" << rte << "/" << rtk << " objId=" << objId << " inArgs=" << inArgs);
try {
sys::Mutex::ScopedUnlock u(userLock);
@@ -1387,18 +1416,18 @@ void ManagementAgent::handleMethodReques
} else
error = callMap["_status_text"].asString();
} catch(exception& e) {
- sendExceptionLH(replyTo, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal);
+ sendExceptionLH(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal);
return;
}
if (errorCode != 0) {
- sendExceptionLH(replyTo, cid, error, errorCode, viaLocal);
+ sendExceptionLH(rte, rtk, cid, error, errorCode, viaLocal);
return;
}
MapCodec::encode(outMap, content);
- sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
- QPID_LOG(debug, "SEND MethodResponse (v2) to=" << replyTo << " seq=" << cid << " map=" << outMap);
+ sendBufferLH(content, cid, headers, "amqp/map", rte, rtk);
+ QPID_LOG(debug, "SEND MethodResponse (v2) to=" << rte << "/" << rtk << " seq=" << cid << " map=" << outMap);
}
@@ -1545,7 +1574,7 @@ void ManagementAgent::SchemaClass::appen
buf.putRawData(reinterpret_cast<uint8_t*>(&data[0]), data.size());
}
-void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& replyToKey, uint32_t sequence)
+void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& rte, const string& rtk, uint32_t sequence)
{
string packageName;
SchemaClassKey key;
@@ -1554,7 +1583,7 @@ void ManagementAgent::handleSchemaReques
key.decode(inBuffer);
QPID_LOG(debug, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
- "), replyTo=" << replyToKey << " seq=" << sequence);
+ "), replyTo=" << rte << "/" << rtk << " seq=" << sequence);
PackageMap::iterator pIter = packages.find(packageName);
if (pIter != packages.end()) {
@@ -1570,17 +1599,17 @@ void ManagementAgent::handleSchemaReques
classInfo.appendSchema(outBuffer);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence);
+ sendBufferLH(outBuffer, outLen, rte, rtk);
+ QPID_LOG(debug, "SEND SchemaResponse to=" << rte << "/" << rtk << " seq=" << sequence);
}
else
- sendCommandCompleteLH(replyToKey, sequence, 1, "Schema not available");
+ sendCommandCompleteLH(rtk, sequence, 1, "Schema not available");
}
else
- sendCommandCompleteLH(replyToKey, sequence, 1, "Class key not found");
+ sendCommandCompleteLH(rtk, sequence, 1, "Class key not found");
}
else
- sendCommandCompleteLH(replyToKey, sequence, 1, "Package not found");
+ sendCommandCompleteLH(rtk, sequence, 1, "Package not found");
}
void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*replyToKey*/, uint32_t sequence)
@@ -1840,7 +1869,7 @@ void ManagementAgent::handleGetQueryLH(B
}
-void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo, const string& cid, bool viaLocal)
+void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, const string& rtk, const string& cid, bool viaLocal)
{
moveNewObjectsLH();
@@ -1861,17 +1890,17 @@ void ManagementAgent::handleGetQueryLH(c
*/
i = inMap.find("_what");
if (i == inMap.end()) {
- sendExceptionLH(replyTo, cid, "_what element missing in Query");
+ sendExceptionLH(rte, rtk, cid, "_what element missing in Query");
return;
}
if (i->second.getType() != qpid::types::VAR_STRING) {
- sendExceptionLH(replyTo, cid, "_what element is not a string");
+ sendExceptionLH(rte, rtk, cid, "_what element is not a string");
return;
}
if (i->second.asString() != "OBJECT") {
- sendExceptionLH(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported");
+ sendExceptionLH(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported");
return;
}
@@ -1930,8 +1959,8 @@ void ManagementAgent::handleGetQueryLH(c
string content;
ListCodec::encode(list_, content);
- sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
- QPID_LOG(debug, "SENT QueryResponse (query by object_id) to=" << replyTo);
+ sendBufferLH(content, cid, headers, "amqp/list", rte, rtk);
+ QPID_LOG(debug, "SENT QueryResponse (query by object_id) to=" << rte << "/" << rtk);
return;
}
} else {
@@ -1967,7 +1996,7 @@ void ManagementAgent::handleGetQueryLH(c
"_data",
object->getMd5Sum());
_subList.push_back(map_);
- if (++objCount >= maxV2ReplyObjs) {
+ if (++objCount >= maxReplyObjs) {
objCount = 0;
_list.push_back(_subList);
_subList.clear();
@@ -1983,27 +2012,26 @@ void ManagementAgent::handleGetQueryLH(c
string content;
while (_list.size() > 1) {
ListCodec::encode(_list.front().asList(), content);
- sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
+ sendBufferLH(content, cid, headers, "amqp/list", rte, rtk);
_list.pop_front();
- QPID_LOG(debug, "SENT QueryResponse (partial, query by schema_id) to=" << replyTo << " len=" << content.length());
+ QPID_LOG(debug, "SENT QueryResponse (partial, query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length());
}
headers.erase("partial");
ListCodec::encode(_list.size() ? _list.front().asList() : Variant::List(), content);
- sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
- QPID_LOG(debug, "SENT QueryResponse (query by schema_id) to=" << replyTo << " len=" << content.length());
+ sendBufferLH(content, cid, headers, "amqp/list", rte, rtk);
+ QPID_LOG(debug, "SENT QueryResponse (query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length());
return;
}
// Unrecognized query - Send empty message to indicate CommandComplete
string content;
ListCodec::encode(Variant::List(), content);
- sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
- QPID_LOG(debug, "SENT QueryResponse (empty) to=" << replyTo);
+ sendBufferLH(content, cid, headers, "amqp/list", rte, rtk);
+ QPID_LOG(debug, "SENT QueryResponse (empty) to=" << rte << "/" << rtk);
}
-void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo,
- const string& cid)
+void ManagementAgent::handleLocateRequestLH(const string&, const string& rte, const string& rtk, const string& cid)
{
QPID_LOG(debug, "RCVD AgentLocateRequest");
@@ -2021,10 +2049,10 @@ void ManagementAgent::handleLocateReques
string content;
MapCodec::encode(map, content);
- sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
+ sendBufferLH(content, cid, headers, "amqp/map", rte, rtk);
clientWasAdded = true;
- QPID_LOG(debug, "SENT AgentLocateResponse replyTo=" << replyTo);
+ QPID_LOG(debug, "SENT AgentLocateResponse replyTo=" << rte << "/" << rtk);
}
@@ -2151,13 +2179,14 @@ bool ManagementAgent::authorizeAgentMess
msg.getFrames().getHeaders()->get<framing::MessageProperties>();
if (p && p->hasReplyTo()) {
const framing::ReplyTo& rt = p->getReplyTo();
- string replyToKey = rt.getRoutingKey();
+ string rte = rt.getExchange();
+ string rtk = rt.getRoutingKey();
string cid;
if (p && p->hasCorrelationId())
cid = p->getCorrelationId();
if (mapMsg) {
- sendExceptionLH(replyToKey, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
+ sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
Manageable::STATUS_FORBIDDEN, false);
} else {
@@ -2169,7 +2198,7 @@ bool ManagementAgent::authorizeAgentMess
outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
+ sendBufferLH(outBuffer, outLen, rte, rtk);
}
QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
@@ -2183,12 +2212,14 @@ bool ManagementAgent::authorizeAgentMess
void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal)
{
- string replyToKey;
+ string rte;
+ string rtk;
const framing::MessageProperties* p =
msg.getFrames().getHeaders()->get<framing::MessageProperties>();
if (p && p->hasReplyTo()) {
const framing::ReplyTo& rt = p->getReplyTo();
- replyToKey = rt.getRoutingKey();
+ rte = rt.getExchange();
+ rtk = rt.getRoutingKey();
}
else
return;
@@ -2220,11 +2251,11 @@ void ManagementAgent::dispatchAgentComma
}
if (opcode == "_method_request")
- return handleMethodRequestLH(body, replyToKey, cid, msg.getPublisher(), viaLocal);
+ return handleMethodRequestLH(body, rte, rtk, cid, msg.getPublisher(), viaLocal);
else if (opcode == "_query_request")
- return handleGetQueryLH(body, replyToKey, cid, viaLocal);
+ return handleGetQueryLH(body, rte, rtk, cid, viaLocal);
else if (opcode == "_agent_locate_request")
- return handleLocateRequestLH(body, replyToKey, cid);
+ return handleLocateRequestLH(body, rte, rtk, cid);
QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!");
return;
@@ -2237,16 +2268,16 @@ void ManagementAgent::dispatchAgentComma
if (!checkHeader(inBuffer, &opcode, &sequence))
return;
- if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'P') handlePackageQueryLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'p') handlePackageIndLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'Q') handleClassQueryLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'q') handleClassIndLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence);
- else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher());
- else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher());
+ if (opcode == 'B') handleBrokerRequestLH (inBuffer, rtk, sequence);
+ else if (opcode == 'P') handlePackageQueryLH (inBuffer, rtk, sequence);
+ else if (opcode == 'p') handlePackageIndLH (inBuffer, rtk, sequence);
+ else if (opcode == 'Q') handleClassQueryLH (inBuffer, rtk, sequence);
+ else if (opcode == 'q') handleClassIndLH (inBuffer, rtk, sequence);
+ else if (opcode == 'S') handleSchemaRequestLH (inBuffer, rte, rtk, sequence);
+ else if (opcode == 's') handleSchemaResponseLH (inBuffer, rtk, sequence);
+ else if (opcode == 'A') handleAttachRequestLH (inBuffer, rtk, sequence, msg.getPublisher());
+ else if (opcode == 'G') handleGetQueryLH (inBuffer, rtk, sequence);
+ else if (opcode == 'M') handleMethodRequestLH (inBuffer, rtk, sequence, msg.getPublisher());
}
}
Propchange: qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb 8 16:16:49 2011
@@ -0,0 +1 @@
+/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp:1061302-1068442
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.h Tue Feb 8 16:16:49 2011
@@ -35,6 +35,7 @@
#include "qpid/types/Variant.h"
#include <qpid/framing/AMQFrame.h>
#include <qpid/framing/FieldValue.h>
+#include <qpid/framing/ResizableBuffer.h>
#include <memory>
#include <string>
#include <map>
@@ -330,7 +331,7 @@ private:
// Maximum # of objects allowed in a single V2 response
// message.
- uint32_t maxV2ReplyObjs;
+ uint32_t maxReplyObjs;
// list of objects that have been deleted, but have yet to be published
// one final time.
@@ -343,6 +344,7 @@ private:
char inputBuffer[MA_BUFFER_SIZE];
char outputBuffer[MA_BUFFER_SIZE];
char eventBuffer[MA_BUFFER_SIZE];
+ framing::ResizableBuffer msgBuffer;
void writeData ();
void periodicProcessing (void);
@@ -352,7 +354,11 @@ private:
void sendBufferLH(framing::Buffer& buf,
uint32_t length,
qpid::broker::Exchange::shared_ptr exchange,
- std::string routingKey);
+ const std::string& routingKey);
+ void sendBufferLH(framing::Buffer& buf,
+ uint32_t length,
+ const std::string& exchange,
+ const std::string& routingKey);
void sendBufferLH(const std::string& data,
const std::string& cid,
const qpid::types::Variant::Map& headers,
@@ -360,6 +366,13 @@ private:
qpid::broker::Exchange::shared_ptr exchange,
const std::string& routingKey,
uint64_t ttl_msec = 0);
+ void sendBufferLH(const std::string& data,
+ const std::string& cid,
+ const qpid::types::Variant::Map& headers,
+ const std::string& content_type,
+ const std::string& exchange,
+ const std::string& routingKey,
+ uint64_t ttl_msec = 0);
void moveNewObjectsLH();
bool moveDeletedObjectsLH();
@@ -384,20 +397,20 @@ private:
void deleteOrphanedAgentsLH();
void sendCommandCompleteLH(const std::string& replyToKey, uint32_t sequence,
uint32_t code = 0, const std::string& text = "OK");
- void sendExceptionLH(const std::string& replyToKey, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false);
+ void sendExceptionLH(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false);
void handleBrokerRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
void handlePackageQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
void handlePackageIndLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
void handleClassQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
void handleClassIndLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handleSchemaRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handleSchemaRequestLH (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence);
void handleSchemaResponseLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
void handleAttachRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
void handleGetQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
void handleMethodRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
- void handleGetQueryLH (const std::string& body, const std::string& replyToKey, const std::string& cid, bool viaLocal);
- void handleMethodRequestLH (const std::string& body, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal);
- void handleLocateRequestLH (const std::string& body, const std::string &replyToKey, const std::string& cid);
+ void handleGetQueryLH (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, bool viaLocal);
+ void handleMethodRequestLH (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal);
+ void handleLocateRequestLH (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid);
size_t validateSchema(framing::Buffer&, uint8_t kind);
Propchange: qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb 8 16:16:49 2011
@@ -0,0 +1 @@
+/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h:1061302-1068442
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp Tue Feb 8 16:16:49 2011
@@ -84,7 +84,7 @@ class RdmaIOHandler : public OutputContr
};
RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr c, qpid::sys::ConnectionCodec::Factory* f) :
- identifier(c->getPeerName()),
+ identifier(c->getFullName()),
factory(f),
codec(0),
readError(false),
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/Socket.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/Socket.h?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/Socket.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/Socket.h Tue Feb 8 16:16:49 2011
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -60,26 +60,31 @@ public:
QPID_COMMON_EXTERN int listen(uint16_t port = 0, int backlog = 10) const;
QPID_COMMON_EXTERN int listen(const SocketAddress&, int backlog = 10) const;
- /** Returns the "socket name" ie the address bound to
+ /** Returns the "socket name" ie the address bound to
* the near end of the socket
*/
QPID_COMMON_EXTERN std::string getSockname() const;
- /** Returns the "peer name" ie the address bound to
+ /** Returns the "peer name" ie the address bound to
* the remote end of the socket
*/
std::string getPeername() const;
- /**
+ /**
* Returns an address (host and port) for the remote end of the
* socket
*/
QPID_COMMON_EXTERN std::string getPeerAddress() const;
- /**
+ /**
* Returns an address (host and port) for the local end of the
* socket
*/
- std::string getLocalAddress() const;
+ QPID_COMMON_EXTERN std::string getLocalAddress() const;
+
+ /**
+ * Returns the full address of the connection: local and remote host and port.
+ */
+ QPID_COMMON_EXTERN std::string getFullAddress() const { return getLocalAddress()+"-"+getPeerAddress(); }
QPID_COMMON_EXTERN uint16_t getLocalPort() const;
uint16_t getRemotePort() const;
@@ -95,7 +100,7 @@ public:
*/
QPID_COMMON_EXTERN Socket* accept() const;
- // TODO The following are raw operations, maybe they need better wrapping?
+ // TODO The following are raw operations, maybe they need better wrapping?
QPID_COMMON_EXTERN int read(void *buf, size_t count) const;
QPID_COMMON_EXTERN int write(const void *buf, size_t count) const;
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/SslPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/SslPlugin.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/SslPlugin.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/SslPlugin.cpp Tue Feb 8 16:16:49 2011
@@ -121,7 +121,7 @@ SslProtocolFactory::SslProtocolFactory(c
void SslProtocolFactory::established(Poller::shared_ptr poller, const qpid::sys::ssl::SslSocket& s,
ConnectionCodec::Factory* f, bool isClient) {
- qpid::sys::ssl::SslHandler* async = new qpid::sys::ssl::SslHandler(s.getPeerAddress(), f, nodict);
+ qpid::sys::ssl::SslHandler* async = new qpid::sys::ssl::SslHandler(s.getFullAddress(), f, nodict);
if (tcpNoDelay) {
s.setTcpNoDelay(tcpNoDelay);
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp Tue Feb 8 16:16:49 2011
@@ -81,7 +81,7 @@ AsynchIOProtocolFactory::AsynchIOProtoco
void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
ConnectionCodec::Factory* f, bool isClient) {
- AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f);
+ AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f);
if (tcpNoDelay) {
s.setTcpNoDelay();
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h Tue Feb 8 16:16:49 2011
@@ -274,6 +274,7 @@ namespace Rdma {
QueuePair::intrusive_ptr getQueuePair();
std::string getLocalName() const;
std::string getPeerName() const;
+ std::string getFullName() const { return getLocalName()+"-"+getPeerName(); }
};
}
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp Tue Feb 8 16:16:49 2011
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -52,9 +52,9 @@ namespace ssl {
namespace {
std::string getName(int fd, bool local, bool includeService = false)
{
- ::sockaddr_storage name; // big enough for any socket address
+ ::sockaddr_storage name; // big enough for any socket address
::socklen_t namelen = sizeof(name);
-
+
int result = -1;
if (local) {
result = ::getsockname(fd, (::sockaddr*)&name, &namelen);
@@ -67,8 +67,8 @@ std::string getName(int fd, bool local,
char servName[NI_MAXSERV];
char dispName[NI_MAXHOST];
if (includeService) {
- if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName),
- servName, sizeof(servName),
+ if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName),
+ servName, sizeof(servName),
NI_NUMERICHOST | NI_NUMERICSERV) != 0)
throw QPID_POSIX_ERROR(rc);
return std::string(dispName) + ":" + std::string(servName);
@@ -82,9 +82,9 @@ std::string getName(int fd, bool local,
std::string getService(int fd, bool local)
{
- ::sockaddr_storage name; // big enough for any socket address
+ ::sockaddr_storage name; // big enough for any socket address
::socklen_t namelen = sizeof(name);
-
+
int result = -1;
if (local) {
result = ::getsockname(fd, (::sockaddr*)&name, &namelen);
@@ -95,8 +95,8 @@ std::string getService(int fd, bool loca
QPID_POSIX_CHECK(result);
char servName[NI_MAXSERV];
- if (int rc=::getnameinfo((::sockaddr*)&name, namelen, 0, 0,
- servName, sizeof(servName),
+ if (int rc=::getnameinfo((::sockaddr*)&name, namelen, 0, 0,
+ servName, sizeof(servName),
NI_NUMERICHOST | NI_NUMERICSERV) != 0)
throw QPID_POSIX_ERROR(rc);
return servName;
@@ -132,8 +132,8 @@ std::string getDomainFromSubject(std::st
}
-SslSocket::SslSocket() : IOHandle(new IOHandlePrivate()), socket(0), prototype(0)
-{
+SslSocket::SslSocket() : IOHandle(new IOHandlePrivate()), socket(0), prototype(0)
+{
impl->fd = ::socket (PF_INET, SOCK_STREAM, 0);
if (impl->fd < 0) throw QPID_POSIX_ERROR(errno);
socket = SSL_ImportFD(0, PR_ImportTCPSocket(impl->fd));
@@ -145,12 +145,12 @@ SslSocket::SslSocket() : IOHandle(new IO
* PR_Accept, we have to reset the handshake.
*/
SslSocket::SslSocket(IOHandlePrivate* ioph, PRFileDesc* model) : IOHandle(ioph), socket(0), prototype(0)
-{
+{
socket = SSL_ImportFD(model, PR_ImportTCPSocket(impl->fd));
NSS_CHECK(SSL_ResetHandshake(socket, true));
}
-void SslSocket::setNonblocking() const
+void SslSocket::setNonblocking() const
{
PRSocketOptionData option;
option.option = PR_SockOpt_Nonblocking;
@@ -164,7 +164,15 @@ void SslSocket::connect(const std::strin
namestream << host << ":" << port;
connectname = namestream.str();
- void* arg = SslOptions::global.certName.empty() ? 0 : const_cast<char*>(SslOptions::global.certName.c_str());
+ void* arg;
+ // Use the connection's cert-name if it has one; else use global cert-name
+ if (certname != "") {
+ arg = const_cast<char*>(certname.c_str());
+ } else if (SslOptions::global.certName.empty()) {
+ arg = 0;
+ } else {
+ arg = const_cast<char*>(SslOptions::global.certName.c_str());
+ }
NSS_CHECK(SSL_GetClientAuthDataHook(socket, NSS_GetClientAuthData, arg));
NSS_CHECK(SSL_SetURL(socket, host.data()));
@@ -220,7 +228,7 @@ int SslSocket::listen(uint16_t port, int
throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(errno)));
if (::listen(socket, backlog) < 0)
throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(errno)));
-
+
socklen_t namelen = sizeof(name);
if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0)
throw QPID_POSIX_ERROR(errno);
@@ -235,7 +243,7 @@ SslSocket* SslSocket::accept() const
return new SslSocket(new IOHandlePrivate(afd), prototype);
} else if (errno == EAGAIN) {
return 0;
- } else {
+ } else {
throw QPID_POSIX_ERROR(errno);
}
}
@@ -303,6 +311,11 @@ void SslSocket::setTcpNoDelay(bool nodel
}
}
+void SslSocket::setCertName(const std::string& name)
+{
+ certname = name;
+}
+
/** get the bit length of the current cipher's key */
int SslSocket::getKeyLen() const
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/ssl/SslSocket.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/ssl/SslSocket.h?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/ssl/SslSocket.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/ssl/SslSocket.h Tue Feb 8 16:16:49 2011
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -41,13 +41,18 @@ class SslSocket : public qpid::sys::IOHa
public:
/** Create a socket wrapper for descriptor. */
SslSocket();
-
+
/** Set socket non blocking */
void setNonblocking() const;
/** Set tcp-nodelay */
void setTcpNoDelay(bool nodelay) const;
+ /** Set SSL cert-name. Allows the cert-name to be set per
+ * connection, overriding global cert-name settings from
+ * NSSInit().*/
+ void setCertName(const std::string& certName);
+
void connect(const std::string& host, uint16_t port) const;
void close() const;
@@ -59,38 +64,43 @@ public:
*@return The bound port.
*/
int listen(uint16_t port = 0, int backlog = 10, const std::string& certName = "localhost.localdomain", bool clientAuth = false) const;
-
- /**
+
+ /**
* Accept a connection from a socket that is already listening
* and has an incoming connection
*/
SslSocket* accept() const;
- // TODO The following are raw operations, maybe they need better wrapping?
+ // TODO The following are raw operations, maybe they need better wrapping?
int read(void *buf, size_t count) const;
int write(const void *buf, size_t count) const;
- /** Returns the "socket name" ie the address bound to
+ /** Returns the "socket name" ie the address bound to
* the near end of the socket
*/
std::string getSockname() const;
- /** Returns the "peer name" ie the address bound to
+ /** Returns the "peer name" ie the address bound to
* the remote end of the socket
*/
std::string getPeername() const;
- /**
+ /**
* Returns an address (host and port) for the remote end of the
* socket
*/
std::string getPeerAddress() const;
- /**
+ /**
* Returns an address (host and port) for the local end of the
* socket
*/
std::string getLocalAddress() const;
+ /**
+ * Returns the full address of the connection: local and remote host and port.
+ */
+ std::string getFullAddress() const { return getLocalAddress()+"-"+getPeerAddress(); }
+
uint16_t getLocalPort() const;
uint16_t getRemotePort() const;
@@ -106,6 +116,8 @@ public:
private:
mutable std::string connectname;
mutable PRFileDesc* socket;
+ std::string certname;
+
/**
* 'model' socket, with configuration to use when importing
* accepted sockets for use as ssl sockets. Set on listen(), used
Modified: qpid/branches/qpid-2935/qpid/cpp/src/tests/cli_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/tests/cli_tests.py?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/tests/cli_tests.py (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/tests/cli_tests.py Tue Feb 8 16:16:49 2011
@@ -22,7 +22,7 @@ import sys
import os
import imp
from qpid.testlib import TestBase010
-# from qpid.brokertest import import_script, checkenv
+# from brokertest import import_script, checkenv
from qpid.datatypes import Message
from qpid.queue import Empty
from time import sleep
Modified: qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster.mk?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster.mk (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster.mk Tue Feb 8 16:16:49 2011
@@ -44,6 +44,7 @@ EXTRA_DIST += \
run_cluster_tests \
run_long_cluster_tests \
testlib.py \
+ brokertest.py \
cluster_tests.py \
cluster_test_logs.py \
long_cluster_tests.py \
@@ -93,7 +94,7 @@ cluster_test_SOURCES = \
cluster_test_LDADD=$(lib_client) $(lib_broker) ../cluster.la -lboost_unit_test_framework
-qpidtest_SCRIPTS += run_cluster_tests cluster_tests.py run_long_cluster_tests long_cluster_tests.py testlib.py cluster_tests.fail
+qpidtest_SCRIPTS += run_cluster_tests brokertest.py cluster_tests.py run_long_cluster_tests long_cluster_tests.py testlib.py cluster_tests.fail
qpidtest_SCRIPTS += $(CLUSTER_TEST_SCRIPTS_LIST)
endif
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org