You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2009/08/31 22:18:48 UTC

svn commit: r809728 - in /qpid/trunk/qpid/cpp: bindings/qmf/ruby/ bindings/qmf/tests/ src/ src/qmf/

Author: tross
Date: Mon Aug 31 20:18:48 2009
New Revision: 809728

URL: http://svn.apache.org/viewvc?rev=809728&view=rev
Log:
Added protocol module for codepoint definitions and header handling.
Fixed a deadlock case in ResilientConnection.
Added more code to the ConsoleEngine implementation.

Added:
    qpid/trunk/qpid/cpp/src/qmf/Protocol.cpp
    qpid/trunk/qpid/cpp/src/qmf/Protocol.h
    qpid/trunk/qpid/cpp/src/qmf/SequenceManager.cpp
    qpid/trunk/qpid/cpp/src/qmf/SequenceManager.h
Modified:
    qpid/trunk/qpid/cpp/bindings/qmf/ruby/qmf.rb
    qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console.rb
    qpid/trunk/qpid/cpp/src/qmf.mk
    qpid/trunk/qpid/cpp/src/qmf/AgentEngine.cpp
    qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.cpp
    qpid/trunk/qpid/cpp/src/qmf/ResilientConnection.cpp

Modified: qpid/trunk/qpid/cpp/bindings/qmf/ruby/qmf.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/bindings/qmf/ruby/qmf.rb?rev=809728&r1=809727&r2=809728&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/bindings/qmf/ruby/qmf.rb (original)
+++ qpid/trunk/qpid/cpp/bindings/qmf/ruby/qmf.rb Mon Aug 31 20:18:48 2009
@@ -536,7 +536,7 @@
   class Console
     attr_reader :impl
 
-    def initialize(handler, kwargs={})
+    def initialize(handler = nil, kwargs={})
       @handler = handler
       @impl = Qmfengine::ConsoleEngine.new
       @event = Qmfengine::ConsoleEvent.new
@@ -587,6 +587,7 @@
       valid = @impl.getEvent(@event)
       while valid
         count += 1
+        puts "Console Event: #{@event.kind}"
         case @event.kind
         when Qmfengine::ConsoleEvent::AGENT_ADDED
         when Qmfengine::ConsoleEvent::AGENT_DELETED

Modified: qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console.rb?rev=809728&r1=809727&r2=809728&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console.rb (original)
+++ qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console.rb Mon Aug 31 20:18:48 2009
@@ -29,7 +29,7 @@
     @settings.set_attr("host", ARGV[0]) if ARGV.size > 0
     @settings.set_attr("port", ARGV[1].to_i) if ARGV.size > 1
     @connection = Qmf::Connection.new(@settings)
-    @qmf = Qmf::Console.new(self)
+    @qmf = Qmf::Console.new
 
     @qmf.add_connection(@connection)
 

Modified: qpid/trunk/qpid/cpp/src/qmf.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf.mk?rev=809728&r1=809727&r2=809728&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf.mk (original)
+++ qpid/trunk/qpid/cpp/src/qmf.mk Mon Aug 31 20:18:48 2009
@@ -49,11 +49,15 @@
   qmf/ObjectIdImpl.h				\
   qmf/ObjectImpl.cpp				\
   qmf/ObjectImpl.h				\
+  qmf/Protocol.cpp				\
+  qmf/Protocol.h				\
   qmf/Query.h					\
   qmf/QueryImpl.cpp				\
   qmf/QueryImpl.h				\
   qmf/ResilientConnection.cpp			\
   qmf/ResilientConnection.h			\
+  qmf/SequenceManager.cpp			\
+  qmf/SequenceManager.h				\
   qmf/Schema.h					\
   qmf/SchemaImpl.cpp				\
   qmf/SchemaImpl.h				\

Modified: qpid/trunk/qpid/cpp/src/qmf/AgentEngine.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/AgentEngine.cpp?rev=809728&r1=809727&r2=809728&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/AgentEngine.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/AgentEngine.cpp Mon Aug 31 20:18:48 2009
@@ -25,6 +25,7 @@
 #include "qmf/ObjectIdImpl.h"
 #include "qmf/QueryImpl.h"
 #include "qmf/ValueImpl.h"
+#include "qmf/Protocol.h"
 #include <qpid/framing/Buffer.h>
 #include <qpid/framing/Uuid.h>
 #include <qpid/framing/FieldTable.h>
@@ -172,8 +173,6 @@
 
         map<string, ClassMaps> packages;
 
-        bool checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq);
-        void encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq = 0);
         AgentEventImpl::Ptr eventDeclareQueue(const string& queueName);
         AgentEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key);
         AgentEventImpl::Ptr eventSetupComplete();
