You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2009/09/29 05:21:51 UTC

svn commit: r819819 - in /qpid/trunk/qpid/cpp: bindings/qmf/ruby/ bindings/qmf/tests/ include/qmf/engine/ src/qmf/engine/ src/qpid/broker/

Author: tross
Date: Tue Sep 29 03:21:49 2009
New Revision: 819819

URL: http://svn.apache.org/viewvc?rev=819819&view=rev
Log:
QMF Engine updates:
  - Connected console handler callbacks
  - Added string representations for a number of object classes
  - Added a feature that completes query requests sent to disconnected agents

Modified:
    qpid/trunk/qpid/cpp/bindings/qmf/ruby/qmf.rb
    qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console.rb
    qpid/trunk/qpid/cpp/include/qmf/engine/Console.h
    qpid/trunk/qpid/cpp/include/qmf/engine/ObjectId.h
    qpid/trunk/qpid/cpp/include/qmf/engine/Schema.h
    qpid/trunk/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp
    qpid/trunk/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h
    qpid/trunk/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp
    qpid/trunk/qpid/cpp/src/qmf/engine/ConsoleImpl.h
    qpid/trunk/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp
    qpid/trunk/qpid/cpp/src/qmf/engine/ObjectIdImpl.h
    qpid/trunk/qpid/cpp/src/qmf/engine/ResilientConnection.cpp
    qpid/trunk/qpid/cpp/src/qmf/engine/SchemaImpl.cpp
    qpid/trunk/qpid/cpp/src/qmf/engine/SchemaImpl.h
    qpid/trunk/qpid/cpp/src/qmf/engine/SequenceManager.cpp
    qpid/trunk/qpid/cpp/src/qmf/engine/SequenceManager.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp

Modified: qpid/trunk/qpid/cpp/bindings/qmf/ruby/qmf.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/bindings/qmf/ruby/qmf.rb?rev=819819&r1=819818&r2=819819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/bindings/qmf/ruby/qmf.rb (original)
+++ qpid/trunk/qpid/cpp/bindings/qmf/ruby/qmf.rb Tue Sep 29 03:21:49 2009
@@ -469,6 +469,10 @@
       return (@impl.getObjectNumHi == other.impl.getObjectNumHi) &&
         (@impl.getObjectNumLo == other.impl.getObjectNumLo)
     end
+
+    def to_s
+      @impl.str
+    end
   end
 
   class Arguments
@@ -734,12 +738,16 @@
       @impl = i
     end
 
-    def package_name()
-      @impl.getPackageName()
+    def package_name
+      @impl.getPackageName
+    end
+
+    def class_name
+      @impl.getClassName
     end
 
-    def class_name()
-      @impl.getClassName()
+    def to_s
+      @impl.asString
     end
   end
 
@@ -982,12 +990,18 @@
         count += 1
         case @event.kind
         when Qmfengine::ConsoleEvent::AGENT_ADDED
+          @handler.agent_added(AgentProxy.new(@event.agent, nil)) if @handler
         when Qmfengine::ConsoleEvent::AGENT_DELETED
+          @handler.agent_deleted(AgentProxy.new(@event.agent, nil)) if @handler
         when Qmfengine::ConsoleEvent::NEW_PACKAGE
+          @handler.new_package(@event.name) if @handler
         when Qmfengine::ConsoleEvent::NEW_CLASS
+          @handler.new_class(SchemaClassKey.new(@event.classKey)) if @handler
         when Qmfengine::ConsoleEvent::OBJECT_UPDATE
+          @handler.object_update(ConsoleObject.new(nil, :impl => @event.object), @event.hasProps, @event.hasStats) if @handler
         when Qmfengine::ConsoleEvent::EVENT_RECEIVED
         when Qmfengine::ConsoleEvent::AGENT_HEARTBEAT
+          @handler.agent_heartbeat(AgentProxy.new(@event.agent, nil), @event.timestamp) if @handler
         when Qmfengine::ConsoleEvent::METHOD_RESPONSE
         end
         @impl.popEvent

Modified: qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console.rb?rev=819819&r1=819818&r2=819819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console.rb (original)
+++ qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console.rb Tue Sep 29 03:21:49 2009
@@ -24,6 +24,36 @@
 
 class App < Qmf::ConsoleHandler
 
