You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2009/09/17 21:27:53 UTC

svn commit: r816345 [1/2] - in /qpid/trunk/qpid/cpp: bindings/qmf/ruby/ bindings/qmf/tests/ src/ src/qmf/

Author: tross
Date: Thu Sep 17 19:27:52 2009
New Revision: 816345

URL: http://svn.apache.org/viewvc?rev=816345&view=rev
Log:
QMF Console
  - Added implementation for method invocation
  - Added metaprogramming hooks in Ruby for attribute and method access
  - Refactored file structure

Added:
    qpid/trunk/qpid/cpp/src/qmf/BrokerProxyImpl.cpp
    qpid/trunk/qpid/cpp/src/qmf/BrokerProxyImpl.h
    qpid/trunk/qpid/cpp/src/qmf/ConsoleEngineImpl.cpp
      - copied, changed from r815416, qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.cpp
    qpid/trunk/qpid/cpp/src/qmf/ConsoleEngineImpl.h
Removed:
    qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.cpp
Modified:
    qpid/trunk/qpid/cpp/bindings/qmf/ruby/qmf.rb
    qpid/trunk/qpid/cpp/bindings/qmf/tests/agent_ruby.rb
    qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console.rb
    qpid/trunk/qpid/cpp/src/qmf.mk
    qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.h
    qpid/trunk/qpid/cpp/src/qmf/Object.h
    qpid/trunk/qpid/cpp/src/qmf/ObjectImpl.cpp
    qpid/trunk/qpid/cpp/src/qmf/ObjectImpl.h
    qpid/trunk/qpid/cpp/src/qmf/Value.h
    qpid/trunk/qpid/cpp/src/qmf/ValueImpl.cpp
    qpid/trunk/qpid/cpp/src/qmf/ValueImpl.h

Modified: qpid/trunk/qpid/cpp/bindings/qmf/ruby/qmf.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/bindings/qmf/ruby/qmf.rb?rev=816345&r1=816344&r2=816345&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/bindings/qmf/ruby/qmf.rb (original)
+++ qpid/trunk/qpid/cpp/bindings/qmf/ruby/qmf.rb Thu Sep 17 19:27:52 2009
@@ -189,8 +189,16 @@
   ##==============================================================================
 
   class QmfObject
+    include MonitorMixin
     attr_reader :impl, :object_class
     def initialize(cls, kwargs={})
+      super()
+      @cv = new_cond
+      @sync_count = 0
+      @sync_result = nil
+      @allow_sets = :false
+      @broker = kwargs[:broker] if kwargs.include?(:broker)
+
       if cls:
         @object_class = cls
         @impl = Qmfengine::Object.new(@object_class.impl)
@@ -264,6 +272,103 @@
       set_attr(name, get_attr(name) - by)
     end
 
+    def method_missing(name_in, *args)
+      #
+      # Convert the name to a string and determine if it represents an
+      # attribute assignment (i.e. "attr=")
+      #
+      name = name_in.to_s
+      attr_set = (name[name.length - 1] == 61)
+      name = name[0..name.length - 2] if attr_set
+      raise "Sets not permitted on this object" if attr_set && !@allow_sets
+
+      #
+      # If the name matches a property name, set or return the value of the property.
+      #
+      @object_class.properties.each do |prop|
+        if prop.name == name
+          if attr_set
+            return set_attr(name, args[0])
+          else
+            return get_attr(name)
+          end
+        end
+      end
+
+      #
+      # Do the same for statistics
+      #
+      @object_class.statistics.each do |stat|
+        if stat.name == name
+          if attr_set
+            return set_attr(name, args[0])
+          else
+            return get_attr(name)
+          end
+        end
+      end
+
+      #
+      # If we still haven't found a match for the name, check to see if
+      # it matches a method name.  If so, marshall the arguments and invoke
+      # the method.
+      #
+      @object_class.methods.each do |method|
+        if method.name == name
+          raise "Sets not permitted on methods" if attr_set
+          timeout = 30
+          synchronize do
+            @sync_count = 1
+            @impl.invokeMethod(name, _marshall(method, args), self)
+            @broker.conn.kick if @broker
+            unless @cv.wait(timeout) { @sync_count == 0 }
+              raise "Timed out waiting for response"
+            end
+          end
+
+          return @sync_result
+        end
+      end
+
+      #
+      # This name means nothing to us, pass it up the line to the parent
+      # class's handler.
+      #
+      super.method_missing(name_in, args)
+    end
+
+    def _method_result(result)
+      synchronize do
+        @sync_result = result
+        @sync_count -= 1
+        @cv.signal
+      end
+    end
+
+    #
+    # Convert a Ruby array of arguments (positional) into a Value object of type "map".
+    #
+    private
+    def _marshall(schema, args)
+      map = Qmfengine::Value.new(TYPE_MAP)
+      schema.arguments.each do |arg|
+        if arg.direction == DIR_IN || arg.direction == DIR_IN_OUT
+          map.insert(arg.name, Qmfengine::Value.new(arg.typecode))
+        end
+      end
+
+      marshalled = Arguments.new(map)
+      idx = 0
+      schema.arguments.each do |arg|
+        if arg.direction == DIR_IN || arg.direction == DIR_IN_OUT
+          marshalled[arg.name] = args[idx] unless args[idx] == nil
+          idx += 1
+        end
+      end
+
+      return marshalled.map
+    end
+
     private
     def value(name)
       val = @impl.getValue(name.to_s)
@@ -277,6 +382,7 @@
   class AgentObject < QmfObject
     def initialize(cls, kwargs={})
       super(cls, kwargs)
+      @allow_sets = :true
     end
 
     def destroy
@@ -307,9 +413,6 @@
 
     def index()
     end
-
-    def method_missing(name, *args)
-    end
   end
 
   class ObjectId
@@ -407,6 +510,22 @@
     end
   end
 
+  class MethodResponse
+    def initialize(impl)
+      puts "start copying..."
+      @impl = Qmfengine::MethodResponse.new(impl)
+      puts "done copying..."
+    end
+
+    def status
+      @impl.getStatus
+    end
+
+    def exception
+      @impl.getException
+    end
+  end
+
   ##==============================================================================
   ## QUERY
   ##==============================================================================
@@ -470,6 +589,14 @@
     def name
       @impl.getName
     end
+
+    def direction
+      @impl.getDirection
+    end
+
+    def typecode
+      @impl.getType
+    end
   end
 
   class SchemaMethod
@@ -802,7 +929,7 @@
 
   class Broker < ConnectionHandler
     include MonitorMixin
-    attr_reader :impl
+    attr_reader :impl, :conn
 
     def initialize(console, conn)
       super()
@@ -871,9 +998,12 @@
         when Qmfengine::BrokerEvent::QUERY_COMPLETE
           result = []
           for idx in 0...@event.queryResponse.getObjectCount
-            result << ConsoleObject.new(nil, :impl => @event.queryResponse.getObject(idx))
+            result << ConsoleObject.new(nil, :impl => @event.queryResponse.getObject(idx), :broker => self)
           end
           @console._get_result(result, @event.context)
+        when Qmfengine::BrokerEvent::METHOD_RESPONSE
+          obj = @event.context
+          obj._method_result(MethodResponse.new(@event.methodResponse))
         end
         @impl.popEvent
         valid = @impl.getEvent(@event)

Modified: qpid/trunk/qpid/cpp/bindings/qmf/tests/agent_ruby.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/bindings/qmf/tests/agent_ruby.rb?rev=816345&r1=816344&r2=816345&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/bindings/qmf/tests/agent_ruby.rb (original)
+++ qpid/trunk/qpid/cpp/bindings/qmf/tests/agent_ruby.rb Thu Sep 17 19:27:52 2009
@@ -92,37 +92,37 @@
       retText = "OK"
 
       if args['test'] == "big"
-        @parent.set_attr("uint64val", 0x9494949449494949)
-        @parent.set_attr("uint32val", 0xa5a55a5a)
-        @parent.set_attr("uint16val", 0xb66b)
-        @parent.set_attr("uint8val",  0xc7)
-
-        @parent.set_attr("int64val", 1000000000000000000)
-        @parent.set_attr("int32val", 1000000000)
-        @parent.set_attr("int16val", 10000)
-        @parent.set_attr("int8val",  100)
+        @parent.uint64val = 0x9494949449494949
+        @parent.uint32val = 0xa5a55a5a
+        @parent.uint16val = 0xb66b
+        @parent.uint8val  =  0xc7
+
+        @parent.int64val = 1000000000000000000
+        @parent.int32val = 1000000000
+        @parent.int16val = 10000
+        @parent.int8val  = 100
 
       elsif args['test'] == "small"
-        @parent.set_attr("uint64val", 4)
-        @parent.set_attr("uint32val", 5)
-        @parent.set_attr("uint16val", 6)
-        @parent.set_attr("uint8val",  7)
-
-        @parent.set_attr("int64val", 8)
-        @parent.set_attr("int32val", 9)
-        @parent.set_attr("int16val", 10)
-        @parent.set_attr("int8val",  11)
+        @parent.uint64val = 4
+        @parent.uint32val = 5
+        @parent.uint16val = 6
+        @parent.uint8val  = 7
+
+        @parent.int64val = 8
+        @parent.int32val = 9
+        @parent.int16val = 10
+        @parent.int8val  = 11
 
       elsif args['test'] == "negative"
-        @parent.set_attr("uint64val", 0)
-        @parent.set_attr("uint32val", 0)
-        @parent.set_attr("uint16val", 0)
-        @parent.set_attr("uint8val",  0)
-
-        @parent.set_attr("int64val", -10000000000)
-        @parent.set_attr("int32val", -100000)
-        @parent.set_attr("int16val", -1000)
-        @parent.set_attr("int8val",  -100)
+        @parent.uint64val = 0
+        @parent.uint32val = 0
+        @parent.uint16val = 0
+        @parent.uint8val  = 0
+
+        @parent.int64val = -10000000000
+        @parent.int32val = -100000
+        @parent.int16val = -1000
+        @parent.int8val  = -100
 
       else
         retCode = 1
@@ -135,7 +135,7 @@
       oid = @agent.alloc_object_id(2)
       args['child_ref'] = oid
       @child = Qmf::AgentObject.new(@model.child_class)
-      @child.set_attr("name", args.by_key("child_name"))
+      @child.name = args.by_key("child_name")
       @child.set_object_id(oid)
       @agent.method_response(context, 0, "OK", args)
 
@@ -161,18 +161,18 @@
     @agent.set_connection(@connection)
 
     @parent = Qmf::AgentObject.new(@model.parent_class)
-    @parent.set_attr("name", "Parent One")
-    @parent.set_attr("state", "OPERATIONAL")
+    @parent.name  = "Parent One"
+    @parent.state = "OPERATIONAL"
 