@@ -268,12 +267,16 @@
     string   replyToKey(message.replyKey ? message.replyKey : "");
     string   userId(message.userId ? message.userId : "");
 
-    if (checkHeader(inBuffer, &opcode, &sequence)) {
-        if      (opcode == 'a') handleAttachResponse(inBuffer);
-        else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey);
-        else if (opcode == 'x') handleConsoleAddedIndication();
-        else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey, userId);
-        else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey, userId);
+    while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) {
+        if      (opcode == Protocol::OP_ATTACH_RESPONSE) handleAttachResponse(inBuffer);
+        else if (opcode == Protocol::OP_SCHEMA_REQUEST) handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey);
+        else if (opcode == Protocol::OP_CONSOLE_ADDED_INDICATION) handleConsoleAddedIndication();
+        else if (opcode == Protocol::OP_GET_QUERY) handleGetQuery(inBuffer, sequence, replyToKey, userId);
+        else if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(inBuffer, sequence, replyToKey, userId);
+        else {
+            QPID_LOG(error, "AgentEngineImpl::handleRcvMessage invalid opcode=" << opcode);
+            break;
+        }
     }
 }
 
@@ -325,7 +328,7 @@
     char    rawbuffer[512];
     Buffer  buffer(rawbuffer, 512);
 
-    encodeHeader(buffer, 'A');
+    Protocol::encodeHeader(buffer, Protocol::OP_ATTACH_REQUEST);
     buffer.putShortString("qmfa");
     systemId.encode(buffer);
     buffer.putLong(requestedBrokerBank);
@@ -340,7 +343,7 @@
     Mutex::ScopedLock _lock(lock);
     Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
 
-    encodeHeader(buffer, 'h');
+    Protocol::encodeHeader(buffer, Protocol::OP_HEARTBEAT_INDICATION);
     buffer.putLongLong(uint64_t(Duration(now())));
     stringstream key;
     key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank;
@@ -349,7 +352,7 @@
 }
 
 void AgentEngineImpl::methodResponse(uint32_t sequence, uint32_t status, char* text,
-                               const Value& argMap)
+                                     const Value& argMap)
 {
     Mutex::ScopedLock _lock(lock);
     map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
@@ -359,7 +362,7 @@
     contextMap.erase(iter);
 
     Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
-    encodeHeader(buffer, 'm', context->sequence);
+    Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, context->sequence);
     buffer.putLong(status);
     buffer.putMediumString(text);
     if (status == 0) {
@@ -390,7 +393,7 @@
     AgentQueryContext::Ptr context = iter->second;
 
     Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
-    encodeHeader(buffer, 'g', context->sequence);
+    Protocol::encodeHeader(buffer, Protocol::OP_OBJECT_INDICATION, context->sequence);
 
     object.impl->encodeSchemaKey(buffer);
     object.impl->encodeManagedObjectData(buffer);
@@ -477,30 +480,6 @@
     Mutex::ScopedLock _lock(lock);
 }
 
-void AgentEngineImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq)
-{
-    buf.putOctet('A');
-    buf.putOctet('M');
-    buf.putOctet('3');
-    buf.putOctet(opcode);
-    buf.putLong (seq);
-}
-
-bool AgentEngineImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq)
-{
-    if (buf.getSize() < 8)
-        return false;
-
-    uint8_t h1 = buf.getOctet();
-    uint8_t h2 = buf.getOctet();
-    uint8_t h3 = buf.getOctet();
-
-    *opcode = buf.getOctet();
-    *seq    = buf.getLong();
-
-    return h1 == 'A' && h2 == 'M' && h3 == '3';
-}
-
 AgentEventImpl::Ptr AgentEngineImpl::eventDeclareQueue(const string& name)
 {
     AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::DECLARE_QUEUE));
@@ -570,7 +549,7 @@
 void AgentEngineImpl::sendPackageIndicationLH(const string& packageName)
 {
     Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
-    encodeHeader(buffer, 'p');
+    Protocol::encodeHeader(buffer, Protocol::OP_PACKAGE_INDICATION);
     buffer.putShortString(packageName);
     sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
     QPID_LOG(trace, "SENT PackageIndication:  package_name=" << packageName);
@@ -579,7 +558,7 @@
 void AgentEngineImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key)
 {
     Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
-    encodeHeader(buffer, 'q');
+    Protocol::encodeHeader(buffer, Protocol::OP_CLASS_INDICATION);
     buffer.putOctet((int) kind);
     buffer.putShortString(packageName);
     buffer.putShortString(key.name);
@@ -592,7 +571,7 @@
                                             uint32_t sequence, uint32_t code, const string& text)
 {
     Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
-    encodeHeader(buffer, 'z', sequence);
+    Protocol::encodeHeader(buffer, Protocol::OP_COMMAND_COMPLETE, sequence);
     buffer.putLong(code);
     buffer.putShortString(text);
     sendBufferLH(buffer, exchange, replyToKey);
@@ -602,7 +581,7 @@
 void AgentEngineImpl::sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text)
 {
     Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
-    encodeHeader(buffer, 'm', sequence);
+    Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, sequence);
     buffer.putLong(code);
 
     string fulltext;