+  def agent_added(agent)
+    puts "AgentAdded: #{agent.label}"
+  end
+
+  def agent_deleted(agent)
+    puts "AgentDeleted: #{agent.label}"
+  end
+
+  def new_package(package)
+    puts "NewPackage: #{package}"
+  end
+
+  def new_class(class_key)
+    puts "NewClass: #{class_key}"
+  end
+
+  def object_update(object, hasProps, hasStats)
+    puts "ObjectUpdate: #{object.object_class.name} props=#{hasProps} stats=#{hasStats}"
+  end
+
+  def event_received(event); end
+
+  def agent_heartbeat(agent, timestamp)
+    puts "AgentHeartbeat: #{agent.label} time=#{timestamp/1000000000}"
+  end
+
+  def method_response(resp); end
+  def broker_info(broker); end
+
+
   def dump_schema
     packages = @qmfc.packages
     puts "----- Packages -----"
@@ -77,12 +107,12 @@
     @settings.host = ARGV[0] if ARGV.size > 0
     @settings.port = ARGV[1].to_i if ARGV.size > 1
     @connection = Qmf::Connection.new(@settings)
-    @qmfc = Qmf::Console.new
+    @qmfc = Qmf::Console.new(self)
 
     @broker = @qmfc.add_connection(@connection)
     @broker.waitForStable
 
-    dump_schema
+    ##dump_schema
 
     agents = @qmfc.agents()
     puts "---- Agents ----"

Modified: qpid/trunk/qpid/cpp/include/qmf/engine/Console.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qmf/engine/Console.h?rev=819819&r1=819818&r2=819819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qmf/engine/Console.h (original)
+++ qpid/trunk/qpid/cpp/include/qmf/engine/Console.h Tue Sep 29 03:21:49 2009
@@ -94,12 +94,14 @@
         EventKind       kind;
         AgentProxy*     agent;          // (AGENT_[ADDED|DELETED|HEARTBEAT])
         char*           name;           // (NEW_PACKAGE)
-        SchemaClassKey* classKey;       // (NEW_CLASS)
+        const SchemaClassKey* classKey; // (NEW_CLASS)
         Object*         object;         // (OBJECT_UPDATE)
         void*           context;        // (OBJECT_UPDATE)
         Event*          event;          // (EVENT_RECEIVED)
         uint64_t        timestamp;      // (AGENT_HEARTBEAT)
         QueryResponse*  queryResponse;  // (QUERY_COMPLETE)
