You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/02/19 17:06:23 UTC

svn commit: r911854 [1/2] - in /qpid/branches/qmf-devel0.7/qpid/cpp: include/qmf/ include/qmf/engine/ src/ src/qmf/ src/qmf/engine/

Author: tross
Date: Fri Feb 19 16:06:22 2010
New Revision: 911854

URL: http://svn.apache.org/viewvc?rev=911854&view=rev
Log:
Checking in work-in-progress for the qmf branch.
This branch will not compile until further changes are checked in.

Added:
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp
Removed:
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Connection.h
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/ConnectionSettings.h
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/ConnectionSettings.h
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Message.h
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/ResilientConnection.h
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Typecode.h
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Value.h
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.h
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/MessageImpl.cpp
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/MessageImpl.h
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Protocol.cpp
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Protocol.h
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ResilientConnection.cpp
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ValueImpl.cpp
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ValueImpl.h
Modified:
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Agent.h
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Console.h
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Object.h
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Query.h
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Schema.h
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf.mk
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ConsoleImpl.h
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.cpp
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.h
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.cpp
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.h
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/SchemaImpl.cpp
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/SchemaImpl.h

Added: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h?rev=911854&view=auto
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h (added)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h Fri Feb 19 16:06:22 2010
@@ -0,0 +1,88 @@
+#ifndef _QmfProtocol_
+#define _QmfProtocol_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/sys/IntegerTypes.h>
+#include <string>
+
+namespace qpid {
+    namespace messaging {
+        class Message;
+    }
+}
+
+namespace qmf {
+
+    class Protocol {
+    public:
+        //static bool checkHeader(const qpid::messaging::Message& msg, std::string& opcode, uint32_t *seq);
+        //static void encodeHeader(qpid::messaging::Message& msg, const std::string& opcode, uint32_t seq = 0);
+
+        const static std::string SCHEMA_ELT_NAME;
+        const static std::string SCHEMA_ELT_TYPE;
+        const static std::string SCHEMA_ELT_DIR;
+        const static std::string SCHEMA_ELT_UNIT;
+        const static std::string SCHEMA_ELT_DESC;
+        const static std::string SCHEMA_ELT_ACCESS;
+        const static std::string SCHEMA_ELT_OPTIONAL;
+        const static std::string SCHEMA_ARGS;
+        const static std::string SCHEMA_PACKAGE;
+        const static std::string SCHEMA_CLASS_KIND;
+        const static std::string SCHEMA_CLASS_KIND_DATA;
+        const static std::string SCHEMA_CLASS_KIND_EVENT;
+        const static std::string SCHEMA_CLASS;
+        const static std::string SCHEMA_HASH;
+        const static std::string AGENT_NAME;
+        const static std::string OBJECT_NAME;
+        const static std::string SCHEMA_ID;
+
+        /*
+        const static uint8_t OP_ATTACH_REQUEST  = 'A';
+        const static uint8_t OP_ATTACH_RESPONSE = 'a';
+
+        const static uint8_t OP_BROKER_REQUEST  = 'B';
+        const static uint8_t OP_BROKER_RESPONSE = 'b';
+
+        const static uint8_t OP_CONSOLE_ADDED_INDICATION = 'x';
+        const static uint8_t OP_COMMAND_COMPLETE         = 'z';
+        const static uint8_t OP_HEARTBEAT_INDICATION     = 'h';
+
+        const static uint8_t OP_PACKAGE_REQUEST    = 'P';
+        const static uint8_t OP_PACKAGE_INDICATION = 'p';
+        const static uint8_t OP_CLASS_QUERY        = 'Q';
+        const static uint8_t OP_CLASS_INDICATION   = 'q';
+        const static uint8_t OP_SCHEMA_REQUEST     = 'S';
+        const static uint8_t OP_SCHEMA_RESPONSE    = 's';
+
+        const static uint8_t OP_METHOD_REQUEST       = 'M';
+        const static uint8_t OP_METHOD_RESPONSE      = 'm';
+        const static uint8_t OP_GET_QUERY            = 'G';
+        const static uint8_t OP_OBJECT_INDICATION    = 'g';
+        const static uint8_t OP_PROPERTY_INDICATION  = 'c';
+        const static uint8_t OP_STATISTIC_INDICATION = 'i';
+        const static uint8_t OP_EVENT_INDICATION     = 'e';
+        */
+    };
+
+}
+
+#endif
+

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Agent.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Agent.h?rev=911854&r1=911853&r2=911854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Agent.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Agent.h Fri Feb 19 16:06:22 2010
@@ -25,8 +25,8 @@
 #include <qmf/engine/Object.h>
 #include <qmf/engine/Event.h>
 #include <qmf/engine/Query.h>
-#include <qmf/engine/Value.h>
-#include <qmf/engine/Message.h>
+#include <qpid/messaging/Connection.h>
+#include <qpid/messaging/Variant.h>
 
 namespace qmf {
 namespace engine {
@@ -42,12 +42,7 @@
             GET_QUERY      = 1,
             START_SYNC     = 2,
             END_SYNC       = 3,
-            METHOD_CALL    = 4,
-            DECLARE_QUEUE  = 5,
-            DELETE_QUEUE   = 6,
-            BIND           = 7,
-            UNBIND         = 8,
-            SETUP_COMPLETE = 9
+            METHOD_CALL    = 4
         };
 
         EventKind    kind;
@@ -55,13 +50,11 @@
         char*        authUserId;  // Authenticated user ID (for all kinds)
         char*        authToken;   // Authentication token if issued (for all kinds)
         char*        name;        // Name of the method/sync query
-                                  //    (METHOD_CALL, START_SYNC, END_SYNC, DECLARE_QUEUE, BIND, UNBIND)
+                                  //    (METHOD_CALL, START_SYNC, END_SYNC)
         Object*      object;      // Object involved in method call (METHOD_CALL)
-        ObjectId*    objectId;    // ObjectId for method call (METHOD_CALL)
+        char*        objectKey;   // Object key for method call (METHOD_CALL)
         Query*       query;       // Query parameters (GET_QUERY, START_SYNC)
-        Value*       arguments;   // Method parameters (METHOD_CALL)
-        char*        exchange;    // Exchange for bind (BIND, UNBIND)
-        char*        bindingKey;  // Key for bind (BIND, UNBIND)
+        qpid::messaging::Variant::Map*  arguments;   // Method parameters (METHOD_CALL)
         const SchemaObjectClass* objectClass; // (METHOD_CALL)
     };
 
@@ -72,10 +65,17 @@
      */
     class Agent {
     public:
-        Agent(char* label, bool internalStore=true);
+        Agent(const char* vendor, const char* product, const char* name, const char* domain=0, bool internalStore=true);
         ~Agent();
 
         /**
+         * Set an agent attribute that can be used to describe this agent to consoles.
+         *@param key Null-terminated string that is the name of the attribute.
+         *@param value Variant value (or any API type) of the attribute.
+         */
+        void setAttr(const char* key, const qpid::messaging::Variant& value);
+
+        /**
          * Configure the directory path for storing persistent data.
          *@param path Null-terminated string containing a directory path where files can be
          *            created, written, and read.  If NULL, no persistent storage will be
@@ -92,24 +92,6 @@
         void setTransferDir(const char* path);
 
         /**
-         * Pass messages received from the AMQP session to the Agent engine.
-         *@param message AMQP messages received on the agent session.
-         */
-        void handleRcvMessage(Message& message);
-
-        /**
-         * Get the next message to be sent to the AMQP network.
-         *@param item The Message structure describing the message to be produced.
-         *@return true if the Message is valid, false if there are no messages to send.
-         */
-        bool getXmtMessage(Message& item) const;
-
-        /**
-         * Remove and discard one message from the head of the transmit queue.
-         */
-        void popXmt();
-
-        /**
          * Get the next application event from the agent engine.
          *@param event The event iff the return value is true
          *@return true if event is valid, false if there are no events to process
@@ -122,20 +104,9 @@
         void popEvent();
 
         /**
-         * A new AMQP session has been established for Agent communication.
-         */
-        void newSession();
-
-        /**
-         * Start the QMF Agent protocol.  This should be invoked after a SETUP_COMPLETE event
-         * is received from the Agent engine.
-         */
-        void startProtocol();
-
-        /**
-         * This method is called periodically so the agent can supply a heartbeat.
+         * Provide the AMQP connection to be used for this agent.
          */
-        void heartbeat();
+        void setConnection(qpid::messaging::Connection& conn);
 
         /**
          * Respond to a method request.
@@ -144,7 +115,7 @@
          *@param text      Status text ("OK" or an error message)
          *@param arguments The list of output arguments from the method call.
          */
-        void methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments);
+        void methodResponse(uint32_t sequence, uint32_t status, char* text, const qpid::messaging::Variant::Map& arguments);
 
         /**
          * Send a content indication to the QMF bus.  This is only needed for objects that are
@@ -152,10 +123,8 @@
          * (inserted using addObject).
          *@param sequence The sequence number of the GET request or the SYNC_START request.
          *@param object   The object (annotated with "changed" flags) for publication.
-         *@param prop     If true, changed object properties are transmitted.
-         *@param stat     If true, changed object statistics are transmitted.
          */
-        void queryResponse(uint32_t sequence, Object& object, bool prop = true, bool stat = true);
+        void queryResponse(uint32_t sequence, Object& object);
 
         /**
          * Indicate the completion of a query.  This is not used for SYNC_START requests.
@@ -178,20 +147,12 @@
         /**
          * Give an object to the Agent for storage and management.  Once added, the agent takes
          * responsibility for the life cycle of the object.
-         *@param obj       The object to be managed by the Agent.
-         *@param persistId A unique non-zero value if the object-id is to be persistent.
-         *@return The objectId of the managed object.
-         */
-        const ObjectId* addObject(Object& obj, uint64_t persistId);
-        //        const ObjectId* addObject(Object& obj, uint32_t persistIdLo, uint32_t persistIdHi);
-
-        /**
-         * Allocate an object-id for an object that will be managed by the application.
-         *@param persistId A unique non-zero value if the object-id is to be persistent.
-         *@return The objectId structure for the allocated ID.
+         *@param obj The object to be managed by the Agent.
+         *@param key A unique name (a primary key) to be used to address this object. If
+         *           left null, the agent will create a unique name for the object.
+         *@return The key for the managed object.
          */