@@ -710,7 +689,7 @@
     if (ocIter != cMap.objectClasses.end()) {
         SchemaObjectClassImpl* oImpl = ocIter->second;
         Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
-        encodeHeader(buffer, 's', sequence);
+        Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence);
         oImpl->encode(buffer);
         sendBufferLH(buffer, rExchange, rKey);
         QPID_LOG(trace, "SENT SchemaResponse: (object) package=" << packageName << " class=" << key.name);
@@ -721,7 +700,7 @@
     if (ecIter != cMap.eventClasses.end()) {
         SchemaEventClassImpl* eImpl = ecIter->second;
         Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
-        encodeHeader(buffer, 's', sequence);
+        Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence);
         eImpl->encode(buffer);
         sendBufferLH(buffer, rExchange, rKey);
         QPID_LOG(trace, "SENT SchemaResponse: (event) package=" << packageName << " class=" << key.name);
@@ -851,108 +830,25 @@
 // Wrappers
 //==================================================================
 
-AgentEngine::AgentEngine(char* label, bool internalStore)
-{
-    impl = new AgentEngineImpl(label, internalStore);
-}
-
-AgentEngine::~AgentEngine()
-{
-    delete impl;
-}
-
-void AgentEngine::setStoreDir(const char* path)
-{
-    impl->setStoreDir(path);
-}
-
-void AgentEngine::setTransferDir(const char* path)
-{
-    impl->setTransferDir(path);
-}
-
-void AgentEngine::handleRcvMessage(Message& message)
-{
-    impl->handleRcvMessage(message);
-}
-
-bool AgentEngine::getXmtMessage(Message& item) const
-{
-    return impl->getXmtMessage(item);
-}
-
-void AgentEngine::popXmt()
-{
-    impl->popXmt();
-}
-
-bool AgentEngine::getEvent(AgentEvent& event) const
-{
-    return impl->getEvent(event);
-}
-
-void AgentEngine::popEvent()
-{
-    impl->popEvent();
-}
-
-void AgentEngine::newSession()
-{
-    impl->newSession();
-}
-
-void AgentEngine::startProtocol()
-{
-    impl->startProtocol();
-}
-
-void AgentEngine::heartbeat()
-{
-    impl->heartbeat();
-}
-
-void AgentEngine::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments)
-{
-    impl->methodResponse(sequence, status, text, arguments);
-}
-
-void AgentEngine::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat)
-{
-    impl->queryResponse(sequence, object, prop, stat);
-}
-
-void AgentEngine::queryComplete(uint32_t sequence)
-{
-    impl->queryComplete(sequence);
-}
-
-void AgentEngine::registerClass(SchemaObjectClass* cls)
-{
-    impl->registerClass(cls);
-}
-
-void AgentEngine::registerClass(SchemaEventClass* cls)
-{
-    impl->registerClass(cls);
-}
-
-const ObjectId* AgentEngine::addObject(Object& obj, uint64_t persistId)
-{
-    return impl->addObject(obj, persistId);
-}
-
-const ObjectId* AgentEngine::allocObjectId(uint64_t persistId)
-{
-    return impl->allocObjectId(persistId);
-}
-
-const ObjectId* AgentEngine::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi)
-{
-    return impl->allocObjectId(persistIdLo, persistIdHi);
-}
-
-void AgentEngine::raiseEvent(Event& event)
-{
-    impl->raiseEvent(event);
-}
+AgentEngine::AgentEngine(char* label, bool internalStore) { impl = new AgentEngineImpl(label, internalStore); }
+AgentEngine::~AgentEngine() { delete impl; }
+void AgentEngine::setStoreDir(const char* path) { impl->setStoreDir(path); }
+void AgentEngine::setTransferDir(const char* path) { impl->setTransferDir(path); }
+void AgentEngine::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); }
+bool AgentEngine::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); }
+void AgentEngine::popXmt() { impl->popXmt(); }
+bool AgentEngine::getEvent(AgentEvent& event) const { return impl->getEvent(event); }
+void AgentEngine::popEvent() { impl->popEvent(); }
+void AgentEngine::newSession() { impl->newSession(); }
+void AgentEngine::startProtocol() { impl->startProtocol(); }
+void AgentEngine::heartbeat() { impl->heartbeat(); }
+void AgentEngine::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments) { impl->methodResponse(sequence, status, text, arguments); }
+void AgentEngine::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) { impl->queryResponse(sequence, object, prop, stat); }
+void AgentEngine::queryComplete(uint32_t sequence) { impl->queryComplete(sequence); }
+void AgentEngine::registerClass(SchemaObjectClass* cls) { impl->registerClass(cls); }
+void AgentEngine::registerClass(SchemaEventClass* cls) { impl->registerClass(cls); }
+const ObjectId* AgentEngine::addObject(Object& obj, uint64_t persistId) { return impl->addObject(obj, persistId); }
+const ObjectId* AgentEngine::allocObjectId(uint64_t persistId) { return impl->allocObjectId(persistId); }
+const ObjectId* AgentEngine::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) { return impl->allocObjectId(persistIdLo, persistIdHi); }
+void AgentEngine::raiseEvent(Event& event) { impl->raiseEvent(event); }
 

