You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/02/27 00:11:20 UTC

svn commit: r916854 - in /qpid/branches/qmf-devel0.7/qpid/cpp: ./ include/qmf/ include/qmf/engine/ include/qpid/messaging/ src/ src/qmf/ src/qmf/engine/ src/qpid/messaging/ src/tests/

Author: tross
Date: Fri Feb 26 23:11:19 2010
New Revision: 916854

URL: http://svn.apache.org/viewvc?rev=916854&view=rev
Log:
Checkpointing Agent engine code.

Added:
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Notifiable.h
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Agent.cpp
Modified:
    qpid/branches/qmf-devel0.7/qpid/cpp/Makefile.am
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Agent.h
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Agent.h
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Console.h
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Query.h
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qpid/messaging/ListContent.h
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qpid/messaging/MapContent.h
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf.mk
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ConsoleImpl.h
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qpid/messaging/ListContent.cpp
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qpid/messaging/MapContent.cpp
    qpid/branches/qmf-devel0.7/qpid/cpp/src/tests/MessagingSessionTests.cpp

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/Makefile.am?rev=916854&r1=916853&r2=916854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/Makefile.am (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/Makefile.am Fri Feb 26 23:11:19 2010
@@ -27,7 +27,7 @@
   xml/cluster.xml INSTALL-WINDOWS CMakeLists.txt BuildInstallSettings.cmake \
   packaging/NSIS
 
-SUBDIRS = managementgen etc src docs/api docs/man examples bindings/qmf
+SUBDIRS = managementgen etc src docs/api docs/man examples
 
 # Update libtool, if needed.
 libtool: $(LIBTOOL_DEPS)

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Agent.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Agent.h?rev=916854&r1=916853&r2=916854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Agent.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Agent.h Fri Feb 26 23:11:19 2010
@@ -21,25 +21,29 @@
  */
 
 #include "qmf/QmfImportExport.h"
+#include "qmf/Notifiable.h"
+#include "qpid/sys/Time.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Variant.h"
+#include <string>
 
 namespace qmf {
 
     class AgentImpl;
-    class Connection;
     class ObjectId;
     class AgentObject;
-    class Value;
     class Event;
-    class SchemaObjectClass;
+    class SchemaClass;
+    class Query;
 
     /**
-     * AgentListener is used by agents that select the internalStore=false option (see Agent
+     * AgentHandler is used by agents that select the internalStore=false option (see Agent
      * constructor) or by agents that wish to provide access control for queries and methods.
      *
      * \ingroup qmfapi
      */
-    class AgentListener {
-        QMF_EXTERN virtual ~AgentListener();
+    class AgentHandler {
+        QMF_EXTERN virtual ~AgentHandler();
 
         /**
          * allowQuery is called before a query operation is executed.  If true is returned
@@ -49,7 +53,7 @@
          * @param q The query being requested.
          * @param userId The authenticated identity of the user requesting the query.
          */
-        virtual bool allowQuery(const Query& q, const char* userId);
+        virtual bool allowQuery(const Query& q, const std::string& userId);
 
         /**
          * allowMethod is called before a method call is executed.  If true is returned
@@ -62,8 +66,11 @@
          * @param cls The Schema describing the object being called.
          * @param userId The authenticated identity of the requesting user.
          */
-        virtual bool allowMethod(const char* name, const Value& args, const ObjectId& oid,
-                                 const SchemaObjectClass& cls, const char* userId);
+        virtual bool allowMethod(const std::string& name,
+                                 const qpid::messaging::Variant::Map& args,
+                                 const ObjectId& oid,
+                                 const SchemaClass& cls,
+                                 const std::string& userId);
 
         /**
          * query is called when the agent receives a query request.  The handler must invoke
@@ -78,11 +85,11 @@
          * @param q The query requested by the console.
          * @param userId the authenticated identity of the user requesting the query.
          */
-        virtual void query(uint32_t context, const Query& q, const char* userId);
+        virtual void query(uint32_t context, const Query& q, const std::string& userId);
 
         /**
          * syncStart is called when a console requests a standing query.  This function must
-         * behave exactly like AgentListener::query (i.e. send zero or more responses followed
+         * behave exactly like AgentHandler::query (i.e. send zero or more responses followed
          * by a queryComplete) except it then remembers the context and the query and makes
          * subsequent queryResponse calls whenever appropriate according the the query.
          *
@@ -96,7 +103,7 @@
          * @param q The query requested by the console.
          * @param userId the authenticated identity of the user requesting the query.
          */
-        virtual void syncStart(uint32_t context, const Query& q, const char* userId);
+        virtual void syncStart(uint32_t context, const Query& q, const std::string& userId);
 
         /**
          * syncTouch is called when the console that requested a standing query refreshes its
@@ -109,7 +116,7 @@
          * @param context The context supplied in a previous call to syncStart.
          * @param userId The authenticated identity of the requesting user.
          */
-        virtual void syncTouch(uint32_t context, const char* userId);
+        virtual void syncTouch(uint32_t context, const std::string& userId);
 
         /**
          * syncStop is called when the console that requested a standing query no longer wishes to
@@ -121,7 +128,7 @@
          * @param context The context supplied in a previous call to syncStart.
          * @param userId The authenticated identity of the requesting user.
          */
-        virtual void syncStop(uint32_t context, const char* userId);
+        virtual void syncStop(uint32_t context, const std::string& userId);
 
         /**
          * methodCall is called when a console invokes a method on a QMF object.  The application
@@ -138,8 +145,8 @@
          * @param cls The Schema describing the object being called.
          * @param userId The authenticated identity of the requesting user.
          */
-        virtual void methodCall(uint32_t context, const char* name, Value& args,
-                                const ObjectId& oid, const SchemaObjectClass& cls, const char* userId);
+        virtual void methodCall(uint32_t context, const std::string& name, qpid::messaging::Variant::Map& args,
+                                const ObjectId& oid, const SchemaClass& cls, const std::string& userId);
     };
 
     /**
@@ -153,7 +160,16 @@
         /**
          * Create an instance of the Agent class.
          *
-         * @param label An optional string label that can be used to identify the agent.
+         * @param vendor A string identifying the vendor of the agent application.
+         *               This should follow the reverse-domain-name form (i.e. org.apache).
+         *
+         * @param product A string identifying the product provided by the vendor.
+         *
+         * @param instance A string that uniquely identifies this instance of the agent.
+         *                 If zero, the agent will generate a guid for the instance string.
+         *
+         * @param domain A string that defines the QMF domain that this agent should join.
+         *               If zero, the agent will join the default QMF domain.
          *
          * @param internalStore If true, objects shall be tracked internally by the agent.
          *                      If false, the user of the agent must track the objects.
@@ -162,13 +178,19 @@
          *        individual operations.  If the user is tracking the objects, the user code
          *        must implement queries and syncs (standing queries).
          *
-         * @param listener A pointer to a class that implements the AgentListener interface.
+         * @param handler A pointer to a class that implements the AgentHandler interface.
          *        This must be supplied if any of the following conditions are true:
          *          - The agent model contains methods
          *          - The user wishes to individually authorize query and sync operations.
          *          - internalStore = false
-         */
-        QMF_EXTERN Agent(char* label="qmfa", bool internalStore=true, AgentListener* listener=0);
+         *
+         * @param notifiable A pointer to a class that implements the Notifiable interface.
+         *        This argument is optional (may be supplied as 0).  If it is not supplied,
+         *        notification callbacks will not be invoked.
+         */
+        QMF_EXTERN Agent(const std::string& vendor, const std::string& product, const std::string& instance="",
+                         const std::string& domain="", bool internalStore=true,
+                         AgentHandler* handler=0, Notifiable* notifiable=0);
 
         /**
          * Destroy an instance of the Agent class.
@@ -176,20 +198,30 @@
         QMF_EXTERN ~Agent();
 
         /**
+         * Set an attribute for the agent.  Attributes are visible to consoles and can be used to find
+         * agents.
+         *
+         * @param name Name of the attribute to be set (or overwritten)
+         *
+         * @param value Value (of any variant type) of the attribute
+         */
+        QMF_EXTERN void setAttribute(const std::string& name, const qpid::messaging::Variant& value);
+
+        /**
          * Set the persistent store file.  This file, if specified, is used to store state information
          * about the Agent.  For example, if object-ids must be persistent across restarts of the Agent
          * program, this file path must be supplied.
          *
          * @param path Full path to a file that is both writable and readable by the Agent program.
          */
-        QMF_EXTERN void setStoreDir(const char* path);
+        QMF_EXTERN void setStoreDir(const std::string& path);
 
         /**
          * Provide a connection (to a Qpid broker) over which the agent can communicate.
          *
          * @param conn Pointer to a Connection object.
          */
-        QMF_EXTERN void setConnection(Connection* conn);
+        QMF_EXTERN void setConnection(qpid::messaging::Connection& conn);
 
         /**
          * Register a class schema (object or event) with the agent.  The agent must have a registered
@@ -198,30 +230,29 @@
          *
          * @param cls Pointer to the schema structure describing the class.
          */
-        QMF_EXTERN void registerClass(SchemaObjectClass* cls);
-        QMF_EXTERN void registerClass(SchemaEventClass* cls);
+        QMF_EXTERN void registerClass(SchemaClass* cls);
 
         /**
-         * Add an object to the agent (for internal storage mode only).
-         *
-         * @param obj Reference to the object to be managed by the agent.
+         * Invoke the handler (if supplied in the constructor) with events stored in the Agent's work
+         * queue.  This function call is a way of supplying the Agent with a thread on which to run the
+         * application's handler (the Agent will never invoke the handler on one of its internal threads).
          *
-         * @param persistent Iff true, the object ID assigned to the object shall indicate persistence
-         *                   (i.e. the object ID shall be the same across restarts of the agent program).
+         * @param limit The maximum number of handler callbacks to invoke during this call.  Zero means
+         *              there will be no limit on the number of invocations.
          *
-         * @param oid 64-bit value for the oid (if zero, the agent will assign the value).
+         * @param timeout The time this call will block if there are no handler events to process.
          *
-         * @param oidLo 32-bit value for the lower 32-bits of the oid.
-         *
-         * @param oidHi 32-bit value for the upper 32-bits of the oid.
+         * @return The number of handler events processed.  If the timeout expired, the return value will
+         *         be zero.
          */
-        QMF_EXTERN const ObjectId* addObject(AgentObject& obj, bool persistent=false, uint64_t oid=0);
-        QMF_EXTERN const ObjectId* addObject(AgentObject& obj, bool persistent, uint32_t oidLo, uint32_t oidHi);
+        QMF_EXTERN uint32_t invokeHandler(uint32_t limit=0, qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
 
         /**
-         * Allocate an object ID for an object (for external storage mode only).
+         * Add an object to the agent (for internal storage mode only).
+         *
+         * @param obj Reference to the object to be managed by the agent.
          *
-         * @param persistent Iff true, the object ID allocated shall indicate persistence
+         * @param persistent Iff true, the object ID assigned to the object shall indicate persistence
          *                   (i.e. the object ID shall be the same across restarts of the agent program).
          *
          * @param oid 64-bit value for the oid (if zero, the agent will assign the value).
@@ -230,8 +261,8 @@
          *
          * @param oidHi 32-bit value for the upper 32-bits of the oid.
          */
-        QMF_EXTERN const ObjectId* allocObjectId(bool persistent=false, uint64_t oid=0);
-        QMF_EXTERN const ObjectId* allocObjectId(bool persistent, uint32_t oidLo, uint32_t oidHi);
+        QMF_EXTERN const ObjectId* addObject(AgentObject& obj, bool persistent=false, uint64_t oid=0);
+        QMF_EXTERN const ObjectId* addObject(AgentObject& obj, bool persistent, uint32_t oidLo, uint32_t oidHi);
 
         /**
          * Raise a QMF event.
@@ -243,42 +274,44 @@
         /**
          * Provide a response to a query (for external storage mode only).
          *
-         * @param context The context value supplied in the query (via the AgentListener interface).
+         * @param context The context value supplied in the query (via the AgentHandler interface).
          *
          * @param object A reference to the agent that matched the query criteria.
-         *
-         * @param prop If true, transmit the property attributes of this object.
-         *
-         * @param stat If true, transmit the statistic attributes of this object.
          */
-        QMF_EXTERN void queryResponse(uint32_t context, AgentObject& object, bool prop = true, bool stat = true);
+        QMF_EXTERN void queryResponse(uint32_t context, AgentObject& object);
 
         /**
          * Indicate that a query (or the initial dump of a sync) is complete (for external storage mode only).
          *
-         * @param context The context value supplied in the query/sync (via the AgentListener interface).
+         * @param context The context value supplied in the query/sync (via the AgentHandler interface).
          */
         QMF_EXTERN void queryComplete(uint32_t context);
 
         /**
          * Provide the response to a method call.
          *
-         * @param context The context value supplied in the method request (via the AgentListener interface).
+         * @param context The context value supplied in the method request (via the AgentHandler interface).
          *
          * @param args The argument list from the method call.  Must include the output arguments (may include
          *             the input arguments).
          *
-         * @param status Numerical return status: zero indicates no error, non-zero indicates error.
-         *
-         * @param exception Pointer to an exception value.  If status is non-zero, the exception value is
+         * @param exception Pointer to an exception value.  If status is non-null, the exception value is
          *                  sent to the caller.  It is optional (i.e. leave the pointer as 0), or may be
          *                  set to any legal value.  A string may be supplied, but an unmanaged object of
          *                  any schema may also be passed.
          */
-        QMF_EXTERN void methodResponse(uint32_t context, const Value& args, uint32_t status=0,
-                                       const Value* exception=0);
+        QMF_EXTERN void methodResponse(uint32_t context,
+                                       const qpid::messaging::Variant::Map& args,
+                                       const qpid::messaging::Variant& exception=qpid::messaging::Variant());
 
     private:
+        /**
+         * Private copy constructor and assignment operator ensure that objects of this class cannot
+         * be copied.
+         */
+        Agent(const Agent&);
+        const Agent& operator=(const Agent&);
+
         AgentImpl* impl;
     };
 

Added: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Notifiable.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Notifiable.h?rev=916854&view=auto
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Notifiable.h (added)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Notifiable.h Fri Feb 26 23:11:19 2010
@@ -0,0 +1,48 @@
+#ifndef _QmfNotifiable_
+#define _QmfNotifiable_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/QmfImportExport.h"
+
+namespace qmf {
+
+    /**
+     * The Notifiable class is an interface that may be used in the application.  It provides
+     * a single callback (notify) that is invoked when the agent or console has events to be
+     * handled by the application.
+     *
+     * This interface should only be used in an application that has more event drivers than
+     * just a single QMF interface.  For example, an application that already uses select or
+     * poll to control execution can use the notify callback to awaken the select/poll call.
+     *
+     * No QMF operations should be performed from the notify callback.  It should only be used
+     * to awaken an application thread that will then perform QMF operations.
+     *
+     * \ingroup qmfapi
+     */
+    class Notifiable {
+    public:
+        QMF_EXTERN virtual ~Notifiable();
+        virtual void notify() = 0;
+    };
+}
+
+#endif

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h?rev=916854&r1=916853&r2=916854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h Fri Feb 26 23:11:19 2010
@@ -57,6 +57,43 @@
         const static std::string SUBTYPE_SCHEMA_PROPERTY;
         const static std::string SUBTYPE_SCHEMA_METHOD;
 
+        /**
+         * Message Content Types
+         */
+        const static std::string AMQP_CONTENT_MAP;
+        const static std::string AMQP_CONTENT_LIST;
+
+        /**
+         * Application Header Keys
+         */
+        const static std::string APP_OPCODE;
+
+        /**
+         * QMF Op Codes
+         */
+        const static std::string OP_AGENT_LOCATE_REQUEST;
+        const static std::string OP_AGENT_LOCATE_RESPONSE;
+        const static std::string OP_AGENT_HEARTBEAT_INDICATION;
+        const static std::string OP_QUERY_REQUEST;
+        const static std::string OP_QUERY_RESPONSE;
+        const static std::string OP_SUBSCRIBE_REQUEST;
+        const static std::string OP_SUBSCRIBE_RESPONSE;
+        const static std::string OP_SUBSCRIBE_CANCEL_INDICATION;
+        const static std::string OP_SUBSCRIBE_REFRESH_REQUEST;
+        const static std::string OP_DATA_INDICATION;
+        const static std::string OP_METHOD_REQUEST;
+        const static std::string OP_METHOD_RESPONSE;
+
+        /**
+         * Content type definitions
+         */
+        const static std::string CONTENT_PACKAGE;
+        const static std::string CONTENT_SCHEMA_ID;
+        const static std::string CONTENT_SCHEMA_CLASS;
+        const static std::string CONTENT_OBJECT_ID;
+        const static std::string CONTENT_DATA;
+        const static std::string CONTENT_EVENT;
+
         /*
         const static uint8_t OP_ATTACH_REQUEST  = 'A';
         const static uint8_t OP_ATTACH_RESPONSE = 'a';

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Agent.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Agent.h?rev=916854&r1=916853&r2=916854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Agent.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Agent.h Fri Feb 26 23:11:19 2010
@@ -20,6 +20,7 @@
  * under the License.
  */
 
+#include <qmf/Notifiable.h>
 #include <qmf/engine/Schema.h>
 #include <qmf/engine/ObjectId.h>
 #include <qmf/engine/Object.h>
@@ -65,10 +66,28 @@
      */
     class Agent {
     public:
+        /**
+         * Declare a type for a notification callback.
+         */
+        typedef void (*notifyCb)();
+
         Agent(const char* vendor, const char* product, const char* name, const char* domain=0, bool internalStore=true);
         ~Agent();
 
         /**
+         * Provide the Agent with a notification callback that is invoked whenever there is new work
+         * placed on the event queue.
+         *
+         * There are two flavors of notification callback: C-style based on the
+         * type "notifyCb"; and the C++ style based on the Notifiable class.
+         * The C++ style can be used for C++ wrappers/applications and the
+         * C-style can be used for C wrappers/applications and also for
+         * Swig-based script wrappers.
+         */
+        void setNotifyCallback(notifyCb handler);
+        void setNotifyCallback(Notifiable* handler);
+
+        /**
          * Set an agent attribute that can be used to describe this agent to consoles.
          *@param key Null-terminated string that is the name of the attribute.
          *@param value Variant value (or any API type) of the attribute.

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Console.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Console.h?rev=916854&r1=916853&r2=916854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Console.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Console.h Fri Feb 26 23:11:19 2010
@@ -98,8 +98,6 @@
         Event*          event;          // (EVENT_RECEIVED)
         uint64_t        timestamp;      // (AGENT_HEARTBEAT)
         QueryResponse*  queryResponse;  // (QUERY_COMPLETE)
-        bool            hasProps;
-        bool            hasStats;
     };
 
     /**
@@ -107,21 +105,11 @@
      */
     struct BrokerEvent {
         enum EventKind {
-            BROKER_INFO     = 10,
-            DECLARE_QUEUE   = 11,
-            DELETE_QUEUE    = 12,
-            BIND            = 13,
-            UNBIND          = 14,
-            SETUP_COMPLETE  = 15,
-            STABLE          = 16,
             QUERY_COMPLETE  = 17,
             METHOD_RESPONSE = 18
         };
 
         EventKind kind;
-        char*           name;           // ([DECLARE|DELETE]_QUEUE, [UN]BIND)
-        char*           exchange;       // ([UN]BIND)
-        char*           bindingKey;     // ([UN]BIND)
         void*           context;        // (QUERY_COMPLETE, METHOD_RESPONSE)
         QueryResponse*  queryResponse;  // (QUERY_COMPLETE)
         MethodResponse* methodResponse; // (METHOD_RESPONSE)
@@ -155,10 +143,6 @@
         BrokerProxy(Console& console);
         ~BrokerProxy();
 
-        void sessionOpened(SessionHandle& sh);
-        void sessionClosed();
-        void startProtocol();
-
         bool getEvent(BrokerEvent& event) const;
         void popEvent();
 

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Query.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Query.h?rev=916854&r1=916853&r2=916854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Query.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Query.h Fri Feb 26 23:11:19 2010
@@ -53,6 +53,7 @@
 
     private:
         friend struct QueryImpl;
+        friend struct BrokerProxyImpl;
         QueryImpl* impl;
     };
 }

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/include/qpid/messaging/ListContent.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qpid/messaging/ListContent.h?rev=916854&r1=916853&r2=916854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qpid/messaging/ListContent.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qpid/messaging/ListContent.h Fri Feb 26 23:11:19 2010
@@ -42,6 +42,7 @@
     typedef Variant::List::const_reverse_iterator const_reverse_iterator;
 
     QPID_CLIENT_EXTERN ListContent(Message&);
+    QPID_CLIENT_EXTERN ListContent(Message&, const Variant::List&);
     QPID_CLIENT_EXTERN ~ListContent();
 
     QPID_CLIENT_EXTERN const_iterator begin() const;

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/include/qpid/messaging/MapContent.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qpid/messaging/MapContent.h?rev=916854&r1=916853&r2=916854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qpid/messaging/MapContent.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qpid/messaging/MapContent.h Fri Feb 26 23:11:19 2010
@@ -47,6 +47,7 @@
     typedef std::map<key_type, Variant>::reverse_iterator reverse_iterator;
 
     QPID_CLIENT_EXTERN MapContent(Message&);
+    QPID_CLIENT_EXTERN MapContent(Message&, const Variant::Map&);
     QPID_CLIENT_EXTERN ~MapContent();
 
     QPID_CLIENT_EXTERN const_iterator begin() const;

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf.mk
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf.mk?rev=916854&r1=916853&r2=916854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf.mk (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf.mk Fri Feb 26 23:11:19 2010
@@ -44,7 +44,9 @@
   ../include/qmf/engine/Object.h		\
   ../include/qmf/engine/QmfEngineImportExport.h	\
   ../include/qmf/engine/Query.h			\
-  ../include/qmf/engine/Schema.h
+  ../include/qmf/engine/Schema.h		\
+  ../include/qmf/Agent.h			\
+  ../include/qmf/Notifiable.h
 
 # ../include/qmf/engine/ObjectId.h
 
@@ -58,6 +60,8 @@
   qpid/agent/ManagementAgentImpl.cpp	\
   qpid/agent/ManagementAgentImpl.h
 
+#  qmf/Agent.cpp
+
 libqmfengine_la_SOURCES =			\
   $(QMF_ENGINE_API)				\
   qmf/engine/Agent.cpp				\

Added: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Agent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Agent.cpp?rev=916854&view=auto
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Agent.cpp (added)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Agent.cpp Fri Feb 26 23:11:19 2010
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/Agent.h"
+#include "qmf/engine/Agent.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Condition.h"
+
+using namespace std;
+using namespace qmf;
+using namespace qpid::messaging;
+using qpid::sys::Duration;
+
+namespace qmf {
+    class AgentImpl {
+    public:
+        AgentImpl(const string& vendor, const string& product, const string& instance, const string& domain, bool internalStore,
+                  AgentHandler* handler, Notifiable* notifiable);
+        ~AgentImpl();
+        void setAttribute(const string& name, const Variant& value) { agentEngine.setAttr(name.c_str(), value); }
+        void setStoreDir(const string& path) { agentEngine.setStoreDir(path.c_str()); }
+        void setConnection(Connection& conn) { agentEngine.setConnection(conn); }
+        void registerClass(SchemaClass* cls) { agentEngine.registerClass(cls); }
+        uint32_t invokeHandler(uint32_t limit, Duration timeout);
+        //    const ObjectId* addObject(AgentObject& obj, bool persistent, uint64_t oid);
+        //    const ObjectId* addObject(AgentObject& obj, bool persistent, uint32_t oidLo, uint32_t oidHi);
+        void raiseEvent(Event& event);
+        void queryResponse(uint32_t context, AgentObject& object);
+        void queryComplete(uint32_t context);
+        void methodResponse(uint32_t context, const Variant::Map& args, const Variant& exception);
+    private:
+        const string vendor;
+        const string product;
+        const string instance;
+        const string domain;
+        const bool internalStore;
+        AgentHandler* handler;
+        Notifiable* notifiable;
+        engine::Agent agentEngine;
+        qpid::sys::Mutex lock;
+        qpid::sys::Condition cond;
+    };
+}
+
+AgentImpl::AgentImpl(const string& _vendor, const string& _product, const string& _instance, const string& _domain,
+                     bool _internalStore, AgentHandler* _handler, Notifiable* _notifiable) :
+    vendor(_vendor), product(_product), instance(_instance.empty() ? "TODO" : _instance),
+    domain(_domain.empty() ? "default" : _domain), internalStore(_internalStore),
+    handler(_handler), notifiable(_notifiable),
+    agentEngine(vendor.c_str(), product.c_str(), instance.c_str(), domain.c_str(), internalStore)
+{
+}
+
+AgentImpl::~AgentImpl()
+{
+}
+
+void AgentImpl::registerClass(SchemaClass* /*cls*/)
+{
+}
+
+uint32_t AgentImpl::invokeHandler(uint32_t limit, Duration timeout)
+{
+    engine::AgentEvent event;
+    bool valid;
+    qpid::sys::AbsTime endTime(qpid::sys::now(), timeout);
+
+    {
+        qpid::sys::Mutex::ScopedLock l(lock);
+        valid = agentEngine.getEvent(event);
+        while (!valid) {
+            if (!cond.wait(lock, endTime))
+                return 0;
+            valid = agentEngine.getEvent(event);
+        }
+    }
+
+    uint32_t count = 0;
+    while (valid) {
+        // TODO: Process event
+        count++;
+        if (limit > 0 && count == limit)
+            break;
+        agentEngine.popEvent();
+        valid = agentEngine.getEvent(event);
+    }
+
+    return count;
+}
+
+//    const ObjectId* AgentImpl::addObject(AgentObject& obj, bool persistent, uint64_t oid);
+//    const ObjectId* AgentImpl::addObject(AgentObject& obj, bool persistent, uint32_t oidLo, uint32_t oidHi);
+
+void AgentImpl::raiseEvent(Event& /*event*/)
+{
+}
+
+void AgentImpl::queryResponse(uint32_t /*context*/, AgentObject& /*object*/)
+{
+}
+
+void AgentImpl::queryComplete(uint32_t /*context*/)
+{
+}
+
+void AgentImpl::methodResponse(uint32_t /*context*/, const Variant::Map& /*args*/, const Variant& /*exception*/)
+{
+}
+
+
+//==================================================================
+// Wrappers
+//==================================================================
+Agent::Agent(const string& vendor, const string& product, const string& instance, const string& domain,
+             bool internalStore, AgentHandler* handler, Notifiable* notifiable) {
+    impl = new AgentImpl(vendor, product, instance, domain, internalStore, handler, notifiable); }
+Agent::~Agent() { delete impl; }
+void Agent::setAttribute(const string& name, const qpid::messaging::Variant& value) { impl->setAttribute(name, value); }
+void Agent::setStoreDir(const string& path) { impl->setStoreDir(path); }
+void Agent::setConnection(qpid::messaging::Connection& conn) { impl->setConnection(conn); }
+void Agent::registerClass(SchemaClass* cls) { impl->registerClass(cls); }
+uint32_t Agent::invokeHandler(uint32_t limit, qpid::sys::Duration timeout) { return impl->invokeHandler(limit, timeout); }
+//const ObjectId* Agent::addObject(AgentObject& obj, bool persistent, uint64_t oid);
+//const ObjectId* Agent::addObject(AgentObject& obj, bool persistent, uint32_t oidLo, uint32_t oidHi);
+void Agent::raiseEvent(Event& event) { impl->raiseEvent(event); }
+void Agent::queryResponse(uint32_t context, AgentObject& object) { impl->queryResponse(context, object); }
+void Agent::queryComplete(uint32_t context) { impl->queryComplete(context); }
+void Agent::methodResponse(uint32_t context, const Variant::Map& args, const Variant& exception) { impl->methodResponse(context, args, exception); }
+

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp?rev=916854&r1=916853&r2=916854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp Fri Feb 26 23:11:19 2010
@@ -43,6 +43,30 @@
 const string Protocol::SUBTYPE_SCHEMA_PROPERTY("qmfProperty");
 const string Protocol::SUBTYPE_SCHEMA_METHOD("qmfMethod");
 
+const string Protocol::AMQP_CONTENT_MAP("amqp/map");
+const string Protocol::AMQP_CONTENT_LIST("amqp/list");
+
+const string Protocol::APP_OPCODE("qmf.opcode");
+
+const string Protocol::OP_AGENT_LOCATE_REQUEST("_agent_locate_request");
+const string Protocol::OP_AGENT_LOCATE_RESPONSE("_agent_locate_response");
+const string Protocol::OP_AGENT_HEARTBEAT_INDICATION("_agent_heartbeat_indication");
+const string Protocol::OP_QUERY_REQUEST("_query_request");
+const string Protocol::OP_QUERY_RESPONSE("_query_response");
+const string Protocol::OP_SUBSCRIBE_REQUEST("_subscribe_request");
+const string Protocol::OP_SUBSCRIBE_RESPONSE("_subscribe_response");
+const string Protocol::OP_SUBSCRIBE_CANCEL_INDICATION("_subscribe_cancel_indication");
+const string Protocol::OP_SUBSCRIBE_REFRESH_REQUEST("_subscribe_refresh_request");
+const string Protocol::OP_DATA_INDICATION("_data_indication");
+const string Protocol::OP_METHOD_REQUEST("_method_request");
+const string Protocol::OP_METHOD_RESPONSE("_method_response");
+
+const string Protocol::CONTENT_PACKAGE("_schema_package");
+const string Protocol::CONTENT_SCHEMA_ID("_schema_id");
+const string Protocol::CONTENT_SCHEMA_CLASS("_schema_class");
+const string Protocol::CONTENT_OBJECT_ID("_object_id");
+const string Protocol::CONTENT_DATA("_data");
+const string Protocol::CONTENT_EVENT("_event");
 
 #if 0
 bool Protocol::checkHeader(const Message& /*msg*/, string& /*opcode*/, uint32_t* /*seq*/)

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp?rev=916854&r1=916853&r2=916854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp Fri Feb 26 23:11:19 2010
@@ -31,6 +31,7 @@
 #include <qpid/messaging/Receiver.h>
 #include <qpid/messaging/Sender.h>
 #include <qpid/messaging/Message.h>
+#include <qpid/messaging/MapView.h>
 #include <string>
 #include <deque>
 #include <map>
@@ -83,6 +84,8 @@
         AgentImpl(const char* vendor, const char* product, const char* name, const char* domain, bool internalStore);
         ~AgentImpl();
 
+        void setNotifyCallback(Agent::notifyCb handler);
+        void setNotifyCallback(Notifiable* handler);
         void setAttr(const char* key, const Variant& value);
         void setStoreDir(const char* path);
         void setTransferDir(const char* path);
@@ -107,10 +110,12 @@
         const string    name;
         const string    domain;
         string directAddr;
-        map<string, Variant> attrMap;
+        Variant::Map attrMap;
         string    storeDir;
         string    transferDir;
         bool      internalStore;
+        Agent::notifyCb  notifyHandler;
+        Notifiable* notifiable;
         Uuid      systemId;
         uint16_t  bootSequence;
         uint32_t  nextContextNum;
@@ -156,14 +161,20 @@
         AgentEventImpl::Ptr eventMethod(uint32_t num, const string& userId, const string& method,
                                         const string& key, boost::shared_ptr<Variant::Map> argMap,
                                         const SchemaClass* cls);
-        void handleRcvMessageLH(qpid::messaging::Message& message);
+        void notify();
+        void handleRcvMessageLH(const Message& message);
+        void handleAgentLocateLH(const Message& message);
+        void handleQueryRequestLH(const Message& message);
+        void handleSubscribeRequest(const Message& message);
+        void handleSubscribeCancel(const Message& message);
+        void handleSubscribeRefresh(const Message& message);
+        void handleMethodRequest(const Message& message);
 
         void sendPackageIndicationLH(const string& packageName);
         void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key);
         void sendCommandCompleteLH(const string& exchange, const string& key, uint32_t seq,
                                    uint32_t code = 0, const string& text = "OK");
         void sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text="");
-        void handleAttachResponse(Message& msg);
         void handlePackageRequest(Message& msg);
         void handleClassQuery(Message& msg);
         void handleSchemaRequest(Message& msg, uint32_t sequence,
@@ -198,12 +209,16 @@
 
 AgentImpl::AgentImpl(const char* _v, const char* _p, const char* _n, const char* _d, bool _i) :
     vendor(_v), product(_p), name(_n), domain(_d ? _d : "default"), internalStore(_i),
+    notifyHandler(0), notifiable(0),
     bootSequence(1), nextContextNum(1), running(true), thread(0)
 {
     directAddr = "qmf." + domain + ".direct/" + vendor + ":" + product + ":" + name;
     if (_d == 0) {
         directAddr += " { create:always }";
     }
+    attrMap["vendor"] = vendor;
+    attrMap["product"] = product;
+    attrMap["name"] = name;
 }
 
 
@@ -211,9 +226,22 @@
 {
 }
 
+void AgentImpl::setNotifyCallback(Agent::notifyCb handler)
+{
+    Mutex::ScopedLock _lock(lock);
+    notifyHandler = handler;
+}
+
+void AgentImpl::setNotifyCallback(Notifiable* handler)
+{
+    Mutex::ScopedLock _lock(lock);
+    notifiable = handler;
+}
+
 void AgentImpl::setAttr(const char* key, const Variant& value)
 {
-    attrMap.insert(pair<string, Variant>(key, value));
+    Mutex::ScopedLock _lock(lock);
+    attrMap[key] = value;
 }
 
 void AgentImpl::setStoreDir(const char* path)
@@ -234,29 +262,6 @@
         transferDir.clear();
 }
 
-/*
-void AgentImpl::handleRcvMessage(Message& message)
-{
-    Buffer   inBuffer(message.body, message.length);
-    uint8_t  opcode;
-    uint32_t sequence;
-    string   replyToExchange(message.replyExchange ? message.replyExchange : "");
-    string   replyToKey(message.replyKey ? message.replyKey : "");
-    string   userId(message.userId ? message.userId : "");
-
-    while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) {
-        if      (opcode == Protocol::OP_ATTACH_RESPONSE) handleAttachResponse(inBuffer);
-        else if (opcode == Protocol::OP_SCHEMA_REQUEST) handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey);
-        else if (opcode == Protocol::OP_CONSOLE_ADDED_INDICATION) handleConsoleAddedIndication();
-        else if (opcode == Protocol::OP_GET_QUERY) handleGetQuery(inBuffer, sequence, replyToKey, userId);
-        else if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(inBuffer, sequence, replyToKey, userId);
-        else {
-            QPID_LOG(error, "AgentImpl::handleRcvMessage invalid opcode=" << opcode);
-            break;
-        }
-    }
-}
-*/
 
 bool AgentImpl::getEvent(AgentEvent& event) const
 {
@@ -277,9 +282,18 @@
 void AgentImpl::setConnection(Connection& conn)
 {
     Mutex::ScopedLock _lock(lock);
+
+    //
+    // Don't permit the overwriting of an existing connection
+    // TODO: return an error or throw an exception if an overwrite is attempted.
+    //
     if (connection == 0)
         return;
     connection = conn;
+
+    //
+    // Start the Agent thread now that we have a connection to work with.
+    //
     thread = new qpid::sys::Thread(*this);
 }
 
@@ -384,6 +398,61 @@
     running = false;
 }
 
+void AgentImpl::handleRcvMessageLH(const Message& message)
+{
+    Variant::Map headers(message.getHeaders());
+    cout << "AgentImpl::handleRcvMessageLH headers=" << headers << endl;
+
+    if (message.getContentType() != Protocol::AMQP_CONTENT_MAP)
+        return;
+
+    Variant::Map::const_iterator iter = headers.find(Protocol::APP_OPCODE);
+    if (iter == headers.end())
+        return;
+    string opcode = iter->second.asString();
+
+    if (opcode == Protocol::OP_AGENT_LOCATE_REQUEST)        handleAgentLocateLH(message);
+    if (opcode == Protocol::OP_QUERY_REQUEST)               handleQueryRequestLH(message);
+    if (opcode == Protocol::OP_SUBSCRIBE_REQUEST)           handleSubscribeRequest(message);
+    if (opcode == Protocol::OP_SUBSCRIBE_CANCEL_INDICATION) handleSubscribeCancel(message);
+    if (opcode == Protocol::OP_SUBSCRIBE_REFRESH_REQUEST)   handleSubscribeRefresh(message);
+    if (opcode == Protocol::OP_METHOD_REQUEST)              handleMethodRequest(message);
+}
+
+void AgentImpl::handleAgentLocateLH(const Message& message)
+{
+    const MapView predicate(message);
+
+    //if (predicateMatches(predicate, attrMap)) {
+    //    sendResponse(message, Protocol::OP_AGENT_LOCATE_RESPONSE, attrMap);
+    //}
+}
+
+void AgentImpl::handleQueryRequestLH(const Message& message)
+{
+    const MapView map(message);
+}
+
+void AgentImpl::handleSubscribeRequest(const Message& message)
+{
+    const MapView map(message);
+}
+
+void AgentImpl::handleSubscribeCancel(const Message& message)
+{
+    const MapView map(message);
+}
+
+void AgentImpl::handleSubscribeRefresh(const Message& message)
+{
+    const MapView map(message);
+}
+
+void AgentImpl::handleMethodRequest(const Message& message)
+{
+    const MapView map(message);
+}
+
 AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string&, const string&, const string& key)
 {
     AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY));
