You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/02/08 17:16:56 UTC

svn commit: r1068464 [2/6] - in /qpid/branches/qpid-2935/qpid: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/examples/python/ cpp/bindings/qmf2/examples/ruby/ cpp/bindings/qmf2/python/ cpp/bindings/qmf2/ruby/ cpp/bindings...

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSession.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSession.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSession.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSession.cpp Tue Feb  8 16:16:49 2011
@@ -36,6 +36,7 @@ using namespace qmf;
 using qpid::messaging::Address;
 using qpid::messaging::Connection;
 using qpid::messaging::Receiver;
+using qpid::messaging::Sender;
 using qpid::messaging::Duration;
 using qpid::messaging::Message;
 using qpid::types::Variant;
@@ -64,9 +65,9 @@ Subscription ConsoleSession::subscribe(c
 //========================================================================================
 
 ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
-    connection(c), domain("default"), maxAgentAgeMinutes(5), opened(false),
-    thread(0), threadCanceled(false),
-    lastVisit(0), lastAgePass(0), connectedBrokerInAgentList(false), schemaCache(new SchemaCache())
+    connection(c), domain("default"), authUser(c.getAuthenticatedUsername()), maxAgentAgeMinutes(5),
+    opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
+    connectedBrokerInAgentList(false), schemaCache(new SchemaCache())
 {
     if (!options.empty()) {
         qpid::messaging::AddressParser parser(options);
@@ -82,6 +83,14 @@ ConsoleSessionImpl::ConsoleSessionImpl(C
         iter = optMap.find("max-agent-age");
         if (iter != optMap.end())
             maxAgentAgeMinutes = iter->second.asUint32();
+
+        iter = optMap.find("listen-on-direct");
+        if (iter != optMap.end())
+            listenOnDirect = iter->second.asBool();
+
+        iter = optMap.find("strict-security");
+        if (iter != optMap.end())
+            strictSecurity = iter->second.asBool();
     }
 }
 
@@ -148,24 +157,26 @@ void ConsoleSessionImpl::open()
     directBase = "qmf." + domain + ".direct";
     topicBase = "qmf." + domain + ".topic";
 
-    string myKey("qmf-console-" + qpid::types::Uuid(true).str());
+    string myKey("direct-console." + qpid::types::Uuid(true).str());
 
-    replyAddress = Address(directBase + "/" + myKey + ";{node:{type:topic}}");
+    replyAddress = Address(topicBase + "/" + myKey + ";{node:{type:topic}}");
 
     // Create AMQP session, receivers, and senders
     session = connection.createSession();
     Receiver directRx = session.createReceiver(replyAddress);
     Receiver topicRx = session.createReceiver(topicBase + "/agent.#"); // TODO: be more discriminating
-    Receiver legacyRx = session.createReceiver("amq.direct/" + myKey + ";{node:{type:topic}}");
+    if (!strictSecurity) {
+        Receiver legacyRx = session.createReceiver("amq.direct/" + myKey + ";{node:{type:topic}}");
+        legacyRx.setCapacity(64);
+        directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}");
+        directSender.setCapacity(128);
+    }
 
     directRx.setCapacity(64);
     topicRx.setCapacity(128);
-    legacyRx.setCapacity(64);
 
-    directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}");
     topicSender = session.createSender(topicBase + ";{create:never,node:{type:topic}}");
 
-    directSender.setCapacity(64);
     topicSender.setCapacity(128);
 
     // Start the receiver thread
@@ -371,7 +382,9 @@ void ConsoleSessionImpl::sendBrokerLocat
     msg.setCorrelationId("broker-locate");
     msg.setSubject("broker");
 
-    directSender.send(msg);
+    Sender sender = session.createSender(directBase + ";{create:never,node:{type:topic}}");
+    sender.send(msg);
+    sender.close();
 
     QPID_LOG(trace, "SENT AgentLocate to broker");
 }
@@ -468,6 +481,7 @@ void ConsoleSessionImpl::handleAgentUpda
             //
             // This is a refresh of an agent we are already tracking.
             //
+            bool detectedRestart(false);
             agent = aIter->second;
             AgentImpl& impl(AgentImplAccess::get(agent));
             impl.touch();
@@ -480,6 +494,7 @@ void ConsoleSessionImpl::handleAgentUpda
                 auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_RESTART));
                 eventImpl->setAgent(agent);
                 enqueueEventLH(ConsoleEvent(eventImpl.release()));
+                detectedRestart = true;
             }
 
             iter = attrs.find(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP);
@@ -488,12 +503,14 @@ void ConsoleSessionImpl::handleAgentUpda
                 if (ts > impl.getAttribute(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP).asUint64()) {
                     //
                     // The agent has added new schema entries since we last heard from it.
-                    // Enqueue a notification.
+                    // Update the attribute and, if this doesn't accompany a restart, enqueue a notification.
                     //
-                    auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_SCHEMA_UPDATE));
-                    eventImpl->setAgent(agent);
-                    impl.setAttribute(iter->first, iter->second);
-                    enqueueEventLH(ConsoleEvent(eventImpl.release()));
+                    if (!detectedRestart) {
+                        auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_SCHEMA_UPDATE));
+                        eventImpl->setAgent(agent);
+                        enqueueEventLH(ConsoleEvent(eventImpl.release()));
+                    }
+                    impl.setAttribute(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP, iter->second);
                 }
             }
         }

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSessionImpl.h?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSessionImpl.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSessionImpl.h Tue Feb  8 16:16:49 2011
@@ -72,7 +72,10 @@ namespace qmf {
         qpid::messaging::Sender directSender;
         qpid::messaging::Sender topicSender;
         std::string domain;
+        std::string authUser;
         uint32_t maxAgentAgeMinutes;
+        bool listenOnDirect;
+        bool strictSecurity;
         Query agentQuery;
         bool opened;
         std::queue<ConsoleEvent> eventQueue;

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qmf/SchemaId.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qmf/SchemaId.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qmf/SchemaId.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qmf/SchemaId.cpp Tue Feb  8 16:16:49 2011
@@ -78,7 +78,8 @@ Variant::Map SchemaIdImpl::asMap() const
         result["_type"] = "_data";
     else
         result["_type"] = "_event";
-    result["_hash"] = hash;
+    if (!hash.isNull())
+        result["_hash"] = hash;
     return result;
 }
 

Propchange: qpid/branches/qpid-2935/qpid/cpp/src/qmf/engine/Agent.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb  8 16:16:49 2011
@@ -0,0 +1 @@
+/qpid/trunk/qpid/cpp/src/qmf/engine/Agent.cpp:1061302-1068442

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Tue Feb  8 16:16:49 2011
@@ -362,7 +362,7 @@ uint32_t ManagementAgentImpl::pollCallba
         methodQueue.pop_front();
         {
             sys::Mutex::ScopedUnlock unlock(agentLock);
-            invokeMethodRequest(item->body, item->cid, item->replyTo, item->userId);
+            invokeMethodRequest(item->body, item->cid, item->replyToExchange, item->replyToKey, item->userId);
             delete item;
         }
     }
@@ -497,7 +497,7 @@ void ManagementAgentImpl::sendHeartbeat(
     QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
 }
 
-void ManagementAgentImpl::sendException(const string& replyToKey, const string& cid,
+void ManagementAgentImpl::sendException(const string& rte, const string& rtk, const string& cid,
                                         const string& text, uint32_t code)
 {
     Variant::Map map;
@@ -514,12 +514,12 @@ void ManagementAgentImpl::sendException(
     map["_values"] = values;
 
     MapCodec::encode(map, content);
-    connThreadBody.sendBuffer(content, cid, headers, directExchange, replyToKey);
+    connThreadBody.sendBuffer(content, cid, headers, rte, rtk);
 
     QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text);
 }
 
-void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo)
+void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& rte, const string& rtk)
 {
     sys::Mutex::ScopedLock lock(agentLock);
     string packageName;
@@ -546,7 +546,7 @@ void ManagementAgentImpl::handleSchemaRe
             outBuffer.putRawData(body);
             outLen = MA_BUFFER_SIZE - outBuffer.available();
             outBuffer.reset();
-            connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+            connThreadBody.sendBuffer(outBuffer, outLen, rte, rtk);
 
             QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name);
         }
@@ -561,7 +561,7 @@ void ManagementAgentImpl::handleConsoleA
     QPID_LOG(trace, "RCVD ConsoleAddedInd");
 }
 