+        bool            hasProps;
+        bool            hasStats;
     };
 
     /**
@@ -136,6 +138,8 @@
         const char* getLabel() const;
 
     private:
+        friend struct StaticContext;
+        friend struct QueryContext;
         friend struct AgentProxyImpl;
         friend class BrokerProxyImpl;
         AgentProxy(AgentProxyImpl* impl);
@@ -219,6 +223,7 @@
     private:
         friend class BrokerProxyImpl;
         friend struct AgentProxyImpl;
+        friend struct StaticContext;
         ConsoleImpl* impl;
     };
 }

Modified: qpid/trunk/qpid/cpp/include/qmf/engine/ObjectId.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qmf/engine/ObjectId.h?rev=819819&r1=819818&r2=819819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qmf/engine/ObjectId.h (original)
+++ qpid/trunk/qpid/cpp/include/qmf/engine/ObjectId.h Tue Sep 29 03:21:49 2009
@@ -38,6 +38,7 @@
         uint32_t getObjectNumHi() const;
         uint32_t getObjectNumLo() const;
         bool isDurable() const;
+        const char* str() const;
 
         bool operator==(const ObjectId& other) const;
         bool operator<(const ObjectId& other) const;

Modified: qpid/trunk/qpid/cpp/include/qmf/engine/Schema.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qmf/engine/Schema.h?rev=819819&r1=819818&r2=819819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qmf/engine/Schema.h (original)
+++ qpid/trunk/qpid/cpp/include/qmf/engine/Schema.h Tue Sep 29 03:21:49 2009
@@ -142,6 +142,7 @@
         const char* getPackageName() const;
         const char* getClassName() const;
         const uint8_t* getHash() const;
+        const char* asString() const;
 
         bool operator==(const SchemaClassKey& other) const;
         bool operator<(const SchemaClassKey& other) const;

Modified: qpid/trunk/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp?rev=819819&r1=819818&r2=819819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp Tue Sep 29 03:21:49 2009
@@ -23,6 +23,7 @@
 #include "qpid/Address.h"
 #include "qpid/sys/SystemInfo.h"
 #include <qpid/log/Statement.h>
+#include <qpid/StringUtils.h>
 #include <string.h>
 #include <iostream>
 #include <fstream>
@@ -109,18 +110,23 @@
 
 void BrokerProxyImpl::startProtocol()
 {
-    Mutex::ScopedLock _lock(lock);
-    char rawbuffer[512];
-    Buffer buffer(rawbuffer, 512);
+    AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, 0, "Agent embedded in broker"));
+    {
+        Mutex::ScopedLock _lock(lock);
+        char rawbuffer[512];
+        Buffer buffer(rawbuffer, 512);
 
-    agentList[0] = AgentProxyPtr(AgentProxyImpl::factory(console, publicObject, 0, "Agent embedded in broker"));
+        agentList[0] = agent;
 
-    requestsOutstanding = 1;
-    topicBound = false;
-    uint32_t sequence(seqMgr.reserve());
-    Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence);
-    sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
-    QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence);
+        requestsOutstanding = 1;
+        topicBound = false;
+        uint32_t sequence(seqMgr.reserve());
+        Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence);
+        sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
+        QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence);
+    }
+
+    console.impl->eventAgentAdded(agent);
 }
 
 void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
@@ -145,7 +151,7 @@
     uint32_t sequence;
 
     while (Protocol::checkHeader(inBuffer, &opcode, &sequence))
-        seqMgr.dispatch(opcode, sequence, inBuffer);
+        seqMgr.dispatch(opcode, sequence, message.routingKey ? string(message.routingKey) : string(), inBuffer);
 }
 
 bool BrokerProxyImpl::getXmtMessage(Message& item) const
@@ -216,6 +222,7 @@
     stringstream key;
     Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
     uint32_t sequence(seqMgr.reserve(queryContext));
+    agent->impl->addSequence(sequence);
 
     Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
     query.impl->encode(outBuffer);
@@ -406,9 +413,23 @@
     return response;
 }
 
-void BrokerProxyImpl::handleHeartbeatIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
+void BrokerProxyImpl::handleHeartbeatIndication(Buffer& inBuffer, uint32_t seq, const string& routingKey)
 {
-    // TODO
+    vector<string> tokens = qpid::split(routingKey, ".");
+    uint32_t agentBank;
+    uint64_t timestamp;
+
+    if (routingKey.empty() || tokens.size() != 4)
+        agentBank = 0;
+    else
+        agentBank = ::atoi(tokens[3].c_str());
+
+    timestamp = inBuffer.getLongLong();
+    map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.find(agentBank);
+    if (iter != agentList.end()) {
+        console.impl->eventAgentHeartbeat(iter->second, timestamp);
+    }
+    QPID_LOG(trace, "RCVD HeartbeatIndication seq=" << seq << " agentBank=" << agentBank);
 }
 
 void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
@@ -481,11 +502,24 @@
 void BrokerProxyImpl::updateAgentList(ObjectPtr obj)
 {
     Value* value = obj->getValue("agentBank");
+    Mutex::ScopedLock _lock(lock);
     if (value != 0 && value->isUint()) {
         uint32_t agentBank = value->asUint();
         if (obj->isDeleted()) {
-            agentList.erase(agentBank);
-            QPID_LOG(trace, "Agent at bank " << agentBank << " removed from agent list");
+            map<uint32_t, AgentProxyPtr>::iterator iter = agentList.find(agentBank);
+            if (iter != agentList.end()) {
+                AgentProxyPtr agent(iter->second);
+                console.impl->eventAgentDeleted(agent);
+                agentList.erase(agentBank);
+                QPID_LOG(trace, "Agent at bank " << agentBank << " removed from agent list");
+
+                //
+                //  Release all sequence numbers for requests in-flight to this agent.
+                //  Since the agent is no longer connected, these requests would not
+                //  otherwise complete.
+                //
+                agent->impl->releaseInFlight(seqMgr);
+            }
         } else {
             Value* str = obj->getValue("label");
             string label;
@@ -493,7 +527,9 @@
                 label = str->asString();
             map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.find(agentBank);
             if (iter == agentList.end()) {
-                agentList[agentBank] = AgentProxyPtr(AgentProxyImpl::factory(console, publicObject, agentBank, label));
+                AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, agentBank, label));
+                agentList[agentBank] = agent;
+                console.impl->eventAgentAdded(agent);
                 QPID_LOG(trace, "Agent '" << label << "' found at bank " << agentBank);
             }
         }
@@ -572,9 +608,11 @@
     return new MethodResponse(impl);
 }
 
-bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
+bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& routingKey, Buffer& buffer)
 {
+    ObjectPtr object;
     bool completeContext = false;
+
     if      (opcode == Protocol::OP_BROKER_RESPONSE) {
         broker.handleBrokerResponse(buffer, sequence);
         completeContext = true;
@@ -592,15 +630,21 @@
     else if (opcode == Protocol::OP_CLASS_INDICATION)
         broker.handleClassIndication(buffer, sequence);
     else if (opcode == Protocol::OP_HEARTBEAT_INDICATION)
-        broker.handleHeartbeatIndication(buffer, sequence);
+        broker.handleHeartbeatIndication(buffer, sequence, routingKey);
     else if (opcode == Protocol::OP_EVENT_INDICATION)
         broker.handleEventIndication(buffer, sequence);
-    else if (opcode == Protocol::OP_PROPERTY_INDICATION)
-        broker.handleObjectIndication(buffer, sequence, true,  false);
-    else if (opcode == Protocol::OP_STATISTIC_INDICATION)
-        broker.handleObjectIndication(buffer, sequence, false, true);
-    else if (opcode == Protocol::OP_OBJECT_INDICATION)
-        broker.handleObjectIndication(buffer, sequence, true,  true);
+    else if (opcode == Protocol::OP_PROPERTY_INDICATION) {
+        object = broker.handleObjectIndication(buffer, sequence, true,  false);
+        broker.console.impl->eventObjectUpdate(object, true, false);
+    }
+    else if (opcode == Protocol::OP_STATISTIC_INDICATION) {
+        object = broker.handleObjectIndication(buffer, sequence, false, true);
+        broker.console.impl->eventObjectUpdate(object, false, true);
+    }
+    else if (opcode == Protocol::OP_OBJECT_INDICATION) {
+        object = broker.handleObjectIndication(buffer, sequence, true,  true);
+        broker.console.impl->eventObjectUpdate(object, true, true);
+    }
     else {
         QPID_LOG(trace, "StaticContext::handleMessage invalid opcode: " << opcode);
         completeContext = true;
@@ -627,7 +671,7 @@
     broker.eventQueue.push_back(broker.eventQueryComplete(userContext, queryResponse));
 }
 
-bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
+bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& /*routingKey*/, Buffer& buffer)
 {
     bool completeContext = false;
     ObjectPtr object;
@@ -635,6 +679,19 @@
     if      (opcode == Protocol::OP_COMMAND_COMPLETE) {
         broker.handleCommandComplete(buffer, sequence);
         completeContext = true;
+
+        //
+        //  Visit each agent and remove the sequence from that agent's in-flight list.
+        //  This could be made more efficient because only one agent will have this sequence
+        //  in its list.
+        //
+        map<uint32_t, AgentProxyPtr> copy;
+        {
+            Mutex::ScopedLock _block(broker.lock);
+            copy = broker.agentList;
+        }
+        for (map<uint32_t, AgentProxyPtr>::iterator iter = copy.begin(); iter != copy.end(); iter++)
+            iter->second->impl->delSequence(sequence);
     }
     else if (opcode == Protocol::OP_OBJECT_INDICATION) {
         object = broker.handleObjectIndication(buffer, sequence, true,  true);
@@ -655,7 +712,7 @@
     broker.eventQueue.push_back(broker.eventMethodResponse(userContext, methodResponse));
 }
 
-bool MethodContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
+bool MethodContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& /*routingKey*/, Buffer& buffer)
 {
     if (opcode == Protocol::OP_METHOD_RESPONSE)
         methodResponse = broker.handleMethodResponse(buffer, sequence, schema);

Modified: qpid/trunk/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h?rev=819819&r1=819818&r2=819819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h Tue Sep 29 03:21:49 2009
@@ -36,6 +36,7 @@
 #include <string>
 #include <deque>
 #include <map>
+#include <set>
 #include <vector>
 
 namespace qmf {
@@ -98,6 +99,7 @@
         BrokerProxy& broker;
         uint32_t agentBank;
         std::string label;
+        std::set<uint32_t> inFlightSequences;
 
         AgentProxyImpl(Console& c, BrokerProxy& b, uint32_t ab, const std::string& l) : console(c), broker(b), agentBank(ab), label(l) {}
         static AgentProxy* factory(Console& c, BrokerProxy& b, uint32_t ab, const std::string& l) {
@@ -106,6 +108,13 @@
         }
         ~AgentProxyImpl() {}
         const std::string& getLabel() const { return label; }
+        void addSequence(uint32_t seq) { inFlightSequences.insert(seq); }
+        void delSequence(uint32_t seq) { inFlightSequences.erase(seq); }
+        void releaseInFlight(SequenceManager& seqMgr) {
+            for (std::set<uint32_t>::iterator iter = inFlightSequences.begin(); iter != inFlightSequences.end(); iter++)
+                seqMgr.release(*iter);
+            inFlightSequences.clear();
+        }
     };
 
     class BrokerProxyImpl : public boost::noncopyable {
@@ -166,7 +175,7 @@
         void handleCommandComplete(qpid::framing::Buffer& inBuffer, uint32_t seq);
         void handleClassIndication(qpid::framing::Buffer& inBuffer, uint32_t seq);
         MethodResponsePtr handleMethodResponse(qpid::framing::Buffer& inBuffer, uint32_t seq, const SchemaMethod* schema);
-        void handleHeartbeatIndication(qpid::framing::Buffer& inBuffer, uint32_t seq);
+        void handleHeartbeatIndication(qpid::framing::Buffer& inBuffer, uint32_t seq, const std::string& routingKey);
         void handleEventIndication(qpid::framing::Buffer& inBuffer, uint32_t seq);
         void handleSchemaResponse(qpid::framing::Buffer& inBuffer, uint32_t seq);
         ObjectPtr handleObjectIndication(qpid::framing::Buffer& inBuffer, uint32_t seq, bool prop, bool stat);
@@ -186,7 +195,7 @@
         virtual ~StaticContext() {}
         void reserve() {}
         void release() { broker.staticRelease(); }
-        bool handleMessage(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer);
+        bool handleMessage(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer);
         BrokerProxyImpl& broker;
     };
 
@@ -199,7 +208,7 @@
         virtual ~QueryContext() {}
         void reserve();
         void release();
-        bool handleMessage(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer);
+        bool handleMessage(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer);
 
         mutable qpid::sys::Mutex lock;
         BrokerProxyImpl& broker;
@@ -213,7 +222,7 @@
         virtual ~MethodContext() {}
         void reserve() {}
         void release();
-        bool handleMessage(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer);
+        bool handleMessage(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer);
 
         BrokerProxyImpl& broker;
         void* userContext;

Modified: qpid/trunk/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp?rev=819819&r1=819818&r2=819819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp Tue Sep 29 03:21:49 2009
@@ -57,11 +57,13 @@
     ::memset(&item, 0, sizeof(ConsoleEvent));
     item.kind      = kind;
     item.agent     = agent.get();
-    item.classKey  = classKey.get();
-    item.object    = object;
+    item.classKey  = classKey;
+    item.object    = object.get();
     item.context   = context;
     item.event     = event;
     item.timestamp = timestamp;
+    item.hasProps  = hasProps;
+    item.hasStats  = hasStats;
 
     STRING_REF(name);
 
@@ -274,9 +276,11 @@
 void ConsoleImpl::learnPackage(const string& packageName)
 {
     Mutex::ScopedLock _lock(lock);
-    if (packages.find(packageName) == packages.end())
+    if (packages.find(packageName) == packages.end()) {
         packages.insert(pair<string, pair<ObjectClassList, EventClassList> >
                         (packageName, pair<ObjectClassList, EventClassList>(ObjectClassList(), EventClassList())));
+        eventNewPackage(packageName);
+    }
 }
 
 void ConsoleImpl::learnClass(SchemaObjectClass* cls)
@@ -288,8 +292,10 @@
         return;
 
     ObjectClassList& list = pIter->second.first;
-    if (list.find(key) == list.end())
+    if (list.find(key) == list.end()) {
         list[key] = cls;
+        eventNewClass(key);
+    }
 }
 
 void ConsoleImpl::learnClass(SchemaEventClass* cls)