-    @parent.set_attr("uint64val", 0)
-    @parent.set_attr("uint32val", 0)
-    @parent.set_attr("uint16val", 0)
-    @parent.set_attr("uint8val",  0)
-
-    @parent.set_attr("int64val", 0)
-    @parent.set_attr("int32val", 0)
-    @parent.set_attr("int16val", 0)
-    @parent.set_attr("int8val",  0)
+    @parent.uint64val = 0
+    @parent.uint32val = 0
+    @parent.uint16val = 0
+    @parent.uint8val  = 0
+
+    @parent.int64val = 0
+    @parent.int32val = 0
+    @parent.int16val = 0
+    @parent.int8val  = 0
 
     @parent_oid = @agent.alloc_object_id(1)
     @parent.set_object_id(@parent_oid)

Modified: qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console.rb?rev=816345&r1=816344&r2=816345&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console.rb (original)
+++ qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console.rb Thu Sep 17 19:27:52 2009
@@ -96,9 +96,24 @@
       puts "---- Brokers ----"
       blist.each do |b|
         puts "    ---- Broker ----"
-        puts "    systemRef: #{b.get_attr('systemRef')}"
-        puts "    port     : #{b.get_attr('port')}"
-        puts "    uptime   : #{b.get_attr('uptime') / 1000000000}"
+        puts "    systemRef: #{b.systemRef}"
+        puts "    port     : #{b.port}"
+        puts "    uptime   : #{b.uptime / 1000000000}"
+
+        for rep in 0...0
+          puts "    Pinging..."
+          ret = b.echo(45, 'text string')
+          puts "        ret=#{ret}"
+        end
+      end
+      puts "----"
+
+      qlist = @qmfc.get_objects(Qmf::Query.new(:package => "org.apache.qpid.broker",
+                                               :class => "queue"))
+      puts "---- Queues ----"
+      qlist.each do |q|
+        puts "    ---- Queue ----"
+        puts "    name     : #{q.name}"
       end
       puts "----"
       sleep(5)

Modified: qpid/trunk/qpid/cpp/src/qmf.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf.mk?rev=816345&r1=816344&r2=816345&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf.mk (original)
+++ qpid/trunk/qpid/cpp/src/qmf.mk Thu Sep 17 19:27:52 2009
@@ -35,9 +35,12 @@
   ../include/qmf/AgentObject.h
 
 libqmfcommon_la_SOURCES =			\
+  qmf/BrokerProxyImpl.cpp			\
+  qmf/BrokerProxyImpl.h				\
   qmf/ConnectionSettingsImpl.cpp		\
   qmf/ConnectionSettingsImpl.h			\
-  qmf/ConsoleEngine.cpp				\
+  qmf/ConsoleEngineImpl.cpp			\
+  qmf/ConsoleEngineImpl.h			\
   qmf/ConsoleEngine.h				\
   qmf/Event.h					\
   qmf/Message.h					\

