You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/03/02 01:58:17 UTC

svn commit: r917825 - in /qpid/branches/qmf-devel0.7/qpid/cpp: include/qmf/ include/qmf/engine/ src/ src/qmf/ src/qmf/engine/

Author: tross
Date: Tue Mar  2 00:58:16 2010
New Revision: 917825

URL: http://svn.apache.org/viewvc?rev=917825&view=rev
Log:
Further implementation of the QMFv2 agent engine.
  - deprecated old ObjectId class
  - renamed Object to Data
  - added hooks for authorization of get, subscribe, and method call
  - added infrastructure for optional internal storage

Added:
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Data.h
      - copied, changed from r917292, qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Object.h
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.cpp
      - copied, changed from r917292, qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.cpp
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.h
      - copied, changed from r917292, qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.h
Removed:
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Event.h
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Object.h
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/ObjectId.h
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/EventImpl.cpp
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/EventImpl.h
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectIdImpl.h
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.cpp
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.h
Modified:
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Notifiable.h
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Agent.h
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Query.h
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf.mk
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.cpp
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.h
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/SchemaImpl.h

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Notifiable.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Notifiable.h?rev=917825&r1=917824&r2=917825&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Notifiable.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Notifiable.h Tue Mar  2 00:58:16 2010
@@ -40,7 +40,7 @@
      */
     class Notifiable {
     public:
-        QMF_EXTERN virtual ~Notifiable();
+        QMF_EXTERN virtual ~Notifiable() {}
         virtual void notify() = 0;
     };
 }

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h?rev=917825&r1=917824&r2=917825&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h Tue Mar  2 00:58:16 2010
@@ -71,6 +71,7 @@
         /**
          * QMF Op Codes
          */
+        const static std::string OP_EXCEPTION;
         const static std::string OP_AGENT_LOCATE_REQUEST;
         const static std::string OP_AGENT_LOCATE_RESPONSE;
         const static std::string OP_AGENT_HEARTBEAT_INDICATION;

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Agent.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Agent.h?rev=917825&r1=917824&r2=917825&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Agent.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Agent.h Tue Mar  2 00:58:16 2010
@@ -22,9 +22,7 @@
 
 #include <qmf/Notifiable.h>
 #include <qmf/engine/Schema.h>
-#include <qmf/engine/ObjectId.h>
-#include <qmf/engine/Object.h>
-#include <qmf/engine/Event.h>
+#include <qmf/engine/Data.h>
 #include <qmf/engine/Query.h>
 #include <qpid/messaging/Connection.h>
 #include <qpid/messaging/Variant.h>