-        const ObjectId* allocObjectId(uint64_t persistId);
-        const ObjectId* allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi);
+        const char* addObject(Object& obj, const char* key=0);
 
         /**
          * Raise an event into the QMF network..

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Console.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Console.h?rev=911854&r1=911853&r2=911854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Console.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Console.h Fri Feb 19 16:06:22 2010
@@ -20,14 +20,12 @@
  * under the License.
  */
 
-#include <qmf/engine/ResilientConnection.h>
 #include <qmf/engine/Schema.h>
 #include <qmf/engine/ObjectId.h>
 #include <qmf/engine/Object.h>
 #include <qmf/engine/Event.h>
 #include <qmf/engine/Query.h>
-#include <qmf/engine/Value.h>
-#include <qmf/engine/Message.h>
+#include <qpid/messaging/Variant.h>
 
 namespace qmf {
 namespace engine {
@@ -49,8 +47,8 @@
         MethodResponse(const MethodResponse& from);
         ~MethodResponse();
         uint32_t getStatus() const;
-        const Value* getException() const;
-        const Value* getArgs() const;
+        const qpid::messaging::Variant* getException() const;
+        const qpid::messaging::Variant::Map* getArgs() const;
 
     private:
         friend struct MethodResponseImpl;
@@ -66,7 +64,7 @@
     public:
         ~QueryResponse();
         uint32_t getStatus() const;
-        const Value* getException() const;
+        const qpid::messaging::Variant* getException() const;
         uint32_t getObjectCount() const;
         const Object* getObject(uint32_t idx) const;
 
@@ -161,10 +159,6 @@
         void sessionClosed();
         void startProtocol();
 
-        void handleRcvMessage(Message& message);
-        bool getXmtMessage(Message& item) const;
-        void popXmt();
-
         bool getEvent(BrokerEvent& event) const;
         void popEvent();
 

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Object.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Object.h?rev=911854&r1=911853&r2=911854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Object.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Object.h Fri Feb 19 16:06:22 2010
@@ -22,7 +22,7 @@
 
 #include <qmf/engine/Schema.h>
 #include <qmf/engine/ObjectId.h>
-#include <qmf/engine/Value.h>
+#include <qpid/messaging/Variant.h>
 
 namespace qmf {
 namespace engine {
@@ -30,23 +30,26 @@
     struct ObjectImpl;
     class Object {
     public:
-        Object(const SchemaObjectClass* type);
+        Object();
+        Object(SchemaObjectClass* type);
         Object(const Object& from);
         virtual ~Object();
 
+        const qpid::messaging::Variant::Map& getValues() const;
+        qpid::messaging::Variant::Map& getValues();
+
+        const SchemaObjectClass* getSchema() const;
+        void setSchema(SchemaObjectClass* schema);
+
+        const char* getKey() const;
+        void setKey(const char* key);
+
+        void touch();
         void destroy();
-        const ObjectId* getObjectId() const;
-        void setObjectId(ObjectId* oid);
-        const SchemaObjectClass* getClass() const;
-        Value* getValue(const char* key) const;
-        void invokeMethod(const char* methodName, const Value* inArgs, void* context) const;
-        bool isDeleted() const;
-        void merge(const Object& from);
 
     private:
         friend struct ObjectImpl;
         friend class  AgentImpl;
-        Object(ObjectImpl* impl);
         ObjectImpl* impl;
     };
 }

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Query.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Query.h?rev=911854&r1=911853&r2=911854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Query.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Query.h Fri Feb 19 16:06:22 2010
@@ -20,89 +20,39 @@
  * under the License.
  */
 
-#include <qmf/engine/ObjectId.h>
-#include <qmf/engine/Value.h>
+#include <qpid/messaging/Variant.h>
 
 namespace qmf {
 namespace engine {
 
     class Object;
-    struct QueryElementImpl;
-    struct QueryImpl;
-    struct QueryExpressionImpl;
-    class  SchemaClassKey;
-
-    enum ValueOper {
-        O_EQ = 1,
-        O_NE = 2,
-        O_LT = 3,
-        O_LE = 4,
-        O_GT = 5,
-        O_GE = 6,
-        O_RE_MATCH = 7,
-        O_RE_NOMATCH = 8,
-        O_PRESENT = 9,
-        O_NOT_PRESENT = 10
-    };
-
-    struct QueryOperand {
-        virtual ~QueryOperand() {}
-        virtual bool evaluate(const Object* object) const = 0;
-    };
-
-    struct QueryElement : public QueryOperand {
-        QueryElement(const char* attrName, const Value* value, ValueOper oper);
-        QueryElement(QueryElementImpl* impl);
-        virtual ~QueryElement();
-        bool evaluate(const Object* object) const;
-
-        QueryElementImpl* impl;
-    };
-
-    enum ExprOper {
-        E_NOT = 1,
-        E_AND = 2,
-        E_OR  = 3,
-        E_XOR = 4
-    };
-
-    struct QueryExpression : public QueryOperand {
-        QueryExpression(ExprOper oper, const QueryOperand* operand1, const QueryOperand* operand2);
-        QueryExpression(QueryExpressionImpl* impl);
-        virtual ~QueryExpression();
-        bool evaluate(const Object* object) const;
-        
-        QueryExpressionImpl* impl;
-    };
+    class QueryImpl;
 
     class Query {
     public:
-        Query(const char* className, const char* packageName);
-        Query(const SchemaClassKey* key);
-        Query(const ObjectId* oid);
+        Query(const char* target);
+        Query(const char* target, const qpid::messaging::Variant::List& predicate);
+        Query(const char* target, const char* expression);
         Query(const Query& from);
         ~Query();
 
-        void setSelect(const QueryOperand* criterion);
-        void setLimit(uint32_t maxResults);
-        void setOrderBy(const char* attrName, bool decreasing);
-
-        const char* getPackage() const;
-        const char* getClass() const;
-        const ObjectId* getObjectId() const;
+        void where(const qpid::messaging::Variant::List& predicate);
+        void where(const char* expression);
+        void limit(uint32_t maxResults);
+        void orderBy(const char* attrName, bool decreasing);
 
-        bool haveSelect() const;
+        bool havePredicate() const;
         bool haveLimit() const;
         bool haveOrderBy() const;
-        const QueryOperand* getSelect() const;
+        const qpid::messaging::Variant::List& getPredicate() const;
         uint32_t getLimit() const;
         const char* getOrderBy() const;
         bool getDecreasing() const;
 
+        bool matches(const Object& object) const;
+
     private:
         friend struct QueryImpl;
-        friend class BrokerProxyImpl;
-        Query(QueryImpl* impl);
         QueryImpl* impl;
     };
 }

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Schema.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Schema.h?rev=911854&r1=911853&r2=911854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Schema.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Schema.h Fri Feb 19 16:06:22 2010
@@ -20,8 +20,8 @@
  * under the License.
  */
 
-#include <qmf/engine/Typecode.h>
 #include <qpid/sys/IntegerTypes.h>
+#include <qpid/messaging/Variant.h>
 
 namespace qmf {
 namespace engine {
@@ -29,12 +29,10 @@
     enum Access { ACCESS_READ_CREATE = 1, ACCESS_READ_WRITE = 2, ACCESS_READ_ONLY = 3 };
     enum Direction { DIR_IN = 1, DIR_OUT = 2, DIR_IN_OUT = 3 };
     enum ClassKind { CLASS_OBJECT = 1, CLASS_EVENT = 2 };
-    enum Severity { SEV_EMERG = 0, SEV_ALERT = 1, SEV_CRIT = 2, SEV_ERROR = 3, SEV_WARN = 4, SEV_NOTICE = 5, SEV_INFORM = 6, SEV_DEBUG = 7 };
 
     struct SchemaArgumentImpl;
     struct SchemaMethodImpl;
     struct SchemaPropertyImpl;
-    struct SchemaStatisticImpl;
     struct SchemaObjectClassImpl;
     struct SchemaEventClassImpl;
     struct SchemaClassKeyImpl;
@@ -43,14 +41,14 @@
      */
     class SchemaArgument {
     public:
-        SchemaArgument(const char* name, Typecode typecode);
+        SchemaArgument(const char* name, qpid::messaging::VariantType typecode);
         SchemaArgument(const SchemaArgument& from);
         ~SchemaArgument();
         void setDirection(Direction dir);
         void setUnit(const char* val);
         void setDesc(const char* desc);
         const char* getName() const;
-        Typecode getType() const;
+        qpid::messaging::VariantType getType() const;
         Direction getDirection() const;
         const char* getUnit() const;
         const char* getDesc() const;
@@ -89,7 +87,7 @@
      */
     class SchemaProperty {
     public:
-        SchemaProperty(const char* name, Typecode typecode);
+        SchemaProperty(const char* name, qpid::messaging::VariantType typecode);
         SchemaProperty(const SchemaProperty& from);
         ~SchemaProperty();
         void setAccess(Access access);
@@ -98,7 +96,7 @@
         void setUnit(const char* val);
         void setDesc(const char* desc);
         const char* getName() const;
-        Typecode getType() const;
+        qpid::messaging::VariantType getType() const;
         Access getAccess() const;
         bool isIndex() const;
         bool isOptional() const;
@@ -114,27 +112,6 @@
 
     /**
      */
-    class SchemaStatistic {
-    public:
-        SchemaStatistic(const char* name, Typecode typecode);
-        SchemaStatistic(const SchemaStatistic& from);
-        ~SchemaStatistic();
-        void setUnit(const char* val);
-        void setDesc(const char* desc);
-        const char* getName() const;
-        Typecode getType() const;
-        const char* getUnit() const;
-        const char* getDesc() const;
-
-    private:
-        friend struct SchemaStatisticImpl;
-        friend struct SchemaObjectClassImpl;
-        SchemaStatistic(SchemaStatisticImpl* impl);
-        SchemaStatisticImpl* impl;
-    };
-
-    /**
-     */
     class SchemaClassKey {
     public:
         SchemaClassKey(const SchemaClassKey& from);
@@ -164,15 +141,12 @@
         SchemaObjectClass(const SchemaObjectClass& from);
         ~SchemaObjectClass();
         void addProperty(const SchemaProperty* property);
-        void addStatistic(const SchemaStatistic* statistic);
         void addMethod(const SchemaMethod* method);
 
         const SchemaClassKey* getClassKey() const;
         int getPropertyCount() const;
-        int getStatisticCount() const;
         int getMethodCount() const;
         const SchemaProperty* getProperty(int idx) const;
-        const SchemaStatistic* getStatistic(int idx) const;
         const SchemaMethod* getMethod(int idx) const;
 
     private:
@@ -187,14 +161,13 @@
      */
     class SchemaEventClass {
     public:
-        SchemaEventClass(const char* package, const char* name, Severity severity);
+        SchemaEventClass(const char* package, const char* name);
         SchemaEventClass(const SchemaEventClass& from);
         ~SchemaEventClass();
         void addArgument(const SchemaArgument* argument);
         void setDesc(const char* desc);
 
         const SchemaClassKey* getClassKey() const;
-        Severity getSeverity() const;
         int getArgumentCount() const;
         const SchemaArgument* getArgument(int idx) const;
 

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf.mk
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf.mk?rev=911854&r1=911853&r2=911854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf.mk (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf.mk Fri Feb 19 16:06:22 2010
@@ -31,9 +31,7 @@
   ../include/qpid/agent/ManagementAgent.h	\
   ../include/qpid/agent/QmfAgentImportExport.h	\
   ../include/qmf/Agent.h			\
-  ../include/qmf/Connection.h			\
   ../include/qmf/QmfImportExport.h		\
-  ../include/qmf/ConnectionSettings.h		\
   ../include/qmf/AgentObject.h
 
 #
@@ -41,18 +39,14 @@
 #
 QMF_ENGINE_API =				\
   ../include/qmf/engine/Agent.h			\
-  ../include/qmf/engine/ConnectionSettings.h	\
   ../include/qmf/engine/Console.h		\
   ../include/qmf/engine/Event.h			\
-  ../include/qmf/engine/Message.h		\
   ../include/qmf/engine/Object.h		\
-  ../include/qmf/engine/ObjectId.h		\
   ../include/qmf/engine/QmfEngineImportExport.h	\
   ../include/qmf/engine/Query.h			\
-  ../include/qmf/engine/ResilientConnection.h	\
-  ../include/qmf/engine/Schema.h		\
-  ../include/qmf/engine/Typecode.h		\
-  ../include/qmf/engine/Value.h
+  ../include/qmf/engine/Schema.h
+
+# ../include/qmf/engine/ObjectId.h
 
 # Public header files
 nobase_include_HEADERS +=	\
@@ -67,31 +61,23 @@
 libqmfengine_la_SOURCES =			\
   $(QMF_ENGINE_API)				\
   qmf/engine/Agent.cpp				\
-  qmf/engine/BrokerProxyImpl.cpp		\
-  qmf/engine/BrokerProxyImpl.h			\
-  qmf/engine/ConnectionSettingsImpl.cpp		\
-  qmf/engine/ConnectionSettingsImpl.h		\
-  qmf/engine/ConsoleImpl.cpp			\
-  qmf/engine/ConsoleImpl.h			\
-  qmf/engine/EventImpl.cpp			\
-  qmf/engine/EventImpl.h			\
-  qmf/engine/MessageImpl.cpp			\
-  qmf/engine/MessageImpl.h			\
-  qmf/engine/ObjectIdImpl.cpp			\
-  qmf/engine/ObjectIdImpl.h			\
   qmf/engine/ObjectImpl.cpp			\
   qmf/engine/ObjectImpl.h			\
-  qmf/engine/Protocol.cpp			\
-  qmf/engine/Protocol.h				\
+  qmf/Protocol.cpp				\
+  qmf/Protocol.h				\
   qmf/engine/QueryImpl.cpp			\
   qmf/engine/QueryImpl.h			\
-  qmf/engine/ResilientConnection.cpp		\
-  qmf/engine/SequenceManager.cpp		\
-  qmf/engine/SequenceManager.h			\
   qmf/engine/SchemaImpl.cpp			\
-  qmf/engine/SchemaImpl.h			\
-  qmf/engine/ValueImpl.cpp			\
-  qmf/engine/ValueImpl.h
+  qmf/engine/SchemaImpl.h
+
+# qmf/engine/BrokerProxyImpl.cpp
+# qmf/engine/BrokerProxyImpl.h
+# qmf/engine/ConsoleImpl.cpp
+# qmf/engine/ConsoleImpl.h
+# qmf/engine/ObjectIdImpl.cpp
+# qmf/engine/ObjectIdImpl.h
+# qmf/engine/SequenceManager.cpp
+# qmf/engine/SequenceManager.h
 
 libqmf_la_LIBADD = libqmfengine.la
 libqmfengine_la_LIBADD = libqpidclient.la

Added: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp?rev=911854&view=auto
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp (added)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp Fri Feb 19 16:06:22 2010
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/Protocol.h"
+
+using namespace std;
+using namespace qmf;
+
+const string Protocol::SCHEMA_ELT_NAME("name");
+const string Protocol::SCHEMA_ELT_TYPE("type");
+const string Protocol::SCHEMA_ELT_DIR("dir");
+const string Protocol::SCHEMA_ELT_UNIT("unit");
+const string Protocol::SCHEMA_ELT_DESC("desc");
+const string Protocol::SCHEMA_ELT_ACCESS("access");
+const string Protocol::SCHEMA_ELT_OPTIONAL("optional");
+const string Protocol::SCHEMA_ARGS("args");
+const string Protocol::SCHEMA_PACKAGE("_package_name");
+const string Protocol::SCHEMA_CLASS_KIND("_type");
+const string Protocol::SCHEMA_CLASS_KIND_DATA("_data");
+const string Protocol::SCHEMA_CLASS_KIND_EVENT("_event");
+const string Protocol::SCHEMA_CLASS("_class_name");
+const string Protocol::SCHEMA_HASH("_hash_str");
+const string Protocol::AGENT_NAME("_agent_name");
+const string Protocol::OBJECT_NAME("_object_name");
+const string Protocol::SCHEMA_ID("_schema_id");
+
+#if 0
+bool Protocol::checkHeader(const Message& /*msg*/, string& /*opcode*/, uint32_t* /*seq*/)
+{
+    // TODO
+    return true;
+}
+
+void Protocol::encodeHeader(Message& /*msg*/, const string& /*opcode*/, uint32_t /*seq*/)
+{
+    // TODO
+}
+#endif

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp?rev=911854&r1=911853&r2=911854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp Fri Feb 19 16:06:22 2010
@@ -18,23 +18,19 @@
  */
 
 #include "qmf/engine/Agent.h"
-#include "qmf/engine/MessageImpl.h"
 #include "qmf/engine/SchemaImpl.h"
-#include "qmf/engine/Typecode.h"
-#include "qmf/engine/EventImpl.h"
 #include "qmf/engine/ObjectImpl.h"
-#include "qmf/engine/ObjectIdImpl.h"
 #include "qmf/engine/QueryImpl.h"
-#include "qmf/engine/ValueImpl.h"
-#include "qmf/engine/Protocol.h"
-#include <qpid/framing/Buffer.h>
-#include <qpid/framing/Uuid.h>
-#include <qpid/framing/FieldTable.h>
-#include <qpid/framing/FieldValue.h>
+#include "qmf/Protocol.h"
 #include <qpid/sys/Mutex.h>
 #include <qpid/log/Statement.h>
 #include <qpid/sys/Time.h>
-#include <string.h>
+#include <qpid/sys/Thread.h>
+#include <qpid/sys/Runnable.h>
+#include <qpid/messaging/Session.h>
+#include <qpid/messaging/Receiver.h>
+#include <qpid/messaging/Sender.h>
+#include <qpid/messaging/Message.h>
 #include <string>
 #include <deque>
 #include <map>
@@ -45,8 +41,8 @@
 
 using namespace std;
 using namespace qmf::engine;
-using namespace qpid::framing;
 using namespace qpid::sys;
+using namespace qpid::messaging;
 
 namespace qmf {
 namespace engine {
@@ -59,11 +55,9 @@
         string      authToken;
         string      name;
         Object*     object;
-        boost::shared_ptr<ObjectId> objectId;
+        string      objectKey;
         boost::shared_ptr<Query> query;
-        boost::shared_ptr<Value> arguments;
-        string      exchange;
-        string      bindingKey;
+        boost::shared_ptr<Variant::Map> arguments;
         const SchemaObjectClass* objectClass;
 
         AgentEventImpl(AgentEvent::EventKind k) :
@@ -72,71 +66,64 @@
         AgentEvent copy();
     };
 
+    /**
+     * AgentQueryContext is used to track asynchronous requests (Query, Sync, or Method)
+     * sent up to the application.
+     */
     struct AgentQueryContext {
         typedef boost::shared_ptr<AgentQueryContext> Ptr;
         uint32_t sequence;
-        string   exchange;
-        string   key;
+        string   consoleAddr;
         const SchemaMethod* schemaMethod;
         AgentQueryContext() : schemaMethod(0) {}
     };
 
-    class AgentImpl : public boost::noncopyable {
+    class AgentImpl : public boost::noncopyable, public qpid::sys::Runnable {
     public:
-        AgentImpl(char* label, bool internalStore);
+        AgentImpl(const char* vendor, const char* product, const char* name, const char* domain, bool internalStore);
         ~AgentImpl();
 
+        void setAttr(const char* key, const Variant& value);
         void setStoreDir(const char* path);
         void setTransferDir(const char* path);
-        void handleRcvMessage(Message& message);
-        bool getXmtMessage(Message& item) const;
-        void popXmt();
         bool getEvent(AgentEvent& event) const;
         void popEvent();
-        void newSession();
-        void startProtocol();
-        void heartbeat();
-        void methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments);
-        void queryResponse(uint32_t sequence, Object& object, bool prop, bool stat);
+        void setConnection(Connection& conn);
+        void methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& arguments);
+        void queryResponse(uint32_t sequence, Object& object);
         void queryComplete(uint32_t sequence);
         void registerClass(SchemaObjectClass* cls);
         void registerClass(SchemaEventClass* cls);
-        const ObjectId* addObject(Object& obj, uint64_t persistId);
-        const ObjectId* allocObjectId(uint64_t persistId);
-        const ObjectId* allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi);
+        const char* addObject(Object& obj, const char* key);
         void raiseEvent(Event& event);
 
+        void run();
+        void stop();
+
     private:
         mutable Mutex lock;
         Mutex     addLock;
-        string    label;
-        string    queueName;
+        const string    vendor;
+        const string    product;
+        const string    name;
+        const string    domain;
+        string directAddr;
+        map<string, Variant> attrMap;
         string    storeDir;
         string    transferDir;
         bool      internalStore;
-        uint64_t  nextTransientId;
         Uuid      systemId;
-        uint32_t  requestedBrokerBank;
-        uint32_t  requestedAgentBank;
-        uint32_t  assignedBrokerBank;
-        uint32_t  assignedAgentBank;
-        AgentAttachment attachment;
         uint16_t  bootSequence;
-        uint64_t  nextObjectId;
         uint32_t  nextContextNum;
+        bool      running;
         deque<AgentEventImpl::Ptr> eventQueue;
-        deque<MessageImpl::Ptr> xmtQueue;
         map<uint32_t, AgentQueryContext::Ptr> contextMap;
-
-        static const char* QMF_EXCHANGE;
-        static const char* DIR_EXCHANGE;
-        static const char* BROKER_KEY;
-        static const uint32_t MERR_UNKNOWN_METHOD = 2;
-        static const uint32_t MERR_UNKNOWN_PACKAGE = 8;
-        static const uint32_t MERR_UNKNOWN_CLASS = 9;
-        static const uint32_t MERR_INTERNAL_ERROR = 10;
-#       define MA_BUFFER_SIZE 65536
-        char outputBuffer[MA_BUFFER_SIZE];
+        Connection connection;
+        Session session;
+        Receiver directReceiver;
+        Receiver topicReceiver;
+        Sender sender;
+        qpid::sys::Thread* thread;
 
         struct AgentClassKey {
             string name;
@@ -144,10 +131,6 @@
             AgentClassKey(const string& n, const uint8_t* h) : name(n) {
                 memcpy(hash, h, 16);
             }
-            AgentClassKey(Buffer& buffer) {
-                buffer.getShortString(name);
-                buffer.getBin128(hash);
-            }
             string repr() {
                 return name;
             }
@@ -176,37 +159,29 @@
 
         map<string, ClassMaps> packages;
 
-        AgentEventImpl::Ptr eventDeclareQueue(const string& queueName);
-        AgentEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key);
-        AgentEventImpl::Ptr eventSetupComplete();
         AgentEventImpl::Ptr eventQuery(uint32_t num, const string& userId, const string& package, const string& cls,
-                                       boost::shared_ptr<ObjectId> oid);
+                                       const string& key);
         AgentEventImpl::Ptr eventMethod(uint32_t num, const string& userId, const string& method,
-                                        boost::shared_ptr<ObjectId> oid, boost::shared_ptr<Value> argMap,
+                                        const string& key, boost::shared_ptr<Variant::Map> argMap,
                                         const SchemaObjectClass* objectClass);
-        void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey);
+        void handleRcvMessageLH(qpid::messaging::Message& message);
 
         void sendPackageIndicationLH(const string& packageName);
         void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key);
         void sendCommandCompleteLH(const string& exchange, const string& key, uint32_t seq,
                                    uint32_t code = 0, const string& text = "OK");
         void sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text="");
-        void handleAttachResponse(Buffer& inBuffer);
-        void handlePackageRequest(Buffer& inBuffer);
-        void handleClassQuery(Buffer& inBuffer);
-        void handleSchemaRequest(Buffer& inBuffer, uint32_t sequence,
+        void handleAttachResponse(Message& msg);
+        void handlePackageRequest(Message& msg);
+        void handleClassQuery(Message& msg);
+        void handleSchemaRequest(Message& msg, uint32_t sequence,
                                  const string& replyToExchange, const string& replyToKey);
-        void handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId);
-        void handleMethodRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId);
-        void handleConsoleAddedIndication();
+        void handleGetQuery(Message& msg, uint32_t sequence, const string& replyTo, const string& userId);
+        void handleMethodRequest(Message& msg, uint32_t sequence, const string& replyTo, const string& userId);
     };
 }
 }
 
-const char* AgentImpl::QMF_EXCHANGE = "qpid.management";
-const char* AgentImpl::DIR_EXCHANGE = "amq.direct";
-const char* AgentImpl::BROKER_KEY   = "broker";
-
 #define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
 
 AgentEvent AgentEventImpl::copy()
@@ -217,33 +192,38 @@
     item.kind      = kind;
     item.sequence  = sequence;
     item.object    = object;
-    item.objectId  = objectId.get();
     item.query     = query.get();
     item.arguments = arguments.get();
     item.objectClass = objectClass;
 
+    STRING_REF(objectKey);
     STRING_REF(authUserId);
     STRING_REF(authToken);
     STRING_REF(name);
-    STRING_REF(exchange);
-    STRING_REF(bindingKey);
 
     return item;
 }
 
-AgentImpl::AgentImpl(char* _label, bool i) :
-    label(_label), queueName("qmfa-"), internalStore(i), nextTransientId(1),
-    requestedBrokerBank(0), requestedAgentBank(0),
-    assignedBrokerBank(0), assignedAgentBank(0),
-    bootSequence(1), nextObjectId(1), nextContextNum(1)
-{
-    queueName += label;
+AgentImpl::AgentImpl(const char* _v, const char* _p, const char* _n, const char* _d, bool _i) :
+    vendor(_v), product(_p), name(_n), domain(_d ? _d : "default"), internalStore(_i),
+    bootSequence(1), nextContextNum(1), running(true), thread(0)
+{
+    directAddr = "qmf." + domain + ".direct/" + vendor + ":" + product + ":" + name;
+    if (_d == 0) {
+        directAddr += " { create:always }";
+    }
 }
 
+
 AgentImpl::~AgentImpl()
 {
 }
 
+void AgentImpl::setAttr(const char* key, const Variant& value)
+{
+    attrMap.insert(pair<string, Variant>(key, value));
+}
+
 void AgentImpl::setStoreDir(const char* path)
 {
     Mutex::ScopedLock _lock(lock);
@@ -262,6 +242,7 @@
         transferDir.clear();
 }
 
+/*
 void AgentImpl::handleRcvMessage(Message& message)
 {
     Buffer   inBuffer(message.body, message.length);
@@ -283,22 +264,7 @@
         }
     }
 }
-
-bool AgentImpl::getXmtMessage(Message& item) const
-{
-    Mutex::ScopedLock _lock(lock);
-    if (xmtQueue.empty())
-        return false;
-    item =  xmtQueue.front()->copy();
-    return true;
-}
-
-void AgentImpl::popXmt()
-{
-    Mutex::ScopedLock _lock(lock);
-    if (!xmtQueue.empty())
-        xmtQueue.pop_front();
-}
+*/
 
 bool AgentImpl::getEvent(AgentEvent& event) const
 {
@@ -316,47 +282,16 @@
         eventQueue.pop_front();
 }
 
-void AgentImpl::newSession()
+void AgentImpl::setConnection(Connection& conn)
 {
     Mutex::ScopedLock _lock(lock);
-    eventQueue.clear();
-    xmtQueue.clear();
-    eventQueue.push_back(eventDeclareQueue(queueName));
-    eventQueue.push_back(eventBind("amq.direct", queueName, queueName));
-    eventQueue.push_back(eventSetupComplete());
-}
-
-void AgentImpl::startProtocol()
-{
-    Mutex::ScopedLock _lock(lock);
-    char    rawbuffer[512];
-    Buffer  buffer(rawbuffer, 512);
-
-    Protocol::encodeHeader(buffer, Protocol::OP_ATTACH_REQUEST);
-    buffer.putShortString("qmfa");
-    systemId.encode(buffer);
-    buffer.putLong(requestedBrokerBank);
-    buffer.putLong(requestedAgentBank);
-    sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
-    QPID_LOG(trace, "SENT AttachRequest: reqBroker=" << requestedBrokerBank <<
-             " reqAgent=" << requestedAgentBank);
-}
-
-void AgentImpl::heartbeat()
-{
-    Mutex::ScopedLock _lock(lock);
-    Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
-
-    Protocol::encodeHeader(buffer, Protocol::OP_HEARTBEAT_INDICATION);
-    buffer.putLongLong(uint64_t(Duration(now())));
-    stringstream key;
-    key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank;
-    sendBufferLH(buffer, QMF_EXCHANGE, key.str());
-    QPID_LOG(trace, "SENT HeartbeatIndication");
+    if (connection == 0)
+        return;
+    connection = conn;
+    thread = new qpid::sys::Thread(*this);
 }
 
-void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text,
-                                     const Value& argMap)
+void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& /*argMap*/)
 {
     Mutex::ScopedLock _lock(lock);
     map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
@@ -365,30 +300,11 @@
     AgentQueryContext::Ptr context = iter->second;
     contextMap.erase(iter);
 
-    Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
-    Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, context->sequence);
-    buffer.putLong(status);
-    buffer.putMediumString(text);
-    if (status == 0) {
-        for (vector<const SchemaArgument*>::const_iterator aIter = context->schemaMethod->impl->arguments.begin();
-             aIter != context->schemaMethod->impl->arguments.end(); aIter++) {
-            const SchemaArgument* schemaArg = *aIter;
-            if (schemaArg->getDirection() == DIR_OUT || schemaArg->getDirection() == DIR_IN_OUT) {
-                if (argMap.keyInMap(schemaArg->getName())) {
-                    const Value* val = argMap.byKey(schemaArg->getName());
-                    val->impl->encode(buffer);
-                } else {
-                    Value val(schemaArg->getType());
-                    val.impl->encode(buffer);
-                }
-            }
-        }
-    }
-    sendBufferLH(buffer, context->exchange, context->key);
+    // TODO: Encode method response
     QPID_LOG(trace, "SENT MethodResponse seq=" << context->sequence << " status=" << status << " text=" << text);
 }
 