@@ -407,8 +476,12 @@
     return event;
 }
 
-void AgentImpl::handleRcvMessageLH(qpid::messaging::Message& /*msg*/)
+void AgentImpl::notify()
 {
+    if (notifyHandler != 0)
+        notifyHandler();
+    if (notifiable != 0)
+        notifiable->notify();
 }
 
 void AgentImpl::sendPackageIndicationLH(const string& packageName)
@@ -471,6 +544,8 @@
 
 Agent::Agent(const char* v, const char* p, const char* n, const char* d, bool i) { impl = new AgentImpl(v, p, n, d, i); }
 Agent::~Agent() { delete impl; }
+void Agent::setNotifyCallback(notifyCb handler) { impl->setNotifyCallback(handler); }
+void Agent::setNotifyCallback(Notifiable* handler) { impl->setNotifyCallback(handler); }
 void Agent::setAttr(const char* key, const Variant& value) { impl->setAttr(key, value); }
 void Agent::setStoreDir(const char* path) { impl->setStoreDir(path); }
 void Agent::setTransferDir(const char* path) { impl->setTransferDir(path); }

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp?rev=916854&r1=916853&r2=916854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp Fri Feb 26 23:11:19 2010
@@ -19,7 +19,7 @@
 
 #include "qmf/engine/BrokerProxyImpl.h"
 #include "qmf/engine/ConsoleImpl.h"