@@ -40,10 +38,13 @@
      */
     struct AgentEvent {
         enum EventKind {
-            GET_QUERY      = 1,
-            START_SYNC     = 2,
-            END_SYNC       = 3,
-            METHOD_CALL    = 4
+            GET_QUERY        = 1,
+            START_SYNC       = 2,
+            END_SYNC         = 3,
+            METHOD_CALL      = 4,
+            GET_AUTHORIZE    = 5,
+            METHOD_AUTHORIZE = 6,
+            SYNC_AUTHORIZE   = 7
         };
 
         EventKind    kind;
@@ -52,7 +53,7 @@
         char*        authToken;   // Authentication token if issued (for all kinds)
         char*        name;        // Name of the method/sync query
                                   //    (METHOD_CALL, START_SYNC, END_SYNC)
-        Object*      object;      // Object involved in method call (METHOD_CALL)
+        Data*        object;      // Object involved in method call (METHOD_CALL)
         char*        objectKey;   // Object key for method call (METHOD_CALL)
         Query*       query;       // Query parameters (GET_QUERY, START_SYNC)
         qpid::messaging::Variant::Map*  arguments;   // Method parameters (METHOD_CALL)
@@ -128,6 +129,21 @@
         void setConnection(qpid::messaging::Connection& conn);
 
         /**
+         * Respond to an authorize request by allowing the requested action.
+         *@param sequence The sequence number from the authorization request event.
+         */
+        void authAllow(uint32_t sequence);
+
+        /**
+         * Respond to an authorize request by denying the requested action.
+         *@param sequence The sequence number from the authorization request event.
+         *@param exception Value (typically a string) describing the reason for the
+         *                 rejection of authorization.
+         */
+        void authDeny(uint32_t sequence, const Data& exception=Data());
+        void authDeny(uint32_t sequence, const char* error);
+
+        /**
          * Respond to a method request.
          *@param sequence  The sequence number from the method request event.
          *@param status    The method's completion status.
@@ -143,7 +159,7 @@
          *@param sequence The sequence number of the GET request or the SYNC_START request.
          *@param object   The object (annotated with "changed" flags) for publication.
          */
-        void queryResponse(uint32_t sequence, Object& object);
+        void queryResponse(uint32_t sequence, Data& object);
 
         /**
          * Indicate the completion of a query.  This is not used for SYNC_START requests.
@@ -165,13 +181,13 @@
          *           left null, the agent will create a unique name for the object.
          *@return The key for the managed object.
          */
-        const char* addObject(Object& obj, const char* key=0);
+        const char* addObject(Data& obj, const char* key=0);
 
         /**
          * Raise an event into the QMF network..
          *@param event The event object for the event to be raised.
          */
-        void raiseEvent(Event& event);
+        void raiseEvent(Data& event);
 
     private:
         AgentImpl* impl;

Copied: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Data.h (from r917292, qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Object.h)
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Data.h?p2=qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Data.h&p1=qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Object.h&r1=917292&r2=917825&rev=917825&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Object.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Data.h Tue Mar  2 00:58:16 2010
@@ -1,5 +1,5 @@
-#ifndef _QmfEngineObject_
-#define _QmfEngineObject_
+#ifndef _QmfEngineData_
+#define _QmfEngineData_
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -21,23 +21,25 @@
  */
 
 #include <qmf/engine/Schema.h>
-#include <qmf/engine/ObjectId.h>
 #include <qpid/messaging/Variant.h>
 
 namespace qmf {
 namespace engine {
 
-    struct ObjectImpl;
-    class Object {
+    struct DataImpl;
+    class Data {
     public:
-        Object();
-        Object(SchemaClass* type);
-        Object(const Object& from);
-        virtual ~Object();
+        Data();
+        Data(SchemaClass* type, const qpid::messaging::Variant::Map& values=qpid::messaging::Variant::Map());
+        Data(const Data& from);
+        virtual ~Data();
 
         const qpid::messaging::Variant::Map& getValues() const;
         qpid::messaging::Variant::Map& getValues();
 
+        const qpid::messaging::Variant::Map& getSubtypes() const;
+        qpid::messaging::Variant::Map& getSubtypes();
+
         const SchemaClass* getSchema() const;
         void setSchema(SchemaClass* schema);
 
@@ -47,10 +49,12 @@
         void touch();
         void destroy();
 
+        qpid::messaging::Variant::Map asMap() const;
+
     private:
-        friend struct ObjectImpl;
+        friend struct DataImpl;
         friend class  AgentImpl;
-        ObjectImpl* impl;
+        DataImpl* impl;
     };
 }
 }

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Query.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Query.h?rev=917825&r1=917824&r2=917825&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Query.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Query.h Tue Mar  2 00:58:16 2010
@@ -25,7 +25,6 @@
 namespace qmf {
 namespace engine {
 
-    class Object;
     class QueryImpl;
 
     class Query {
@@ -49,11 +48,12 @@
         const char* getOrderBy() const;
         bool getDecreasing() const;
 
-        bool matches(const Object& object) const;
+        bool matches(const qpid::messaging::Variant::Map& data) const;
 
     private:
         friend struct QueryImpl;
         friend struct BrokerProxyImpl;
+        Query(QueryImpl*);
         QueryImpl* impl;
     };
 }

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf.mk
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf.mk?rev=917825&r1=917824&r2=917825&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf.mk (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf.mk Tue Mar  2 00:58:16 2010
@@ -40,16 +40,13 @@
 QMF_ENGINE_API =				\
   ../include/qmf/engine/Agent.h			\
   ../include/qmf/engine/Console.h		\
-  ../include/qmf/engine/Event.h			\
-  ../include/qmf/engine/Object.h		\
+  ../include/qmf/engine/Data.h			\
   ../include/qmf/engine/QmfEngineImportExport.h	\
   ../include/qmf/engine/Query.h			\
   ../include/qmf/engine/Schema.h		\
   ../include/qmf/Agent.h			\
   ../include/qmf/Notifiable.h
 
-# ../include/qmf/engine/ObjectId.h
-
 # Public header files
 nobase_include_HEADERS +=	\
   $(QMF_API)			\
@@ -65,8 +62,8 @@
 libqmfengine_la_SOURCES =			\
   $(QMF_ENGINE_API)				\
   qmf/engine/Agent.cpp				\
-  qmf/engine/ObjectImpl.cpp			\
-  qmf/engine/ObjectImpl.h			\
+  qmf/engine/DataImpl.cpp			\
+  qmf/engine/DataImpl.h				\
   qmf/Protocol.cpp				\
   qmf/Protocol.h				\
   qmf/engine/QueryImpl.cpp			\
@@ -78,8 +75,6 @@
 # qmf/engine/BrokerProxyImpl.h
 # qmf/engine/ConsoleImpl.cpp
 # qmf/engine/ConsoleImpl.h
-# qmf/engine/ObjectIdImpl.cpp
-# qmf/engine/ObjectIdImpl.h
 # qmf/engine/SequenceManager.cpp
 # qmf/engine/SequenceManager.h
 

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp?rev=917825&r1=917824&r2=917825&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp Tue Mar  2 00:58:16 2010
@@ -48,6 +48,7 @@
 
 const string Protocol::APP_OPCODE("qmf.opcode");
 
+const string Protocol::OP_EXCEPTION("_exception");
 const string Protocol::OP_AGENT_LOCATE_REQUEST("_agent_locate_request");
 const string Protocol::OP_AGENT_LOCATE_RESPONSE("_agent_locate_response");
 const string Protocol::OP_AGENT_HEARTBEAT_INDICATION("_agent_heartbeat_indication");

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp?rev=917825&r1=917824&r2=917825&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp Tue Mar  2 00:58:16 2010
@@ -19,10 +19,11 @@
 
 #include "qmf/engine/Agent.h"
 #include "qmf/engine/SchemaImpl.h"
-#include "qmf/engine/ObjectImpl.h"
+#include "qmf/engine/Data.h"
 #include "qmf/engine/QueryImpl.h"
 #include "qmf/Protocol.h"
 #include <qpid/sys/Mutex.h>
+#include <qpid/sys/Condition.h>
 #include <qpid/log/Statement.h>
 #include <qpid/sys/Time.h>
 #include <qpid/sys/Thread.h>
@@ -30,8 +31,11 @@
 #include <qpid/messaging/Session.h>
 #include <qpid/messaging/Receiver.h>
 #include <qpid/messaging/Sender.h>
+#include <qpid/messaging/Address.h>
 #include <qpid/messaging/Message.h>
+#include <qpid/messaging/MapContent.h>
 #include <qpid/messaging/MapView.h>
+#include <qpid/messaging/ListView.h>
 #include <string>
 #include <deque>
 #include <map>
@@ -48,6 +52,8 @@
 namespace qmf {
 namespace engine {
 
+    class AgentImpl;
+
     struct AgentEventImpl {
         typedef boost::shared_ptr<AgentEventImpl> Ptr;
         AgentEvent::EventKind kind;
@@ -55,7 +61,7 @@
         string      authUserId;
         string      authToken;
         string      name;
-        Object*     object;
+        Data*       object;
         string      objectKey;
         boost::shared_ptr<Query> query;
         boost::shared_ptr<Variant::Map> arguments;
@@ -68,15 +74,31 @@
     };
 
     /**
-     * AgentQueryContext is used to track asynchronous requests (Query, Sync, or Method)
+     * AsyncContext is used to track asynchronous requests (Query, Sync, or Method)
      * sent up to the application.
      */
-    struct AgentQueryContext {
-        typedef boost::shared_ptr<AgentQueryContext> Ptr;
-        uint32_t sequence;
-        string   consoleAddr;
+    struct AsyncContext {
+        typedef boost::shared_ptr<AsyncContext> Ptr;
+        string correlationId;
+        Address replyTo;
+        AgentEventImpl::Ptr authorizedEvent;
         const SchemaMethod* schemaMethod;
-        AgentQueryContext() : schemaMethod(0) {}
+        AsyncContext(const string& cid, const Address& rt) : correlationId(cid), replyTo(rt), schemaMethod(0) {}
+    };
+
+    class StoreThread : public boost::noncopyable, public qpid::sys::Runnable {
+    public:
+        StoreThread(AgentImpl& a) : agent(a), running(true), thread(*this) {}
+        ~StoreThread() {
+            stop();
+        }
+        void run();
+        void stop();
+
+    private:
+        AgentImpl& agent;
+        bool running;
+        qpid::sys::Thread thread;
     };
 
     class AgentImpl : public boost::noncopyable, public qpid::sys::Runnable {
@@ -92,24 +114,34 @@
         bool getEvent(AgentEvent& event) const;
         void popEvent();
         void setConnection(Connection& conn);
+        void authAllow(uint32_t sequence);
+        void authDeny(uint32_t sequence, const Data&);
+        void authDeny(uint32_t sequence, const string&);
         void methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& arguments);
-        void queryResponse(uint32_t sequence, Object& object);
+        void queryResponse(uint32_t sequence, Data& object);
         void queryComplete(uint32_t sequence);
         void registerClass(SchemaClass* cls);
-        const char* addObject(Object& obj, const char* key);
-        void raiseEvent(Event& event);
+        const char* addObject(Data& obj, const char* key);
+        void raiseEvent(Data& event);
 
         void run();
         void stop();
 
+        // This blocking call is used by the internal store thread(s) to get work to do.
+        AgentEventImpl::Ptr nextInternalEvent();
+        void signalInternal() { cond.notify(); }
+
     private:
         mutable Mutex lock;
-        Mutex     addLock;
+        Condition cond;
         const string    vendor;
         const string    product;
         const string    name;
         const string    domain;
         string directAddr;
+        string directAddrParams;
+        string topicAddr;
+        string topicAddrParams;
         Variant::Map attrMap;
         string    storeDir;
         string    transferDir;
@@ -121,13 +153,15 @@
         uint32_t  nextContextNum;
         bool      running;
         deque<AgentEventImpl::Ptr> eventQueue;
-        map<uint32_t, AgentQueryContext::Ptr> contextMap;
+        deque<AgentEventImpl::Ptr> internalEventQueue;
+        map<uint32_t, AsyncContext::Ptr> contextMap;
         Connection connection;
         Session session;
         Receiver directReceiver;
         Receiver topicReceiver;
         Sender sender;
         qpid::sys::Thread* thread;
+        StoreThread* storeThread;
 
         struct AgentClassKey {
             string name;
@@ -169,6 +203,8 @@
         void handleSubscribeCancel(const Message& message);
         void handleSubscribeRefresh(const Message& message);
         void handleMethodRequest(const Message& message);
+        void sendResponse(const Message& request, const string& opcode, const Data& data);
+        void sendResponse(const Address& address, const string& correlationId, const string& opcode, const Data& data);
 
         void sendPackageIndicationLH(const string& packageName);
         void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key);
@@ -207,18 +243,38 @@
     return item;
 }
 
+void StoreThread::run()
+{
+    while (running) {
+        AgentEventImpl::Ptr ptr(agent.nextInternalEvent());
+    }
+}
+
+void StoreThread::stop()
+{
+    running = false;
+    agent.signalInternal();
+}
+
 AgentImpl::AgentImpl(const char* _v, const char* _p, const char* _n, const char* _d, bool _i) :
     vendor(_v), product(_p), name(_n), domain(_d ? _d : "default"), internalStore(_i),
     notifyHandler(0), notifiable(0),
     bootSequence(1), nextContextNum(1), running(true), thread(0)
 {
     directAddr = "qmf." + domain + ".direct/" + vendor + ":" + product + ":" + name;
-    if (_d == 0) {
-        directAddr += " { create:always }";
+    topicAddr  = "qmf." + domain + ".topic/console.ind.#";
+    if (_d != 0) {
+        directAddrParams = " {create: always, type: topic, x-properties: {type: direct}}";
+        topicAddrParams = " {create: always, type: topic, x-properties: {type: topic}}";
+    }
+    attrMap["_vendor"] = vendor;
+    attrMap["_product"] = product;
+    attrMap["_instance"] = name;
+    attrMap["_name"] = vendor + ":" + product + ":" + name;
+
+    if (internalStore) {
+        storeThread = new StoreThread(*this);
     }
-    attrMap["vendor"] = vendor;
-    attrMap["product"] = product;
-    attrMap["name"] = name;
 }
 
 
@@ -297,42 +353,93 @@
     thread = new qpid::sys::Thread(*this);
 }
 
+void AgentImpl::authAllow(uint32_t sequence)
+{
+    Mutex::ScopedLock _lock(lock);
+
+    // Find the context associated with the sequence number
+    map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
+    if (iter == contextMap.end())
+        return;
+    AsyncContext::Ptr context = iter->second;
+
+    // Transform the authorize event into the real event
+    switch (context->authorizedEvent->kind) {
+    case AgentEvent::GET_AUTHORIZE    : context->authorizedEvent->kind = AgentEvent::GET_QUERY; break;
+    case AgentEvent::METHOD_AUTHORIZE : context->authorizedEvent->kind = AgentEvent::METHOD_CALL; break;
+    case AgentEvent::SYNC_AUTHORIZE   : context->authorizedEvent->kind = AgentEvent::START_SYNC; break;
+    default:
+        contextMap.erase(iter);
+        return;
+    }
+
+    // Re-issue the now-authorized action.  If this is a data query (get or subscribe),
+    // and the agent is handling storage internally, redirect to the internal event
+    // queue for processing by the internal-storage thread.
+    if (internalStore) {
+        internalEventQueue.push_back(context->authorizedEvent);
+        cond.notify();
+    } else {
+        eventQueue.push_back(context->authorizedEvent);
+        notify();
+    }
+}
+
+void AgentImpl::authDeny(uint32_t sequence, const Data& exception)
+{
+    Mutex::ScopedLock _lock(lock);
+    map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
+    if (iter == contextMap.end())
+        return;
+    AsyncContext::Ptr context = iter->second;
+    contextMap.erase(iter);
+
+    // Return an exception message to the requestor
+    sendResponse(context->replyTo, context->correlationId, Protocol::OP_EXCEPTION, exception);
+}
+
+void AgentImpl::authDeny(uint32_t sequence, const string& error)
+{
+    Data exception;
+    exception.getValues()["status"] = "Access to this Operation Denied";
+    exception.getValues()["text"] = error;
+    authDeny(sequence, exception);
+}
+
 void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& /*argMap*/)
 {
     Mutex::ScopedLock _lock(lock);
-    map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
+    map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
     if (iter == contextMap.end())
         return;
-    AgentQueryContext::Ptr context = iter->second;
+    AsyncContext::Ptr context = iter->second;
     contextMap.erase(iter);
 
     // TODO: Encode method response
-    QPID_LOG(trace, "SENT MethodResponse seq=" << context->sequence << " status=" << status << " text=" << text);
+    QPID_LOG(trace, "SENT MethodResponse corr=" << context->correlationId << " status=" << status << " text=" << text);
 }
 
-void AgentImpl::queryResponse(uint32_t sequence, Object&)
+void AgentImpl::queryResponse(uint32_t sequence, Data&)
 {
     Mutex::ScopedLock _lock(lock);
-    map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
+    map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
     if (iter == contextMap.end())
         return;
-    AgentQueryContext::Ptr context = iter->second;
+    AsyncContext::Ptr context = iter->second;
 
     // TODO: accumulate data records and send response messages when we have "enough"
-
-    QPID_LOG(trace, "SENT ContentIndication seq=" << context->sequence);
 }
 
 void AgentImpl::queryComplete(uint32_t sequence)
 {
     Mutex::ScopedLock _lock(lock);
-    map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
+    map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
     if (iter == contextMap.end())
         return;
 
     // TODO: send a response message if there are any unsent data records
 
-    AgentQueryContext::Ptr context = iter->second;
+    AsyncContext::Ptr context = iter->second;
     contextMap.erase(iter);
     //sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK");
 }
@@ -354,13 +461,13 @@
     // TODO: Indicate this schema if connected.
 }
 
-const char* AgentImpl::addObject(Object&, const char*)
+const char* AgentImpl::addObject(Data&, const char*)
 {
     Mutex::ScopedLock _lock(lock);
     return 0;
 }
 
-void AgentImpl::raiseEvent(Event&)
+void AgentImpl::raiseEvent(Data&)
 {
     Mutex::ScopedLock _lock(lock);
 }
@@ -370,9 +477,14 @@
     qpid::sys::Duration duration = qpid::sys::TIME_MSEC * 500;
 
     session = connection.newSession();
-    directReceiver = session.createReceiver(directAddr);
+    QPID_LOG(trace, "Creating direct receiver to address: " << directAddr << directAddrParams);
+    directReceiver = session.createReceiver(directAddr + directAddrParams);
     directReceiver.setCapacity(10);
 
+    QPID_LOG(trace, "Creating topic receiver to address: " << topicAddr << topicAddrParams);
+    topicReceiver = session.createReceiver(topicAddr + topicAddrParams);
+    topicReceiver.setCapacity(10);
+
     Mutex::ScopedLock _lock(lock);
     while (running) {
         Receiver rcvr;
@@ -398,12 +510,29 @@
     running = false;
 }
 
+AgentEventImpl::Ptr AgentImpl::nextInternalEvent()
+{
+    Mutex::ScopedLock _lock(lock);
+    while (internalEventQueue.empty())
+        cond.wait(lock);
+
+    AgentEventImpl::Ptr event(internalEventQueue.front());
+    internalEventQueue.pop_front();
+    return event;
+
+    // TODO: make sure this function returns with a null pointer when the thread needs to stop.
+}
+
+
 void AgentImpl::handleRcvMessageLH(const Message& message)
 {
     Variant::Map headers(message.getHeaders());
-    cout << "AgentImpl::handleRcvMessageLH headers=" << headers << endl;
+    cout << "AgentImpl::handleRcvMessageLH contentType=" << message.getContentType() <<
+        " replyTo=" << message.getReplyTo() <<
+        " headers=" << headers << endl;
 
-    if (message.getContentType() != Protocol::AMQP_CONTENT_MAP)
+    if (message.getContentType() != Protocol::AMQP_CONTENT_MAP &&
+        message.getContentType() != Protocol::AMQP_CONTENT_LIST)
         return;
 
     Variant::Map::const_iterator iter = headers.find(Protocol::APP_OPCODE);
@@ -421,16 +550,33 @@
 
 void AgentImpl::handleAgentLocateLH(const Message& message)
 {
-    const MapView predicate(message);
-
-    //if (predicateMatches(predicate, attrMap)) {
-    //    sendResponse(message, Protocol::OP_AGENT_LOCATE_RESPONSE, attrMap);
-    //}
+    QPID_LOG(trace, "RCVD AgentLocateRequest replyTo=" << message.getReplyTo());
+    auto_ptr<Query> query(QueryImpl::factory(ListView(message)));
+    if (query->matches(attrMap)) {
+        Data data(0, attrMap);
+        sendResponse(message, Protocol::OP_AGENT_LOCATE_RESPONSE, data);
+        QPID_LOG(trace, "SENT AgentLocateResponse");
+    }
 }
 
 void AgentImpl::handleQueryRequestLH(const Message& message)
 {
-    const MapView map(message);
+    uint32_t contextNum = nextContextNum++;
+    AsyncContext::Ptr context(new AsyncContext(message.getCorrelationId(), message.getReplyTo()));
+    contextMap[contextNum] = context;
+
+    // Build the event for the get request
+    AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_AUTHORIZE));
+    event->sequence = contextNum;
+    event->authUserId = message.getUserId();
+    event->query.reset(QueryImpl::factory(MapView(message)));
+
+    // Put the not-yet-authorized event into the context for possible later use
+    context->authorizedEvent = event;
+
+    // Enqueue the event
+    eventQueue.push_back(event);
+    notify();
 }
 
 void AgentImpl::handleSubscribeRequest(const Message& message)
@@ -453,6 +599,22 @@
     const MapView map(message);
 }
 
+void AgentImpl::sendResponse(const Message& request, const string& opcode, const Data& data)
+{
+    sendResponse(request.getReplyTo(), request.getCorrelationId(), opcode, data);
+}
+
+void AgentImpl::sendResponse(const Address& address, const string& correlationId, const string& opcode, const Data& data)
+{
+    Message message;
+    MapContent content(message, data.asMap());
+
+    message.setCorrelationId(correlationId);
+    message.getHeaders()[Protocol::APP_OPCODE] = opcode;
+    content.encode();
+    session.createSender(address).send(message);
+}
+
 AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string&, const string&, const string& key)
 {
     AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY));