Modified: qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.cpp?rev=809728&r1=809727&r2=809728&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.cpp Mon Aug 31 20:18:48 2009
@@ -25,6 +25,8 @@
 #include "qmf/ObjectIdImpl.h"
 #include "qmf/QueryImpl.h"
 #include "qmf/ValueImpl.h"
+#include "qmf/Protocol.h"
+#include "qmf/SequenceManager.h"
 #include <qpid/framing/Buffer.h>
 #include <qpid/framing/Uuid.h>
 #include <qpid/framing/FieldTable.h>
@@ -36,6 +38,7 @@
 #include <string>
 #include <deque>
 #include <map>
+#include <vector>
 #include <iostream>
 #include <fstream>
 #include <boost/shared_ptr.hpp>
@@ -92,18 +95,9 @@
         BrokerEvent copy();
     };
 
-    struct BrokerProxyImpl {
+    class BrokerProxyImpl : public SequenceContext {
+    public:
         typedef boost::shared_ptr<BrokerProxyImpl> Ptr;
-        mutable Mutex lock;
-        BrokerProxy* envelope;
-        ConsoleEngineImpl* console;
-        string queueName;
-        deque<MessageImpl::Ptr> xmtQueue;
-        deque<BrokerEventImpl::Ptr> eventQueue;
-
-        static const char* QMF_EXCHANGE;
-        static const char* DIR_EXCHANGE;
-        static const char* BROKER_KEY;
 
         BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console);
         ~BrokerProxyImpl() {}
@@ -112,6 +106,7 @@
         void sessionClosed();
         void startProtocol();
 
+        void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey);
         void handleRcvMessage(Message& message);
         bool getXmtMessage(Message& item) const;
         void popXmt();
@@ -119,9 +114,41 @@
         bool getEvent(BrokerEvent& event) const;
         void popEvent();
 
+        // From SequenceContext
+        void complete();
+
+        void addBinding(const string& exchange, const string& key);
+
+    private:
+        mutable Mutex lock;
+        BrokerProxy* envelope;
+        ConsoleEngineImpl* console;
+        string queueName;
+        Uuid brokerId;
+        SequenceManager seqMgr;
+        uint32_t requestsOutstanding;
+        bool topicBound;
+        deque<MessageImpl::Ptr> xmtQueue;
+        deque<BrokerEventImpl::Ptr> eventQueue;
+
+#       define MA_BUFFER_SIZE 65536
+        char outputBuffer[MA_BUFFER_SIZE];
+
         BrokerEventImpl::Ptr eventDeclareQueue(const string& queueName);
         BrokerEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key);
         BrokerEventImpl::Ptr eventSetupComplete();
+
+        void handleBrokerResponse(Buffer& inBuffer, uint32_t seq);
+        void handlePackageIndication(Buffer& inBuffer, uint32_t seq);
+        void handleCommandComplete(Buffer& inBuffer, uint32_t seq);
+        void handleClassIndication(Buffer& inBuffer, uint32_t seq);
+        void handleMethodResponse(Buffer& inBuffer, uint32_t seq);
+        void handleHeartbeatIndication(Buffer& inBuffer, uint32_t seq);
+        void handleEventIndication(Buffer& inBuffer, uint32_t seq);
+        void handleSchemaResponse(Buffer& inBuffer, uint32_t seq);
+        void handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat);
+        void incOutstanding();
+        void decOutstanding();
     };
 
     struct AgentProxyImpl {
@@ -139,10 +166,6 @@
         ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& settings = ConsoleSettings());
         ~ConsoleEngineImpl();
 
-        void handleRcvMessage(BrokerProxy& broker, Message& message);
-        bool getXmtMessage(Message& item) const;
-        void popXmt();
-
         bool getEvent(ConsoleEvent& event) const;
         void popEvent();
 
@@ -175,17 +198,21 @@
         */
 
     private:
+        friend class BrokerProxyImpl;
         ConsoleEngine* envelope;
         const ConsoleSettings& settings;
         mutable Mutex lock;
         deque<ConsoleEventImpl::Ptr> eventQueue;