Added: qpid/trunk/qpid/cpp/src/qmf/BrokerProxyImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/BrokerProxyImpl.cpp?rev=816345&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/BrokerProxyImpl.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qmf/BrokerProxyImpl.cpp Thu Sep 17 19:27:52 2009
@@ -0,0 +1,663 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/BrokerProxyImpl.h"
+#include "qmf/ConsoleEngineImpl.h"
+#include "qmf/Protocol.h"
+#include "qpid/Address.h"
+#include "qpid/sys/SystemInfo.h"
+#include <qpid/log/Statement.h>
+#include <string.h>
+#include <iostream>
+#include <fstream>
+
+using namespace std;
+using namespace qmf;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+namespace {
+    const char* QMF_EXCHANGE     = "qpid.management";
+    const char* DIR_EXCHANGE     = "amq.direct";
+    const char* BROKER_KEY       = "broker";
+    const char* BROKER_PACKAGE   = "org.apache.qpid.broker";
+    const char* AGENT_CLASS      = "agent";
+    const char* BROKER_AGENT_KEY = "agent.1.0";
+}
+
+const Object* QueryResponseImpl::getObject(uint32_t idx) const
+{
+    vector<ObjectImpl::Ptr>::const_iterator iter = results.begin();
+
+    while (idx > 0) {
+        if (iter == results.end())
+            return 0;
+        iter++;
+        idx--;
+    }
+
+    return (*iter)->envelope;
+}
+
+#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
+
+BrokerEvent BrokerEventImpl::copy()
+{
+    BrokerEvent item;
+
+    ::memset(&item, 0, sizeof(BrokerEvent));
+    item.kind = kind;
+
+    STRING_REF(name);
+    STRING_REF(exchange);
+    STRING_REF(bindingKey);
+    item.context = context;
+    item.queryResponse = queryResponse.get() ? queryResponse->envelope : 0;
+    item.methodResponse = methodResponse.get() ? methodResponse->envelope : 0;
+
+    return item;
+}
+
+BrokerProxyImpl::BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console) :
+    envelope(e), console(_console.impl)
+{
+    stringstream qn;
+    qpid::TcpAddress addr;
+
+    SystemInfo::getLocalHostname(addr);
+    qn << "qmfc-" << SystemInfo::getProcessName() << "-" << addr << "-" << SystemInfo::getProcessId();
+    queueName = qn.str();
+
+    seqMgr.setUnsolicitedContext(SequenceContext::Ptr(new StaticContext(*this)));
+}
+
+void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/)
+{
+    Mutex::ScopedLock _lock(lock);
+    agentList.clear();
+    eventQueue.clear();
+    xmtQueue.clear();
+    eventQueue.push_back(eventDeclareQueue(queueName));
+    eventQueue.push_back(eventBind(DIR_EXCHANGE, queueName, queueName));
+    eventQueue.push_back(eventSetupComplete());
+
+    // TODO: Store session handle
+}
+
+void BrokerProxyImpl::sessionClosed()
+{
+    Mutex::ScopedLock _lock(lock);
+    agentList.clear();
+    eventQueue.clear();
+    xmtQueue.clear();
+}
+
+void BrokerProxyImpl::startProtocol()
+{
+    Mutex::ScopedLock _lock(lock);
+    char rawbuffer[512];
+    Buffer buffer(rawbuffer, 512);
+
+    agentList.push_back(AgentProxyImpl::Ptr(new AgentProxyImpl(console, this, 0, "Agent embedded in broker")));
+
+    requestsOutstanding = 1;
+    topicBound = false;
+    uint32_t sequence(seqMgr.reserve());
+    Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence);
+    sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
+    QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence);
+}
+
+void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
+{
+    uint32_t length = buf.getPosition();
+    MessageImpl::Ptr message(new MessageImpl);
+
+    buf.reset();
+    buf.getRawData(message->body, length);
+    message->destination   = destination;
+    message->routingKey    = routingKey;
+    message->replyExchange = DIR_EXCHANGE;
+    message->replyKey      = queueName;
+
+    xmtQueue.push_back(message);
+}
+
+void BrokerProxyImpl::handleRcvMessage(Message& message)
+{
+    Buffer inBuffer(message.body, message.length);
+    uint8_t opcode;
+    uint32_t sequence;
+
+    while (Protocol::checkHeader(inBuffer, &opcode, &sequence))
+        seqMgr.dispatch(opcode, sequence, inBuffer);
+}
+
+bool BrokerProxyImpl::getXmtMessage(Message& item) const
+{
+    Mutex::ScopedLock _lock(lock);
+    if (xmtQueue.empty())
+        return false;
+    item =  xmtQueue.front()->copy();
+    return true;
+}
+
+void BrokerProxyImpl::popXmt()
+{
+    Mutex::ScopedLock _lock(lock);
+    if (!xmtQueue.empty())
+        xmtQueue.pop_front();
+}
+
+bool BrokerProxyImpl::getEvent(BrokerEvent& event) const
+{
+    Mutex::ScopedLock _lock(lock);
+    if (eventQueue.empty())
+        return false;
+    event = eventQueue.front()->copy();
+    return true;
+}
+
+void BrokerProxyImpl::popEvent()
+{
+    Mutex::ScopedLock _lock(lock);
+    if (!eventQueue.empty())
+        eventQueue.pop_front();
+}
+
+uint32_t BrokerProxyImpl::agentCount() const
+{
+    Mutex::ScopedLock _lock(lock);
+    return agentList.size();
+}
+
+const AgentProxy* BrokerProxyImpl::getAgent(uint32_t idx) const
+{
+    Mutex::ScopedLock _lock(lock);
+    for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin();
+         iter != agentList.end(); iter++)
+        if (idx-- == 0)
+            return (*iter)->envelope;
+    return 0;
+}
+
+void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentProxy* agent)
+{
+    SequenceContext::Ptr queryContext(new QueryContext(*this, context));
+    Mutex::ScopedLock _lock(lock);
+    if (agent != 0) {
+        sendGetRequestLH(queryContext, query, agent->impl);
+    } else {
+        // TODO (optimization) only send queries to agents that have the requested class+package
+        for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin();
+             iter != agentList.end(); iter++) {
+            sendGetRequestLH(queryContext, query, (*iter).get());
+        }
+    }
+}
+
+void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent)
+{
+    stringstream key;
+    Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+    uint32_t sequence(seqMgr.reserve(queryContext));
+
+    Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
+    query.impl->encode(outBuffer);
+    key << "agent.1." << agent->agentBank;
+    sendBufferLH(outBuffer, QMF_EXCHANGE, key.str());
+    QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << key.str());
+}
+
+string BrokerProxyImpl::encodeMethodArguments(const SchemaMethod* schema, const Value* argmap, Buffer& buffer)
+{
+    int argCount = schema->getArgumentCount();
+
+    if (argmap == 0 || !argmap->isMap())
+        return string("Arguments must be in a map value");
+
+    for (int aIdx = 0; aIdx < argCount; aIdx++) {
+        const SchemaArgument* arg(schema->getArgument(aIdx));
+        if (arg->getDirection() == DIR_IN || arg->getDirection() == DIR_IN_OUT) {
+            if (argmap->keyInMap(arg->getName())) {
+                const Value* argVal(argmap->byKey(arg->getName()));
+                if (argVal->getType() != arg->getType())
+                    return string("Argument is the wrong type: ") + arg->getName();
+                argVal->impl->encode(buffer);
+            } else {
+                Value defaultValue(arg->getType());
+                defaultValue.impl->encode(buffer);
+            }
+        }
+    }
+
+    return string();
+}
+
+void BrokerProxyImpl::sendMethodRequest(ObjectIdImpl* oid, const SchemaObjectClass* cls,
+                                        const string& methodName, const Value* args, void* userContext)
+{
+    int methodCount = cls->getMethodCount();
+    int idx;
+    for (idx = 0; idx < methodCount; idx++) {
+        const SchemaMethod* method = cls->getMethod(idx);
+        if (string(method->getName()) == methodName) {
+            Mutex::ScopedLock _lock(lock);
+            SequenceContext::Ptr methodContext(new MethodContext(*this, userContext, method->impl));
+            stringstream key;
+            Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+            uint32_t sequence(seqMgr.reserve(methodContext));
+
+            Protocol::encodeHeader(outBuffer, Protocol::OP_METHOD_REQUEST, sequence);
+            oid->encode(outBuffer);
+            cls->getClassKey()->impl->encode(outBuffer);
+            outBuffer.putShortString(methodName);
+
+            string argErrorString = encodeMethodArguments(method, args, outBuffer);
+            if (argErrorString.empty()) {
+                key << "agent.1." << oid->getAgentBank();
+                sendBufferLH(outBuffer, QMF_EXCHANGE, key.str());
+                QPID_LOG(trace, "SENT MethodRequest seq=" << sequence << " method=" << methodName << " key=" << key.str());
+            } else {
+                MethodResponseImpl::Ptr argError(new MethodResponseImpl(1, argErrorString));
+                eventQueue.push_back(eventMethodResponse(userContext, argError));
+            }
+            return;
+        }
+    }
+
+    MethodResponseImpl::Ptr error(new MethodResponseImpl(1, string("Unknown method: ") + methodName));
+    Mutex::ScopedLock _lock(lock);
+    eventQueue.push_back(eventMethodResponse(userContext, error));
+}
+
+void BrokerProxyImpl::addBinding(const string& exchange, const string& key)
+{
+    Mutex::ScopedLock _lock(lock);
+    eventQueue.push_back(eventBind(exchange, queueName, key));
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventDeclareQueue(const string& queueName)
+{
+    BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::DECLARE_QUEUE));
+    event->name = queueName;
+    return event;
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventBind(const string& exchange, const string& queue, const string& key)
+{
+    BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::BIND));
+    event->name       = queue;
+    event->exchange   = exchange;
+    event->bindingKey = key;
+
+    return event;
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventSetupComplete()
+{
+    BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::SETUP_COMPLETE));
+    return event;
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventStable()
+{
+    BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::STABLE));
+    return event;
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventQueryComplete(void* context, QueryResponseImpl::Ptr response)
+{
+    BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::QUERY_COMPLETE));
+    event->context = context;
+    event->queryResponse = response;
+    return event;
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventMethodResponse(void* context, MethodResponseImpl::Ptr response)
+{
+    BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::METHOD_RESPONSE));
+    event->context = context;
+    event->methodResponse = response;
+    return event;
+}
+
+void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq)
+{
+    brokerId.decode(inBuffer);
+    QPID_LOG(trace, "RCVD BrokerResponse seq=" << seq << " brokerId=" << brokerId);
+    Mutex::ScopedLock _lock(lock);
+    Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+    uint32_t sequence(seqMgr.reserve());
+    incOutstandingLH();
+    Protocol::encodeHeader(outBuffer, Protocol::OP_PACKAGE_REQUEST, sequence);
+    sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
+    QPID_LOG(trace, "SENT PackageRequest seq=" << sequence);
+}
+
+void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq)
+{
+    string package;
+
+    inBuffer.getShortString(package);
+    QPID_LOG(trace, "RCVD PackageIndication seq=" << seq << " package=" << package);
+    console->learnPackage(package);
+
+    Mutex::ScopedLock _lock(lock);
+    Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+    uint32_t sequence(seqMgr.reserve());
+    incOutstandingLH();
+    Protocol::encodeHeader(outBuffer, Protocol::OP_CLASS_QUERY, sequence);
+    outBuffer.putShortString(package);
+    sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
+    QPID_LOG(trace, "SENT ClassQuery seq=" << sequence << " package=" << package);
+}
+
+void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq)
+{
+    string text;
+    uint32_t code = inBuffer.getLong();
+    inBuffer.getShortString(text);
+    QPID_LOG(trace, "RCVD CommandComplete seq=" << seq << " code=" << code << " text=" << text);
+}
+
+void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq)
+{
+    uint8_t kind = inBuffer.getOctet();
+    SchemaClassKeyImpl classKey(inBuffer);
+
+    QPID_LOG(trace, "RCVD ClassIndication seq=" << seq << " kind=" << (int) kind << " key=" << classKey.str());
+
+    if (!console->haveClass(classKey)) {
+        Mutex::ScopedLock _lock(lock);
+        incOutstandingLH();
+        Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+        uint32_t sequence(seqMgr.reserve());
+        Protocol::encodeHeader(outBuffer, Protocol::OP_SCHEMA_REQUEST, sequence);
+        classKey.encode(outBuffer);
+        sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
+        QPID_LOG(trace, "SENT SchemaRequest seq=" << sequence <<" key=" << classKey.str());
+    }
+}
+
+MethodResponseImpl::Ptr BrokerProxyImpl::handleMethodResponse(Buffer& inBuffer, uint32_t seq, SchemaMethodImpl* schema)
+{
+    MethodResponseImpl::Ptr response(new MethodResponseImpl(inBuffer, schema));
+
+    QPID_LOG(trace, "RCVD MethodResponse seq=" << seq << " status=" << response->getStatus() << " text=" <<
+             response->getException()->asString());
+
+    return response;
+}
+
+void BrokerProxyImpl::handleHeartbeatIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
+{
+    // TODO
+}
+
+void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
+{
+    // TODO
+}
+
+void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq)
+{
+    SchemaObjectClassImpl::Ptr oClassPtr;
+    SchemaEventClassImpl::Ptr eClassPtr;
+    uint8_t kind = inBuffer.getOctet();
+    const SchemaClassKeyImpl* key;
+    if (kind == CLASS_OBJECT) {
+        oClassPtr.reset(new SchemaObjectClassImpl(inBuffer));
+        console->learnClass(oClassPtr);
+        key = oClassPtr->getClassKey()->impl;
+        QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->str());
+
+        //
+        // If we have just learned about the org.apache.qpid.broker:agent class, send a get
+        // request for the current list of agents so we can have it on-hand before we declare
+        // this session "stable".
+        //
+        if (key->getClassName() == AGENT_CLASS && key->getPackageName() == BROKER_PACKAGE) {
+            Mutex::ScopedLock _lock(lock);
+            incOutstandingLH();
+            Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+            uint32_t sequence(seqMgr.reserve());
+            Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
+            FieldTable ft;
+            ft.setString("_class", AGENT_CLASS);
+            ft.setString("_package", BROKER_PACKAGE);
+            ft.encode(outBuffer);
+            sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_AGENT_KEY);
+            QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << BROKER_AGENT_KEY);
+        }
+    } else if (kind == CLASS_EVENT) {
+        eClassPtr.reset(new SchemaEventClassImpl(inBuffer));
+        console->learnClass(eClassPtr);
+        key = eClassPtr->getClassKey()->impl;
+        QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=event key=" << key->str());
+    }
+    else {
+        QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind);
+    }
+}
+
+ObjectImpl::Ptr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat)
+{
+    SchemaClassKeyImpl classKey(inBuffer);
+    QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey.str());
+
+    SchemaObjectClassImpl::Ptr schema = console->getSchema(classKey);
+    if (schema.get() == 0) {
+        QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey.str());
+        return ObjectImpl::Ptr();
+    }
+
+    return ObjectImpl::Ptr(new ObjectImpl(schema->envelope, this, inBuffer, prop, stat, true));
+}
+
+void BrokerProxyImpl::incOutstandingLH()
+{
+    requestsOutstanding++;
+}
+
+void BrokerProxyImpl::decOutstanding()
+{
+    Mutex::ScopedLock _lock(lock);
+    requestsOutstanding--;
+    if (requestsOutstanding == 0 && !topicBound) {
+        topicBound = true;
+        for (vector<pair<string, string> >::const_iterator iter = console->bindingList.begin();
+             iter != console->bindingList.end(); iter++) {
+            string exchange(iter->first.empty() ? QMF_EXCHANGE : iter->first);
+            string key(iter->second);
+            eventQueue.push_back(eventBind(exchange, queueName, key));
+        }
+        eventQueue.push_back(eventStable());
+    }
+}
+
+MethodResponseImpl::MethodResponseImpl(const MethodResponseImpl& from) :
+    envelope(from.envelope), // !!!! TODO !!!! this is not right
+    status(from.status), schema(from.schema),
+    exception(from.exception.get() ? new Value(*from.exception) : 0),
+    arguments(from.arguments.get() ? new Value(*from.arguments) : 0)
+{
+}
+
+MethodResponseImpl::MethodResponseImpl(Buffer& buf, SchemaMethodImpl* s) :
+    envelope(new MethodResponse(this)), schema(s)
+{
+    string text;
+
+    status = buf.getLong();
+    buf.getMediumString(text);
+    exception.reset(new Value(TYPE_LSTR));
+    exception->setString(text.c_str());
+
+    if (status != 0)
+        return;
+
+    arguments.reset(new Value(TYPE_MAP));
+    int argCount(schema->getArgumentCount());
+    for (int idx = 0; idx < argCount; idx++) {
+        const SchemaArgument* arg = schema->getArgument(idx);
+        if (arg->getDirection() == DIR_OUT || arg->getDirection() == DIR_IN_OUT) {
+            ValueImpl* value(new ValueImpl(arg->getType(), buf));
+            arguments->insert(arg->getName(), value->envelope);
+        }
+    }
+}
+
+MethodResponseImpl::MethodResponseImpl(uint32_t s, const string& text) : envelope(new MethodResponse(this)), schema(0)
+{
+    status = s;
+    exception.reset(new Value(TYPE_LSTR));
+    exception->setString(text.c_str());
+}
+
+bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
+{
+    bool completeContext = false;
+    if      (opcode == Protocol::OP_BROKER_RESPONSE) {
+        broker.handleBrokerResponse(buffer, sequence);
+        completeContext = true;
+    }
+    else if (opcode == Protocol::OP_COMMAND_COMPLETE) {
+        broker.handleCommandComplete(buffer, sequence);
+        completeContext = true;
+    }
+    else if (opcode == Protocol::OP_SCHEMA_RESPONSE) {
+        broker.handleSchemaResponse(buffer, sequence);
+        completeContext = true;
+    }
+    else if (opcode == Protocol::OP_PACKAGE_INDICATION)
+        broker.handlePackageIndication(buffer, sequence);
+    else if (opcode == Protocol::OP_CLASS_INDICATION)
+        broker.handleClassIndication(buffer, sequence);
+    else if (opcode == Protocol::OP_HEARTBEAT_INDICATION)
+        broker.handleHeartbeatIndication(buffer, sequence);
+    else if (opcode == Protocol::OP_EVENT_INDICATION)
+        broker.handleEventIndication(buffer, sequence);
+    else if (opcode == Protocol::OP_PROPERTY_INDICATION)
+        broker.handleObjectIndication(buffer, sequence, true,  false);
+    else if (opcode == Protocol::OP_STATISTIC_INDICATION)
+        broker.handleObjectIndication(buffer, sequence, false, true);
+    else if (opcode == Protocol::OP_OBJECT_INDICATION)
+        broker.handleObjectIndication(buffer, sequence, true,  true);
+    else {
+        QPID_LOG(trace, "StaticContext::handleMessage invalid opcode: " << opcode);
+        completeContext = true;
+    }
+
+    return completeContext;
+}
+
+void QueryContext::reserve()
+{
+    Mutex::ScopedLock _lock(lock);
+    requestsOutstanding++;
+}
+
+void QueryContext::release()
+{
+    {
+        Mutex::ScopedLock _lock(lock);
+        if (--requestsOutstanding > 0)
+            return;
+    }
+
+    Mutex::ScopedLock _block(broker.lock);
+    broker.eventQueue.push_back(broker.eventQueryComplete(userContext, queryResponse));
+}
+
+bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
+{
+    bool completeContext = false;
+    ObjectImpl::Ptr object;
+
+    if      (opcode == Protocol::OP_COMMAND_COMPLETE) {
+        broker.handleCommandComplete(buffer, sequence);
+        completeContext = true;
+    }
+    else if (opcode == Protocol::OP_OBJECT_INDICATION) {
+        object = broker.handleObjectIndication(buffer, sequence, true,  true);
+        if (object.get() != 0)
+            queryResponse->results.push_back(object);
+    }
+    else {
+        QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode);
+        completeContext = true;
+    }
+
+    return completeContext;
+}
+
+void MethodContext::release()
+{
+    Mutex::ScopedLock _block(broker.lock);
+    broker.eventQueue.push_back(broker.eventMethodResponse(userContext, methodResponse));
+}
+
+bool MethodContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
+{
+    if (opcode == Protocol::OP_METHOD_RESPONSE)
+        methodResponse = broker.handleMethodResponse(buffer, sequence, schema);
+    else
+        QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode);
+
+    return true;
+}
+
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+AgentProxy::AgentProxy(AgentProxyImpl* i) : impl(i) {}
+AgentProxy::~AgentProxy() { delete impl; }
+const char* AgentProxy::getLabel() const { return impl->getLabel().c_str(); }
+
+BrokerProxy::BrokerProxy(ConsoleEngine& console) : impl(new BrokerProxyImpl(this, console)) {}
+BrokerProxy::~BrokerProxy() { delete impl; }
+void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); }
+void BrokerProxy::sessionClosed() { impl->sessionClosed(); }
+void BrokerProxy::startProtocol() { impl->startProtocol(); }
+void BrokerProxy::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); }
+bool BrokerProxy::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); }
+void BrokerProxy::popXmt() { impl->popXmt(); }
+bool BrokerProxy::getEvent(BrokerEvent& event) const { return impl->getEvent(event); }
+void BrokerProxy::popEvent() { impl->popEvent(); }
+uint32_t BrokerProxy::agentCount() const { return impl->agentCount(); }
+const AgentProxy* BrokerProxy::getAgent(uint32_t idx) const { return impl->getAgent(idx); }
+void BrokerProxy::sendQuery(const Query& query, void* context, const AgentProxy* agent) { impl->sendQuery(query, context, agent); }
+
+MethodResponse::MethodResponse(const MethodResponse& from) : impl(new MethodResponseImpl(*(from.impl))) {}
+MethodResponse::MethodResponse(MethodResponseImpl* i) : impl(i) {}
+MethodResponse::~MethodResponse() {}
+uint32_t MethodResponse::getStatus() const { return impl->getStatus(); }
+const Value* MethodResponse::getException() const { return impl->getException(); }
+const Value* MethodResponse::getArgs() const { return impl->getArgs(); }
+
+QueryResponse::QueryResponse(QueryResponseImpl* i) : impl(i) {}
+QueryResponse::~QueryResponse() {}
+uint32_t QueryResponse::getStatus() const { return impl->getStatus(); }
+const Value* QueryResponse::getException() const { return impl->getException(); }
+uint32_t QueryResponse::getObjectCount() const { return impl->getObjectCount(); }
+const Object* QueryResponse::getObject(uint32_t idx) const { return impl->getObject(idx); }
+