-void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& cid, const string& replyTo, const string& userId)
+void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& cid, const string& rte, const string& rtk, const string& userId)
 {
     string  methodName;
     bool    failed = false;
@@ -572,11 +572,9 @@ void ManagementAgentImpl::invokeMethodRe
 
     MapCodec::decode(body, inMap);
 
-    outMap["_values"] = Variant::Map();
-
     if ((oid = inMap.find("_object_id")) == inMap.end() ||
         (mid = inMap.find("_method_name")) == inMap.end()) {
-        sendException(replyTo, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID),
+        sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID),
                       Manageable::STATUS_PARAMETER_INVALID);
         failed = true;
     } else {
@@ -595,6 +593,8 @@ void ManagementAgentImpl::invokeMethodRe
                 inArgs = (mid->second).asMap();
             }
 
+            QPID_LOG(trace, "Invoking Method: name=" << methodName << " args=" << inArgs);
+
             boost::shared_ptr<ManagementObject> oPtr;
             {
                 sys::Mutex::ScopedLock lock(agentLock);
@@ -604,7 +604,7 @@ void ManagementAgentImpl::invokeMethodRe
             }
 
             if (oPtr.get() == 0) {
-                sendException(replyTo, cid, Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT),
+                sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT),
                               Manageable::STATUS_UNKNOWN_OBJECT);
                 failed = true;
             } else {
@@ -617,13 +617,13 @@ void ManagementAgentImpl::invokeMethodRe
                         if (iter->first != "_status_code" && iter->first != "_status_text")
                             outMap["_arguments"].asMap()[iter->first] = iter->second;
                 } else {
-                    sendException(replyTo, cid, callMap["_status_text"], callMap["_status_code"]);
+                    sendException(rte, rtk, cid, callMap["_status_text"], callMap["_status_code"]);
                     failed = true;
                 }
             }
 
         } catch(types::InvalidConversion& e) {
-            sendException(replyTo, cid, e.what(), Manageable::STATUS_EXCEPTION);
+            sendException(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION);
             failed = true;
         }
     }
@@ -635,11 +635,11 @@ void ManagementAgentImpl::invokeMethodRe
         headers["qmf.opcode"] = "_method_response";
         QPID_LOG(trace, "SENT MethodResponse map=" << outMap);
         MapCodec::encode(outMap, content);
-        connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo);
+        connThreadBody.sendBuffer(content, cid, headers, rte, rtk);
     }
 }
 
-void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, const string& replyTo)
+void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, const string& rte, const string& rtk)
 {
     moveNewObjectsLH();
 
@@ -666,12 +666,12 @@ void ManagementAgentImpl::handleGetQuery
      */
     i = inMap.find("_what");
     if (i == inMap.end()) {
-        sendException(replyTo, cid, "_what element missing in Query");
+        sendException(rte, rtk, cid, "_what element missing in Query");
         return;
     }
 
     if (i->second.getType() != qpid::types::VAR_STRING) {
-        sendException(replyTo, cid, "_what element is not a string");
+        sendException(rte, rtk, cid, "_what element is not a string");
         return;
     }
 
@@ -709,8 +709,8 @@ void ManagementAgentImpl::handleGetQuery
                 headers.erase("partial");
 
                 ListCodec::encode(list_, content);
-                connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
-                QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo);
+                connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list");
+                QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << rte << "/" << rtk);
                 return;
             }
         } else { // match using schema_id, if supplied
@@ -771,8 +771,8 @@ void ManagementAgentImpl::handleGetQuery
                         if (++objCount >= maxV2ReplyObjs) {
                             objCount = 0;
                             ListCodec::encode(list_, content);
-                            connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
-                            QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo);
+                            connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list");
+                            QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << rte << "/" << rtk);
                             content.clear();
                             list_.clear();
                         }
@@ -784,8 +784,8 @@ void ManagementAgentImpl::handleGetQuery
         // Send last "non-partial" message to indicate CommandComplete
         headers.erase("partial");
         ListCodec::encode(list_, content);
-        connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
-        QPID_LOG(trace, "SENT QueryResponse (empty with no 'partial' indicator) to=" << replyTo);
+        connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list");
+        QPID_LOG(trace, "SENT QueryResponse (last message, no 'partial' indicator) to=" << rte << "/" << rtk);
 
     } else if (i->second.asString() == "SCHEMA_ID") {
         headers["qmf.content"] = "_schema_id";
@@ -806,16 +806,16 @@ void ManagementAgentImpl::handleGetQuery
 
         headers.erase("partial");
         ListCodec::encode(list_, content);
-        connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
-        QPID_LOG(trace, "SENT QueryResponse (SchemaId) to=" << replyTo);
+        connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list");
+        QPID_LOG(trace, "SENT QueryResponse (SchemaId) to=" << rte << "/" << rtk);
 
     } else {
         // Unknown query target
-        sendException(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported");
+        sendException(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported");
     }
 }
 
-void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& replyTo)
+void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& rte, const string& rtk)
 {
     QPID_LOG(trace, "RCVD AgentLocateRequest");
 
@@ -829,9 +829,9 @@ void ManagementAgentImpl::handleLocateRe
 
     getHeartbeatContent(map);
     MapCodec::encode(map, content);
-    connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo);
+    connThreadBody.sendBuffer(content, cid, headers, rte, rtk);
 
-    QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo);
+    QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << rte << "/" << rtk);
 
     {
         sys::Mutex::ScopedLock lock(agentLock);
@@ -839,12 +839,12 @@ void ManagementAgentImpl::handleLocateRe
     }
 }
 