@@ -528,14 +690,8 @@
     Mutex::ScopedLock _lock(lock);
 }
 
-void AgentImpl::handleMethodRequest(Message& /*msg*/, uint32_t sequence, const string& /*replyTo*/, const string& /*userId*/)
+void AgentImpl::handleMethodRequest(Message& /*msg*/, uint32_t, const string& /*replyTo*/, const string& /*userId*/)
 {
-    Mutex::ScopedLock _lock(lock);
-    QPID_LOG(trace, "RCVD MethodRequest seq=" << sequence << " method=");
-
-    AgentQueryContext::Ptr context(new AgentQueryContext);
-    uint32_t contextNum = nextContextNum++;
-    contextMap[contextNum] = context;
 }
 
 //==================================================================
@@ -552,10 +708,13 @@
 bool Agent::getEvent(AgentEvent& event) const { return impl->getEvent(event); }
 void Agent::popEvent() { impl->popEvent(); }
 void Agent::setConnection(Connection& conn) { impl->setConnection(conn); }
+void Agent::authAllow(uint32_t sequence) { impl->authAllow(sequence); }
+void Agent::authDeny(uint32_t sequence, const Data& ex) { impl->authDeny(sequence, ex); }
+void Agent::authDeny(uint32_t sequence, const char* ex) { impl->authDeny(sequence, string(ex)); }
 void Agent::methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& arguments) { impl->methodResponse(sequence, status, text, arguments); }