Added: qpid/trunk/qpid/cpp/src/qmf/BrokerProxyImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/BrokerProxyImpl.h?rev=816345&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/BrokerProxyImpl.h (added)
+++ qpid/trunk/qpid/cpp/src/qmf/BrokerProxyImpl.h Thu Sep 17 19:27:52 2009
@@ -0,0 +1,222 @@
+#ifndef _QmfBrokerProxyImpl_
+#define _QmfBrokerProxyImpl_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/ConsoleEngine.h"
+#include "qmf/ObjectImpl.h"
+#include "qmf/SchemaImpl.h"
+#include "qmf/ValueImpl.h"
+#include "qmf/QueryImpl.h"
+#include "qmf/SequenceManager.h"
+#include "qmf/MessageImpl.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/sys/Mutex.h"
+#include "boost/shared_ptr.hpp"
+#include <memory>
+#include <string>
+#include <deque>
+#include <map>
+#include <vector>
+
+namespace qmf {
+
+    struct MethodResponseImpl {
+        typedef boost::shared_ptr<MethodResponseImpl> Ptr;
+        MethodResponse* envelope;
+        uint32_t status;
+        SchemaMethodImpl* schema;
+        boost::shared_ptr<Value> exception;
+        boost::shared_ptr<Value> arguments;
+
+        MethodResponseImpl(const MethodResponseImpl& from);
+        MethodResponseImpl(qpid::framing::Buffer& buf, SchemaMethodImpl* schema);
+        MethodResponseImpl(uint32_t status, const std::string& text);
+        ~MethodResponseImpl() { delete envelope; }
+        uint32_t getStatus() const { return status; }
+        const Value* getException() const { return exception.get(); }
+        const Value* getArgs() const { return arguments.get(); }
+    };
+
+    struct QueryResponseImpl {
+        typedef boost::shared_ptr<QueryResponseImpl> Ptr;
+        QueryResponse *envelope;
+        uint32_t status;
+        std::auto_ptr<Value> exception;
+        std::vector<ObjectImpl::Ptr> results;
+
+        QueryResponseImpl() : envelope(new QueryResponse(this)), status(0) {}
+        ~QueryResponseImpl() { delete envelope; }
+        uint32_t getStatus() const { return status; }
+        const Value* getException() const { return exception.get(); }
+        uint32_t getObjectCount() const { return results.size(); }
+        const Object* getObject(uint32_t idx) const;
+    };
+
+    struct BrokerEventImpl {
+        typedef boost::shared_ptr<BrokerEventImpl> Ptr;
+        BrokerEvent::EventKind kind;
+        std::string name;
+        std::string exchange;
+        std::string bindingKey;
+        void* context;
+        QueryResponseImpl::Ptr queryResponse;
+        MethodResponseImpl::Ptr methodResponse;
+
+        BrokerEventImpl(BrokerEvent::EventKind k) : kind(k), context(0) {}
+        ~BrokerEventImpl() {}
+        BrokerEvent copy();
+    };
+
+    struct AgentProxyImpl {
+        typedef boost::shared_ptr<AgentProxyImpl> Ptr;
+        AgentProxy* envelope;
+        ConsoleEngineImpl* console;
+        BrokerProxyImpl* broker;
+        uint32_t agentBank;
+        std::string label;
+
+        AgentProxyImpl(ConsoleEngineImpl* c, BrokerProxyImpl* b, uint32_t ab, const std::string& l) :
+            envelope(new AgentProxy(this)), console(c), broker(b), agentBank(ab), label(l) {}
+        ~AgentProxyImpl() {}
+        const std::string& getLabel() const { return label; }
+    };
+
+    class BrokerProxyImpl {
+    public:
+        typedef boost::shared_ptr<BrokerProxyImpl> Ptr;
+
+        BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console);
+        ~BrokerProxyImpl() {}
+
+        void sessionOpened(SessionHandle& sh);
+        void sessionClosed();
+        void startProtocol();
+
+        void sendBufferLH(qpid::framing::Buffer& buf, const std::string& destination, const std::string& routingKey);
+        void handleRcvMessage(Message& message);
+        bool getXmtMessage(Message& item) const;
+        void popXmt();
+
+        bool getEvent(BrokerEvent& event) const;
+        void popEvent();
+
+        uint32_t agentCount() const;
+        const AgentProxy* getAgent(uint32_t idx) const;
+        void sendQuery(const Query& query, void* context, const AgentProxy* agent);
+        void sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent);
+        std::string encodeMethodArguments(const SchemaMethod* schema, const Value* args, qpid::framing::Buffer& buffer);
+        void sendMethodRequest(ObjectIdImpl* oid, const SchemaObjectClass* cls, const std::string& method, const Value* args, void* context);
+
+        void addBinding(const std::string& exchange, const std::string& key);
+        void staticRelease() { decOutstanding(); }
+
+    private:
+        friend class StaticContext;
+        friend class QueryContext;
+        friend class MethodContext;
+        mutable qpid::sys::Mutex lock;
+        BrokerProxy* envelope;
+        ConsoleEngineImpl* console;
+        std::string queueName;
+        qpid::framing::Uuid brokerId;
+        SequenceManager seqMgr;
+        uint32_t requestsOutstanding;
+        bool topicBound;
+        std::vector<AgentProxyImpl::Ptr> agentList;
+        std::deque<MessageImpl::Ptr> xmtQueue;
+        std::deque<BrokerEventImpl::Ptr> eventQueue;
+
+#       define MA_BUFFER_SIZE 65536
+        char outputBuffer[MA_BUFFER_SIZE];
+
+        BrokerEventImpl::Ptr eventDeclareQueue(const std::string& queueName);
+        BrokerEventImpl::Ptr eventBind(const std::string& exchange, const std::string& queue, const std::string& key);
+        BrokerEventImpl::Ptr eventSetupComplete();
+        BrokerEventImpl::Ptr eventStable();
+        BrokerEventImpl::Ptr eventQueryComplete(void* context, QueryResponseImpl::Ptr response);
+        BrokerEventImpl::Ptr eventMethodResponse(void* context, MethodResponseImpl::Ptr response);
+
+        void handleBrokerResponse(qpid::framing::Buffer& inBuffer, uint32_t seq);
+        void handlePackageIndication(qpid::framing::Buffer& inBuffer, uint32_t seq);
+        void handleCommandComplete(qpid::framing::Buffer& inBuffer, uint32_t seq);
+        void handleClassIndication(qpid::framing::Buffer& inBuffer, uint32_t seq);
+        MethodResponseImpl::Ptr handleMethodResponse(qpid::framing::Buffer& inBuffer, uint32_t seq, SchemaMethodImpl* schema);
+        void handleHeartbeatIndication(qpid::framing::Buffer& inBuffer, uint32_t seq);
+        void handleEventIndication(qpid::framing::Buffer& inBuffer, uint32_t seq);
+        void handleSchemaResponse(qpid::framing::Buffer& inBuffer, uint32_t seq);
+        ObjectImpl::Ptr handleObjectIndication(qpid::framing::Buffer& inBuffer, uint32_t seq, bool prop, bool stat);
+        void incOutstandingLH();
+        void decOutstanding();
+    };
+
+    //
+    // StaticContext is used to handle:
+    //
+    //  1) Responses to console-level requests (for schema info, etc.)
+    //  2) Unsolicited messages from agents (events, published updates, etc.)
+    //
+    struct StaticContext : public SequenceContext {
+        StaticContext(BrokerProxyImpl& b) : broker(b) {}
+        virtual ~StaticContext() {}
+        void reserve() {}
+        void release() { broker.staticRelease(); }
+        bool handleMessage(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer);
+        BrokerProxyImpl& broker;
+    };
+
+    //
+    // QueryContext is used to track and handle responses associated with a single Get Query
+    //
+    struct QueryContext : public SequenceContext {
+        QueryContext(BrokerProxyImpl& b, void* u) :
+            broker(b), userContext(u), requestsOutstanding(0), queryResponse(new QueryResponseImpl()) {}
+        virtual ~QueryContext() {}
+        void reserve();
+        void release();
+        bool handleMessage(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer);
+
+        mutable qpid::sys::Mutex lock;
+        BrokerProxyImpl& broker;
+        void* userContext;
+        uint32_t requestsOutstanding;
+        QueryResponseImpl::Ptr queryResponse;
+    };
+
+    struct MethodContext : public SequenceContext {
+        MethodContext(BrokerProxyImpl& b, void* u, SchemaMethodImpl* s) : broker(b), userContext(u), schema(s) {}
+        virtual ~MethodContext() {}
+        void reserve() {}
+        void release();
+        bool handleMessage(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer);
+
+        BrokerProxyImpl& broker;
+        void* userContext;
+        SchemaMethodImpl* schema;
+        MethodResponseImpl::Ptr methodResponse;
+    };
+
+
+
+}
+
+#endif
+