-void AgentImpl::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat)
+void AgentImpl::queryResponse(uint32_t sequence, Object&)
 {
     Mutex::ScopedLock _lock(lock);
     map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
@@ -396,17 +312,8 @@
         return;
     AgentQueryContext::Ptr context = iter->second;
 
-    Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
-    Protocol::encodeHeader(buffer, Protocol::OP_OBJECT_INDICATION, context->sequence);
+    // TODO: accumulate data records and send response messages when we have "enough"
 
-    object.impl->encodeSchemaKey(buffer);
-    object.impl->encodeManagedObjectData(buffer);
-    if (prop)
-        object.impl->encodeProperties(buffer);
-    if (stat)
-        object.impl->encodeStatistics(buffer);
-    
-    sendBufferLH(buffer, context->exchange, context->key);
     QPID_LOG(trace, "SENT ContentIndication seq=" << context->sequence);
 }
 
@@ -417,9 +324,11 @@
     if (iter == contextMap.end())
         return;
 
+    // TODO: send a response message if there are any unsent data records
+
     AgentQueryContext::Ptr context = iter->second;
     contextMap.erase(iter);
-    sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK");
+    //sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK");
 }
 
 void AgentImpl::registerClass(SchemaObjectClass* cls)
@@ -456,413 +365,148 @@
     // TODO: Indicate this schema if connected.
 }
 