-void Agent::queryResponse(uint32_t sequence, Object& object) { impl->queryResponse(sequence, object); }
+void Agent::queryResponse(uint32_t sequence, Data& object) { impl->queryResponse(sequence, object); }
 void Agent::queryComplete(uint32_t sequence) { impl->queryComplete(sequence); }
 void Agent::registerClass(SchemaClass* cls) { impl->registerClass(cls); }
-const char* Agent::addObject(Object& obj, const char* key) { return impl->addObject(obj, key); }
-void Agent::raiseEvent(Event& event) { impl->raiseEvent(event); }
+const char* Agent::addObject(Data& obj, const char* key) { return impl->addObject(obj, key); }
+void Agent::raiseEvent(Data& event) { impl->raiseEvent(event); }
 

Copied: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.cpp (from r917292, qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.cpp)
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.cpp?p2=qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.cpp&p1=qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.cpp&r1=917292&r2=917825&rev=917825&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.cpp Tue Mar  2 00:58:16 2010
@@ -17,7 +17,8 @@
  * under the License.
  */
 
-#include "qmf/engine/ObjectImpl.h"
+#include "qmf/Protocol.h"
+#include "qmf/engine/DataImpl.h"
 #include <qpid/sys/Time.h>
 
 using namespace std;
@@ -25,43 +26,59 @@
 using namespace qpid::sys;
 using namespace qpid::messaging;
 