-#include "qmf/engine/Protocol.h"
+#include "qmf/Protocol.h"
 #include "qpid/Address.h"
 #include "qpid/sys/SystemInfo.h"
 #include <qpid/log/Statement.h>
@@ -30,7 +30,7 @@
 
 using namespace std;
 using namespace qmf::engine;
-using namespace qpid::framing;
+using namespace qpid::messaging;
 using namespace qpid::sys;
 
 namespace {
@@ -64,10 +64,6 @@
 
     ::memset(&item, 0, sizeof(BrokerEvent));
     item.kind = kind;
-
-    STRING_REF(name);
-    STRING_REF(exchange);
-    STRING_REF(bindingKey);
     item.context = context;
     item.queryResponse = queryResponse.get();
     item.methodResponse = methodResponse.get();
@@ -87,63 +83,12 @@
     seqMgr.setUnsolicitedContext(SequenceContext::Ptr(new StaticContext(*this)));
 }
 
-void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/)
-{
-    Mutex::ScopedLock _lock(lock);
-    agentList.clear();
-    eventQueue.clear();
-    xmtQueue.clear();
-    eventQueue.push_back(eventDeclareQueue(queueName));
-    eventQueue.push_back(eventBind(DIR_EXCHANGE, queueName, queueName));
-    eventQueue.push_back(eventSetupComplete());
-
-    // TODO: Store session handle
-}
-
-void BrokerProxyImpl::sessionClosed()
-{
-    Mutex::ScopedLock _lock(lock);
-    agentList.clear();
-    eventQueue.clear();
-    xmtQueue.clear();
-}
-
-void BrokerProxyImpl::startProtocol()
-{
-    AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, 0, "Agent embedded in broker"));
-    {
-        Mutex::ScopedLock _lock(lock);
-        char rawbuffer[512];
-        Buffer buffer(rawbuffer, 512);
-
-        agentList[0] = agent;
-
-        requestsOutstanding = 1;
-        topicBound = false;
-        uint32_t sequence(seqMgr.reserve());
-        Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence);
-        sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
-        QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence);
-    }
-
-    console.impl->eventAgentAdded(agent);
-}
-
-void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
+void BrokerProxyImpl::sendBufferLH(Buffer&, const string&, const string&)
 {
-    uint32_t length = buf.getPosition();
-    MessageImpl::Ptr message(new MessageImpl);
-
-    buf.reset();
-    buf.getRawData(message->body, length);
-    message->destination   = destination;
-    message->routingKey    = routingKey;
-    message->replyExchange = DIR_EXCHANGE;
-    message->replyKey      = queueName;
-
-    xmtQueue.push_back(message);
+    // TODO
 }
 