@@ -301,8 +307,10 @@
         return;
 
     EventClassList& list = pIter->second.second;
-    if (list.find(key) == list.end())
+    if (list.find(key) == list.end()) {
         list[key] = cls;
+        eventNewClass(key);
+    }
 }
 
 bool ConsoleImpl::haveClass(const SchemaClassKey* key) const
@@ -333,6 +341,57 @@
     return iter->second;
 }
 
+void ConsoleImpl::eventAgentAdded(boost::shared_ptr<AgentProxy> agent)
+{
+    ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_ADDED));
+    event->agent = agent;
+    Mutex::ScopedLock _lock(lock);
+    eventQueue.push_back(event);
+}
+
+void ConsoleImpl::eventAgentDeleted(boost::shared_ptr<AgentProxy> agent)
+{
+    ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_DELETED));
+    event->agent = agent;
+    Mutex::ScopedLock _lock(lock);
+    eventQueue.push_back(event);
+}
+
+void ConsoleImpl::eventNewPackage(const string& packageName)
+{
+    ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::NEW_PACKAGE));
+    event->name = packageName;
+    Mutex::ScopedLock _lock(lock);
+    eventQueue.push_back(event);
+}
+
+void ConsoleImpl::eventNewClass(const SchemaClassKey* key)
+{
+    ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::NEW_CLASS));
+    event->classKey = key;
+    Mutex::ScopedLock _lock(lock);
+    eventQueue.push_back(event);
+}
+
+void ConsoleImpl::eventObjectUpdate(ObjectPtr object, bool prop, bool stat)
+{
+    ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::OBJECT_UPDATE));
+    event->object = object;
+    event->hasProps = prop;
+    event->hasStats = stat;
+    Mutex::ScopedLock _lock(lock);
+    eventQueue.push_back(event);
+}
+
+void ConsoleImpl::eventAgentHeartbeat(boost::shared_ptr<AgentProxy> agent, uint64_t timestamp)
+{
+    ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_HEARTBEAT));
+    event->agent = agent;
+    event->timestamp = timestamp;
+    Mutex::ScopedLock _lock(lock);
+    eventQueue.push_back(event);
+}
+
 //==================================================================
 // Wrappers
 //==================================================================