-void ManagementAgentImpl::handleMethodRequest(const string& body, const string& cid, const string& replyTo, const string& userId)
+void ManagementAgentImpl::handleMethodRequest(const string& body, const string& cid, const string& rte, const string& rtk, const string& userId)
 {
     if (extThread) {
         sys::Mutex::ScopedLock lock(agentLock);
 
-        methodQueue.push_back(new QueuedMethod(cid, replyTo, body, userId));
+        methodQueue.push_back(new QueuedMethod(cid, rte, rtk, body, userId));
         if (pipeHandle != 0) {
             pipeHandle->write("X", 1);
         } else if (notifyable != 0) {
@@ -863,7 +863,7 @@ void ManagementAgentImpl::handleMethodRe
             inCallback = false;
         }
     } else {
-        invokeMethodRequest(body, cid, replyTo, userId);
+        invokeMethodRequest(body, cid, rte, rtk, userId);
     }
 
     QPID_LOG(trace, "RCVD MethodRequest");
@@ -871,10 +871,12 @@ void ManagementAgentImpl::handleMethodRe
 
 void ManagementAgentImpl::received(Message& msg)
 {
+    string   replyToExchange;
     string   replyToKey;
     framing::MessageProperties mp = msg.getMessageProperties();
     if (mp.hasReplyTo()) {
         const framing::ReplyTo& rt = mp.getReplyTo();
+        replyToExchange = rt.getExchange();
         replyToKey = rt.getRoutingKey();
     }
 
@@ -887,9 +889,9 @@ void ManagementAgentImpl::received(Messa
         string opcode = mp.getApplicationHeaders().getAsString("qmf.opcode");
         string cid = msg.getMessageProperties().getCorrelationId();
 
-        if      (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToKey);
-        else if (opcode == "_method_request")       handleMethodRequest(msg.getData(), cid, replyToKey, userId);
-        else if (opcode == "_query_request")        handleGetQuery(msg.getData(), cid, replyToKey);
+        if      (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToExchange, replyToKey);
+        else if (opcode == "_method_request")       handleMethodRequest(msg.getData(), cid, replyToExchange, replyToKey, userId);
+        else if (opcode == "_query_request")        handleGetQuery(msg.getData(), cid, replyToExchange, replyToKey);
         else {
             QPID_LOG(warning, "Support for QMF V2 Opcode [" << opcode << "] TBD!!!");
         }
@@ -906,7 +908,7 @@ void ManagementAgentImpl::received(Messa
 
     if (checkHeader(inBuffer, &opcode, &sequence))
     {
-        if      (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToKey);
+        if      (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey);
         else if (opcode == 'x') handleConsoleAddedIndication();
         else
             QPID_LOG(warning, "Ignoring old-format QMF Request! opcode=" << char(opcode));

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h Tue Feb  8 16:16:49 2011
@@ -128,11 +128,12 @@ class ManagementAgentImpl : public Manag
     };
 
     struct QueuedMethod {
-    QueuedMethod(const std::string& _cid, const std::string& _reply, const std::string& _body, const std::string& _uid) :
-        cid(_cid), replyTo(_reply), body(_body), userId(_uid) {}
+    QueuedMethod(const std::string& _cid, const std::string& _rte, const std::string& _rtk, const std::string& _body, const std::string& _uid) :
+        cid(_cid), replyToExchange(_rte), replyToKey(_rtk), body(_body), userId(_uid) {}
 
         std::string cid;
-        std::string replyTo;
+        std::string replyToExchange;
+        std::string replyToKey;
         std::string body;
         std::string userId;
     };
@@ -278,16 +279,16 @@ class ManagementAgentImpl : public Manag
                                                 uint8_t type=ManagementItem::CLASS_KIND_TABLE);
     bool checkHeader  (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
     void sendHeartbeat();
-    void sendException(const std::string& replyToKey, const std::string& cid,
+    void sendException(const std::string& replyToExchange, const std::string& replyToKey, const std::string& cid,
                        const std::string& text, uint32_t code=1);
     void handlePackageRequest (qpid::framing::Buffer& inBuffer);
     void handleClassQuery     (qpid::framing::Buffer& inBuffer);
-    void handleSchemaRequest  (qpid::framing::Buffer& inBuffer, uint32_t sequence, const std::string& replyTo);
-    void invokeMethodRequest  (const std::string& body, const std::string& cid, const std::string& replyTo, const std::string& userId);
+    void handleSchemaRequest  (qpid::framing::Buffer& inBuffer, uint32_t sequence, const std::string& rte, const std::string& rtk);
+    void invokeMethodRequest  (const std::string& body, const std::string& cid, const std::string& rte, const std::string& rtk, const std::string& userId);
 
-    void handleGetQuery       (const std::string& body, const std::string& cid, const std::string& replyTo);
-    void handleLocateRequest  (const std::string& body, const std::string& sequence, const std::string& replyTo);
-    void handleMethodRequest  (const std::string& body, const std::string& sequence, const std::string& replyTo, const std::string& userId);
+    void handleGetQuery       (const std::string& body, const std::string& cid, const std::string& rte, const std::string& rtk);
+    void handleLocateRequest  (const std::string& body, const std::string& sequence, const std::string& rte, const std::string& rtk);
+    void handleMethodRequest  (const std::string& body, const std::string& sequence, const std::string& rte, const std::string& rtk, const std::string& userId);
     void handleConsoleAddedIndication();
     void getHeartbeatContent  (qpid::types::Variant::Map& map);
 };

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Connection.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Connection.cpp Tue Feb  8 16:16:49 2011
@@ -160,7 +160,10 @@ void Connection::received(framing::AMQFr
     if (frame.getChannel() == 0 && frame.getMethod()) {
         adapter.handle(frame);
     } else {
-        getChannel(frame.getChannel()).in(frame);
+        if (adapter.isOpen())
+            getChannel(frame.getChannel()).in(frame);
+        else
+            close(connection::CLOSE_CODE_FRAMING_ERROR, "Connection not yet open, invalid frame received.");
     }
 
     if (isLink) //i.e. we are acting as the client to another broker
@@ -184,7 +187,8 @@ bool isMessage(const AMQMethodBody* meth
 
 void Connection::recordFromServer(const framing::AMQFrame& frame)
 {
-    if (mgmtObject != 0)
+    // Don't record management stats in cluster-unsafe contexts
+    if (mgmtObject != 0 && isClusterSafe())
     {
         mgmtObject->inc_framesToClient();
         mgmtObject->inc_bytesToClient(frame.encodedSize());
@@ -196,7 +200,8 @@ void Connection::recordFromServer(const 
 
 void Connection::recordFromClient(const framing::AMQFrame& frame)
 {
-    if (mgmtObject != 0)
+    // Don't record management stats in cluster-unsafe contexts
+    if (mgmtObject != 0 && isClusterSafe())
     {
         mgmtObject->inc_framesFromClient();
         mgmtObject->inc_bytesFromClient(frame.encodedSize());

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Tue Feb  8 16:16:49 2011
@@ -255,8 +255,17 @@ MessageStore* LinkRegistry::getStore() c
     return store;
 }
 
-Link::shared_ptr LinkRegistry::findLink(const std::string& key)
+Link::shared_ptr LinkRegistry::findLink(const std::string& keyOrMgmtId)
 {
+    // Convert keyOrMgmtId to a host:port key.
+    //
+    // TODO aconway 2011-02-01: centralize code that constructs/parses
+    // connection management IDs. Currently sys:: protocol factories
+    // and IO plugins construct the IDs and LinkRegistry parses them.
+    size_t separator = keyOrMgmtId.find('-');
+    if (separator == std::string::npos) separator = 0;
+    std::string key =  keyOrMgmtId.substr(separator+1, std::string::npos);
+
     Mutex::ScopedLock locker(lock);
     LinkMap::iterator l = links.find(key);
     if (l != links.end()) return l->second;

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Tue Feb  8 16:16:49 2011
@@ -372,7 +372,8 @@ void SessionAdapter::QueueHandlerImpl::d
             }
 
             //apply settings & create persistent record if required
-            queue_created.first->create(arguments);
+            try { queue_created.first->create(arguments); }
+            catch (...) { getBroker().getQueues().destroy(name); throw; }
 
             //add default binding:
             getBroker().getExchanges().getDefault()->bind(queue, name, 0);

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp Tue Feb  8 16:16:49 2011
@@ -194,7 +194,7 @@ void SslProtocolFactory::established(sys
                                      const qpid::sys::Socket& s,
                                      sys::ConnectionCodec::Factory* f,
                                      bool isClient) {
-    sys::AsynchIOHandler* async = new sys::AsynchIOHandler(s.getPeerAddress(), f);
+    sys::AsynchIOHandler* async = new sys::AsynchIOHandler(s.getFullAddress(), f);
 
     if (tcpNoDelay) {
         s.setTcpNoDelay();

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/ConnectionSettings.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/ConnectionSettings.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/ConnectionSettings.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/ConnectionSettings.cpp Tue Feb  8 16:16:49 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,7 +30,7 @@ namespace client {
 
 ConnectionSettings::ConnectionSettings() :
     protocol("tcp"),
-    host("localhost"), 
+    host("localhost"),
     port(5672),
     locale("en_US"),
     heartbeat(0),
@@ -40,7 +40,8 @@ ConnectionSettings::ConnectionSettings()
     tcpNoDelay(false),
     service(qpid::saslName),
     minSsf(0),
-    maxSsf(256)
+    maxSsf(256),
+    sslCertName("")
 {}
 
 ConnectionSettings::~ConnectionSettings() {}

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/SslConnector.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/SslConnector.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/SslConnector.cpp Tue Feb  8 16:16:49 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -130,7 +130,7 @@ class SslConnector : public Connector
 
 public:
     SslConnector(Poller::shared_ptr p, framing::ProtocolVersion pVersion,
-              const ConnectionSettings&, 
+              const ConnectionSettings&,
               ConnectionImpl*);
 };
 
@@ -170,7 +170,7 @@ SslConnector::SslConnector(Poller::share
                      const ConnectionSettings& settings,
                      ConnectionImpl* cimpl)
     : maxFrameSize(settings.maxFrameSize),
-      version(ver), 
+      version(ver),
       initiated(false),
       closed(true),
       shutdownHandler(0),
@@ -179,8 +179,11 @@ SslConnector::SslConnector(Poller::share
       poller(p)
 {
     QPID_LOG(debug, "SslConnector created for " << version.toString());
-    //TODO: how do we want to handle socket configuration with ssl?
-    //settings.configureSocket(socket);
+
+    if (settings.sslCertName != "") {
+        QPID_LOG(debug, "ssl-cert-name = " << settings.sslCertName);
+        socket.setCertName(settings.sslCertName);
+    }
 }
 
 SslConnector::~SslConnector() {
@@ -244,14 +247,14 @@ void SslConnector::setShutdownHandler(Sh
 }
 
 OutputHandler* SslConnector::getOutputHandler() {
-    return this; 
+    return this;
 }
 
 sys::ShutdownHandler* SslConnector::getShutdownHandler() const {
     return shutdownHandler;
 }
 
-const std::string& SslConnector::getIdentifier() const { 
+const std::string& SslConnector::getIdentifier() const {
     return identifier;
 }
 
@@ -271,7 +274,7 @@ void SslConnector::Writer::init(std::str
     aio = a;
     newBuffer();
 }
-void SslConnector::Writer::handle(framing::AMQFrame& frame) { 
+void SslConnector::Writer::handle(framing::AMQFrame& frame) {
     Mutex::ScopedLock l(lock);
     frames.push_back(frame);
     if (frame.getEof() || (bounds && bounds->getCurrentSize() >= maxFrameSize)) {
@@ -372,7 +375,7 @@ const SecuritySettings* SslConnector::ge
 {
     securitySettings.ssf = socket.getKeyLen();
     securitySettings.authid = "dummy";//set to non-empty string to enable external authentication
-    return &securitySettings; 
+    return &securitySettings;
 }
 
 }} // namespace qpid::client

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Tue Feb  8 16:16:49 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -42,7 +42,7 @@ using qpid::framing::Uuid;
 void convert(const Variant::List& from, std::vector<std::string>& to)
 {
     for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) {
-        to.push_back(i->asString());                
+        to.push_back(i->asString());
     }
 }
 
@@ -108,9 +108,11 @@ void convert(const Variant::Map& from, C
     setIfFound(from, "bounds", to.bounds);
 
     setIfFound(from, "transport", to.protocol);
+
+    setIfFound(from, "ssl-cert-name", to.sslCertName);
 }
 
-ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) : 
+ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) :
     reconnect(false), timeout(-1), limit(-1),
     minReconnectInterval(3), maxReconnectInterval(60),
     retries(0), reconnectOnLimitExceeded(true)
@@ -135,7 +137,7 @@ void ConnectionImpl::setOptions(const Va
         setIfFound(options, "reconnect-interval-max", maxReconnectInterval);
     }
     setIfFound(options, "reconnect-urls", urls);
-    setIfFound(options, "x-reconnect-on-limit-exceeded", reconnectOnLimitExceeded);    
+    setIfFound(options, "x-reconnect-on-limit-exceeded", reconnectOnLimitExceeded);
 }
 
 void ConnectionImpl::setOption(const std::string& name, const Variant& value)
@@ -216,7 +218,7 @@ qpid::messaging::Session ConnectionImpl:
         } catch (const qpid::SessionException& e) {
             throw qpid::messaging::SessionError(e.what());
         } catch (const std::exception& e) {
-            throw qpid::messaging::MessagingException(e.what());            
+            throw qpid::messaging::MessagingException(e.what());
         }
     }
     return impl;

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h Tue Feb  8 16:16:49 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Tue Feb  8 16:16:49 2011
@@ -497,7 +497,7 @@ void UpdateClient::updateConsumer(
         ci->isNotifyEnabled(),
         ci->position
     );
-    consumerNumbering.add(ci);
+    consumerNumbering.add(ci.get());
 
     QPID_LOG(debug, *this << " updated consumer " << ci->getName()
              << " on " << shadowSession.getId());
@@ -584,10 +584,9 @@ void UpdateClient::updateQueueListeners(
 }
 
 void UpdateClient::updateQueueListener(std::string& q,
-                                  const boost::shared_ptr<broker::Consumer>& c)
+                                       const boost::shared_ptr<broker::Consumer>& c)
 {
-    const boost::shared_ptr<SemanticState::ConsumerImpl> ci =
-        boost::dynamic_pointer_cast<SemanticState::ConsumerImpl>(c);
+    SemanticState::ConsumerImpl* ci = dynamic_cast<SemanticState::ConsumerImpl*>(c.get());
     size_t n = consumerNumbering[ci];
     if (n >= consumerNumbering.size())
         throw Exception(QPID_MSG("Unexpected listener on queue " << q));

Propchange: qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb  8 16:16:49 2011
@@ -0,0 +1 @@
+/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp:1061302-1068442

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.h?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.h Tue Feb  8 16:16:49 2011
@@ -106,7 +106,7 @@ class UpdateClient : public sys::Runnabl
     void updateBridge(const boost::shared_ptr<broker::Bridge>&);
 
 
-    Numbering<broker::SemanticState::ConsumerImpl::shared_ptr> consumerNumbering;
+    Numbering<broker::SemanticState::ConsumerImpl*> consumerNumbering;
     MemberId updaterId;
     MemberId updateeId;
     Url updateeUrl;

Propchange: qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb  8 16:16:49 2011
@@ -0,0 +1 @@
+/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h:1061302-1068442

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/framing/Buffer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/framing/Buffer.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/framing/Buffer.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/framing/Buffer.cpp Tue Feb  8 16:16:49 2011
@@ -246,6 +246,7 @@ void Buffer::putShortString(const string
     size_t slen = s.length();
     if (slen <= std::numeric_limits<uint8_t>::max()) {
         uint8_t len = (uint8_t) slen;
+        checkAvailable(slen + 1);
         putOctet(len);
         s.copy(data + position, len);
         position += len;
@@ -258,6 +259,7 @@ void Buffer::putMediumString(const strin
     size_t slen = s.length();
     if (slen <= std::numeric_limits<uint16_t>::max()) {
         uint16_t len = (uint16_t) slen;
+        checkAvailable(slen + 2);
         putShort(len);
         s.copy(data + position, len);
         position += len;
@@ -268,6 +270,7 @@ void Buffer::putMediumString(const strin
 
 void Buffer::putLongString(const string& s){
     uint32_t len = s.length();
+    checkAvailable(len + 4);
     putLong(len);
     s.copy(data + position, len);
     position += len;    
@@ -301,6 +304,7 @@ void Buffer::getBin128(uint8_t* b){
 
 void Buffer::putRawData(const string& s){
     uint32_t len = s.length();
+    checkAvailable(len);
     s.copy(data + position, len);
     position += len;    
 }
@@ -312,6 +316,7 @@ void Buffer::getRawData(string& s, uint3
 }
 
 void Buffer::putRawData(const uint8_t* s, size_t len){
+    checkAvailable(len);
     memcpy(data + position, s, len);
     position += len;    
 }

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.cpp Tue Feb  8 16:16:49 2011
@@ -106,7 +106,8 @@ ManagementAgent::ManagementAgent (const 
     startTime(sys::now()),
     suppressed(false), disallowAllV1Methods(false),
     vendorNameKey(defaultVendorName), productNameKey(defaultProductName),
-    qmf1Support(qmfV1), qmf2Support(qmfV2), maxV2ReplyObjs(100)
+    qmf1Support(qmfV1), qmf2Support(qmfV2), maxReplyObjs(100),
+    msgBuffer(MA_BUFFER_SIZE)
 {
     nextObjectId   = 1;
     brokerBank     = 1;
@@ -502,7 +503,7 @@ bool ManagementAgent::checkHeader (Buffe
 void ManagementAgent::sendBufferLH(Buffer&  buf,
                                    uint32_t length,
                                    qpid::broker::Exchange::shared_ptr exchange,
-                                   string   routingKey)
+                                   const string&  routingKey)
 {
     if (suppressed) {
         QPID_LOG(debug, "Suppressing management message to " << routingKey);
@@ -547,6 +548,17 @@ void ManagementAgent::sendBufferLH(Buffe
 }
 
 
+void ManagementAgent::sendBufferLH(Buffer&  buf,
+                                   uint32_t length,
+                                   const string& exchange,
+                                   const string& routingKey)
+{
+    qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange));
+    if (ex.get() != 0)
+        sendBufferLH(buf, length, ex, routingKey);
+}
+
+
 // NOTE WELL: assumes userLock is held by caller (LH)
 // NOTE EVEN WELLER: drops this lock when delivering the message!!!
 void ManagementAgent::sendBufferLH(const string& data,
@@ -611,6 +623,20 @@ void ManagementAgent::sendBufferLH(const
 }
 
 
+void ManagementAgent::sendBufferLH(const string& data,
+                                   const string& cid,
+                                   const Variant::Map& headers,
+                                   const string& content_type,
+                                   const string& exchange,
+                                   const string& routingKey,
+                                   uint64_t ttl_msec)
+{
+    qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange));
+    if (ex.get() != 0)
+        sendBufferLH(data, cid, headers, content_type, ex, routingKey, ttl_msec);
+}
+
+
 /** Objects that have been added since the last periodic poll are temporarily
  * saved in the newManagementObjects list.  This allows objects to be
  * added without needing to block on the userLock (addLock is used instead).
@@ -663,7 +689,6 @@ void ManagementAgent::periodicProcessing
 #define HEADROOM  4096
     debugSnapshot("Management agent periodic processing");
     sys::Mutex::ScopedLock lock (userLock);
-    char                msgChars[BUFSIZE];
     uint32_t            contentSize;
     string              routingKey;
     string sBuf;
@@ -704,7 +729,7 @@ void ManagementAgent::periodicProcessing
         for (PendingDeletedObjsMap::iterator mIter = tmp.begin(); mIter != tmp.end(); mIter++) {
             std::string packageName;
             std::string className;
-            Buffer msgBuffer(msgChars, BUFSIZE);
+            msgBuffer.reset();
             uint32_t v1Objs = 0;
             uint32_t v2Objs = 0;
             Variant::List list_;
@@ -715,6 +740,7 @@ void ManagementAgent::periodicProcessing
 
             for (DeletedObjectList::iterator lIter = mIter->second.begin();
                  lIter != mIter->second.end(); lIter++) {
+                msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space.
                 std::string oid = (*lIter)->objectId;
                 if (!(*lIter)->encodedV1Config.empty()) {
                     encodeHeader(msgBuffer, 'c');
@@ -730,9 +756,9 @@ void ManagementAgent::periodicProcessing
                              << " len=" <<  (*lIter)->encodedV1Inst.size());
                     v1Objs++;
                 }
-                if (v1Objs && msgBuffer.available() < HEADROOM) {
+                if (v1Objs >= maxReplyObjs) {
                     v1Objs = 0;
-                    contentSize = BUFSIZE - msgBuffer.available();
+                    contentSize = msgBuffer.getSize();
                     stringstream key;
                     key << "console.obj.1.0." << packageName << "." << className;
                     msgBuffer.reset();
@@ -744,7 +770,7 @@ void ManagementAgent::periodicProcessing
                 if (!(*lIter)->encodedV2.empty()) {
                     QPID_LOG(trace, "Deleting V2 " << "map=" << (*lIter)->encodedV2);
                     list_.push_back((*lIter)->encodedV2);
-                    if (++v2Objs >= maxV2ReplyObjs) {
+                    if (++v2Objs >= maxReplyObjs) {
                         v2Objs = 0;
 
                         string content;
@@ -815,11 +841,11 @@ void ManagementAgent::periodicProcessing
     // sendBuffer() call, so always restart the search after a sendBuffer() call
     //
     while (1) {
-        Buffer msgBuffer(msgChars, BUFSIZE);
+        msgBuffer.reset();
         Variant::List list_;
         uint32_t pcount;
         uint32_t scount;
-        uint32_t v2Objs;
+        uint32_t v1Objs, v2Objs;
         ManagementObjectMap::iterator baseIter;
         std::string packageName;
         std::string className;
@@ -842,6 +868,7 @@ void ManagementAgent::periodicProcessing
             break;  // done - all objects processed
 
         pcount = scount = 0;
+        v1Objs = 0;
         v2Objs = 0;
         list_.clear();
         msgBuffer.reset();
@@ -849,6 +876,7 @@ void ManagementAgent::periodicProcessing
         for (ManagementObjectMap::iterator iter = baseIter;
              iter != managementObjects.end();
              iter++) {
+            msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space
             ManagementObject* baseObject = baseIter->second;
             ManagementObject* object = iter->second;
             bool send_stats, send_props;
@@ -875,6 +903,7 @@ void ManagementAgent::periodicProcessing
                     QPID_LOG(trace, "Changed V1 properties "
                              << object->getObjectId().getV2Key()
                              << " len=" << msgBuffer.getPosition()-pos);
+                    ++v1Objs;
                 }
 
                 if (send_stats && qmf1Support) {
@@ -886,7 +915,7 @@ void ManagementAgent::periodicProcessing
                     QPID_LOG(trace, "Changed V1 statistics "
                              << object->getObjectId().getV2Key()
                              << " len=" << msgBuffer.getPosition()-pos);
-
+                    ++v1Objs;
                 }
 
                 if ((send_stats || send_props) && qmf2Support) {
@@ -916,8 +945,8 @@ void ManagementAgent::periodicProcessing
 
                 object->setForcePublish(false);
 
-                if ((qmf1Support && (msgBuffer.available() < HEADROOM)) ||
-                    (qmf2Support && (v2Objs >= maxV2ReplyObjs)))
+                if ((qmf1Support && (v1Objs >= maxReplyObjs)) ||
+                    (qmf2Support && (v2Objs >= maxReplyObjs)))
                     break;  // have enough objects, send an indication...
             }
         }
@@ -1102,7 +1131,7 @@ void ManagementAgent::sendCommandComplet
              replyToKey << " seq=" << sequence);
 }
 
-void ManagementAgent::sendExceptionLH(const string& replyToKey, const string& cid,
+void ManagementAgent::sendExceptionLH(const string& rte, const string& rtk, const string& cid,
                                       const string& text, uint32_t code, bool viaLocal)
 {
     static const string addr_exchange("qmf.default.direct");
@@ -1121,7 +1150,7 @@ void ManagementAgent::sendExceptionLH(co
     map["_values"] = values;
 
     MapCodec::encode(map, content);
-    sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyToKey);
+    sendBufferLH(content, cid, headers, "amqp/map", rte, rtk);
 
     QPID_LOG(debug, "SENT Exception code=" << code <<" text=" << text);
 }
@@ -1291,7 +1320,7 @@ void ManagementAgent::handleMethodReques
 }
 
 
-void ManagementAgent::handleMethodRequestLH (const string& body, const string& replyTo,
+void ManagementAgent::handleMethodRequestLH (const string& body, const string& rte, const string& rtk,
                                              const string& cid, const ConnectionToken* connToken, bool viaLocal)
 {
     moveNewObjectsLH();
@@ -1313,7 +1342,7 @@ void ManagementAgent::handleMethodReques
 
     if ((oid = inMap.find("_object_id")) == inMap.end() ||
         (mid = inMap.find("_method_name")) == inMap.end()) {
-        sendExceptionLH(replyTo, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID),
+        sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID),
                         Manageable::STATUS_PARAMETER_INVALID, viaLocal);
         return;
     }
@@ -1332,7 +1361,7 @@ void ManagementAgent::handleMethodReques
             inArgs = (mid->second).asMap();
         }
     } catch(exception& e) {
-        sendExceptionLH(replyTo, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal);
+        sendExceptionLH(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal);
         return;
     }
 
@@ -1341,7 +1370,7 @@ void ManagementAgent::handleMethodReques
     if (iter == managementObjects.end() || iter->second->isDeleted()) {
         stringstream estr;
         estr << "No object found with ID=" << objId;
-        sendExceptionLH(replyTo, cid, estr.str(), 1, viaLocal);
+        sendExceptionLH(rte, rtk, cid, estr.str(), 1, viaLocal);
         return;
     }
 
@@ -1351,7 +1380,7 @@ void ManagementAgent::handleMethodReques
 
     i = disallowed.find(make_pair(iter->second->getClassName(), methodName));
     if (i != disallowed.end()) {
-        sendExceptionLH(replyTo, cid, i->second, Manageable::STATUS_FORBIDDEN, viaLocal);
+        sendExceptionLH(rte, rtk, cid, i->second, Manageable::STATUS_FORBIDDEN, viaLocal);
         return;
     }
 
@@ -1362,7 +1391,7 @@ void ManagementAgent::handleMethodReques
         params[acl::PROP_SCHEMACLASS]   = iter->second->getClassName();
 
         if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, &params)) {
-            sendExceptionLH(replyTo, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
+            sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
                             Manageable::STATUS_FORBIDDEN, viaLocal);
             return;
         }
@@ -1372,7 +1401,7 @@ void ManagementAgent::handleMethodReques
 
     QPID_LOG(debug, "RECV MethodRequest (v2) class=" << iter->second->getPackageName()
              << ":" << iter->second->getClassName() << " method=" <<
-             methodName << " replyTo=" << replyTo << " objId=" << objId << " inArgs=" << inArgs);
+             methodName << " replyTo=" << rte << "/" << rtk << " objId=" << objId << " inArgs=" << inArgs);
 
     try {
         sys::Mutex::ScopedUnlock u(userLock);
@@ -1387,18 +1416,18 @@ void ManagementAgent::handleMethodReques
         } else
             error = callMap["_status_text"].asString();
     } catch(exception& e) {
-        sendExceptionLH(replyTo, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal);
+        sendExceptionLH(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal);
         return;
     }
 
     if (errorCode != 0) {
-        sendExceptionLH(replyTo, cid, error, errorCode, viaLocal);
+        sendExceptionLH(rte, rtk, cid, error, errorCode, viaLocal);
         return;
     }
 
     MapCodec::encode(outMap, content);
-    sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
-    QPID_LOG(debug, "SEND MethodResponse (v2) to=" << replyTo << " seq=" << cid << " map=" << outMap);
+    sendBufferLH(content, cid, headers, "amqp/map", rte, rtk);
+    QPID_LOG(debug, "SEND MethodResponse (v2) to=" << rte << "/" << rtk << " seq=" << cid << " map=" << outMap);
 }
 
 
@@ -1545,7 +1574,7 @@ void ManagementAgent::SchemaClass::appen
         buf.putRawData(reinterpret_cast<uint8_t*>(&data[0]), data.size());
 }
 
-void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& replyToKey, uint32_t sequence)
+void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& rte, const string& rtk, uint32_t sequence)
 {
     string         packageName;
     SchemaClassKey key;
@@ -1554,7 +1583,7 @@ void ManagementAgent::handleSchemaReques
     key.decode(inBuffer);
 
     QPID_LOG(debug, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
-             "), replyTo=" << replyToKey << " seq=" << sequence);
+             "), replyTo=" << rte << "/" << rtk << " seq=" << sequence);
 
     PackageMap::iterator pIter = packages.find(packageName);
     if (pIter != packages.end()) {
@@ -1570,17 +1599,17 @@ void ManagementAgent::handleSchemaReques
                 classInfo.appendSchema(outBuffer);
                 outLen = MA_BUFFER_SIZE - outBuffer.available();
                 outBuffer.reset();
-                sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
-                QPID_LOG(debug, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence);
+                sendBufferLH(outBuffer, outLen, rte, rtk);
+                QPID_LOG(debug, "SEND SchemaResponse to=" << rte << "/" << rtk << " seq=" << sequence);
             }
             else
-                sendCommandCompleteLH(replyToKey, sequence, 1, "Schema not available");
+                sendCommandCompleteLH(rtk, sequence, 1, "Schema not available");
         }
         else
-            sendCommandCompleteLH(replyToKey, sequence, 1, "Class key not found");
+            sendCommandCompleteLH(rtk, sequence, 1, "Class key not found");
     }
     else
-        sendCommandCompleteLH(replyToKey, sequence, 1, "Package not found");
+        sendCommandCompleteLH(rtk, sequence, 1, "Package not found");
 }
 
 void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*replyToKey*/, uint32_t sequence)
@@ -1840,7 +1869,7 @@ void ManagementAgent::handleGetQueryLH(B
 }
 
 
-void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo, const string& cid, bool viaLocal)
+void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, const string& rtk, const string& cid, bool viaLocal)
 {
     moveNewObjectsLH();
 
@@ -1861,17 +1890,17 @@ void ManagementAgent::handleGetQueryLH(c
      */
     i = inMap.find("_what");
     if (i == inMap.end()) {
-        sendExceptionLH(replyTo, cid, "_what element missing in Query");
+        sendExceptionLH(rte, rtk, cid, "_what element missing in Query");
         return;
     }
 
     if (i->second.getType() != qpid::types::VAR_STRING) {
-        sendExceptionLH(replyTo, cid, "_what element is not a string");
+        sendExceptionLH(rte, rtk, cid, "_what element is not a string");
         return;
     }
 
     if (i->second.asString() != "OBJECT") {
-        sendExceptionLH(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported");
+        sendExceptionLH(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported");
         return;
     }
 
@@ -1930,8 +1959,8 @@ void ManagementAgent::handleGetQueryLH(c
             string content;
 
             ListCodec::encode(list_, content);
-            sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
-            QPID_LOG(debug, "SENT QueryResponse (query by object_id) to=" << replyTo);
+            sendBufferLH(content, cid, headers, "amqp/list", rte, rtk);
+            QPID_LOG(debug, "SENT QueryResponse (query by object_id) to=" << rte << "/" << rtk);
             return;
         }
     } else {
@@ -1967,7 +1996,7 @@ void ManagementAgent::handleGetQueryLH(c
                                                            "_data",
                                                            object->getMd5Sum());
                     _subList.push_back(map_);
-                    if (++objCount >= maxV2ReplyObjs) {
+                    if (++objCount >= maxReplyObjs) {
                         objCount = 0;
                         _list.push_back(_subList);
                         _subList.clear();
@@ -1983,27 +2012,26 @@ void ManagementAgent::handleGetQueryLH(c
         string content;
         while (_list.size() > 1) {
             ListCodec::encode(_list.front().asList(), content);
-            sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
+            sendBufferLH(content, cid, headers, "amqp/list", rte, rtk);
             _list.pop_front();
-            QPID_LOG(debug, "SENT QueryResponse (partial, query by schema_id) to=" << replyTo << " len=" << content.length());
+            QPID_LOG(debug, "SENT QueryResponse (partial, query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length());
         }
         headers.erase("partial");
         ListCodec::encode(_list.size() ? _list.front().asList() : Variant::List(), content);
-        sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
-        QPID_LOG(debug, "SENT QueryResponse (query by schema_id) to=" << replyTo << " len=" << content.length());
+        sendBufferLH(content, cid, headers, "amqp/list", rte, rtk);
+        QPID_LOG(debug, "SENT QueryResponse (query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length());
         return;
     }
 
     // Unrecognized query - Send empty message to indicate CommandComplete
     string content;
     ListCodec::encode(Variant::List(), content);
-    sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
-    QPID_LOG(debug, "SENT QueryResponse (empty) to=" << replyTo);
+    sendBufferLH(content, cid, headers, "amqp/list", rte, rtk);
+    QPID_LOG(debug, "SENT QueryResponse (empty) to=" << rte << "/" << rtk);
 }
 
 
-void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo,
-                                            const string& cid)
+void ManagementAgent::handleLocateRequestLH(const string&, const string& rte, const string& rtk, const string& cid)
 {
     QPID_LOG(debug, "RCVD AgentLocateRequest");
 
@@ -2021,10 +2049,10 @@ void ManagementAgent::handleLocateReques
 
     string content;
     MapCodec::encode(map, content);
-    sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
+    sendBufferLH(content, cid, headers, "amqp/map", rte, rtk);
     clientWasAdded = true;
 
-    QPID_LOG(debug, "SENT AgentLocateResponse replyTo=" << replyTo);
+    QPID_LOG(debug, "SENT AgentLocateResponse replyTo=" << rte << "/" << rtk);
 }
 
 
@@ -2151,13 +2179,14 @@ bool ManagementAgent::authorizeAgentMess
             msg.getFrames().getHeaders()->get<framing::MessageProperties>();
         if (p && p->hasReplyTo()) {
             const framing::ReplyTo& rt = p->getReplyTo();
-            string replyToKey = rt.getRoutingKey();
+            string rte = rt.getExchange();
+            string rtk = rt.getRoutingKey();
             string cid;
             if (p && p->hasCorrelationId())
                 cid = p->getCorrelationId();
 
             if (mapMsg) {
-                sendExceptionLH(replyToKey, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
+                sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
                                 Manageable::STATUS_FORBIDDEN, false);
             } else {
 
@@ -2169,7 +2198,7 @@ bool ManagementAgent::authorizeAgentMess
                 outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
                 outLen = MA_BUFFER_SIZE - outBuffer.available();
                 outBuffer.reset();
-                sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
+                sendBufferLH(outBuffer, outLen, rte, rtk);
             }
 
             QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
@@ -2183,12 +2212,14 @@ bool ManagementAgent::authorizeAgentMess
 
 void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal)
 {
-    string   replyToKey;
+    string   rte;
+    string   rtk;
     const framing::MessageProperties* p =
         msg.getFrames().getHeaders()->get<framing::MessageProperties>();
     if (p && p->hasReplyTo()) {
         const framing::ReplyTo& rt = p->getReplyTo();
-        replyToKey = rt.getRoutingKey();
+        rte = rt.getExchange();
+        rtk = rt.getRoutingKey();
     }
     else
         return;
@@ -2220,11 +2251,11 @@ void ManagementAgent::dispatchAgentComma
         }
 
         if (opcode == "_method_request")
-            return handleMethodRequestLH(body, replyToKey, cid, msg.getPublisher(), viaLocal);
+            return handleMethodRequestLH(body, rte, rtk, cid, msg.getPublisher(), viaLocal);
         else if (opcode == "_query_request")
-            return handleGetQueryLH(body, replyToKey, cid, viaLocal);
+            return handleGetQueryLH(body, rte, rtk, cid, viaLocal);
         else if (opcode == "_agent_locate_request")
-            return handleLocateRequestLH(body, replyToKey, cid);
+            return handleLocateRequestLH(body, rte, rtk, cid);
 
         QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!");
         return;
@@ -2237,16 +2268,16 @@ void ManagementAgent::dispatchAgentComma
         if (!checkHeader(inBuffer, &opcode, &sequence))
             return;
 
-        if      (opcode == 'B') handleBrokerRequestLH  (inBuffer, replyToKey, sequence);
-        else if (opcode == 'P') handlePackageQueryLH   (inBuffer, replyToKey, sequence);
-        else if (opcode == 'p') handlePackageIndLH     (inBuffer, replyToKey, sequence);
-        else if (opcode == 'Q') handleClassQueryLH     (inBuffer, replyToKey, sequence);
-        else if (opcode == 'q') handleClassIndLH       (inBuffer, replyToKey, sequence);
-        else if (opcode == 'S') handleSchemaRequestLH  (inBuffer, replyToKey, sequence);
-        else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence);
-        else if (opcode == 'A') handleAttachRequestLH  (inBuffer, replyToKey, sequence, msg.getPublisher());
-        else if (opcode == 'G') handleGetQueryLH       (inBuffer, replyToKey, sequence);
-        else if (opcode == 'M') handleMethodRequestLH  (inBuffer, replyToKey, sequence, msg.getPublisher());
+        if      (opcode == 'B') handleBrokerRequestLH  (inBuffer, rtk, sequence);
+        else if (opcode == 'P') handlePackageQueryLH   (inBuffer, rtk, sequence);
+        else if (opcode == 'p') handlePackageIndLH     (inBuffer, rtk, sequence);
+        else if (opcode == 'Q') handleClassQueryLH     (inBuffer, rtk, sequence);
+        else if (opcode == 'q') handleClassIndLH       (inBuffer, rtk, sequence);
+        else if (opcode == 'S') handleSchemaRequestLH  (inBuffer, rte, rtk, sequence);
+        else if (opcode == 's') handleSchemaResponseLH (inBuffer, rtk, sequence);
+        else if (opcode == 'A') handleAttachRequestLH  (inBuffer, rtk, sequence, msg.getPublisher());
+        else if (opcode == 'G') handleGetQueryLH       (inBuffer, rtk, sequence);
+        else if (opcode == 'M') handleMethodRequestLH  (inBuffer, rtk, sequence, msg.getPublisher());
     }
 }
 

Propchange: qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb  8 16:16:49 2011
@@ -0,0 +1 @@
+/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp:1061302-1068442

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.h Tue Feb  8 16:16:49 2011
@@ -35,6 +35,7 @@
 #include "qpid/types/Variant.h"
 #include <qpid/framing/AMQFrame.h>
 #include <qpid/framing/FieldValue.h>
+#include <qpid/framing/ResizableBuffer.h>
 #include <memory>
 #include <string>
 #include <map>
@@ -330,7 +331,7 @@ private:
 
     // Maximum # of objects allowed in a single V2 response
     // message.
-    uint32_t maxV2ReplyObjs;
+    uint32_t maxReplyObjs;
 
     // list of objects that have been deleted, but have yet to be published
     // one final time.
@@ -343,6 +344,7 @@ private:
     char inputBuffer[MA_BUFFER_SIZE];
     char outputBuffer[MA_BUFFER_SIZE];
     char eventBuffer[MA_BUFFER_SIZE];
+    framing::ResizableBuffer msgBuffer;
 
     void writeData ();
     void periodicProcessing (void);
@@ -352,7 +354,11 @@ private:
     void sendBufferLH(framing::Buffer&             buf,
                       uint32_t                     length,
                       qpid::broker::Exchange::shared_ptr exchange,
-                      std::string                  routingKey);
+                      const std::string&           routingKey);
+    void sendBufferLH(framing::Buffer&             buf,
+                      uint32_t                     length,
+                      const std::string&           exchange,
+                      const std::string&           routingKey);
     void sendBufferLH(const std::string&     data,
                       const std::string&     cid,
                       const qpid::types::Variant::Map& headers,
@@ -360,6 +366,13 @@ private:
                       qpid::broker::Exchange::shared_ptr exchange,
                       const std::string& routingKey,
                       uint64_t ttl_msec = 0);
+    void sendBufferLH(const std::string& data,
+                      const std::string& cid,
+                      const qpid::types::Variant::Map& headers,
+                      const std::string& content_type,
+                      const std::string& exchange,
+                      const std::string& routingKey,
+                      uint64_t ttl_msec = 0);
     void moveNewObjectsLH();
     bool moveDeletedObjectsLH();
 
@@ -384,20 +397,20 @@ private:
     void deleteOrphanedAgentsLH();
     void sendCommandCompleteLH(const std::string& replyToKey, uint32_t sequence,
                               uint32_t code = 0, const std::string& text = "OK");
-    void sendExceptionLH(const std::string& replyToKey, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false);
+    void sendExceptionLH(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false);
     void handleBrokerRequestLH  (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
     void handlePackageQueryLH   (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
     void handlePackageIndLH     (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
     void handleClassQueryLH     (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
     void handleClassIndLH       (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
-    void handleSchemaRequestLH  (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+    void handleSchemaRequestLH  (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence);
     void handleSchemaResponseLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
     void handleAttachRequestLH  (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
     void handleGetQueryLH       (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
     void handleMethodRequestLH  (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
-    void handleGetQueryLH       (const std::string& body, const std::string& replyToKey, const std::string& cid, bool viaLocal);
-    void handleMethodRequestLH  (const std::string& body, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal);
-    void handleLocateRequestLH  (const std::string& body, const std::string &replyToKey, const std::string& cid);
+    void handleGetQueryLH       (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, bool viaLocal);
+    void handleMethodRequestLH  (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal);
+    void handleLocateRequestLH  (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid);
 
 
     size_t validateSchema(framing::Buffer&, uint8_t kind);

Propchange: qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb  8 16:16:49 2011
@@ -0,0 +1 @@
+/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h:1061302-1068442

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp Tue Feb  8 16:16:49 2011
@@ -84,7 +84,7 @@ class RdmaIOHandler : public OutputContr
 };
 
 RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr c, qpid::sys::ConnectionCodec::Factory* f) :
-    identifier(c->getPeerName()),
+    identifier(c->getFullName()),
     factory(f),
     codec(0),
     readError(false),

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/Socket.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/Socket.h?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/Socket.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/Socket.h Tue Feb  8 16:16:49 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -60,26 +60,31 @@ public:
     QPID_COMMON_EXTERN int listen(uint16_t port = 0, int backlog = 10) const;
     QPID_COMMON_EXTERN int listen(const SocketAddress&, int backlog = 10) const;
 
-    /** Returns the "socket name" ie the address bound to 
+    /** Returns the "socket name" ie the address bound to
      * the near end of the socket
      */
     QPID_COMMON_EXTERN std::string getSockname() const;
 
-    /** Returns the "peer name" ie the address bound to 
+    /** Returns the "peer name" ie the address bound to
      * the remote end of the socket
      */
     std::string getPeername() const;
 
-    /** 
+    /**
      * Returns an address (host and port) for the remote end of the
      * socket
      */
     QPID_COMMON_EXTERN std::string getPeerAddress() const;
-    /** 
+    /**
      * Returns an address (host and port) for the local end of the
      * socket
      */
-    std::string getLocalAddress() const;
+    QPID_COMMON_EXTERN std::string getLocalAddress() const;
+
+    /**
+     * Returns the full address of the connection: local and remote host and port.
+     */
+    QPID_COMMON_EXTERN std::string getFullAddress() const { return getLocalAddress()+"-"+getPeerAddress(); }
 
     QPID_COMMON_EXTERN uint16_t getLocalPort() const;
     uint16_t getRemotePort() const;
@@ -95,7 +100,7 @@ public:
      */
     QPID_COMMON_EXTERN Socket* accept() const;
 
-    // TODO The following are raw operations, maybe they need better wrapping? 
+    // TODO The following are raw operations, maybe they need better wrapping?
     QPID_COMMON_EXTERN int read(void *buf, size_t count) const;
     QPID_COMMON_EXTERN int write(const void *buf, size_t count) const;
 

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/SslPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/SslPlugin.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/SslPlugin.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/SslPlugin.cpp Tue Feb  8 16:16:49 2011
@@ -121,7 +121,7 @@ SslProtocolFactory::SslProtocolFactory(c
 
 void SslProtocolFactory::established(Poller::shared_ptr poller, const qpid::sys::ssl::SslSocket& s,
                                           ConnectionCodec::Factory* f, bool isClient) {
-    qpid::sys::ssl::SslHandler* async = new qpid::sys::ssl::SslHandler(s.getPeerAddress(), f, nodict);
+    qpid::sys::ssl::SslHandler* async = new qpid::sys::ssl::SslHandler(s.getFullAddress(), f, nodict);
 
     if (tcpNoDelay) {
         s.setTcpNoDelay(tcpNoDelay);

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp Tue Feb  8 16:16:49 2011
@@ -81,7 +81,7 @@ AsynchIOProtocolFactory::AsynchIOProtoco
 
 void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
                                           ConnectionCodec::Factory* f, bool isClient) {
-    AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f);
+    AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f);
 
     if (tcpNoDelay) {
         s.setTcpNoDelay();

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h Tue Feb  8 16:16:49 2011
@@ -274,6 +274,7 @@ namespace Rdma {
         QueuePair::intrusive_ptr getQueuePair();
         std::string getLocalName() const;
         std::string getPeerName() const;
+        std::string getFullName() const { return getLocalName()+"-"+getPeerName(); }
     };
 }
 

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp Tue Feb  8 16:16:49 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -52,9 +52,9 @@ namespace ssl {
 namespace {
 std::string getName(int fd, bool local, bool includeService = false)
 {
-    ::sockaddr_storage name; // big enough for any socket address    
+    ::sockaddr_storage name; // big enough for any socket address
     ::socklen_t namelen = sizeof(name);
-    
+
     int result = -1;
     if (local) {
         result = ::getsockname(fd, (::sockaddr*)&name, &namelen);
@@ -67,8 +67,8 @@ std::string getName(int fd, bool local, 
     char servName[NI_MAXSERV];
     char dispName[NI_MAXHOST];
     if (includeService) {
-        if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), 
-                                 servName, sizeof(servName), 
+        if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName),
+                                 servName, sizeof(servName),
                                  NI_NUMERICHOST | NI_NUMERICSERV) != 0)
             throw QPID_POSIX_ERROR(rc);
         return std::string(dispName) + ":" + std::string(servName);
@@ -82,9 +82,9 @@ std::string getName(int fd, bool local, 
 
 std::string getService(int fd, bool local)
 {
-    ::sockaddr_storage name; // big enough for any socket address    
+    ::sockaddr_storage name; // big enough for any socket address
     ::socklen_t namelen = sizeof(name);
-    
+
     int result = -1;
     if (local) {
         result = ::getsockname(fd, (::sockaddr*)&name, &namelen);
@@ -95,8 +95,8 @@ std::string getService(int fd, bool loca
     QPID_POSIX_CHECK(result);
 
     char servName[NI_MAXSERV];
-    if (int rc=::getnameinfo((::sockaddr*)&name, namelen, 0, 0, 
-                                 servName, sizeof(servName), 
+    if (int rc=::getnameinfo((::sockaddr*)&name, namelen, 0, 0,
+                                 servName, sizeof(servName),
                                  NI_NUMERICHOST | NI_NUMERICSERV) != 0)
         throw QPID_POSIX_ERROR(rc);
     return servName;
@@ -132,8 +132,8 @@ std::string getDomainFromSubject(std::st
 
 }
 
-SslSocket::SslSocket() : IOHandle(new IOHandlePrivate()), socket(0), prototype(0) 
-{ 
+SslSocket::SslSocket() : IOHandle(new IOHandlePrivate()), socket(0), prototype(0)
+{
     impl->fd = ::socket (PF_INET, SOCK_STREAM, 0);
     if (impl->fd < 0) throw QPID_POSIX_ERROR(errno);
     socket = SSL_ImportFD(0, PR_ImportTCPSocket(impl->fd));
@@ -145,12 +145,12 @@ SslSocket::SslSocket() : IOHandle(new IO
  * PR_Accept, we have to reset the handshake.
  */
 SslSocket::SslSocket(IOHandlePrivate* ioph, PRFileDesc* model) : IOHandle(ioph), socket(0), prototype(0)
-{ 
+{
     socket = SSL_ImportFD(model, PR_ImportTCPSocket(impl->fd));
     NSS_CHECK(SSL_ResetHandshake(socket, true));
 }
 
-void SslSocket::setNonblocking() const 
+void SslSocket::setNonblocking() const
 {
     PRSocketOptionData option;
     option.option = PR_SockOpt_Nonblocking;
@@ -164,7 +164,15 @@ void SslSocket::connect(const std::strin
     namestream << host << ":" << port;
     connectname = namestream.str();
 
-    void* arg = SslOptions::global.certName.empty() ? 0 : const_cast<char*>(SslOptions::global.certName.c_str());
+    void* arg;
+    // Use the connection's cert-name if it has one; else use global cert-name
+    if (certname != "") {
+        arg = const_cast<char*>(certname.c_str());
+    } else if (SslOptions::global.certName.empty()) {
+        arg = 0;
+    } else {
+        arg = const_cast<char*>(SslOptions::global.certName.c_str());
+    }
     NSS_CHECK(SSL_GetClientAuthDataHook(socket, NSS_GetClientAuthData, arg));
     NSS_CHECK(SSL_SetURL(socket, host.data()));
 
@@ -220,7 +228,7 @@ int SslSocket::listen(uint16_t port, int
         throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(errno)));
     if (::listen(socket, backlog) < 0)
         throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(errno)));
-    
+
     socklen_t namelen = sizeof(name);
     if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0)
         throw QPID_POSIX_ERROR(errno);
@@ -235,7 +243,7 @@ SslSocket* SslSocket::accept() const
         return new SslSocket(new IOHandlePrivate(afd), prototype);
     } else if (errno == EAGAIN) {
         return 0;
-    } else { 
+    } else {
         throw QPID_POSIX_ERROR(errno);
     }
 }
@@ -303,6 +311,11 @@ void SslSocket::setTcpNoDelay(bool nodel
     }
 }
 
+void SslSocket::setCertName(const std::string& name)
+{
+    certname = name;
+}
+
 
 /** get the bit length of the current cipher's key */
 int SslSocket::getKeyLen() const

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/ssl/SslSocket.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/ssl/SslSocket.h?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/ssl/SslSocket.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/ssl/SslSocket.h Tue Feb  8 16:16:49 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -41,13 +41,18 @@ class SslSocket : public qpid::sys::IOHa
 public:
     /** Create a socket wrapper for descriptor. */
     SslSocket();
-    
+
     /** Set socket non blocking */
     void setNonblocking() const;
 
     /** Set tcp-nodelay */
     void setTcpNoDelay(bool nodelay) const;
 
+    /** Set SSL cert-name. Allows the cert-name to be set per
+     * connection, overriding global cert-name settings from
+     * NSSInit().*/
+    void setCertName(const std::string& certName);
+
     void connect(const std::string& host, uint16_t port) const;
 
     void close() const;
@@ -59,38 +64,43 @@ public:
      *@return The bound port.
      */
     int listen(uint16_t port = 0, int backlog = 10, const std::string& certName = "localhost.localdomain", bool clientAuth = false) const;
-    
-    /** 
+
+    /**
      * Accept a connection from a socket that is already listening
      * and has an incoming connection
      */
     SslSocket* accept() const;
 
-    // TODO The following are raw operations, maybe they need better wrapping? 
+    // TODO The following are raw operations, maybe they need better wrapping?
     int read(void *buf, size_t count) const;
     int write(const void *buf, size_t count) const;
 
-    /** Returns the "socket name" ie the address bound to 
+    /** Returns the "socket name" ie the address bound to
      * the near end of the socket
      */
     std::string getSockname() const;
 
-    /** Returns the "peer name" ie the address bound to 
+    /** Returns the "peer name" ie the address bound to
      * the remote end of the socket
      */
     std::string getPeername() const;
 
-    /** 
+    /**
      * Returns an address (host and port) for the remote end of the
      * socket
      */
     std::string getPeerAddress() const;
-    /** 
+    /**
      * Returns an address (host and port) for the local end of the
      * socket
      */
     std::string getLocalAddress() const;
 
+    /**
+     * Returns the full address of the connection: local and remote host and port.
+     */
+    std::string getFullAddress() const { return getLocalAddress()+"-"+getPeerAddress(); }
+
     uint16_t getLocalPort() const;
     uint16_t getRemotePort() const;
 
@@ -106,6 +116,8 @@ public:
 private:
     mutable std::string connectname;
     mutable PRFileDesc* socket;
+    std::string certname;
+
     /**
      * 'model' socket, with configuration to use when importing
      * accepted sockets for use as ssl sockets. Set on listen(), used

Modified: qpid/branches/qpid-2935/qpid/cpp/src/tests/cli_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/tests/cli_tests.py?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/tests/cli_tests.py (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/tests/cli_tests.py Tue Feb  8 16:16:49 2011
@@ -22,7 +22,7 @@ import sys
 import os
 import imp
 from qpid.testlib import TestBase010
-# from qpid.brokertest import import_script, checkenv 
+# from brokertest import import_script, checkenv 
 from qpid.datatypes import Message
 from qpid.queue import Empty
 from time import sleep

Modified: qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster.mk?rev=1068464&r1=1068463&r2=1068464&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster.mk (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster.mk Tue Feb  8 16:16:49 2011
@@ -44,6 +44,7 @@ EXTRA_DIST +=					\
 	run_cluster_tests			\
 	run_long_cluster_tests			\
 	testlib.py				\
+	brokertest.py				\
 	cluster_tests.py			\
 	cluster_test_logs.py			\
 	long_cluster_tests.py			\
@@ -93,7 +94,7 @@ cluster_test_SOURCES =				\
 
 cluster_test_LDADD=$(lib_client) $(lib_broker) ../cluster.la -lboost_unit_test_framework
 
-qpidtest_SCRIPTS += run_cluster_tests cluster_tests.py run_long_cluster_tests long_cluster_tests.py testlib.py cluster_tests.fail
+qpidtest_SCRIPTS += run_cluster_tests brokertest.py cluster_tests.py run_long_cluster_tests long_cluster_tests.py testlib.py cluster_tests.fail
 qpidtest_SCRIPTS += $(CLUSTER_TEST_SCRIPTS_LIST)
 
 endif



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org