+/*
 void BrokerProxyImpl::handleRcvMessage(Message& message)
 {
     Buffer inBuffer(message.body, message.length);
@@ -153,22 +98,7 @@
     while (Protocol::checkHeader(inBuffer, &opcode, &sequence))
         seqMgr.dispatch(opcode, sequence, message.routingKey ? string(message.routingKey) : string(), inBuffer);
 }
-
-bool BrokerProxyImpl::getXmtMessage(Message& item) const
-{
-    Mutex::ScopedLock _lock(lock);
-    if (xmtQueue.empty())
-        return false;
-    item =  xmtQueue.front()->copy();
-    return true;
-}
-
-void BrokerProxyImpl::popXmt()
-{
-    Mutex::ScopedLock _lock(lock);
-    if (!xmtQueue.empty())
-        xmtQueue.pop_front();
-}
+*/
 
 bool BrokerProxyImpl::getEvent(BrokerEvent& event) const
 {
@@ -227,10 +157,6 @@
 
 bool BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent)
 {
-    if (query.impl->singleAgent()) {
-        if (query.impl->agentBank() != agent->getAgentBank())
-            return false;
-    }
     stringstream key;
     Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
     uint32_t sequence(seqMgr.reserve(queryContext));
@@ -269,7 +195,7 @@
     return string();
 }
 