Modified: qpid/trunk/qpid/cpp/src/qmf/engine/ConsoleImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/engine/ConsoleImpl.h?rev=819819&r1=819818&r2=819819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/engine/ConsoleImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qmf/engine/ConsoleImpl.h Tue Sep 29 03:21:49 2009
@@ -56,14 +56,16 @@
         ConsoleEvent::EventKind kind;
         boost::shared_ptr<AgentProxy> agent;
         std::string name;
-        boost::shared_ptr<SchemaClassKey> classKey;
-        Object* object;
+        const SchemaClassKey* classKey;
+        boost::shared_ptr<Object> object;
         void* context;
         Event* event;
         uint64_t timestamp;
+        bool hasProps;
+        bool hasStats;
 
         ConsoleEventImpl(ConsoleEvent::EventKind k) :
-            kind(k), object(0), context(0), event(0), timestamp(0) {}
+            kind(k), classKey(0), context(0), event(0), timestamp(0) {}
         ~ConsoleEventImpl() {}
         ConsoleEvent copy();
     };
@@ -101,6 +103,7 @@
 
     private:
         friend class BrokerProxyImpl;
+        friend struct StaticContext;
         const ConsoleSettings& settings;
         mutable qpid::sys::Mutex lock;
         std::deque<ConsoleEventImpl::Ptr> eventQueue;