-const ObjectId* AgentImpl::addObject(Object&, uint64_t)
+const char* AgentImpl::addObject(Object&, const char*)
 {
     Mutex::ScopedLock _lock(lock);
     return 0;
 }
 
-const ObjectId* AgentImpl::allocObjectId(uint64_t persistId)
+void AgentImpl::raiseEvent(Event&)
 {
     Mutex::ScopedLock _lock(lock);
-    uint16_t sequence  = persistId ? 0 : bootSequence;
-    uint64_t objectNum = persistId ? persistId : nextObjectId++;
-
-    ObjectId* oid = ObjectIdImpl::factory(&attachment, 0, sequence, objectNum);
-    return oid;
 }
 
-const ObjectId* AgentImpl::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi)
+void AgentImpl::run()
 {
-    return allocObjectId(((uint64_t) persistIdHi) << 32 | (uint64_t) persistIdLo);
-}
+    qpid::sys::Duration duration = qpid::sys::TIME_MSEC * 500;
 
-void AgentImpl::raiseEvent(Event& event)
-{
-    Mutex::ScopedLock _lock(lock);
-    Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
-    Protocol::encodeHeader(buffer, Protocol::OP_EVENT_INDICATION);
-
-    event.impl->encodeSchemaKey(buffer);
-    buffer.putLongLong(uint64_t(Duration(now())));
-    event.impl->encode(buffer);
-    string key(event.impl->getRoutingKey(assignedBrokerBank, assignedAgentBank));
-
-    sendBufferLH(buffer, QMF_EXCHANGE, key);
-    QPID_LOG(trace, "SENT EventIndication");
-}
+    session = connection.newSession();
+    directReceiver = session.createReceiver(directAddr);
+    directReceiver.setCapacity(10);
 