-void BrokerProxyImpl::sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls,
+void BrokerProxyImpl::sendMethodRequest(ObjectId* oid, const SchemaClass* cls,
                                         const string& methodName, const Value* args, void* userContext)
 {
     int methodCount = cls->getMethodCount();
@@ -452,43 +378,13 @@
 
 void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq)
 {
-    SchemaObjectClass* oClassPtr;
-    SchemaEventClass* eClassPtr;
+    SchemaClass* classPtr;
     uint8_t kind = inBuffer.getOctet();
     const SchemaClassKey* key;
-    if (kind == CLASS_OBJECT) {
-        oClassPtr = SchemaObjectClassImpl::factory(inBuffer);
-        console.impl->learnClass(oClassPtr);
-        key = oClassPtr->getClassKey();
-        QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->impl->str());
-
-        //
-        // If we have just learned about the org.apache.qpid.broker:agent class, send a get
-        // request for the current list of agents so we can have it on-hand before we declare
-        // this session "stable".
-        //
-        if (key->impl->getClassName() == AGENT_CLASS && key->impl->getPackageName() == BROKER_PACKAGE) {
-            Mutex::ScopedLock _lock(lock);
-            incOutstandingLH();
-            Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
-            uint32_t sequence(seqMgr.reserve());
-            Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
-            FieldTable ft;
-            ft.setString("_class", AGENT_CLASS);
-            ft.setString("_package", BROKER_PACKAGE);
-            ft.encode(outBuffer);
-            sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_AGENT_KEY);
-            QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << BROKER_AGENT_KEY);
-        }
-    } else if (kind == CLASS_EVENT) {
-        eClassPtr = SchemaEventClassImpl::factory(inBuffer);
-        console.impl->learnClass(eClassPtr);
-        key = eClassPtr->getClassKey();
-        QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=event key=" << key->impl->str());
-    }
-    else {
-        QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind);
-    }
+    classPtr = SchemaClassImpl::factory(inBuffer);
+    console.impl->learnClass(classPtr);
+    key = classPtr->getClassKey();
+    QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->impl->str());
 }
 
 ObjectPtr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat)