+        vector<BrokerProxyImpl::Ptr> brokerList;
+        vector<pair<string, string> > bindingList; // exchange/key (empty exchange => QMF_EXCHANGE)
     };
 }
 
-const char* BrokerProxyImpl::QMF_EXCHANGE = "qpid.management";
-const char* BrokerProxyImpl::DIR_EXCHANGE = "amq.direct";
-const char* BrokerProxyImpl::BROKER_KEY   = "broker";
-
+namespace {
+const char* QMF_EXCHANGE = "qpid.management";
+const char* DIR_EXCHANGE = "amq.direct";
+const char* BROKER_KEY   = "broker";
+}
 
 #define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
 
@@ -250,12 +277,55 @@
 
 void BrokerProxyImpl::startProtocol()
 {
-    cout << "BrokerProxyImpl::startProtocol" << endl;
-}
+    Mutex::ScopedLock _lock(lock);
+    char rawbuffer[512];
+    Buffer buffer(rawbuffer, 512);
 
-void BrokerProxyImpl::handleRcvMessage(Message& /*message*/)
-{
-    // TODO: Dispatch the messages types
+    requestsOutstanding = 1;
+    topicBound = false;
+    Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST);
+    sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
+    QPID_LOG(trace, "SENT BrokerRequest");
+}
+
+void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
+{
+    uint32_t length = buf.getPosition();
+    MessageImpl::Ptr message(new MessageImpl);
+
+    buf.reset();
+    buf.getRawData(message->body, length);
+    message->destination   = destination;
+    message->routingKey    = routingKey;
+    message->replyExchange = DIR_EXCHANGE;
+    message->replyKey      = queueName;
+
+    xmtQueue.push_back(message);
+}
+
+void BrokerProxyImpl::handleRcvMessage(Message& message)
+{
+    Buffer inBuffer(message.body, message.length);
+    uint8_t opcode;
+    uint32_t sequence;
+
+    while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) {
+        if      (opcode == Protocol::OP_BROKER_RESPONSE)      handleBrokerResponse(inBuffer, sequence);
+        else if (opcode == Protocol::OP_PACKAGE_INDICATION)   handlePackageIndication(inBuffer, sequence);
+        else if (opcode == Protocol::OP_COMMAND_COMPLETE)     handleCommandComplete(inBuffer, sequence);
+        else if (opcode == Protocol::OP_CLASS_INDICATION)     handleClassIndication(inBuffer, sequence);
+        else if (opcode == Protocol::OP_METHOD_RESPONSE)      handleMethodResponse(inBuffer, sequence);
+        else if (opcode == Protocol::OP_HEARTBEAT_INDICATION) handleHeartbeatIndication(inBuffer, sequence);
+        else if (opcode == Protocol::OP_EVENT_INDICATION)     handleEventIndication(inBuffer, sequence);
+        else if (opcode == Protocol::OP_SCHEMA_RESPONSE)      handleSchemaResponse(inBuffer, sequence);
+        else if (opcode == Protocol::OP_PROPERTY_INDICATION)  handleObjectIndication(inBuffer, sequence, true,  false);
+        else if (opcode == Protocol::OP_STATISTIC_INDICATION) handleObjectIndication(inBuffer, sequence, false, true);
+        else if (opcode == Protocol::OP_OBJECT_INDICATION)    handleObjectIndication(inBuffer, sequence, true,  true);
+        else {
+            QPID_LOG(trace, "BrokerProxyImpl::handleRcvMessage invalid opcode: " << opcode);
+            break;
+        }
+    }
 }
 
 bool BrokerProxyImpl::getXmtMessage(Message& item) const
@@ -290,6 +360,16 @@
         eventQueue.pop_front();
 }
 
+void BrokerProxyImpl::complete()
+{
+    decOutstanding();
+}
+
+void BrokerProxyImpl::addBinding(const string& exchange, const string& key)
+{
+    eventQueue.push_back(eventBind(exchange, queueName, key));
+}
+
 BrokerEventImpl::Ptr BrokerProxyImpl::eventDeclareQueue(const string& queueName)
 {
     BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::DECLARE_QUEUE));
@@ -313,6 +393,89 @@
     return event;
 }
 