-AgentEventImpl::Ptr AgentImpl::eventDeclareQueue(const string& name)
-{
-    AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::DECLARE_QUEUE));
-    event->name = name;
-
-    return event;
-}
+    Mutex::ScopedLock _lock(lock);
+    while (running) {
+        Receiver rcvr;
+        bool available;
+        {
+            Mutex::ScopedUnlock _unlock(lock);
+            available = session.nextReceiver(rcvr, duration);
+        }
 
-AgentEventImpl::Ptr AgentImpl::eventBind(const string& exchange, const string& queue,
-                                               const string& key)
-{
-    AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::BIND));
-    event->name       = queue;
-    event->exchange   = exchange;
-    event->bindingKey = key;
+        if (available) {
+            Message msg(rcvr.get());
+            handleRcvMessageLH(msg);
+        }
+    }
 
-    return event;
+    directReceiver.close();
+    session.close();
 }
 
-AgentEventImpl::Ptr AgentImpl::eventSetupComplete()
+void AgentImpl::stop()
 {
-    AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::SETUP_COMPLETE));
-    return event;
+    Mutex::ScopedLock _lock(lock);
+    running = false;
 }
 
-AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string& package,
-                                                const string& cls, boost::shared_ptr<ObjectId> oid)
+AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string&, const string&, const string& key)
 {
     AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY));
     event->sequence = num;
     event->authUserId = userId;
-    if (oid.get())
-        event->query.reset(new Query(oid.get()));
-    else
-        event->query.reset(new Query(cls.c_str(), package.c_str()));
+    event->objectKey = key;
     return event;
 }
 
 AgentEventImpl::Ptr AgentImpl::eventMethod(uint32_t num, const string& userId, const string& method,
-                                                 boost::shared_ptr<ObjectId> oid, boost::shared_ptr<Value> argMap,
-                                                 const SchemaObjectClass* objectClass)
+                                           const string& key, boost::shared_ptr<Variant::Map> argMap,
+                                           const SchemaObjectClass* objectClass)
 {
     AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::METHOD_CALL));
     event->sequence = num;
     event->authUserId = userId;
     event->name = method;
-    event->objectId = oid;
+    event->objectKey = key;
     event->arguments = argMap;
     event->objectClass = objectClass;
     return event;
 }
 
-void AgentImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
+void AgentImpl::handleRcvMessageLH(qpid::messaging::Message& /*msg*/)
 {
-    uint32_t length = buf.getPosition();
-    MessageImpl::Ptr message(new MessageImpl);
-
-    buf.reset();
-    buf.getRawData(message->body, length);
-    message->destination   = destination;
-    message->routingKey    = routingKey;
-    message->replyExchange = "amq.direct";
-    message->replyKey      = queueName;
-
-    xmtQueue.push_back(message);
 }
 
 void AgentImpl::sendPackageIndicationLH(const string& packageName)
 {
-    Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
-    Protocol::encodeHeader(buffer, Protocol::OP_PACKAGE_INDICATION);
-    buffer.putShortString(packageName);
-    sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
+    // TODO
     QPID_LOG(trace, "SENT PackageIndication:  package_name=" << packageName);
 }
 
-void AgentImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key)
+void AgentImpl::sendClassIndicationLH(ClassKind /*kind*/, const string& packageName, const AgentClassKey& key)
 {
-    Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
-    Protocol::encodeHeader(buffer, Protocol::OP_CLASS_INDICATION);
-    buffer.putOctet((int) kind);
-    buffer.putShortString(packageName);
-    buffer.putShortString(key.name);
-    buffer.putBin128(const_cast<uint8_t*>(key.hash)); // const_cast needed for older Qpid libraries
-    sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
+    // TODO
     QPID_LOG(trace, "SENT ClassIndication:  package_name=" << packageName << " class_name=" << key.name);
 }
 
-void AgentImpl::sendCommandCompleteLH(const string& exchange, const string& replyToKey,
-                                            uint32_t sequence, uint32_t code, const string& text)
+void AgentImpl::sendCommandCompleteLH(const string&, const string&, uint32_t sequence, uint32_t code, const string& text)
 {
-    Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
-    Protocol::encodeHeader(buffer, Protocol::OP_COMMAND_COMPLETE, sequence);
-    buffer.putLong(code);
-    buffer.putShortString(text);
-    sendBufferLH(buffer, exchange, replyToKey);
+    // TODO
     QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text);
 }
 
-void AgentImpl::sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text)
+void AgentImpl::sendMethodErrorLH(uint32_t /*sequence*/, const string& /*key*/, uint32_t code, const string& text)
 {
-    Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
-    Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, sequence);
-    buffer.putLong(code);
-
-    string fulltext;
-    switch (code) {
-    case MERR_UNKNOWN_PACKAGE: fulltext = "Unknown Package";  break;
-    case MERR_UNKNOWN_CLASS:   fulltext = "Unknown Class"; break;
-    case MERR_UNKNOWN_METHOD:  fulltext = "Unknown Method"; break;
-    case MERR_INTERNAL_ERROR:  fulltext = "Internal Error"; break;
-    default:                   fulltext = "Unspecified Error"; break;
-    }
-
-    if (!text.empty()) {
-        fulltext += " (";
-        fulltext += text;
-        fulltext += ")";
-    }
-
-    buffer.putMediumString(fulltext);
-    sendBufferLH(buffer, DIR_EXCHANGE, key);
-    QPID_LOG(trace, "SENT MethodResponse: errorCode=" << code << " text=" << fulltext);
+    // TODO
+    QPID_LOG(trace, "SENT MethodResponse: errorCode=" << code << " text=" << text);
 }
 
-void AgentImpl::handleAttachResponse(Buffer& inBuffer)
+void AgentImpl::handlePackageRequest(Message&)
 {
     Mutex::ScopedLock _lock(lock);
-
-    assignedBrokerBank = inBuffer.getLong();
-    assignedAgentBank  = inBuffer.getLong();
-
-    QPID_LOG(trace, "RCVD AttachResponse: broker=" << assignedBrokerBank << " agent=" << assignedAgentBank);
-
-    if ((assignedBrokerBank != requestedBrokerBank) ||
-        (assignedAgentBank  != requestedAgentBank)) {
-        if (requestedAgentBank == 0) {
-            QPID_LOG(notice, "Initial object-id bank assigned: " << assignedBrokerBank << "." <<
-                     assignedAgentBank);
-        } else {
-            QPID_LOG(warning, "Collision in object-id! New bank assigned: " << assignedBrokerBank <<
-                     "." << assignedAgentBank);
-        }
-        //storeData(); // TODO
-        requestedBrokerBank = assignedBrokerBank;
-        requestedAgentBank = assignedAgentBank;
-    }
-
-    attachment.setBanks(assignedBrokerBank, assignedAgentBank);
-
-    // Bind to qpid.management to receive commands
-    stringstream key;
-    key << "agent." << assignedBrokerBank << "." << assignedAgentBank;
-    eventQueue.push_back(eventBind(QMF_EXCHANGE, queueName, key.str()));
-
-    // Send package indications for all local packages
-    for (map<string, ClassMaps>::iterator pIter = packages.begin();
-         pIter != packages.end();
-         pIter++) {
-        sendPackageIndicationLH(pIter->first);
-
-        // Send class indications for all local classes
-        ClassMaps cMap = pIter->second;
-        for (ObjectClassMap::iterator cIter = cMap.objectClasses.begin();
-             cIter != cMap.objectClasses.end(); cIter++)
-            sendClassIndicationLH(CLASS_OBJECT, pIter->first, cIter->first);
-        for (EventClassMap::iterator cIter = cMap.eventClasses.begin();
-             cIter != cMap.eventClasses.end(); cIter++)
-            sendClassIndicationLH(CLASS_EVENT, pIter->first, cIter->first);
-    }
 }
 
-void AgentImpl::handlePackageRequest(Buffer&)
+void AgentImpl::handleClassQuery(Message&)
 {
     Mutex::ScopedLock _lock(lock);
 }
 
-void AgentImpl::handleClassQuery(Buffer&)
+void AgentImpl::handleSchemaRequest(Message&, uint32_t, const string&, const string&)
 {
     Mutex::ScopedLock _lock(lock);
 }
 