@@ -127,6 +130,13 @@
         void learnClass(SchemaEventClass* cls);
         bool haveClass(const SchemaClassKey* key) const;
         SchemaObjectClass* getSchema(const SchemaClassKey* key) const;
+
+        void eventAgentAdded(boost::shared_ptr<AgentProxy> agent);
+        void eventAgentDeleted(boost::shared_ptr<AgentProxy> agent);
+        void eventNewPackage(const std::string& packageName);
+        void eventNewClass(const SchemaClassKey* key);
+        void eventObjectUpdate(ObjectPtr object, bool prop, bool stat);
+        void eventAgentHeartbeat(boost::shared_ptr<AgentProxy> agent, uint64_t timestamp);
     };
 }
 }

Modified: qpid/trunk/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp?rev=819819&r1=819818&r2=819819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp Tue Sep 29 03:21:49 2009
@@ -111,13 +111,14 @@
     agent = 0;
 }
 
-std::string ObjectIdImpl::asString() const
+const string& ObjectIdImpl::asString() const
 {
     stringstream val;
 
     val << (int) getFlags() << "-" << getSequence() << "-" << getBrokerBank() << "-" <<
         getAgentBank() << "-" << getObjectNum();
-    return val.str();
+    repr = val.str();
+    return repr;
 }
 
 bool ObjectIdImpl::operator==(const ObjectIdImpl& other) const