Modified: qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.h?rev=816345&r1=816344&r2=816345&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.h (original)
+++ qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.h Thu Sep 17 19:27:52 2009
@@ -46,6 +46,7 @@
     class MethodResponse {
     public:
         MethodResponse(MethodResponseImpl* impl);
+        MethodResponse(const MethodResponse& from);
         ~MethodResponse();
         uint32_t getStatus() const;
         const Value* getException() const;
@@ -84,8 +85,7 @@
             NEW_CLASS       = 4,
             OBJECT_UPDATE   = 5,
             EVENT_RECEIVED  = 7,
-            AGENT_HEARTBEAT = 8,
-            METHOD_RESPONSE = 9
+            AGENT_HEARTBEAT = 8
         };
 
         EventKind       kind;
@@ -96,8 +96,6 @@
         void*           context;        // (OBJECT_UPDATE)
         Event*          event;          // (EVENT_RECEIVED)
         uint64_t        timestamp;      // (AGENT_HEARTBEAT)
-        uint32_t        methodHandle;   // (METHOD_RESPONSE)
-        MethodResponse* methodResponse; // (METHOD_RESPONSE)
         QueryResponse*  queryResponse;  // (QUERY_COMPLETE)
     };
 
@@ -113,15 +111,17 @@
             UNBIND          = 14,
             SETUP_COMPLETE  = 15,
             STABLE          = 16,
-            QUERY_COMPLETE  = 17
+            QUERY_COMPLETE  = 17,
+            METHOD_RESPONSE = 18
         };
 
         EventKind kind;
-        char*          name;          // ([DECLARE|DELETE]_QUEUE, [UN]BIND)
-        char*          exchange;      // ([UN]BIND)
-        char*          bindingKey;    // ([UN]BIND)
-        void*          context;       // (QUERY_COMPLETE)
-        QueryResponse* queryResponse; // (QUERY_COMPLETE)
+        char*           name;           // ([DECLARE|DELETE]_QUEUE, [UN]BIND)
+        char*           exchange;       // ([UN]BIND)
+        char*           bindingKey;     // ([UN]BIND)
+        void*           context;        // (QUERY_COMPLETE, METHOD_RESPONSE)
+        QueryResponse*  queryResponse;  // (QUERY_COMPLETE)
+        MethodResponse* methodResponse; // (METHOD_RESPONSE)
     };
 
     /**

Copied: qpid/trunk/qpid/cpp/src/qmf/ConsoleEngineImpl.cpp (from r815416, qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.cpp)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/ConsoleEngineImpl.cpp?p2=qpid/trunk/qpid/cpp/src/qmf/ConsoleEngineImpl.cpp&p1=qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.cpp&r1=815416&r2=816345&rev=816345&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/ConsoleEngineImpl.cpp Thu Sep 17 19:27:52 2009
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-#include "qmf/ConsoleEngine.h"
+#include "qmf/ConsoleEngineImpl.h"
 #include "qmf/MessageImpl.h"
 #include "qmf/SchemaImpl.h"
 #include "qmf/Typecode.h"
@@ -27,278 +27,25 @@
 #include "qmf/ValueImpl.h"
 #include "qmf/Protocol.h"
 #include "qmf/SequenceManager.h"
+#include "qmf/BrokerProxyImpl.h"
 #include <qpid/framing/Buffer.h>
 #include <qpid/framing/Uuid.h>
 #include <qpid/framing/FieldTable.h>
 #include <qpid/framing/FieldValue.h>
-#include <qpid/sys/Mutex.h>
 #include <qpid/log/Statement.h>
 #include <qpid/sys/Time.h>
 #include <qpid/sys/SystemInfo.h>
 #include <string.h>
-#include <string>
-#include <deque>
-#include <map>
-#include <vector>
 #include <iostream>
 #include <fstream>
-#include <boost/shared_ptr.hpp>
 
 using namespace std;
 using namespace qmf;
 using namespace qpid::framing;
 using namespace qpid::sys;
 
-namespace qmf {
-
-    struct MethodResponseImpl {
-        typedef boost::shared_ptr<MethodResponseImpl> Ptr;
-        MethodResponse* envelope;
-        uint32_t status;
-        auto_ptr<Value> exception;
-        auto_ptr<Value> arguments;
-
-        MethodResponseImpl(Buffer& buf);
-        ~MethodResponseImpl() { delete envelope; }
-        uint32_t getStatus() const { return status; }
-        const Value* getException() const { return exception.get(); }
-        const Value* getArgs() const { return arguments.get(); }
-    };
-
-    struct QueryResponseImpl {
-        typedef boost::shared_ptr<QueryResponseImpl> Ptr;
-        QueryResponse *envelope;
-        uint32_t status;
-        auto_ptr<Value> exception;
-        vector<ObjectImpl::Ptr> results;
-
-        QueryResponseImpl() : envelope(new QueryResponse(this)), status(0) {}
-        ~QueryResponseImpl() { delete envelope; }
-        uint32_t getStatus() const { return status; }
-        const Value* getException() const { return exception.get(); }
-        uint32_t getObjectCount() const { return results.size(); }
-        const Object* getObject(uint32_t idx) const;
-    };
-
-    struct ConsoleEventImpl {
-        typedef boost::shared_ptr<ConsoleEventImpl> Ptr;
-        ConsoleEvent::EventKind kind;
-        boost::shared_ptr<AgentProxyImpl> agent;
-        string name;
-        boost::shared_ptr<SchemaClassKey> classKey;
-        Object* object;
-        void* context;
-        Event* event;
-        uint64_t timestamp;
-        uint32_t methodHandle;
-        MethodResponseImpl::Ptr methodResponse;
-
-        ConsoleEventImpl(ConsoleEvent::EventKind k) :
-            kind(k), object(0), context(0), event(0), timestamp(0), methodHandle(0) {}
-        ~ConsoleEventImpl() {}
-        ConsoleEvent copy();
-    };
-
-    struct BrokerEventImpl {
-        typedef boost::shared_ptr<BrokerEventImpl> Ptr;
-        BrokerEvent::EventKind kind;
-        string name;
-        string exchange;
-        string bindingKey;
-        void* context;
-        QueryResponseImpl::Ptr queryResponse;
-
-        BrokerEventImpl(BrokerEvent::EventKind k) : kind(k) {}
-        ~BrokerEventImpl() {}
-        BrokerEvent copy();
-    };
-
-    struct AgentProxyImpl {
-        typedef boost::shared_ptr<AgentProxyImpl> Ptr;
-        AgentProxy* envelope;
-        ConsoleEngineImpl* console;
-        BrokerProxyImpl* broker;
-        uint32_t agentBank;
-        string label;
-
-        AgentProxyImpl(ConsoleEngineImpl* c, BrokerProxyImpl* b, uint32_t ab, const string& l) :
-            envelope(new AgentProxy(this)), console(c), broker(b), agentBank(ab), label(l) {}
-        ~AgentProxyImpl() {}
-        const string& getLabel() const { return label; }
-    };
-
-    class BrokerProxyImpl {
-    public:
-        typedef boost::shared_ptr<BrokerProxyImpl> Ptr;
-
-        BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console);
-        ~BrokerProxyImpl() {}
-
-        void sessionOpened(SessionHandle& sh);
-        void sessionClosed();
-        void startProtocol();
-
-        void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey);
-        void handleRcvMessage(Message& message);
-        bool getXmtMessage(Message& item) const;
-        void popXmt();
-
-        bool getEvent(BrokerEvent& event) const;
-        void popEvent();
-
-        uint32_t agentCount() const;
-        const AgentProxy* getAgent(uint32_t idx) const;
-        void sendQuery(const Query& query, void* context, const AgentProxy* agent);
-        void sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent);
-
-        void addBinding(const string& exchange, const string& key);
-        void staticRelease() { decOutstanding(); }
-
-    private:
-        friend class StaticContext;
-        friend class QueryContext;
-        mutable Mutex lock;
-        BrokerProxy* envelope;
-        ConsoleEngineImpl* console;
-        string queueName;
-        Uuid brokerId;
-        SequenceManager seqMgr;
-        uint32_t requestsOutstanding;
-        bool topicBound;
-        vector<AgentProxyImpl::Ptr> agentList;
-        deque<MessageImpl::Ptr> xmtQueue;
-        deque<BrokerEventImpl::Ptr> eventQueue;
-
-#       define MA_BUFFER_SIZE 65536
-        char outputBuffer[MA_BUFFER_SIZE];
-
-        BrokerEventImpl::Ptr eventDeclareQueue(const string& queueName);
-        BrokerEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key);
-        BrokerEventImpl::Ptr eventSetupComplete();
-        BrokerEventImpl::Ptr eventStable();
-        BrokerEventImpl::Ptr eventQueryComplete(void* context, QueryResponseImpl::Ptr response);
-
-        void handleBrokerResponse(Buffer& inBuffer, uint32_t seq);
-        void handlePackageIndication(Buffer& inBuffer, uint32_t seq);
-        void handleCommandComplete(Buffer& inBuffer, uint32_t seq);
-        void handleClassIndication(Buffer& inBuffer, uint32_t seq);
-        void handleMethodResponse(Buffer& inBuffer, uint32_t seq);
-        void handleHeartbeatIndication(Buffer& inBuffer, uint32_t seq);
-        void handleEventIndication(Buffer& inBuffer, uint32_t seq);
-        void handleSchemaResponse(Buffer& inBuffer, uint32_t seq);
-        ObjectImpl::Ptr handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat);
-        void incOutstandingLH();
-        void decOutstanding();
-    };
-
-    struct StaticContext : public SequenceContext {
-        StaticContext(BrokerProxyImpl& b) : broker(b) {}
-        ~StaticContext() {}
-        void reserve() {}
-        void release() { broker.staticRelease(); }
-        bool handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer);
-        BrokerProxyImpl& broker;
-    };
-
-    struct QueryContext : public SequenceContext {
-        QueryContext(BrokerProxyImpl& b, void* u) :
-            broker(b), userContext(u), requestsOutstanding(0), queryResponse(new QueryResponseImpl()) {}
-        ~QueryContext() {}
-        void reserve();
-        void release();
-        bool handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer);
-
-        mutable Mutex lock;
-        BrokerProxyImpl& broker;
-        void* userContext;
-        uint32_t requestsOutstanding;
-        QueryResponseImpl::Ptr queryResponse;
-    };
-
-    class ConsoleEngineImpl {
-    public:
-        ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& settings = ConsoleSettings());
-        ~ConsoleEngineImpl();
-
-        bool getEvent(ConsoleEvent& event) const;
-        void popEvent();
-
-        void addConnection(BrokerProxy& broker, void* context);
-        void delConnection(BrokerProxy& broker);
-
-        uint32_t packageCount() const;
-        const string& getPackageName(uint32_t idx) const;
-
-        uint32_t classCount(const char* packageName) const;
-        const SchemaClassKey* getClass(const char* packageName, uint32_t idx) const;
-
-        ClassKind getClassKind(const SchemaClassKey* key) const;
-        const SchemaObjectClass* getObjectClass(const SchemaClassKey* key) const;
-        const SchemaEventClass* getEventClass(const SchemaClassKey* key) const;
-
-        void bindPackage(const char* packageName);
-        void bindClass(const SchemaClassKey* key);
-        void bindClass(const char* packageName, const char* className);
-
-        /*
-        void startSync(const Query& query, void* context, SyncQuery& sync);
-        void touchSync(SyncQuery& sync);
-        void endSync(SyncQuery& sync);
-        */
-
-    private:
-        friend class BrokerProxyImpl;
-        ConsoleEngine* envelope;
-        const ConsoleSettings& settings;
-        mutable Mutex lock;
-        deque<ConsoleEventImpl::Ptr> eventQueue;
-        vector<BrokerProxyImpl*> brokerList;
-        vector<pair<string, string> > bindingList; // exchange/key (empty exchange => QMF_EXCHANGE)
-
-        // Declare a compare class for the class maps that compares the dereferenced
-        // class key pointers.  The default behavior would be to compare the pointer
-        // addresses themselves.
-        struct KeyCompare {
-            bool operator()(const SchemaClassKeyImpl* left, const SchemaClassKeyImpl* right) const {
-                return *left < *right;
-            }
-        };
-
-        typedef map<const SchemaClassKeyImpl*, SchemaObjectClassImpl::Ptr, KeyCompare> ObjectClassList;
-        typedef map<const SchemaClassKeyImpl*, SchemaEventClassImpl::Ptr, KeyCompare> EventClassList;
-        typedef map<string, pair<ObjectClassList, EventClassList> > PackageList;
-
-        PackageList packages;
-
-        void learnPackage(const string& packageName);
-        void learnClass(SchemaObjectClassImpl::Ptr cls);
-        void learnClass(SchemaEventClassImpl::Ptr cls);
-        bool haveClass(const SchemaClassKeyImpl& key) const;
-        SchemaObjectClassImpl::Ptr getSchema(const SchemaClassKeyImpl& key) const;
-    };
-}
-
 namespace {
-    const char* QMF_EXCHANGE     = "qpid.management";
-    const char* DIR_EXCHANGE     = "amq.direct";
-    const char* BROKER_KEY       = "broker";
-    const char* BROKER_PACKAGE   = "org.apache.qpid.broker";
-    const char* AGENT_CLASS      = "agent";
-    const char* BROKER_AGENT_KEY = "agent.1.0";
-}
-
-const Object* QueryResponseImpl::getObject(uint32_t idx) const
-{
-    vector<ObjectImpl::Ptr>::const_iterator iter = results.begin();
-
-    while (idx > 0) {
-        if (iter == results.end())
-            return 0;
-        iter++;
-        idx--;
-    }
-
-    return (*iter)->envelope;
+    const char* QMF_EXCHANGE = "qpid.management";
 }
 
 #define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