-void AgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence,
-                                          const string& replyExchange, const string& replyKey)
+void AgentImpl::handleGetQuery(Message&, uint32_t, const string&, const string&)
 {
     Mutex::ScopedLock _lock(lock);
-    string rExchange(replyExchange);
-    string rKey(replyKey);
-    string packageName;
-    inBuffer.getShortString(packageName);
-    AgentClassKey key(inBuffer);
-
-    if (rExchange.empty())
-        rExchange = QMF_EXCHANGE;
-    if (rKey.empty())
-        rKey = BROKER_KEY;
-
-    QPID_LOG(trace, "RCVD SchemaRequest: package=" << packageName << " class=" << key.name);
-
-    map<string, ClassMaps>::iterator pIter = packages.find(packageName);
-    if (pIter == packages.end()) {
-        sendCommandCompleteLH(rExchange, rKey, sequence, 1, "package not found");
-        return;
-    }
-
-    ClassMaps cMap = pIter->second;
-    ObjectClassMap::iterator ocIter = cMap.objectClasses.find(key);
-    if (ocIter != cMap.objectClasses.end()) {
-        SchemaObjectClass* oImpl = ocIter->second;
-        Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
-        Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence);
-        oImpl->impl->encode(buffer);
-        sendBufferLH(buffer, rExchange, rKey);
-        QPID_LOG(trace, "SENT SchemaResponse: (object) package=" << packageName << " class=" << key.name);
-        return;
-    }
-
-    EventClassMap::iterator ecIter = cMap.eventClasses.find(key);
-    if (ecIter != cMap.eventClasses.end()) {
-        SchemaEventClass* eImpl = ecIter->second;
-        Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
-        Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence);
-        eImpl->impl->encode(buffer);
-        sendBufferLH(buffer, rExchange, rKey);
-        QPID_LOG(trace, "SENT SchemaResponse: (event) package=" << packageName << " class=" << key.name);
-        return;
-    }
-
-    sendCommandCompleteLH(rExchange, rKey, sequence, 1, "class not found");
-}
-
-void AgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId)
-{
-    Mutex::ScopedLock _lock(lock);
-    FieldTable ft;
-    FieldTable::ValuePtr value;
-    map<string, ClassMaps>::const_iterator pIter = packages.end();
-    string pname;
-    string cname;
-    string oidRepr;
-    boost::shared_ptr<ObjectId> oid;
-
-    ft.decode(inBuffer);
-    
-    QPID_LOG(trace, "RCVD GetQuery: seq=" << sequence << " map=" << ft);
-
-    value = ft.get("_package");
-    if (value.get() && value->convertsTo<string>()) {
-        pname = value->get<string>();
-        pIter = packages.find(pname);
-        if (pIter == packages.end()) {
-            sendCommandCompleteLH(DIR_EXCHANGE, replyTo, sequence);
-            return;
-        }
-    }
-
-    value = ft.get("_class");
-    if (value.get() && value->convertsTo<string>()) {
-        cname = value->get<string>();
-        // TODO - check for validity of class (in package or any package)
-        if (pIter == packages.end()) {
-        } else {
-            
-        }
-    }
-
-    value = ft.get("_objectid");
-    if (value.get() && value->convertsTo<string>()) {
-        oidRepr = value->get<string>();
-        oid.reset(new ObjectId());
-        oid->impl->fromString(oidRepr);
-    }
-
-    AgentQueryContext::Ptr context(new AgentQueryContext);
-    uint32_t contextNum = nextContextNum++;
-    context->sequence = sequence;
-    context->exchange = DIR_EXCHANGE;
-    context->key = replyTo;
-    contextMap[contextNum] = context;
-
-    eventQueue.push_back(eventQuery(contextNum, userId, pname, cname, oid));
 }
 
-void AgentImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, const string& replyTo, const string& userId)
+void AgentImpl::handleMethodRequest(Message& /*msg*/, uint32_t sequence, const string& /*replyTo*/, const string& /*userId*/)
 {
     Mutex::ScopedLock _lock(lock);
-    string pname;
-    string method;
-    boost::shared_ptr<ObjectId> oid(ObjectIdImpl::factory(buffer));
-    buffer.getShortString(pname);
-    AgentClassKey classKey(buffer);
-    buffer.getShortString(method);
-
-    QPID_LOG(trace, "RCVD MethodRequest seq=" << sequence << " method=" << method);
-
-    map<string, ClassMaps>::const_iterator pIter = packages.find(pname);
-    if (pIter == packages.end()) {
-        sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_PACKAGE, pname);
-        return;
-    }
-
-    ObjectClassMap::const_iterator cIter = pIter->second.objectClasses.find(classKey);
-    if (cIter == pIter->second.objectClasses.end()) {
-        sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_CLASS, classKey.repr());
-        return;
-    }
-
-    const SchemaObjectClass* schema = cIter->second;
-    vector<const SchemaMethod*>::const_iterator mIter = schema->impl->methods.begin();
-    for (; mIter != schema->impl->methods.end(); mIter++) {
-        if ((*mIter)->getName() == method)
-            break;
-    }
-
-    if (mIter == schema->impl->methods.end()) {
-        sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_METHOD, method);
-        return;
-    }
-
-    const SchemaMethod* schemaMethod = *mIter;
-    boost::shared_ptr<Value> argMap(new Value(TYPE_MAP));
-    Value* value;
-    for (vector<const SchemaArgument*>::const_iterator aIter = schemaMethod->impl->arguments.begin();
-         aIter != schemaMethod->impl->arguments.end(); aIter++) {
-        const SchemaArgument* schemaArg = *aIter;
-        if (schemaArg->getDirection() == DIR_IN || schemaArg->getDirection() == DIR_IN_OUT)
-            value = ValueImpl::factory(schemaArg->getType(), buffer);
-        else
-            value = ValueImpl::factory(schemaArg->getType());
-        argMap->insert(schemaArg->getName(), value);
-    }
+    QPID_LOG(trace, "RCVD MethodRequest seq=" << sequence << " method=");
 
     AgentQueryContext::Ptr context(new AgentQueryContext);
     uint32_t contextNum = nextContextNum++;
-    context->sequence = sequence;
-    context->exchange = DIR_EXCHANGE;
-    context->key = replyTo;
-    context->schemaMethod = schemaMethod;
     contextMap[contextNum] = context;
-
-    eventQueue.push_back(eventMethod(contextNum, userId, method, oid, argMap, schema));
-}
-
-void AgentImpl::handleConsoleAddedIndication()
-{
-    Mutex::ScopedLock _lock(lock);
 }
 
 //==================================================================
 // Wrappers
 //==================================================================
 
-Agent::Agent(char* label, bool internalStore) { impl = new AgentImpl(label, internalStore); }
+Agent::Agent(const char* v, const char* p, const char* n, const char* d, bool i) { impl = new AgentImpl(v, p, n, d, i); }
 Agent::~Agent() { delete impl; }
+void Agent::setAttr(const char* key, const Variant& value) { impl->setAttr(key, value); }
 void Agent::setStoreDir(const char* path) { impl->setStoreDir(path); }
 void Agent::setTransferDir(const char* path) { impl->setTransferDir(path); }
-void Agent::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); }
-bool Agent::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); }
-void Agent::popXmt() { impl->popXmt(); }
 bool Agent::getEvent(AgentEvent& event) const { return impl->getEvent(event); }
 void Agent::popEvent() { impl->popEvent(); }
-void Agent::newSession() { impl->newSession(); }
-void Agent::startProtocol() { impl->startProtocol(); }
-void Agent::heartbeat() { impl->heartbeat(); }
-void Agent::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments) { impl->methodResponse(sequence, status, text, arguments); }
-void Agent::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) { impl->queryResponse(sequence, object, prop, stat); }
+void Agent::setConnection(Connection& conn) { impl->setConnection(conn); }
+void Agent::methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& arguments) { impl->methodResponse(sequence, status, text, arguments); }
+void Agent::queryResponse(uint32_t sequence, Object& object) { impl->queryResponse(sequence, object); }
 void Agent::queryComplete(uint32_t sequence) { impl->queryComplete(sequence); }
 void Agent::registerClass(SchemaObjectClass* cls) { impl->registerClass(cls); }
 void Agent::registerClass(SchemaEventClass* cls) { impl->registerClass(cls); }
-const ObjectId* Agent::addObject(Object& obj, uint64_t persistId) { return impl->addObject(obj, persistId); }
-const ObjectId* Agent::allocObjectId(uint64_t persistId) { return impl->allocObjectId(persistId); }
-const ObjectId* Agent::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) { return impl->allocObjectId(persistIdLo, persistIdHi); }
+const char* Agent::addObject(Object& obj, const char* key) { return impl->addObject(obj, key); }
 void Agent::raiseEvent(Event& event) { impl->raiseEvent(event); }
 

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h?rev=911854&r1=911853&r2=911854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h Fri Feb 19 16:06:22 2010
@@ -23,12 +23,11 @@
 #include "qmf/engine/Console.h"
 #include "qmf/engine/ObjectImpl.h"
 #include "qmf/engine/SchemaImpl.h"
-#include "qmf/engine/ValueImpl.h"
 #include "qmf/engine/QueryImpl.h"
 #include "qmf/engine/SequenceManager.h"
 #include "qmf/engine/MessageImpl.h"
-#include "qpid/framing/Buffer.h"
 #include "qpid/framing/Uuid.h"
+#include "qpid/messaging/Variant.h"
 #include "qpid/sys/Mutex.h"
 #include "boost/shared_ptr.hpp"
 #include "boost/noncopyable.hpp"
@@ -46,8 +45,8 @@
     struct MethodResponseImpl {
         uint32_t status;
         const SchemaMethod* schema;
-        std::auto_ptr<Value> exception;
-        std::auto_ptr<Value> arguments;
+        std::auto_ptr<qpid::messaging::Variant> exception;
+        std::auto_ptr<qpid::messaging::Variant::Map> arguments;
 
         MethodResponseImpl(const MethodResponseImpl& from);
         MethodResponseImpl(qpid::framing::Buffer& buf, const SchemaMethod* schema);
@@ -56,14 +55,14 @@
         static MethodResponse* factory(uint32_t status, const std::string& text);
         ~MethodResponseImpl() {}
         uint32_t getStatus() const { return status; }
-        const Value* getException() const { return exception.get(); }
-        const Value* getArgs() const { return arguments.get(); }
+        const qpid::messaging::Variant* getException() const { return exception.get(); }
+        const qpid::messaging::Variant::Map* getArgs() const { return arguments.get(); }
     };
 
     typedef boost::shared_ptr<QueryResponse> QueryResponsePtr;
     struct QueryResponseImpl {
         uint32_t status;
-        std::auto_ptr<Value> exception;
+        std::auto_ptr<qpid::messaging::Variant> exception;
         std::vector<ObjectPtr> results;
 
         QueryResponseImpl() : status(0) {}
@@ -73,7 +72,7 @@
         }
         ~QueryResponseImpl() {}
         uint32_t getStatus() const { return status; }