+void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq)
+{
+    // Note that this function doesn't touch requestsOutstanding.  This is because
+    // it accounts for one request completed (the BrokerRequest) and one request
+    // started (the PackageRequest) which cancel each other out.
+
+    brokerId.decode(inBuffer);
+    QPID_LOG(trace, "RCVD BrokerResponse seq=" << seq << " brokerId=" << brokerId);
+    Mutex::ScopedLock _lock(lock);
+    Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+    uint32_t sequence(seqMgr.reserve(this));
+    Protocol::encodeHeader(outBuffer, Protocol::OP_PACKAGE_REQUEST, sequence);
+    sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
+    QPID_LOG(trace, "SENT PackageRequest seq=" << sequence);
+}
+
+void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq)
+{
+    string package;
+
+    inBuffer.getShortString(package);
+    QPID_LOG(trace, "RCVD PackageIndication seq=" << seq << " package=" << package);
+}
+
+void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq)
+{
+    string text;
+    uint32_t code = inBuffer.getLong();
+    inBuffer.getShortString(text);
+    QPID_LOG(trace, "RCVD CommandComplete seq=" << seq << " code=" << code << " text=" << text);
+    seqMgr.release(seq);
+}
+
+void BrokerProxyImpl::handleClassIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
+{
+    // TODO
+}
+
+void BrokerProxyImpl::handleMethodResponse(Buffer& /*inBuffer*/, uint32_t /*seq*/)
+{
+    // TODO
+}
+
+void BrokerProxyImpl::handleHeartbeatIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
+{
+    // TODO
+}
+
+void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
+{
+    // TODO
+}
+
+void BrokerProxyImpl::handleSchemaResponse(Buffer& /*inBuffer*/, uint32_t /*seq*/)
+{
+    // TODO
+}
+
+void BrokerProxyImpl::handleObjectIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/, bool /*prop*/, bool /*stat*/)
+{
+    // TODO
+}
+
+void BrokerProxyImpl::incOutstanding()
+{
+    Mutex::ScopedLock _lock(lock);
+    requestsOutstanding++;
+}
+
+void BrokerProxyImpl::decOutstanding()
+{
+    Mutex::ScopedLock _lock(lock);
+    requestsOutstanding--;
+    if (requestsOutstanding == 0 && !topicBound) {
+        for (vector<pair<string, string> >::const_iterator iter = console->bindingList.begin();
+             iter != console->bindingList.end(); iter++) {
+            string exchange(iter->first.empty() ? QMF_EXCHANGE : iter->first);
+            string key(iter->second);
+            eventQueue.push_back(eventBind(exchange, queueName, key));
+        }
+    }
+}
+
 MethodResponseImpl::MethodResponseImpl(Buffer& buf) : envelope(new MethodResponse(this))
 {
     string text;
@@ -329,6 +492,19 @@
 ConsoleEngineImpl::ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& s) :
     envelope(e), settings(s)
 {
+    bindingList.push_back(pair<string, string>(string(), "schema.#"));
+    if (settings.rcvObjects && settings.rcvEvents && settings.rcvHeartbeats && !settings.userBindings) {
+        bindingList.push_back(pair<string, string>(string(), "console.#"));
+    } else {
+        if (settings.rcvObjects && !settings.userBindings)
+            bindingList.push_back(pair<string, string>(string(), "console.obj.#"));
+        else
+            bindingList.push_back(pair<string, string>(string(), "console.obj.*.*.org.apache.qpid.broker.agent"));
+        if (settings.rcvEvents)
+            bindingList.push_back(pair<string, string>(string(), "console.event.#"));
+        if (settings.rcvHeartbeats)
+            bindingList.push_back(pair<string, string>(string(), "console.heartbeat.#"));
+    }
 }
 
 ConsoleEngineImpl::~ConsoleEngineImpl()
@@ -354,72 +530,105 @@
 
 void ConsoleEngineImpl::addConnection(BrokerProxy& /*broker*/, void* /*context*/)
 {
+    // TODO
 }
 
 void ConsoleEngineImpl::delConnection(BrokerProxy& /*broker*/)
 {
+    // TODO
 }
 
 uint32_t ConsoleEngineImpl::packageCount() const
 {
+    // TODO
     return 0;
 }
 
 const string& ConsoleEngineImpl::getPackageName(uint32_t /*idx*/) const
 {
+    // TODO
     static string temp;
     return temp;
 }
 
 uint32_t ConsoleEngineImpl::classCount(const char* /*packageName*/) const
 {
+    // TODO
     return 0;
 }
 
 const SchemaClassKey* ConsoleEngineImpl::getClass(const char* /*packageName*/, uint32_t /*idx*/) const
 {
+    // TODO
     return 0;
 }
 
 ClassKind ConsoleEngineImpl::getClassKind(const SchemaClassKey& /*key*/) const
 {
+    // TODO
     return CLASS_OBJECT;
 }
 
 const SchemaObjectClass* ConsoleEngineImpl::getObjectClass(const SchemaClassKey& /*key*/) const
 {
+    // TODO
     return 0;
 }
 
 const SchemaEventClass* ConsoleEngineImpl::getEventClass(const SchemaClassKey& /*key*/) const
 {
+    // TODO
     return 0;
 }
 