@@ -308,465 +55,19 @@
     ConsoleEvent item;
 
     ::memset(&item, 0, sizeof(ConsoleEvent));
-    item.kind           = kind;
-    item.agent          = agent.get() ? agent->envelope : 0;
-    item.classKey       = classKey.get();
-    item.object         = object;
-    item.context        = context;
-    item.event          = event;
-    item.timestamp      = timestamp;
-    item.methodHandle   = methodHandle;
-    item.methodResponse = methodResponse.get() ? methodResponse->envelope : 0;
-
-    STRING_REF(name);
-
-    return item;
-}
-
-BrokerEvent BrokerEventImpl::copy()
-{
-    BrokerEvent item;
-
-    ::memset(&item, 0, sizeof(BrokerEvent));
-    item.kind = kind;
+    item.kind      = kind;
+    item.agent     = agent.get() ? agent->envelope : 0;
+    item.classKey  = classKey.get();
+    item.object    = object;
+    item.context   = context;
+    item.event     = event;
+    item.timestamp = timestamp;
 
     STRING_REF(name);
-    STRING_REF(exchange);
-    STRING_REF(bindingKey);
-    item.context = context;
-    item.queryResponse = queryResponse.get() ? queryResponse->envelope : 0;
 
     return item;
 }
 
-BrokerProxyImpl::BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console) :
-    envelope(e), console(_console.impl)
-{
-    stringstream qn;
-    qpid::TcpAddress addr;
-
-    SystemInfo::getLocalHostname(addr);
-    qn << "qmfc-" << SystemInfo::getProcessName() << "-" << addr << "-" << SystemInfo::getProcessId();
-    queueName = qn.str();
-
-    seqMgr.setUnsolicitedContext(SequenceContext::Ptr(new StaticContext(*this)));
-}
-
-void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/)
-{
-    Mutex::ScopedLock _lock(lock);
-    agentList.clear();
-    eventQueue.clear();
-    xmtQueue.clear();
-    eventQueue.push_back(eventDeclareQueue(queueName));
-    eventQueue.push_back(eventBind(DIR_EXCHANGE, queueName, queueName));
-    eventQueue.push_back(eventSetupComplete());
-
-    // TODO: Store session handle
-}
-
-void BrokerProxyImpl::sessionClosed()
-{
-    Mutex::ScopedLock _lock(lock);
-    agentList.clear();
-    eventQueue.clear();
-    xmtQueue.clear();
-}
-
-void BrokerProxyImpl::startProtocol()
-{
-    Mutex::ScopedLock _lock(lock);
-    char rawbuffer[512];
-    Buffer buffer(rawbuffer, 512);
-
-    agentList.push_back(AgentProxyImpl::Ptr(new AgentProxyImpl(console, this, 0, "Agent embedded in broker")));
-
-    requestsOutstanding = 1;
-    topicBound = false;
-    uint32_t sequence(seqMgr.reserve());
-    Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence);
-    sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
-    QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence);
-}
-
-void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
-{
-    uint32_t length = buf.getPosition();
-    MessageImpl::Ptr message(new MessageImpl);
-
-    buf.reset();
-    buf.getRawData(message->body, length);
-    message->destination   = destination;
-    message->routingKey    = routingKey;
-    message->replyExchange = DIR_EXCHANGE;
-    message->replyKey      = queueName;
-
-    xmtQueue.push_back(message);
-}
-
-void BrokerProxyImpl::handleRcvMessage(Message& message)
-{
-    Buffer inBuffer(message.body, message.length);
-    uint8_t opcode;
-    uint32_t sequence;
-
-    while (Protocol::checkHeader(inBuffer, &opcode, &sequence))
-        seqMgr.dispatch(opcode, sequence, inBuffer);
-}
-
-bool BrokerProxyImpl::getXmtMessage(Message& item) const
-{
-    Mutex::ScopedLock _lock(lock);
-    if (xmtQueue.empty())
-        return false;
-    item =  xmtQueue.front()->copy();
-    return true;
-}
-
-void BrokerProxyImpl::popXmt()
-{
-    Mutex::ScopedLock _lock(lock);
-    if (!xmtQueue.empty())
-        xmtQueue.pop_front();
-}
-
-bool BrokerProxyImpl::getEvent(BrokerEvent& event) const
-{
-    Mutex::ScopedLock _lock(lock);
-    if (eventQueue.empty())
-        return false;
-    event = eventQueue.front()->copy();
-    return true;
-}
-
-void BrokerProxyImpl::popEvent()
-{
-    Mutex::ScopedLock _lock(lock);
-    if (!eventQueue.empty())
-        eventQueue.pop_front();
-}
-
-uint32_t BrokerProxyImpl::agentCount() const
-{
-    Mutex::ScopedLock _lock(lock);
-    return agentList.size();
-}
-
-const AgentProxy* BrokerProxyImpl::getAgent(uint32_t idx) const
-{
-    Mutex::ScopedLock _lock(lock);
-    for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin();
-         iter != agentList.end(); iter++)
-        if (idx-- == 0)
-            return (*iter)->envelope;
-    return 0;
-}
-
-void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentProxy* agent)
-{
-    SequenceContext::Ptr queryContext(new QueryContext(*this, context));
-    Mutex::ScopedLock _lock(lock);
-    if (agent != 0) {
-        sendGetRequestLH(queryContext, query, agent->impl);
-    } else {
-        // TODO (optimization) only send queries to agents that have the requested class+package
-        for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin();
-             iter != agentList.end(); iter++) {
-            sendGetRequestLH(queryContext, query, (*iter).get());
-        }
-    }
-}
-
-void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent)
-{
-    stringstream key;
-    Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
-    uint32_t sequence(seqMgr.reserve(queryContext));
-
-    Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
-    query.impl->encode(outBuffer);
-    key << "agent.1." << agent->agentBank;
-    sendBufferLH(outBuffer, QMF_EXCHANGE, key.str());
-    QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << key.str());
-}
-
-void BrokerProxyImpl::addBinding(const string& exchange, const string& key)
-{
-    eventQueue.push_back(eventBind(exchange, queueName, key));
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventDeclareQueue(const string& queueName)
-{
-    BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::DECLARE_QUEUE));
-    event->name = queueName;
-    return event;
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventBind(const string& exchange, const string& queue, const string& key)
-{
-    BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::BIND));
-    event->name       = queue;
-    event->exchange   = exchange;
-    event->bindingKey = key;
-
-    return event;
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventSetupComplete()
-{
-    BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::SETUP_COMPLETE));
-    return event;
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventStable()
-{
-    BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::STABLE));
-    return event;
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventQueryComplete(void* context, QueryResponseImpl::Ptr response)
-{
-    BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::QUERY_COMPLETE));
-    event->context = context;
-    event->queryResponse = response;
-    return event;
-}
-
-void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq)
-{
-    brokerId.decode(inBuffer);
-    QPID_LOG(trace, "RCVD BrokerResponse seq=" << seq << " brokerId=" << brokerId);
-    Mutex::ScopedLock _lock(lock);
-    Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
-    uint32_t sequence(seqMgr.reserve());
-    incOutstandingLH();
-    Protocol::encodeHeader(outBuffer, Protocol::OP_PACKAGE_REQUEST, sequence);
-    sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
-    QPID_LOG(trace, "SENT PackageRequest seq=" << sequence);
-}
-
-void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq)
-{
-    string package;
-
-    inBuffer.getShortString(package);
-    QPID_LOG(trace, "RCVD PackageIndication seq=" << seq << " package=" << package);
-    console->learnPackage(package);
-
-    Mutex::ScopedLock _lock(lock);
-    Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
-    uint32_t sequence(seqMgr.reserve());
-    incOutstandingLH();
-    Protocol::encodeHeader(outBuffer, Protocol::OP_CLASS_QUERY, sequence);
-    outBuffer.putShortString(package);
-    sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
-    QPID_LOG(trace, "SENT ClassQuery seq=" << sequence << " package=" << package);
-}
-
-void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq)
-{
-    string text;
-    uint32_t code = inBuffer.getLong();
-    inBuffer.getShortString(text);
-    QPID_LOG(trace, "RCVD CommandComplete seq=" << seq << " code=" << code << " text=" << text);
-}
-
-void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq)
-{
-    uint8_t kind = inBuffer.getOctet();
-    SchemaClassKeyImpl classKey(inBuffer);
-
-    QPID_LOG(trace, "RCVD ClassIndication seq=" << seq << " kind=" << (int) kind << " key=" << classKey.str());
-
-    if (!console->haveClass(classKey)) {
-        Mutex::ScopedLock _lock(lock);
-        incOutstandingLH();
-        Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
-        uint32_t sequence(seqMgr.reserve());
-        Protocol::encodeHeader(outBuffer, Protocol::OP_SCHEMA_REQUEST, sequence);
-        classKey.encode(outBuffer);
-        sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
-        QPID_LOG(trace, "SENT SchemaRequest seq=" << sequence <<" key=" << classKey.str());
-    }
-}
-
-void BrokerProxyImpl::handleMethodResponse(Buffer& /*inBuffer*/, uint32_t /*seq*/)
-{
-    // TODO
-}
-
-void BrokerProxyImpl::handleHeartbeatIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
-{
-    // TODO
-}
-
-void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
-{
-    // TODO
-}
-
-void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq)
-{
-    SchemaObjectClassImpl::Ptr oClassPtr;
-    SchemaEventClassImpl::Ptr eClassPtr;
-    uint8_t kind = inBuffer.getOctet();
-    const SchemaClassKeyImpl* key;
-    if (kind == CLASS_OBJECT) {
-        oClassPtr.reset(new SchemaObjectClassImpl(inBuffer));
-        console->learnClass(oClassPtr);
-        key = oClassPtr->getClassKey()->impl;
-        QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->str());
-
-        //
-        // If we have just learned about the org.apache.qpid.broker:agent class, send a get
-        // request for the current list of agents so we can have it on-hand before we declare
-        // this session "stable".
-        //
-        if (key->getClassName() == AGENT_CLASS && key->getPackageName() == BROKER_PACKAGE) {
-            Mutex::ScopedLock _lock(lock);
-            incOutstandingLH();
-            Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
-            uint32_t sequence(seqMgr.reserve());
-            Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
-            FieldTable ft;
-            ft.setString("_class", AGENT_CLASS);
-            ft.setString("_package", BROKER_PACKAGE);
-            ft.encode(outBuffer);
-            sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_AGENT_KEY);
-            QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << BROKER_AGENT_KEY);
-        }
-    } else if (kind == CLASS_EVENT) {
-        eClassPtr.reset(new SchemaEventClassImpl(inBuffer));
-        console->learnClass(eClassPtr);
-        key = eClassPtr->getClassKey()->impl;
-        QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=event key=" << key->str());
-    }
-    else {
-        QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind);
-    }
-}
-
-ObjectImpl::Ptr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat)
-{
-    SchemaClassKeyImpl classKey(inBuffer);
-    QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey.str());
-
-    SchemaObjectClassImpl::Ptr schema = console->getSchema(classKey);
-    if (schema.get() == 0) {
-        QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey.str());
-        return ObjectImpl::Ptr();
-    }
-
-    return ObjectImpl::Ptr(new ObjectImpl(schema->envelope, inBuffer, prop, stat, true));
-}
-
-void BrokerProxyImpl::incOutstandingLH()
-{
-    requestsOutstanding++;
-}
-
-void BrokerProxyImpl::decOutstanding()
-{
-    Mutex::ScopedLock _lock(lock);
-    requestsOutstanding--;
-    if (requestsOutstanding == 0 && !topicBound) {
-        topicBound = true;
-        for (vector<pair<string, string> >::const_iterator iter = console->bindingList.begin();
-             iter != console->bindingList.end(); iter++) {
-            string exchange(iter->first.empty() ? QMF_EXCHANGE : iter->first);
-            string key(iter->second);
-            eventQueue.push_back(eventBind(exchange, queueName, key));
-        }
-        eventQueue.push_back(eventStable());
-    }
-}
-
-MethodResponseImpl::MethodResponseImpl(Buffer& buf) : envelope(new MethodResponse(this))
-{
-    string text;
-
-    status = buf.getLong();
-    buf.getMediumString(text);
-    exception.reset(new Value(TYPE_LSTR));
-    exception->setString(text.c_str());
-
-    // TODO: Parse schema-specific output arguments.
-    arguments.reset(new Value(TYPE_MAP));
-}
-
-bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
-{
-    bool completeContext = false;
-    if      (opcode == Protocol::OP_BROKER_RESPONSE) {
-        broker.handleBrokerResponse(buffer, sequence);
-        completeContext = true;
-    }
-    else if (opcode == Protocol::OP_COMMAND_COMPLETE) {
-        broker.handleCommandComplete(buffer, sequence);
-        completeContext = true;
-    }
-    else if (opcode == Protocol::OP_SCHEMA_RESPONSE) {
-        broker.handleSchemaResponse(buffer, sequence);
-        completeContext = true;
-    }
-    else if (opcode == Protocol::OP_PACKAGE_INDICATION)
-        broker.handlePackageIndication(buffer, sequence);
-    else if (opcode == Protocol::OP_CLASS_INDICATION)
-        broker.handleClassIndication(buffer, sequence);
-    else if (opcode == Protocol::OP_HEARTBEAT_INDICATION)
-        broker.handleHeartbeatIndication(buffer, sequence);
-    else if (opcode == Protocol::OP_EVENT_INDICATION)
-        broker.handleEventIndication(buffer, sequence);
-    else if (opcode == Protocol::OP_PROPERTY_INDICATION)
-        broker.handleObjectIndication(buffer, sequence, true,  false);
-    else if (opcode == Protocol::OP_STATISTIC_INDICATION)
-        broker.handleObjectIndication(buffer, sequence, false, true);
-    else if (opcode == Protocol::OP_OBJECT_INDICATION)
-        broker.handleObjectIndication(buffer, sequence, true,  true);
-    else {
-        QPID_LOG(trace, "StaticContext::handleMessage invalid opcode: " << opcode);
-        completeContext = true;
-    }
-
-    return completeContext;
-}
-
-void QueryContext::reserve()
-{
-    Mutex::ScopedLock _lock(lock);
-    requestsOutstanding++;
-}
-
-void QueryContext::release()
-{
-    Mutex::ScopedLock _lock(lock);
-    if (--requestsOutstanding == 0) {
-        broker.eventQueue.push_back(broker.eventQueryComplete(userContext, queryResponse));
-    }
-}
-
-bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
-{
-    bool completeContext = false;
-    ObjectImpl::Ptr object;
-
-    if      (opcode == Protocol::OP_COMMAND_COMPLETE) {
-        broker.handleCommandComplete(buffer, sequence);
-        completeContext = true;
-    }
-    else if (opcode == Protocol::OP_OBJECT_INDICATION) {
-        object = broker.handleObjectIndication(buffer, sequence, true,  true);
-        if (object.get() != 0)
-            queryResponse->results.push_back(object);
-    }
-    else {
-        QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode);
-        completeContext = true;
-    }
-
-    return completeContext;
-}
-
 ConsoleEngineImpl::ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& s) :
     envelope(e), settings(s)
 {
@@ -1037,37 +338,6 @@
 // Wrappers
 //==================================================================
 
-AgentProxy::AgentProxy(AgentProxyImpl* i) : impl(i) {}
-AgentProxy::~AgentProxy() { delete impl; }
-const char* AgentProxy::getLabel() const { return impl->getLabel().c_str(); }
-
-BrokerProxy::BrokerProxy(ConsoleEngine& console) : impl(new BrokerProxyImpl(this, console)) {}
-BrokerProxy::~BrokerProxy() { delete impl; }
-void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); }
-void BrokerProxy::sessionClosed() { impl->sessionClosed(); }
-void BrokerProxy::startProtocol() { impl->startProtocol(); }
-void BrokerProxy::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); }
-bool BrokerProxy::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); }
-void BrokerProxy::popXmt() { impl->popXmt(); }
-bool BrokerProxy::getEvent(BrokerEvent& event) const { return impl->getEvent(event); }
-void BrokerProxy::popEvent() { impl->popEvent(); }
-uint32_t BrokerProxy::agentCount() const { return impl->agentCount(); }
-const AgentProxy* BrokerProxy::getAgent(uint32_t idx) const { return impl->getAgent(idx); }
-void BrokerProxy::sendQuery(const Query& query, void* context, const AgentProxy* agent) { impl->sendQuery(query, context, agent); }
-
-MethodResponse::MethodResponse(MethodResponseImpl* i) : impl(i) {}
-MethodResponse::~MethodResponse() {}
-uint32_t MethodResponse::getStatus() const { return impl->getStatus(); }
-const Value* MethodResponse::getException() const { return impl->getException(); }
-const Value* MethodResponse::getArgs() const { return impl->getArgs(); }
-
-QueryResponse::QueryResponse(QueryResponseImpl* i) : impl(i) {}
-QueryResponse::~QueryResponse() {}
-uint32_t QueryResponse::getStatus() const { return impl->getStatus(); }
-const Value* QueryResponse::getException() const { return impl->getException(); }
-uint32_t QueryResponse::getObjectCount() const { return impl->getObjectCount(); }
-const Object* QueryResponse::getObject(uint32_t idx) const { return impl->getObject(idx); }
-
 ConsoleEngine::ConsoleEngine(const ConsoleSettings& settings) : impl(new ConsoleEngineImpl(this, settings)) {}
 ConsoleEngine::~ConsoleEngine() { delete impl; }
 bool ConsoleEngine::getEvent(ConsoleEvent& event) const { return impl->getEvent(event); }