@@ -154,6 +155,7 @@
 uint32_t ObjectId::getObjectNumHi() const { return impl->getObjectNumHi(); }
 uint32_t ObjectId::getObjectNumLo() const { return impl->getObjectNumLo(); }
 bool ObjectId::isDurable() const { return impl->isDurable(); }
+const char* ObjectId::str() const { return impl->asString().c_str(); }
 bool ObjectId::operator==(const ObjectId& other) const { return *impl == *other.impl; }
 bool ObjectId::operator<(const ObjectId& other) const { return *impl < *other.impl; }
 bool ObjectId::operator>(const ObjectId& other) const { return *impl > *other.impl; }

Modified: qpid/trunk/qpid/cpp/src/qmf/engine/ObjectIdImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/engine/ObjectIdImpl.h?rev=819819&r1=819818&r2=819819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/engine/ObjectIdImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qmf/engine/ObjectIdImpl.h Tue Sep 29 03:21:49 2009
@@ -38,6 +38,7 @@
         AgentAttachment* agent;
         uint64_t first;
         uint64_t second;
+        mutable std::string repr;
 
         ObjectIdImpl() : agent(0), first(0), second(0) {}
         ObjectIdImpl(qpid::framing::Buffer& buffer);
@@ -49,7 +50,7 @@
         void decode(qpid::framing::Buffer& buffer);
         void encode(qpid::framing::Buffer& buffer) const;
         void fromString(const std::string& repr);
-        std::string asString() const;
+        const std::string& asString() const;
         uint8_t getFlags() const { return (first & 0xF000000000000000LL) >> 60; }
         uint16_t getSequence() const { return (first & 0x0FFF000000000000LL) >> 48; }
         uint32_t getBrokerBank() const { return (first & 0x0000FFFFF0000000LL) >> 28; }

Modified: qpid/trunk/qpid/cpp/src/qmf/engine/ResilientConnection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/engine/ResilientConnection.cpp?rev=819819&r1=819818&r2=819819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/engine/ResilientConnection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/engine/ResilientConnection.cpp Tue Sep 29 03:21:49 2009
@@ -171,15 +171,20 @@
     MessageImpl qmsg;
     qmsg.body = msg.getData();
 