-ObjectImpl::ObjectImpl() :
+DataImpl::DataImpl() :
     objectClass(0), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime)
 {
 }
 
 
-ObjectImpl::ObjectImpl(SchemaClass* type) :
-    objectClass(type), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime)
+DataImpl::DataImpl(SchemaClass* type, const qpid::messaging::Variant::Map& v) :
+    values(v), objectClass(type), createTime(uint64_t(Duration(now()))),
+    destroyTime(0), lastUpdatedTime(createTime)
 {
 }
 
 
-void ObjectImpl::touch()
+void DataImpl::touch()
 {
     lastUpdatedTime = uint64_t(Duration(now()));
 }
 
 
-void ObjectImpl::destroy()
+void DataImpl::destroy()
 {
     destroyTime = uint64_t(Duration(now()));
 }
 
+Variant::Map DataImpl::asMap() const
+{
+    Variant::Map map;
+
+    map[Protocol::VALUES] = values;
+    if (!subtypes.empty())
+        map[Protocol::SUBTYPES] = subtypes;
+    // TODO: Add key, schema, and lifecycle data
+
+    return map;
+}
+
 
 //==================================================================
 // Wrappers
 //==================================================================
 
-Object::Object() : impl(new ObjectImpl()) {}
-Object::Object(SchemaClass* type) : impl(new ObjectImpl(type)) {}
-Object::Object(const Object& from) : impl(new ObjectImpl(*(from.impl))) {}
-Object::~Object() { delete impl; }
-const Variant::Map& Object::getValues() const { return impl->getValues(); }
-Variant::Map& Object::getValues() { return impl->getValues(); }
-const SchemaClass* Object::getSchema() const { return impl->getSchema(); }
-void Object::setSchema(SchemaClass* schema) { impl->setSchema(schema); }
-const char* Object::getKey() const { return impl->getKey(); }
-void Object::setKey(const char* key) { impl->setKey(key); }
-void Object::touch() { impl->touch(); }
-void Object::destroy() { impl->destroy(); }
+Data::Data() : impl(new DataImpl()) {}
+Data::Data(SchemaClass* type, const Variant::Map& m) : impl(new DataImpl(type, m)) {}
+Data::Data(const Data& from) : impl(new DataImpl(*(from.impl))) {}
+Data::~Data() { delete impl; }
+const Variant::Map& Data::getValues() const { return impl->getValues(); }
+Variant::Map& Data::getValues() { return impl->getValues(); }
+const Variant::Map& Data::getSubtypes() const { return impl->getSubtypes(); }
+Variant::Map& Data::getSubtypes() { return impl->getSubtypes(); }
+const SchemaClass* Data::getSchema() const { return impl->getSchema(); }
+void Data::setSchema(SchemaClass* schema) { impl->setSchema(schema); }
+const char* Data::getKey() const { return impl->getKey(); }
+void Data::setKey(const char* key) { impl->setKey(key); }
+void Data::touch() { impl->touch(); }
+void Data::destroy() { impl->destroy(); }
+Variant::Map Data::asMap() const { return impl->asMap(); }