-void ConsoleEngineImpl::bindPackage(const char* /*packageName*/)
+void ConsoleEngineImpl::bindPackage(const char* packageName)
 {
+    stringstream key;
+    key << "console.obj.*.*." << packageName << ".#";
+    Mutex::ScopedLock _lock(lock);
+    bindingList.push_back(pair<string, string>(string(), key.str()));
+    for (vector<BrokerProxyImpl::Ptr>::iterator iter = brokerList.begin();
+         iter != brokerList.end(); iter++)
+        (*iter)->addBinding(QMF_EXCHANGE, key.str());
 }
 
-void ConsoleEngineImpl::bindClass(const SchemaClassKey& /*key*/)
+void ConsoleEngineImpl::bindClass(const SchemaClassKey& classKey)
 {
+    stringstream key;
+    key << "console.obj.*.*." << classKey.getPackageName() << "." << classKey.getClassName() << ".#";
+    Mutex::ScopedLock _lock(lock);
+    bindingList.push_back(pair<string, string>(string(), key.str()));
+    for (vector<BrokerProxyImpl::Ptr>::iterator iter = brokerList.begin();
+         iter != brokerList.end(); iter++)
+        (*iter)->addBinding(QMF_EXCHANGE, key.str());
 }
 
-void ConsoleEngineImpl::bindClass(const char* /*packageName*/, const char* /*className*/)
+void ConsoleEngineImpl::bindClass(const char* packageName, const char* className)
 {
+    stringstream key;
+    key << "console.obj.*.*." << packageName << "." << className << ".#";
+    Mutex::ScopedLock _lock(lock);
+    bindingList.push_back(pair<string, string>(string(), key.str()));
+    for (vector<BrokerProxyImpl::Ptr>::iterator iter = brokerList.begin();
+         iter != brokerList.end(); iter++)
+        (*iter)->addBinding(QMF_EXCHANGE, key.str());
 }
 
 uint32_t ConsoleEngineImpl::agentCount() const
 {
+    // TODO
     return 0;
 }
 
 const AgentProxy* ConsoleEngineImpl::getAgent(uint32_t /*idx*/) const
 {
+    // TODO
     return 0;
 }
 
 void ConsoleEngineImpl::sendQuery(const Query& /*query*/, void* /*context*/)
 {
+    // TODO
 }
 
 /*
@@ -437,7 +646,6 @@
 */
 
 
-
 //==================================================================
 // Wrappers
 //==================================================================

Added: qpid/trunk/qpid/cpp/src/qmf/Protocol.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/Protocol.cpp?rev=809728&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/Protocol.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qmf/Protocol.cpp Mon Aug 31 20:18:48 2009
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/Protocol.h"
+#include "qpid/framing/Buffer.h"
+
+using namespace std;
+using namespace qmf;
+using namespace qpid::framing;
+
+
+bool Protocol::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq)
+{
+    if (buf.available() < 8)
+        return false;
+
+    uint8_t h1 = buf.getOctet();
+    uint8_t h2 = buf.getOctet();
+    uint8_t h3 = buf.getOctet();
+
+    *opcode = buf.getOctet();
+    *seq    = buf.getLong();
+
+    return h1 == 'A' && h2 == 'M' && h3 == '3';
+}
+
+void Protocol::encodeHeader(qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq)
+{
+    buf.putOctet('A');
+    buf.putOctet('M');
+    buf.putOctet('3');
+    buf.putOctet(opcode);
+    buf.putLong (seq);
+}
+
+