@@ -496,7 +392,7 @@
     auto_ptr<SchemaClassKey> classKey(SchemaClassKeyImpl::factory(inBuffer));
     QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey->impl->str());
 
-    SchemaObjectClass* schema = console.impl->getSchema(classKey.get());
+    SchemaClass* schema = console.impl->getSchema(classKey.get());
     if (schema == 0) {
         QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey->impl->str());
         return ObjectPtr();
@@ -749,12 +645,6 @@
 
 BrokerProxy::BrokerProxy(Console& console) : impl(new BrokerProxyImpl(*this, console)) {}
 BrokerProxy::~BrokerProxy() { delete impl; }
-void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); }
-void BrokerProxy::sessionClosed() { impl->sessionClosed(); }
-void BrokerProxy::startProtocol() { impl->startProtocol(); }
-void BrokerProxy::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); }
-bool BrokerProxy::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); }
-void BrokerProxy::popXmt() { impl->popXmt(); }
 bool BrokerProxy::getEvent(BrokerEvent& event) const { return impl->getEvent(event); }
 void BrokerProxy::popEvent() { impl->popEvent(); }
 uint32_t BrokerProxy::agentCount() const { return impl->agentCount(); }

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h?rev=916854&r1=916853&r2=916854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h Fri Feb 26 23:11:19 2010
@@ -25,8 +25,6 @@
 #include "qmf/engine/SchemaImpl.h"
 #include "qmf/engine/QueryImpl.h"
 #include "qmf/engine/SequenceManager.h"