-        const Value* getException() const { return exception.get(); }
+        const qpid::messaging::Variant* getException() const { return exception.get(); }
         uint32_t getObjectCount() const { return results.size(); }
         const Object* getObject(uint32_t idx) const;
     };
@@ -140,8 +139,8 @@
         const AgentProxy* getAgent(uint32_t idx) const;
         void sendQuery(const Query& query, void* context, const AgentProxy* agent);
         bool sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent);
-        std::string encodeMethodArguments(const SchemaMethod* schema, const Value* args, qpid::framing::Buffer& buffer);
-        void sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls, const std::string& method, const Value* args, void* context);
+        std::string encodeMethodArguments(const SchemaMethod* schema, const qpid::messaging::Variant::Map* args, qpid::framing::Buffer& buffer);
+        void sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls, const std::string& method, const qpid::messaging::Variant::Map* args, void* context);
 
         void addBinding(const std::string& exchange, const std::string& key);
         void staticRelease() { decOutstanding(); }
@@ -219,6 +218,9 @@
         QueryResponsePtr queryResponse;
     };
 
+    //
+    // MethodContext is used to track and handle the response associated with a single Method Request
+    //
     struct MethodContext : public SequenceContext {
         MethodContext(BrokerProxyImpl& b, void* u, const SchemaMethod* s) : broker(b), userContext(u), schema(s) {}
         virtual ~MethodContext() {}

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ConsoleImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ConsoleImpl.h?rev=911854&r1=911853&r2=911854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ConsoleImpl.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ConsoleImpl.h Fri Feb 19 16:06:22 2010
@@ -23,18 +23,13 @@
 #include "qmf/engine/Console.h"
 #include "qmf/engine/MessageImpl.h"
 #include "qmf/engine/SchemaImpl.h"
-#include "qmf/engine/Typecode.h"
 #include "qmf/engine/ObjectImpl.h"
 #include "qmf/engine/ObjectIdImpl.h"
 #include "qmf/engine/QueryImpl.h"
-#include "qmf/engine/ValueImpl.h"
 #include "qmf/engine/Protocol.h"
 #include "qmf/engine/SequenceManager.h"
 #include "qmf/engine/BrokerProxyImpl.h"
-#include <qpid/framing/Buffer.h>
 #include <qpid/framing/Uuid.h>
-#include <qpid/framing/FieldTable.h>
-#include <qpid/framing/FieldValue.h>
 #include <qpid/sys/Mutex.h>
 #include <qpid/sys/Time.h>
 #include <qpid/sys/SystemInfo.h>

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.cpp?rev=911854&r1=911853&r2=911854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.cpp Fri Feb 19 16:06:22 2010
@@ -18,215 +18,50 @@
  */
 
 #include "qmf/engine/ObjectImpl.h"
-#include "qmf/engine/ValueImpl.h"
-#include "qmf/engine/BrokerProxyImpl.h"
 #include <qpid/sys/Time.h>
 
 using namespace std;
 using namespace qmf::engine;
 using namespace qpid::sys;
-using qpid::framing::Buffer;
+using namespace qpid::messaging;
 
-ObjectImpl::ObjectImpl(const SchemaObjectClass* type) : objectClass(type), broker(0), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime)
+ObjectImpl::ObjectImpl() :
+    objectClass(0), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime)
 {
-    int propCount = objectClass->getPropertyCount();
-    int statCount = objectClass->getStatisticCount();
-    int idx;
-
-    for (idx = 0; idx < propCount; idx++) {
-        const SchemaProperty* prop = objectClass->getProperty(idx);
-        properties[prop->getName()] = ValuePtr(new Value(prop->getType()));
-    }
-
-    for (idx = 0; idx < statCount; idx++) {
-        const SchemaStatistic* stat = objectClass->getStatistic(idx);
-        statistics[stat->getName()] = ValuePtr(new Value(stat->getType()));
-    }
 }
 
-ObjectImpl::ObjectImpl(const SchemaObjectClass* type, BrokerProxyImpl* b, Buffer& buffer, bool prop, bool stat, bool managed) :
-    objectClass(type), broker(b), createTime(0), destroyTime(0), lastUpdatedTime(0)
-{
-    int idx;
-
-    if (managed) {
-        lastUpdatedTime = buffer.getLongLong();
-        createTime = buffer.getLongLong();
-        destroyTime = buffer.getLongLong();
-        objectId.reset(ObjectIdImpl::factory(buffer));
-    }
-
-    if (prop) {
-        int propCount = objectClass->getPropertyCount();
-        set<string> excludes;
-        parsePresenceMasks(buffer, excludes);
-        for (idx = 0; idx < propCount; idx++) {
-            const SchemaProperty* prop = objectClass->getProperty(idx);
-            if (excludes.count(prop->getName()) != 0) {
-                properties[prop->getName()] = ValuePtr(new Value(prop->getType()));
-            } else {
-                Value* pval = ValueImpl::factory(prop->getType(), buffer);
-                properties[prop->getName()] = ValuePtr(pval);
-            }
-        }
-    }
-
-    if (stat) {
-        int statCount = objectClass->getStatisticCount();
-        for (idx = 0; idx < statCount; idx++) {
-            const SchemaStatistic* stat = objectClass->getStatistic(idx);
-            Value* sval = ValueImpl::factory(stat->getType(), buffer);
-            statistics[stat->getName()] = ValuePtr(sval);
-        }
-    }
-}
-
-Object* ObjectImpl::factory(const SchemaObjectClass* type, BrokerProxyImpl* b, Buffer& buffer, bool prop, bool stat, bool managed)
-{
-    ObjectImpl* impl(new ObjectImpl(type, b, buffer, prop, stat, managed));
-    return new Object(impl);
-}
-
-ObjectImpl::~ObjectImpl()
-{
-}
-
-void ObjectImpl::destroy()
-{
-    destroyTime = uint64_t(Duration(now()));
-    // TODO - flag deletion
-}
-
-Value* ObjectImpl::getValue(const string& key) const
-{
-    map<string, ValuePtr>::const_iterator iter;
 
-    iter = properties.find(key);
-    if (iter != properties.end())
-        return iter->second.get();
-
-    iter = statistics.find(key);
-    if (iter != statistics.end())
-        return iter->second.get();
-
-    return 0;
-}
-
-void ObjectImpl::invokeMethod(const string& methodName, const Value* inArgs, void* context) const
+ObjectImpl::ObjectImpl(SchemaObjectClass* type) :
+    objectClass(type), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime)
 {
-    if (broker != 0 && objectId.get() != 0)
-        broker->sendMethodRequest(objectId.get(), objectClass, methodName, inArgs, context);
 }
 
-void ObjectImpl::merge(const Object& from)
-{
-    for (map<string, ValuePtr>::const_iterator piter = from.impl->properties.begin();
-         piter != from.impl->properties.end(); piter++)
-        properties[piter->first] = piter->second;
-    for (map<string, ValuePtr>::const_iterator siter = from.impl->statistics.begin();
-         siter != from.impl->statistics.end(); siter++)
-        statistics[siter->first] = siter->second;
-}
-
-void ObjectImpl::parsePresenceMasks(Buffer& buffer, set<string>& excludeList)
-{
-    int propCount = objectClass->getPropertyCount();
-    excludeList.clear();
-    uint8_t bit = 0;
-    uint8_t mask = 0;
-
-    for (int idx = 0; idx < propCount; idx++) {
-        const SchemaProperty* prop = objectClass->getProperty(idx);
-        if (prop->isOptional()) {
-            if (bit == 0) {
-                mask = buffer.getOctet();
-                bit = 1;
-            }
-            if ((mask & bit) == 0)
-                excludeList.insert(string(prop->getName()));
-            if (bit == 0x80)
-                bit = 0;
-            else
-                bit = bit << 1;
-        }
-    }
-}
 
-void ObjectImpl::encodeSchemaKey(qpid::framing::Buffer& buffer) const
+void ObjectImpl::touch()
 {
-    buffer.putShortString(objectClass->getClassKey()->getPackageName());
-    buffer.putShortString(objectClass->getClassKey()->getClassName());
-    buffer.putBin128(const_cast<uint8_t*>(objectClass->getClassKey()->getHash()));
+    lastUpdatedTime = uint64_t(Duration(now()));
 }
 
-void ObjectImpl::encodeManagedObjectData(qpid::framing::Buffer& buffer) const
-{
-    buffer.putLongLong(lastUpdatedTime);
-    buffer.putLongLong(createTime);
-    buffer.putLongLong(destroyTime);
-    objectId->impl->encode(buffer);
-}
 
-void ObjectImpl::encodeProperties(qpid::framing::Buffer& buffer) const
+void ObjectImpl::destroy()
 {
-    int propCount = objectClass->getPropertyCount();
-    uint8_t bit = 0;
-    uint8_t mask = 0;
-    ValuePtr value;
-
-    for (int idx = 0; idx < propCount; idx++) {
-        const SchemaProperty* prop = objectClass->getProperty(idx);
-        if (prop->isOptional()) {
-            value = properties[prop->getName()];
-            if (bit == 0)
-                bit = 1;
-            if (!value->isNull())
-                mask |= bit;
-            if (bit == 0x80) {
-                buffer.putOctet(mask);
-                bit = 0;
-                mask = 0;
-            } else
-                bit = bit << 1;
-        }
-    }
-    if (bit != 0) {
-        buffer.putOctet(mask);
-    }
-
-    for (int idx = 0; idx < propCount; idx++) {
-        const SchemaProperty* prop = objectClass->getProperty(idx);
-        value = properties[prop->getName()];
-        if (!prop->isOptional() || !value->isNull()) {
-            value->impl->encode(buffer);
-        }
-    }
+    destroyTime = uint64_t(Duration(now()));
 }
 