Added: qpid/trunk/qpid/cpp/src/qmf/Protocol.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/Protocol.h?rev=809728&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/Protocol.h (added)
+++ qpid/trunk/qpid/cpp/src/qmf/Protocol.h Mon Aug 31 20:18:48 2009
@@ -0,0 +1,67 @@
+#ifndef _QmfProtocol_
+#define _QmfProtocol_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/sys/IntegerTypes.h>
+
+namespace qpid {
+    namespace framing {
+        class Buffer;
+    }
+}
+
+namespace qmf {
+
+    class Protocol {
+    public:
+        static bool checkHeader(qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
+        static void encodeHeader(qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
+
+        const static uint8_t OP_ATTACH_REQUEST  = 'A';
+        const static uint8_t OP_ATTACH_RESPONSE = 'a';
+
+        const static uint8_t OP_BROKER_REQUEST  = 'B';
+        const static uint8_t OP_BROKER_RESPONSE = 'b';
+
+        const static uint8_t OP_CONSOLE_ADDED_INDICATION = 'x';
+        const static uint8_t OP_COMMAND_COMPLETE         = 'z';
+        const static uint8_t OP_HEARTBEAT_INDICATION     = 'h';
+
+        const static uint8_t OP_PACKAGE_REQUEST    = 'P';
+        const static uint8_t OP_PACKAGE_INDICATION = 'p';
+        const static uint8_t OP_CLASS_QUERY        = 'Q';
+        const static uint8_t OP_CLASS_INDICATION   = 'q';
+        const static uint8_t OP_SCHEMA_REQUEST     = 'S';
+        const static uint8_t OP_SCHEMA_RESPONSE    = 's';
+
+        const static uint8_t OP_METHOD_REQUEST       = 'M';
+        const static uint8_t OP_METHOD_RESPONSE      = 'm';
+        const static uint8_t OP_GET_QUERY            = 'G';
+        const static uint8_t OP_OBJECT_INDICATION    = 'g';
+        const static uint8_t OP_PROPERTY_INDICATION  = 'c';
+        const static uint8_t OP_STATISTIC_INDICATION = 'i';
+        const static uint8_t OP_EVENT_INDICATION     = 'e';
+    };
+
+}
+
+#endif
+

Modified: qpid/trunk/qpid/cpp/src/qmf/ResilientConnection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/ResilientConnection.cpp?rev=809728&r1=809727&r2=809728&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/ResilientConnection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/ResilientConnection.cpp Mon Aug 31 20:18:48 2009
@@ -318,6 +318,7 @@
 
     while (true) {
         try {
+            QPID_LOG(trace, "Trying to open connection...");
             connection.open(settings.impl->getClientSettings());
             {
                 Mutex::ScopedLock _lock(lock);
@@ -326,6 +327,7 @@
 
                 while (connected)
                     cond.wait(lock);
+                delay = delayMin;
 
                 while (!sessions.empty()) {
                     set<RCSession::Ptr>::iterator iter = sessions.begin();
@@ -334,6 +336,11 @@
                     EnqueueEvent(ResilientConnectionEvent::SESSION_CLOSED, sess->userContext);
                     Mutex::ScopedUnlock _u(lock);
                     sess->stop();
+
+                    // Nullify the intrusive pointer within the scoped unlock, otherwise,
+                    // the reference is held until overwritted above (under lock) which causes
+                    // the session destructor to be called with the lock held.
+                    sess = 0;
                 }
 
                 EnqueueEvent(ResilientConnectionEvent::DISCONNECTED);
@@ -341,7 +348,6 @@
                 if (shutdown)
                     return;
             }
-            delay = delayMin;
             connection.close();
         } catch (exception &e) {
             QPID_LOG(debug, "connection.open exception: " << e.what());

Added: qpid/trunk/qpid/cpp/src/qmf/SequenceManager.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/SequenceManager.cpp?rev=809728&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/SequenceManager.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qmf/SequenceManager.cpp Mon Aug 31 20:18:48 2009
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/SequenceManager.h"
+
+using namespace std;
+using namespace qmf;
+using namespace qpid::sys;
+
+SequenceManager::SequenceManager() : nextSequence(1) {}
+
+uint32_t SequenceManager::reserve(SequenceContext* ctx)
+{
+    Mutex::ScopedLock _lock(lock);
+    uint32_t seq = nextSequence;
+    while (contextMap.find(seq) != contextMap.end())
+        seq = seq < 0xFFFFFFFF ? seq + 1 : 1;
+    nextSequence = seq < 0xFFFFFFFF ? seq + 1 : 1;
+    contextMap[seq] = ctx;
+    return seq;
+}
+
+void SequenceManager::release(uint32_t sequence)
+{
+    Mutex::ScopedLock _lock(lock);
+    map<uint32_t, SequenceContext*>::iterator iter = contextMap.find(sequence);
+    if (iter != contextMap.end()) {
+        if (iter->second != 0)
+            iter->second->complete();
+        contextMap.erase(iter);
+    }
+}
+
+

Added: qpid/trunk/qpid/cpp/src/qmf/SequenceManager.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/SequenceManager.h?rev=809728&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/SequenceManager.h (added)
+++ qpid/trunk/qpid/cpp/src/qmf/SequenceManager.h Mon Aug 31 20:18:48 2009
@@ -0,0 +1,52 @@
+#ifndef _QmfSequenceManager_
+#define _QmfSequenceManager_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qpid/sys/Mutex.h"
+#include <map>
+
+namespace qmf {
+
+    class SequenceContext {
+    public:
+        SequenceContext() {}
+        virtual ~SequenceContext() {}
+
+        virtual void complete() = 0;
+    };
+
+    class SequenceManager {
+    public:
+        SequenceManager();
+
+        uint32_t reserve(SequenceContext* ctx);
+        void release(uint32_t sequence);
+
+    private:
+        mutable qpid::sys::Mutex lock;
+        uint32_t nextSequence;
+        std::map<uint32_t, SequenceContext*> contextMap;
+    };
+
+}
+
+#endif
+



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org