-#include "qmf/engine/MessageImpl.h"
-#include "qpid/framing/Uuid.h"
 #include "qpid/messaging/Variant.h"
 #include "qpid/sys/Mutex.h"
 #include "boost/shared_ptr.hpp"
@@ -80,9 +78,6 @@
     struct BrokerEventImpl {
         typedef boost::shared_ptr<BrokerEventImpl> Ptr;
         BrokerEvent::EventKind kind;
-        std::string name;
-        std::string exchange;
-        std::string bindingKey;
         void* context;
         QueryResponsePtr queryResponse;
         MethodResponsePtr methodResponse;
@@ -123,14 +118,7 @@
         BrokerProxyImpl(BrokerProxy& pub, Console& _console);
         ~BrokerProxyImpl() {}
 
-        void sessionOpened(SessionHandle& sh);
-        void sessionClosed();
-        void startProtocol();
-
         void sendBufferLH(qpid::framing::Buffer& buf, const std::string& destination, const std::string& routingKey);
-        void handleRcvMessage(Message& message);
-        bool getXmtMessage(Message& item) const;
-        void popXmt();
 
         bool getEvent(BrokerEvent& event) const;
         void popEvent();
@@ -140,7 +128,7 @@
         void sendQuery(const Query& query, void* context, const AgentProxy* agent);
         bool sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent);
         std::string encodeMethodArguments(const SchemaMethod* schema, const qpid::messaging::Variant::Map* args, qpid::framing::Buffer& buffer);
-        void sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls, const std::string& method, const qpid::messaging::Variant::Map* args, void* context);
+        void sendMethodRequest(ObjectId* oid, const SchemaClass* cls, const std::string& method, const qpid::messaging::Variant::Map* args, void* context);
 
         void addBinding(const std::string& exchange, const std::string& key);
         void staticRelease() { decOutstanding(); }
@@ -153,12 +141,11 @@
         mutable qpid::sys::Mutex lock;
         Console& console;
         std::string queueName;