Added: qpid/trunk/qpid/cpp/src/qmf/ConsoleEngineImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/ConsoleEngineImpl.h?rev=816345&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/ConsoleEngineImpl.h (added)
+++ qpid/trunk/qpid/cpp/src/qmf/ConsoleEngineImpl.h Thu Sep 17 19:27:52 2009
@@ -0,0 +1,133 @@
+#ifndef _QmfConsoleEngineImpl_
+#define _QmfConsoleEngineImpl_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/ConsoleEngine.h"
+#include "qmf/MessageImpl.h"
+#include "qmf/SchemaImpl.h"
+#include "qmf/Typecode.h"
+#include "qmf/ObjectImpl.h"
+#include "qmf/ObjectIdImpl.h"
+#include "qmf/QueryImpl.h"
+#include "qmf/ValueImpl.h"
+#include "qmf/Protocol.h"
+#include "qmf/SequenceManager.h"
+#include "qmf/BrokerProxyImpl.h"
+#include <qpid/framing/Buffer.h>
+#include <qpid/framing/Uuid.h>
+#include <qpid/framing/FieldTable.h>
+#include <qpid/framing/FieldValue.h>
+#include <qpid/sys/Mutex.h>
+#include <qpid/sys/Time.h>
+#include <qpid/sys/SystemInfo.h>
+#include <string.h>
+#include <string>
+#include <deque>
+#include <map>
+#include <vector>
+#include <iostream>
+#include <fstream>
+#include <boost/shared_ptr.hpp>
+
+namespace qmf {
+
+    struct ConsoleEventImpl {
+        typedef boost::shared_ptr<ConsoleEventImpl> Ptr;
+        ConsoleEvent::EventKind kind;
+        boost::shared_ptr<AgentProxyImpl> agent;
+        std::string name;
+        boost::shared_ptr<SchemaClassKey> classKey;
+        Object* object;
+        void* context;
+        Event* event;
+        uint64_t timestamp;
+
+        ConsoleEventImpl(ConsoleEvent::EventKind k) :
+            kind(k), object(0), context(0), event(0), timestamp(0) {}
+        ~ConsoleEventImpl() {}
+        ConsoleEvent copy();
+    };
+
+    class ConsoleEngineImpl {
+    public:
+        ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& settings = ConsoleSettings());
+        ~ConsoleEngineImpl();
+
+        bool getEvent(ConsoleEvent& event) const;
+        void popEvent();
+
+        void addConnection(BrokerProxy& broker, void* context);
+        void delConnection(BrokerProxy& broker);
+
+        uint32_t packageCount() const;
+        const std::string& getPackageName(uint32_t idx) const;
+
+        uint32_t classCount(const char* packageName) const;
+        const SchemaClassKey* getClass(const char* packageName, uint32_t idx) const;
+
+        ClassKind getClassKind(const SchemaClassKey* key) const;
+        const SchemaObjectClass* getObjectClass(const SchemaClassKey* key) const;
+        const SchemaEventClass* getEventClass(const SchemaClassKey* key) const;
+
+        void bindPackage(const char* packageName);
+        void bindClass(const SchemaClassKey* key);
+        void bindClass(const char* packageName, const char* className);
+
+        /*
+        void startSync(const Query& query, void* context, SyncQuery& sync);
+        void touchSync(SyncQuery& sync);
+        void endSync(SyncQuery& sync);
+        */
+
+    private:
+        friend class BrokerProxyImpl;
+        ConsoleEngine* envelope;
+        const ConsoleSettings& settings;
+        mutable qpid::sys::Mutex lock;
+        std::deque<ConsoleEventImpl::Ptr> eventQueue;
+        std::vector<BrokerProxyImpl*> brokerList;
+        std::vector<std::pair<std::string, std::string> > bindingList; // exchange/key (empty exchange => QMF_EXCHANGE)
+
+        // Declare a compare class for the class maps that compares the dereferenced
+        // class key pointers.  The default behavior would be to compare the pointer
+        // addresses themselves.
+        struct KeyCompare {
+            bool operator()(const SchemaClassKeyImpl* left, const SchemaClassKeyImpl* right) const {
+                return *left < *right;
+            }
+        };
+
+        typedef std::map<const SchemaClassKeyImpl*, SchemaObjectClassImpl::Ptr, KeyCompare> ObjectClassList;
+        typedef std::map<const SchemaClassKeyImpl*, SchemaEventClassImpl::Ptr, KeyCompare> EventClassList;
+        typedef std::map<std::string, std::pair<ObjectClassList, EventClassList> > PackageList;
+
+        PackageList packages;
+
+        void learnPackage(const std::string& packageName);
+        void learnClass(SchemaObjectClassImpl::Ptr cls);
+        void learnClass(SchemaEventClassImpl::Ptr cls);
+        bool haveClass(const SchemaClassKeyImpl& key) const;
+        SchemaObjectClassImpl::Ptr getSchema(const SchemaClassKeyImpl& key) const;
+    };
+}
+
+#endif
+