-void ObjectImpl::encodeStatistics(qpid::framing::Buffer& buffer) const
-{
-    int statCount = objectClass->getStatisticCount();
-    for (int idx = 0; idx < statCount; idx++) {
-        const SchemaStatistic* stat = objectClass->getStatistic(idx);
-        ValuePtr value = statistics[stat->getName()];
-        value->impl->encode(buffer);
-    }
-}
 
 //==================================================================
 // Wrappers
 //==================================================================
 
-Object::Object(const SchemaObjectClass* type) : impl(new ObjectImpl(type)) {}
-Object::Object(ObjectImpl* i) : impl(i) {}
+Object::Object() : impl(new ObjectImpl()) {}
+Object::Object(SchemaObjectClass* type) : impl(new ObjectImpl(type)) {}
 Object::Object(const Object& from) : impl(new ObjectImpl(*(from.impl))) {}
 Object::~Object() { delete impl; }
+const Variant::Map& Object::getValues() const { return impl->getValues(); }
+Variant::Map& Object::getValues() { return impl->getValues(); }
+const SchemaObjectClass* Object::getSchema() const { return impl->getSchema(); }
+void Object::setSchema(SchemaObjectClass* schema) { impl->setSchema(schema); }
+const char* Object::getKey() const { return impl->getKey(); }
+void Object::setKey(const char* key) { impl->setKey(key); }
+void Object::touch() { impl->touch(); }
 void Object::destroy() { impl->destroy(); }
-const ObjectId* Object::getObjectId() const { return impl->getObjectId(); }
-void Object::setObjectId(ObjectId* oid) { impl->setObjectId(oid); }
-const SchemaObjectClass* Object::getClass() const { return impl->getClass(); }
-Value* Object::getValue(const char* key) const { return impl->getValue(key); }
-void Object::invokeMethod(const char* m, const Value* a, void* c) const { impl->invokeMethod(m, a, c); }
-bool Object::isDeleted() const { return impl->isDeleted(); }
-void Object::merge(const Object& from) { impl->merge(from); }
-

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.h?rev=911854&r1=911853&r2=911854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.h Fri Feb 19 16:06:22 2010
@@ -21,53 +21,55 @@
  */
 
 #include <qmf/engine/Object.h>
-#include <qmf/engine/ObjectIdImpl.h>
+#include <qpid/sys/Mutex.h>
+#include <qpid/messaging/Variant.h>
 #include <map>
 #include <set>
 #include <string>
-#include <qpid/framing/Buffer.h>
 #include <boost/shared_ptr.hpp>
-#include <qpid/sys/Mutex.h>
 
 namespace qmf {
 namespace engine {
 
-    class BrokerProxyImpl;
+    class SchemaObjectClass;
 
     typedef boost::shared_ptr<Object> ObjectPtr;
 
     struct ObjectImpl {
-        typedef boost::shared_ptr<Value> ValuePtr;
-        const SchemaObjectClass* objectClass;
-        BrokerProxyImpl* broker;
-        boost::shared_ptr<ObjectId> objectId;
+        /**
+         * Content of the object's data
+         */
+        qpid::messaging::Variant::Map values;
+
+        /**
+         * Schema reference if this object is "described"
+         */
+        SchemaObjectClass* objectClass;
+
+        /**
+         * Address and lifecycle information if this object is "managed"
+         * The object is considered "managed" if the key is non-empty.
+         */
+        std::string key;
         uint64_t createTime;
         uint64_t destroyTime;
         uint64_t lastUpdatedTime;
-        mutable std::map<std::string, ValuePtr> properties;
-        mutable std::map<std::string, ValuePtr> statistics;
 
-        ObjectImpl(const SchemaObjectClass* type);
-        ObjectImpl(const SchemaObjectClass* type, BrokerProxyImpl* b, qpid::framing::Buffer& buffer,
-                   bool prop, bool stat, bool managed);
-        static Object* factory(const SchemaObjectClass* type, BrokerProxyImpl* b, qpid::framing::Buffer& buffer,
-                               bool prop, bool stat, bool managed);
-        ~ObjectImpl();
+        ObjectImpl();
+        ObjectImpl(SchemaObjectClass* type);
+        ~ObjectImpl() {}
+
+        const qpid::messaging::Variant::Map& getValues() const { return values; }
+        qpid::messaging::Variant::Map& getValues() { return values; }
+
+        const SchemaObjectClass* getSchema() const { return objectClass; }
+        void setSchema(SchemaObjectClass* schema) { objectClass = schema; }
+
+        const char* getKey() const { return key.c_str(); }
+        void setKey(const char* _key) { key = _key; }
 
+        void touch();
         void destroy();
-        const ObjectId* getObjectId() const { return objectId.get(); }
-        void setObjectId(ObjectId* oid) { objectId.reset(new ObjectId(*oid)); }
-        const SchemaObjectClass* getClass() const { return objectClass; }
-        Value* getValue(const std::string& key) const;
-        void invokeMethod(const std::string& methodName, const Value* inArgs, void* context) const;
-        bool isDeleted() const { return destroyTime != 0; }
-        void merge(const Object& from);
-
-        void parsePresenceMasks(qpid::framing::Buffer& buffer, std::set<std::string>& excludeList);
-        void encodeSchemaKey(qpid::framing::Buffer& buffer) const;
-        void encodeManagedObjectData(qpid::framing::Buffer& buffer) const;
-        void encodeProperties(qpid::framing::Buffer& buffer) const;
-        void encodeStatistics(qpid::framing::Buffer& buffer) const;
     };
 }
 }

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.cpp?rev=911854&r1=911853&r2=911854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.cpp Fri Feb 19 16:06:22 2010
@@ -18,52 +18,20 @@
  */
 
 #include "qmf/engine/QueryImpl.h"
-#include "qmf/engine/ObjectIdImpl.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/framing/FieldTable.h"
 
 using namespace std;
 using namespace qmf::engine;
-using namespace qpid::framing;
+using namespace qpid::messaging;
 
-bool QueryElementImpl::evaluate(const Object* /*object*/) const
+bool QueryImpl::matches(const Object&) const
 {
-    // TODO: Implement this
-    return false;
+    return true;
 }
 
-bool QueryExpressionImpl::evaluate(const Object* /*object*/) const
-{
-    // TODO: Implement this
-    return false;
-}
-
-QueryImpl::QueryImpl(Buffer& buffer)
-{
-    FieldTable ft;
-    ft.decode(buffer);
-    // TODO
-}
 
-Query* QueryImpl::factory(Buffer& buffer)
+void QueryImpl::parsePredicate(const std::string&)
 {
-    QueryImpl* impl(new QueryImpl(buffer));
-    return new Query(impl);
-}
-
-void QueryImpl::encode(Buffer& buffer) const
-{
-    FieldTable ft;
-
-    if (oid.get() != 0) {
-        ft.setString("_objectid", oid->impl->asString());
-    } else {
-        if (!packageName.empty())
-            ft.setString("_package", packageName);
-        ft.setString("_class", className);
-    }
-
-    ft.encode(buffer);
+    predicate.clear();
 }
 
 
@@ -71,33 +39,21 @@
 // Wrappers
 //==================================================================
 
-QueryElement::QueryElement(const char* attrName, const Value* value, ValueOper oper) : impl(new QueryElementImpl(attrName, value, oper)) {}
-QueryElement::QueryElement(QueryElementImpl* i) : impl(i) {}
-QueryElement::~QueryElement() { delete impl; }
-bool QueryElement::evaluate(const Object* object) const { return impl->evaluate(object); }
-
-QueryExpression::QueryExpression(ExprOper oper, const QueryOperand* operand1, const QueryOperand* operand2) : impl(new QueryExpressionImpl(oper, operand1, operand2)) {}
-QueryExpression::QueryExpression(QueryExpressionImpl* i) : impl(i) {}
-QueryExpression::~QueryExpression() { delete impl; }
-bool QueryExpression::evaluate(const Object* object) const { return impl->evaluate(object); }
-
-Query::Query(const char* className, const char* packageName) : impl(new QueryImpl(className, packageName)) {}
-Query::Query(const SchemaClassKey* key) : impl(new QueryImpl(key)) {}
-Query::Query(const ObjectId* oid) : impl(new QueryImpl(oid)) {}
-Query::Query(QueryImpl* i) : impl(i) {}
+Query::Query(const char* target) : impl(new QueryImpl(target)) {}
+Query::Query(const char* target, const Variant::List& predicate) : impl(new QueryImpl(target, predicate)) {}
+Query::Query(const char* target, const char* expression) : impl(new QueryImpl(target, expression)) {}
 Query::Query(const Query& from) : impl(new QueryImpl(*(from.impl))) {}
 Query::~Query() { delete impl; }
-void Query::setSelect(const QueryOperand* criterion) { impl->setSelect(criterion); }
-void Query::setLimit(uint32_t maxResults) { impl->setLimit(maxResults); }
-void Query::setOrderBy(const char* attrName, bool decreasing) { impl->setOrderBy(attrName, decreasing); }
-const char* Query::getPackage() const { return impl->getPackage().c_str(); }
-const char* Query::getClass() const { return impl->getClass().c_str(); }
-const ObjectId* Query::getObjectId() const { return impl->getObjectId(); }
-bool Query::haveSelect() const { return impl->haveSelect(); }
+void Query::where(const Variant::List& predicate) { impl->where(predicate); }
+void Query::where(const char* expression) { impl->where(expression); }
+void Query::limit(uint32_t maxResults) { impl->limit(maxResults); }
+void Query::orderBy(const char* attrName, bool decreasing) { impl->orderBy(attrName, decreasing); }
+bool Query::havePredicate() const { return impl->havePredicate(); }
 bool Query::haveLimit() const { return impl->haveLimit(); }
 bool Query::haveOrderBy() const { return impl->haveOrderBy(); }
-const QueryOperand* Query::getSelect() const { return impl->getSelect(); }
+const Variant::List& Query::getPredicate() const { return impl->getPredicate(); }
 uint32_t Query::getLimit() const { return impl->getLimit(); }
-const char* Query::getOrderBy() const { return impl->getOrderBy().c_str(); }
+const char* Query::getOrderBy() const { return impl->getOrderBy(); }
 bool Query::getDecreasing() const { return impl->getDecreasing(); }
+bool Query::matches(const Object& object) const { return impl->matches(object); }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org