-        qpid::framing::Uuid brokerId;
+        qpid::messaging::Uuid brokerId;
         SequenceManager seqMgr;
         uint32_t requestsOutstanding;
         bool topicBound;
         std::map<uint32_t, AgentProxyPtr> agentList;
-        std::deque<MessageImpl::Ptr> xmtQueue;
         std::deque<BrokerEventImpl::Ptr> eventQueue;
 
 #       define MA_BUFFER_SIZE 65536

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp?rev=916854&r1=916853&r2=916854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp Fri Feb 26 23:11:19 2010
@@ -18,20 +18,6 @@
  */
 
 #include "qmf/engine/ConsoleImpl.h"
-#include "qmf/engine/MessageImpl.h"
-#include "qmf/engine/SchemaImpl.h"
-#include "qmf/engine/Typecode.h"
-#include "qmf/engine/ObjectImpl.h"
-#include "qmf/engine/ObjectIdImpl.h"
-#include "qmf/engine/QueryImpl.h"
-#include "qmf/engine/ValueImpl.h"
-#include "qmf/engine/Protocol.h"
-#include "qmf/engine/SequenceManager.h"
-#include "qmf/engine/BrokerProxyImpl.h"
-#include <qpid/framing/Buffer.h>
-#include <qpid/framing/Uuid.h>
-#include <qpid/framing/FieldTable.h>
-#include <qpid/framing/FieldValue.h>
 #include <qpid/log/Statement.h>
 #include <qpid/sys/Time.h>
 #include <qpid/sys/SystemInfo.h>

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ConsoleImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ConsoleImpl.h?rev=916854&r1=916853&r2=916854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ConsoleImpl.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ConsoleImpl.h Fri Feb 26 23:11:19 2010
@@ -21,15 +21,13 @@
  */
 
 #include "qmf/engine/Console.h"
-#include "qmf/engine/MessageImpl.h"
 #include "qmf/engine/SchemaImpl.h"
 #include "qmf/engine/ObjectImpl.h"
 #include "qmf/engine/ObjectIdImpl.h"
 #include "qmf/engine/QueryImpl.h"
-#include "qmf/engine/Protocol.h"
+#include "qmf/Protocol.h"
 #include "qmf/engine/SequenceManager.h"
 #include "qmf/engine/BrokerProxyImpl.h"
-#include <qpid/framing/Uuid.h>
 #include <qpid/sys/Mutex.h>
 #include <qpid/sys/Time.h>
 #include <qpid/sys/SystemInfo.h>

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qpid/messaging/ListContent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qpid/messaging/ListContent.cpp?rev=916854&r1=916853&r2=916854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qpid/messaging/ListContent.cpp (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qpid/messaging/ListContent.cpp Fri Feb 26 23:11:19 2010
@@ -37,6 +37,11 @@
         }
     }
 
+    ListContentImpl(Message& m, const Variant::List& i) : Variant(i), msg(&m)
+    {
+        msg->getContent().clear();
+    }
+
     void encode()
     {
         qpid::client::amqp0_10::ListCodec codec;
@@ -45,6 +50,7 @@
 };
 
 ListContent::ListContent(Message& m) : impl(new ListContentImpl(m)) {}
+ListContent::ListContent(Message& m, const Variant::List& i) : impl(new ListContentImpl(m, i)) {}
 ListContent::~ListContent() { delete impl; }
 ListContent& ListContent::operator=(const ListContent& l) { *impl = *l.impl; return *this; }
 

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qpid/messaging/MapContent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qpid/messaging/MapContent.cpp?rev=916854&r1=916853&r2=916854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qpid/messaging/MapContent.cpp (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qpid/messaging/MapContent.cpp Fri Feb 26 23:11:19 2010
@@ -37,6 +37,11 @@
         }
     }
 
+    MapContentImpl(Message& m, const Variant::Map& i) : Variant(i), msg(&m)
+    {
+        msg->getContent().clear();
+    }
+
     void encode()
     {
         qpid::client::amqp0_10::MapCodec codec;
@@ -46,6 +51,7 @@
 };
 
 MapContent::MapContent(Message& m) : impl(new MapContentImpl(m)) {}
+MapContent::MapContent(Message& m, const Variant::Map& i) : impl(new MapContentImpl(m, i)) {}
 MapContent::~MapContent() { delete impl; }
 MapContent& MapContent::operator=(const MapContent& m) { *impl = *m.impl; return *this; }
 

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=916854&r1=916853&r2=916854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/tests/MessagingSessionTests.cpp Fri Feb 26 23:11:19 2010
@@ -346,6 +346,25 @@
     fix.session.acknowledge();
 }
 
+QPID_AUTO_TEST_CASE(testMapMessageWithInitial)
+{
+    QueueFixture fix;
+    Sender sender = fix.session.createSender(fix.queue);
+    Message out;
+    Variant::Map imap;
+    imap["abc"] = "def";
+    imap["pi"] = 3.14f;
+    MapContent content(out, imap);
+    content.encode();
+    sender.send(out);
+    Receiver receiver = fix.session.createReceiver(fix.queue);
+    Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
+    MapView view(in);
+    BOOST_CHECK_EQUAL(view["abc"].asString(), "def");
+    BOOST_CHECK_EQUAL(view["pi"].asFloat(), 3.14f);
+    fix.session.acknowledge();
+}
+
 QPID_AUTO_TEST_CASE(testListMessage)
 {
     QueueFixture fix;
@@ -379,6 +398,40 @@
     fix.session.acknowledge();
 }
 
+QPID_AUTO_TEST_CASE(testListMessageWithInitial)
+{
+    QueueFixture fix;
+    Sender sender = fix.session.createSender(fix.queue);
+    Message out;
+    Variant::List ilist;
+    ilist.push_back(Variant("abc"));
+    ilist.push_back(Variant(1234));
+    ilist.push_back(Variant("def"));
+    ilist.push_back(Variant(56.789));
+    ListContent content(out, ilist);
+    content.encode();
+    sender.send(out);
+    Receiver receiver = fix.session.createReceiver(fix.queue);
+    Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
+    ListView view(in);
+    BOOST_CHECK_EQUAL(view.size(), content.size());
+    BOOST_CHECK_EQUAL(view.front().asString(), "abc");
+    BOOST_CHECK_EQUAL(view.back().asDouble(), 56.789);
+
+    ListView::const_iterator i = view.begin();
+    BOOST_CHECK(i != view.end());
+    BOOST_CHECK_EQUAL(i->asString(), "abc");
+    BOOST_CHECK(++i != view.end());
+    BOOST_CHECK_EQUAL(i->asInt64(), 1234);
+    BOOST_CHECK(++i != view.end());
+    BOOST_CHECK_EQUAL(i->asString(), "def");
+    BOOST_CHECK(++i != view.end());
+    BOOST_CHECK_EQUAL(i->asDouble(), 56.789);
+    BOOST_CHECK(++i == view.end());
+
+    fix.session.acknowledge();
+}
+
 QPID_AUTO_TEST_CASE(testReject)
 {
     QueueFixture fix;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org