Modified: qpid/trunk/qpid/cpp/src/qmf/Object.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/Object.h?rev=816345&r1=816344&r2=816345&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/Object.h (original)
+++ qpid/trunk/qpid/cpp/src/qmf/Object.h Thu Sep 17 19:27:52 2009
@@ -39,6 +39,7 @@
         void setObjectId(ObjectId* oid);
         const SchemaObjectClass* getClass() const;
         Value* getValue(char* key) const;
+        void invokeMethod(const char* methodName, const Value* inArgs, void* context) const;
 
         ObjectImpl* impl;
     };

Modified: qpid/trunk/qpid/cpp/src/qmf/ObjectImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/ObjectImpl.cpp?rev=816345&r1=816344&r2=816345&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/ObjectImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/ObjectImpl.cpp Thu Sep 17 19:27:52 2009
@@ -19,6 +19,7 @@
 
 #include "qmf/ObjectImpl.h"
 #include "qmf/ValueImpl.h"
+#include "qmf/BrokerProxyImpl.h"
 #include <qpid/sys/Time.h>
 
 using namespace std;
@@ -27,7 +28,7 @@
 using qpid::framing::Buffer;
 
 ObjectImpl::ObjectImpl(Object* e, const SchemaObjectClass* type) :
-    envelope(e), objectClass(type), createTime(uint64_t(Duration(now()))),
+    envelope(e), objectClass(type), broker(0), createTime(uint64_t(Duration(now()))),
     destroyTime(0), lastUpdatedTime(createTime)
 {
     int propCount = objectClass->getPropertyCount();
@@ -45,8 +46,8 @@
     }
 }
 
-ObjectImpl::ObjectImpl(const SchemaObjectClass* type, Buffer& buffer, bool prop, bool stat, bool managed) :
-    envelope(new Object(this)), objectClass(type), createTime(0), destroyTime(0), lastUpdatedTime(0)
+ObjectImpl::ObjectImpl(const SchemaObjectClass* type, BrokerProxyImpl* b, Buffer& buffer, bool prop, bool stat, bool managed) :
+    envelope(new Object(this)), objectClass(type), broker(b), createTime(0), destroyTime(0), lastUpdatedTime(0)
 {
     int idx;
 
@@ -107,6 +108,12 @@
     return 0;
 }
 
+void ObjectImpl::invokeMethod(const string& methodName, const Value* inArgs, void* context) const
+{
+    if (broker != 0 && objectId.get() != 0)
+        broker->sendMethodRequest(objectId.get(), objectClass, methodName, inArgs, context);
+}
+
 void ObjectImpl::parsePresenceMasks(Buffer& buffer, set<string>& excludeList)
 {
     int propCount = objectClass->getPropertyCount();
@@ -205,4 +212,5 @@
 void Object::setObjectId(ObjectId* oid) { impl->setObjectId(oid); }
 const SchemaObjectClass* Object::getClass() const { return impl->getClass(); }
 Value* Object::getValue(char* key) const { return impl->getValue(key); }
+void Object::invokeMethod(const char* m, const Value* a, void* c) const { impl->invokeMethod(m, a, c); }
 

Modified: qpid/trunk/qpid/cpp/src/qmf/ObjectImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/ObjectImpl.h?rev=816345&r1=816344&r2=816345&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/ObjectImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qmf/ObjectImpl.h Thu Sep 17 19:27:52 2009
@@ -31,11 +31,14 @@
 
 namespace qmf {
 
+    class BrokerProxyImpl;
+
     struct ObjectImpl {
         typedef boost::shared_ptr<ObjectImpl> Ptr;
         typedef boost::shared_ptr<Value> ValuePtr;
         Object* envelope;
         const SchemaObjectClass* objectClass;
+        BrokerProxyImpl* broker;
         boost::shared_ptr<ObjectIdImpl> objectId;
         uint64_t createTime;
         uint64_t destroyTime;
@@ -44,7 +47,8 @@
         mutable std::map<std::string, ValuePtr> statistics;
 
         ObjectImpl(Object* e, const SchemaObjectClass* type);
-        ObjectImpl(const SchemaObjectClass* type, qpid::framing::Buffer& buffer, bool prop, bool stat, bool managed);
+        ObjectImpl(const SchemaObjectClass* type, BrokerProxyImpl* b, qpid::framing::Buffer& buffer,
+                   bool prop, bool stat, bool managed);
         ~ObjectImpl();
 
         void destroy();
@@ -52,6 +56,7 @@
         void setObjectId(ObjectId* oid) { objectId.reset(oid->impl); }
         const SchemaObjectClass* getClass() const { return objectClass; }
         Value* getValue(const std::string& key) const;
+        void invokeMethod(const std::string& methodName, const Value* inArgs, void* context) const;
 
         void parsePresenceMasks(qpid::framing::Buffer& buffer, std::set<std::string>& excludeList);
         void encodeSchemaKey(qpid::framing::Buffer& buffer) const;

Modified: qpid/trunk/qpid/cpp/src/qmf/Value.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/Value.h?rev=816345&r1=816344&r2=816345&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/Value.h (original)
+++ qpid/trunk/qpid/cpp/src/qmf/Value.h Thu Sep 17 19:27:52 2009
@@ -31,6 +31,7 @@
     class Value {
     public:
         //        Value();
+        Value(const Value& from);
         Value(Typecode t, Typecode arrayType = TYPE_UINT8);
         Value(ValueImpl* impl);
         ~Value();



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org