Copied: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.h (from r917292, qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.h)
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.h?p2=qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.h&p1=qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.h&r1=917292&r2=917825&rev=917825&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.h Tue Mar  2 00:58:16 2010
@@ -1,5 +1,5 @@
-#ifndef _QmfEngineObjectImpl_
-#define _QmfEngineObjectImpl_
+#ifndef _QmfEngineDataImpl_
+#define _QmfEngineDataImpl_
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,7 +20,7 @@
  * under the License.
  */
 
-#include <qmf/engine/Object.h>
+#include <qmf/engine/Data.h>
 #include <qpid/sys/Mutex.h>
 #include <qpid/messaging/Variant.h>
 #include <map>
@@ -33,13 +33,14 @@
 
     class SchemaClass;
 
-    typedef boost::shared_ptr<Object> ObjectPtr;
+    typedef boost::shared_ptr<Data> DataPtr;
 
-    struct ObjectImpl {
+    struct DataImpl {
         /**
          * Content of the object's data
          */
         qpid::messaging::Variant::Map values;
+        qpid::messaging::Variant::Map subtypes;
 
         /**
          * Schema reference if this object is "described"
@@ -55,13 +56,16 @@
         uint64_t destroyTime;
         uint64_t lastUpdatedTime;
 
-        ObjectImpl();
-        ObjectImpl(SchemaClass* type);
-        ~ObjectImpl() {}
+        DataImpl();
+        DataImpl(SchemaClass* type, const qpid::messaging::Variant::Map&);
+        ~DataImpl() {}
 
         const qpid::messaging::Variant::Map& getValues() const { return values; }
         qpid::messaging::Variant::Map& getValues() { return values; }
 
+        const qpid::messaging::Variant::Map& getSubtypes() const { return subtypes; }
+        qpid::messaging::Variant::Map& getSubtypes() { return subtypes; }
+
         const SchemaClass* getSchema() const { return objectClass; }
         void setSchema(SchemaClass* schema) { objectClass = schema; }
 
@@ -70,6 +74,8 @@
 
         void touch();
         void destroy();
+
+        qpid::messaging::Variant::Map asMap() const;
     };
 }
 }

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.cpp?rev=917825&r1=917824&r2=917825&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.cpp Tue Mar  2 00:58:16 2010
@@ -23,7 +23,17 @@
 using namespace qmf::engine;
 using namespace qpid::messaging;
 
-bool QueryImpl::matches(const Object&) const
+QueryImpl::QueryImpl(const qpid::messaging::MapView&)
+{
+    // TODO
+}
+
+QueryImpl::QueryImpl(const qpid::messaging::ListView&)
+{
+    //TODO
+}
+
+bool QueryImpl::matches(const Variant::Map&) const
 {
     return true;
 }
@@ -34,6 +44,17 @@
     predicate.clear();
 }
 
+Query* QueryImpl::factory(const qpid::messaging::MapView& map)
+{
+    QueryImpl* impl(new QueryImpl(map));
+    return new Query(impl);
+}
+
+Query* QueryImpl::factory(const qpid::messaging::ListView& pred)
+{
+    QueryImpl* impl(new QueryImpl(pred));
+    return new Query(impl);
+}
 
 //==================================================================
 // Wrappers
@@ -43,6 +64,7 @@
 Query::Query(const char* target, const Variant::List& predicate) : impl(new QueryImpl(target, predicate)) {}
 Query::Query(const char* target, const char* expression) : impl(new QueryImpl(target, expression)) {}
 Query::Query(const Query& from) : impl(new QueryImpl(*(from.impl))) {}
+Query::Query(QueryImpl* i) : impl(i) {}
 Query::~Query() { delete impl; }
 void Query::where(const Variant::List& predicate) { impl->where(predicate); }
 void Query::where(const char* expression) { impl->where(expression); }
@@ -55,5 +77,5 @@
 uint32_t Query::getLimit() const { return impl->getLimit(); }
 const char* Query::getOrderBy() const { return impl->getOrderBy(); }
 bool Query::getDecreasing() const { return impl->getDecreasing(); }
-bool Query::matches(const Object& object) const { return impl->matches(object); }
+bool Query::matches(const Variant::Map& data) const { return impl->matches(data); }
 

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.h?rev=917825&r1=917824&r2=917825&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.h Tue Mar  2 00:58:16 2010
@@ -22,6 +22,8 @@
 
 #include "qmf/engine/Query.h"
 #include <qpid/messaging/Variant.h>
+#include <qpid/messaging/MapView.h>
+#include <qpid/messaging/ListView.h>
 #include <string>
 #include <boost/shared_ptr.hpp>
 
@@ -34,8 +36,13 @@
             target(_target), predicate(_predicate), resultLimit(0) {}
         QueryImpl(const char* _target, const char* expression) :
             target(_target), resultLimit(0) { parsePredicate(expression); }
+        QueryImpl(const qpid::messaging::MapView& map);
+        QueryImpl(const qpid::messaging::ListView& pred);
         ~QueryImpl() {}
 
+        static Query* factory(const qpid::messaging::MapView& map);
+        static Query* factory(const qpid::messaging::ListView& pred);
+
         void where(const qpid::messaging::Variant::List& _predicate) { predicate = _predicate; }
         void where(const char* expression) { parsePredicate(expression); }
         void limit(uint32_t maxResults) { resultLimit = maxResults; }
@@ -48,7 +55,7 @@
         uint32_t getLimit() const { return resultLimit; }
         const char* getOrderBy() const { return sortAttr.c_str(); }
         bool getDecreasing() const { return orderDecreasing; }
-        bool matches(const Object& object) const;
+        bool matches(const qpid::messaging::Variant::Map& data) const;
 
         void parsePredicate(const std::string& expression);
 

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/SchemaImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/SchemaImpl.h?rev=917825&r1=917824&r2=917825&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/SchemaImpl.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/SchemaImpl.h Tue Mar  2 00:58:16 2010
@@ -41,7 +41,7 @@
         SchemaException(const std::string& context, const std::string& expected) {
             text = context + ": Expected item with key " + expected;
         }
-        virtual ~SchemaException() throw();
+        virtual ~SchemaException() throw() {}
         virtual const char* what() const throw() { return text.c_str(); }
 
     private:



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org