-    qpid::framing::MessageProperties p = msg.getMessageProperties();
-    if (p.hasReplyTo()) {
-        const qpid::framing::ReplyTo& rt = p.getReplyTo();
+    qpid::framing::DeliveryProperties dp = msg.getDeliveryProperties();
+    if (dp.hasRoutingKey()) {
+        qmsg.routingKey = dp.getRoutingKey();
+    }
+
+    qpid::framing::MessageProperties mp = msg.getMessageProperties();
+    if (mp.hasReplyTo()) {
+        const qpid::framing::ReplyTo& rt = mp.getReplyTo();
         qmsg.replyExchange = rt.getExchange();
         qmsg.replyKey = rt.getRoutingKey();
     }
 
-    if (p.hasUserId()) {
-        qmsg.userId = p.getUserId();
+    if (mp.hasUserId()) {
+        qmsg.userId = mp.getUserId();
     }
 
     connImpl.EnqueueEvent(ResilientConnectionEvent::RECV, userContext, qmsg);

Modified: qpid/trunk/qpid/cpp/src/qmf/engine/SchemaImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/engine/SchemaImpl.cpp?rev=819819&r1=819818&r2=819819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/engine/SchemaImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/engine/SchemaImpl.cpp Tue Sep 29 03:21:49 2009
@@ -326,12 +326,13 @@
     return hash < other.hash;
 }
 
-string SchemaClassKeyImpl::str() const
+const string& SchemaClassKeyImpl::str() const
 {
     Uuid printableHash(hash.get());
     stringstream str;
     str << package << ":" << name << "(" << printableHash << ")";
-    return str.str();
+    repr = str.str();
+    return repr;
 }
 
 SchemaObjectClassImpl::SchemaObjectClassImpl(Buffer& buffer) : hasHash(true), classKey(SchemaClassKeyImpl::factory(package, name, hash))
@@ -579,6 +580,7 @@
 const char* SchemaClassKey::getPackageName() const { return impl->getPackageName().c_str(); }
 const char* SchemaClassKey::getClassName() const { return impl->getClassName().c_str(); }
 const uint8_t* SchemaClassKey::getHash() const { return impl->getHash(); }
+const char* SchemaClassKey::asString() const { return impl->str().c_str(); }
 bool SchemaClassKey::operator==(const SchemaClassKey& other) const { return *impl == *(other.impl); }
 bool SchemaClassKey::operator<(const SchemaClassKey& other) const { return *impl < *(other.impl); }
 

Modified: qpid/trunk/qpid/cpp/src/qmf/engine/SchemaImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/engine/SchemaImpl.h?rev=819819&r1=819818&r2=819819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/engine/SchemaImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qmf/engine/SchemaImpl.h Tue Sep 29 03:21:49 2009
@@ -142,6 +142,7 @@
         const std::string& package;
         const std::string& name;
         const SchemaHash& hash;
+        mutable std::string repr;
 
         // The *Container elements are only used if there isn't an external place to
         // store these values.
@@ -161,7 +162,7 @@
         void encode(qpid::framing::Buffer& buffer) const;
         bool operator==(const SchemaClassKeyImpl& other) const;
         bool operator<(const SchemaClassKeyImpl& other) const;
-        std::string str() const;
+        const std::string& str() const;
     };
 
     struct SchemaObjectClassImpl {

Modified: qpid/trunk/qpid/cpp/src/qmf/engine/SequenceManager.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/engine/SequenceManager.cpp?rev=819819&r1=819818&r2=819819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/engine/SequenceManager.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/engine/SequenceManager.cpp Tue Sep 29 03:21:49 2009
@@ -68,14 +68,14 @@
     contextMap.clear();
 }
 
-void SequenceManager::dispatch(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer)
+void SequenceManager::dispatch(uint8_t opcode, uint32_t sequence, const string& routingKey, qpid::framing::Buffer& buffer)
 {
     Mutex::ScopedLock _lock(lock);
     bool done;
 
     if (sequence == 0) {
         if (unsolicitedContext.get() != 0) {
-            done = unsolicitedContext->handleMessage(opcode, sequence, buffer);
+            done = unsolicitedContext->handleMessage(opcode, sequence, routingKey, buffer);
             if (done)
                 unsolicitedContext->release();
         }
@@ -85,7 +85,7 @@
     map<uint32_t, SequenceContext::Ptr>::iterator iter = contextMap.find(sequence);
     if (iter != contextMap.end()) {
         if (iter->second != 0) {
-            done = iter->second->handleMessage(opcode, sequence, buffer);
+            done = iter->second->handleMessage(opcode, sequence, routingKey, buffer);
             if (done) {
                 iter->second->release();
                 contextMap.erase(iter);

Modified: qpid/trunk/qpid/cpp/src/qmf/engine/SequenceManager.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/engine/SequenceManager.h?rev=819819&r1=819818&r2=819819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/engine/SequenceManager.h (original)
+++ qpid/trunk/qpid/cpp/src/qmf/engine/SequenceManager.h Tue Sep 29 03:21:49 2009
@@ -40,7 +40,7 @@
         virtual ~SequenceContext() {}
 
         virtual void reserve() = 0;
-        virtual bool handleMessage(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer) = 0;
+        virtual bool handleMessage(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer) = 0;
         virtual void release() = 0;
     };
 
@@ -52,7 +52,7 @@
         uint32_t reserve(SequenceContext::Ptr ctx = SequenceContext::Ptr());
         void release(uint32_t sequence);
         void releaseAll();
-        void dispatch(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer);
+        void dispatch(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer);
 
     private:
         mutable qpid::sys::Mutex lock;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=819819&r1=819818&r2=819819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Sep 29 03:21:49 2009
@@ -65,7 +65,7 @@
       tagGenerator("sgen"),
       dtxSelected(false),
       authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()),
-      userID(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@')))
+      userID(getSession().getConnection().getUserId())
 {
     acl = getSession().getBroker().getAcl();
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org