You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/03/02 04:25:56 UTC

svn commit: r917854 - in /qpid/branches/qmf-devel0.7/qpid/cpp: include/qmf/Protocol.h include/qmf/engine/Data.h src/qmf/Protocol.cpp src/qmf/engine/Agent.cpp src/qmf/engine/DataImpl.cpp src/qmf/engine/DataImpl.h

Author: tross
Date: Tue Mar  2 03:25:56 2010
New Revision: 917854

URL: http://svn.apache.org/viewvc?rev=917854&view=rev
Log:
Update branch with new Agent engine implementation:
  - Data hooks to allow batched and partial updates from internal storage.
  - Capability for immediate updates for deletion and changing of discrete values.
  - Implementation of query and event-raise.

Modified:
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h
    qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Data.h
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.cpp
    qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.h

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h?rev=917854&r1=917853&r2=917854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h Tue Mar  2 03:25:56 2010
@@ -67,6 +67,8 @@
          * Application Header Keys
          */
         const static std::string APP_OPCODE;
+        const static std::string APP_PARTIAL;
+        const static std::string APP_CONTENT;
 
         /**
          * QMF Op Codes
@@ -88,6 +90,7 @@
         /**
          * Content type definitions
          */
+        const static std::string CONTENT_NONE;
         const static std::string CONTENT_PACKAGE;
         const static std::string CONTENT_SCHEMA_ID;
         const static std::string CONTENT_SCHEMA_CLASS;

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Data.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Data.h?rev=917854&r1=917853&r2=917854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Data.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Data.h Tue Mar  2 03:25:56 2010
@@ -20,12 +20,12 @@
  * under the License.
  */
 
-#include <qmf/engine/Schema.h>
 #include <qpid/messaging/Variant.h>
 
 namespace qmf {
 namespace engine {
 
+    class SchemaClass;
     struct DataImpl;
     class Data {
     public:
@@ -41,16 +41,14 @@
         qpid::messaging::Variant::Map& getSubtypes();
 
         const SchemaClass* getSchema() const;
-        void setSchema(SchemaClass* schema);
 
         const char* getKey() const;
         void setKey(const char* key);
 
-        void touch();
+        void modifyStart();
+        void modifyDone();
         void destroy();
 
-        qpid::messaging::Variant::Map asMap() const;
-
     private:
         friend struct DataImpl;
         friend class  AgentImpl;

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp?rev=917854&r1=917853&r2=917854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp Tue Mar  2 03:25:56 2010
@@ -47,6 +47,8 @@
 const string Protocol::AMQP_CONTENT_LIST("amqp/list");
 
 const string Protocol::APP_OPCODE("qmf.opcode");
+const string Protocol::APP_PARTIAL("partial");
+const string Protocol::APP_CONTENT("qmf.content");
 
 const string Protocol::OP_EXCEPTION("_exception");
 const string Protocol::OP_AGENT_LOCATE_REQUEST("_agent_locate_request");
@@ -62,6 +64,7 @@
 const string Protocol::OP_METHOD_REQUEST("_method_request");
 const string Protocol::OP_METHOD_RESPONSE("_method_response");
 
+const string Protocol::CONTENT_NONE("");
 const string Protocol::CONTENT_PACKAGE("_schema_package");
 const string Protocol::CONTENT_SCHEMA_ID("_schema_id");
 const string Protocol::CONTENT_SCHEMA_CLASS("_schema_class");

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp?rev=917854&r1=917853&r2=917854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp Tue Mar  2 03:25:56 2010
@@ -19,7 +19,7 @@
 
 #include "qmf/engine/Agent.h"
 #include "qmf/engine/SchemaImpl.h"
-#include "qmf/engine/Data.h"
+#include "qmf/engine/DataImpl.h"
 #include "qmf/engine/QueryImpl.h"
 #include "qmf/Protocol.h"
 #include <qpid/sys/Mutex.h>
@@ -34,6 +34,7 @@
 #include <qpid/messaging/Address.h>
 #include <qpid/messaging/Message.h>
 #include <qpid/messaging/MapContent.h>
+#include <qpid/messaging/ListContent.h>
 #include <qpid/messaging/MapView.h>
 #include <qpid/messaging/ListView.h>
 #include <string>
@@ -86,15 +87,27 @@
         AsyncContext(const string& cid, const Address& rt) : correlationId(cid), replyTo(rt), schemaMethod(0) {}
     };
 
-    class StoreThread : public boost::noncopyable, public qpid::sys::Runnable {
+    /**
+     * StoreThread is used only when the Agent runs in internal-store mode.
+     * This class keeps track of stored objects and can perform queries and
+     * subscription queries on the data.
+     */
+    class StoreThread : public boost::noncopyable, public qpid::sys::Runnable, public DataManager {
     public:
         StoreThread(AgentImpl& a) : agent(a), running(true), thread(*this) {}
-        ~StoreThread() {
-            stop();
-        }
+        ~StoreThread() { stop(); }
+
+        void addObject(const Data& data);
+
+        // Methods from Runnable
         void run();
         void stop();
 
+        // Methods from DataManager
+        void modifyStart(DataPtr data);
+        void modifyDone(DataPtr data);
+        void destroy(DataPtr data);
+
     private:
         AgentImpl& agent;
         bool running;
@@ -142,6 +155,7 @@
         string directAddrParams;
         string topicAddr;
         string topicAddrParams;
+        string eventSendAddr;
         Variant::Map attrMap;
         string    storeDir;
         string    transferDir;
@@ -204,7 +218,10 @@
         void handleSubscribeRefresh(const Message& message);
         void handleMethodRequest(const Message& message);
         void sendResponse(const Message& request, const string& opcode, const Data& data);
-        void sendResponse(const Address& address, const string& correlationId, const string& opcode, const Data& data);
+        void send(const Address& address, const string& correlationId, const string& opcode,
+                  const string& cType, const Data& data);
+        void send(const Address& address, const string& correlationId, const string& opcode,
+                  const string& cType, const Variant::List& list, bool partial=false);
 
         void sendPackageIndicationLH(const string& packageName);
         void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key);
@@ -243,6 +260,11 @@
     return item;
 }
 
+void StoreThread::addObject(const Data& data)
+{
+    DataPtr stored(new Data(data));
+}
+
 void StoreThread::run()
 {
     while (running) {
@@ -256,6 +278,30 @@
     agent.signalInternal();
 }
 
+void StoreThread::modifyStart(DataPtr)
+{
+    // Algorithm:
+    //   Make a copy of the indicated object as a delta base if there
+    //   isn't already one in place.  If there is, do nothing.
+}
+
+void StoreThread::modifyDone(DataPtr)
+{
+    // Algorithm:
+    //   If any deltas between the current and the stored base are discrete,
+    //   send an immediate update.  Otherwise, mark the object as modified.
+    //
+    //   If an update is sent, delete the base copy.  If not, leave the base copy
+    //   in place for the later periodic update.
+}
+
+void StoreThread::destroy(DataPtr)
+{
+    // Algorithm:
+    //   Send an immediate full-update for this object with the delete time set.
+    //   Remove the object and any copies from the data store.
+}
+
 AgentImpl::AgentImpl(const char* _v, const char* _p, const char* _n, const char* _d, bool _i) :
     vendor(_v), product(_p), name(_n), domain(_d ? _d : "default"), internalStore(_i),
     notifyHandler(0), notifiable(0),
@@ -263,6 +309,7 @@
 {
     directAddr = "qmf." + domain + ".direct/" + vendor + ":" + product + ":" + name;
     topicAddr  = "qmf." + domain + ".topic/console.ind.#";
+    eventSendAddr = "qmf." + domain + ".topic/agent.event";
     if (_d != 0) {
         directAddrParams = " {create: always, type: topic, x-properties: {type: direct}}";
         topicAddrParams = " {create: always, type: topic, x-properties: {type: topic}}";
@@ -376,7 +423,7 @@
     // Re-issue the now-authorized action.  If this is a data query (get or subscribe),
     // and the agent is handling storage internally, redirect to the internal event
     // queue for processing by the internal-storage thread.
-    if (internalStore) {
+    if (internalStore && context->authorizedEvent->kind != AgentEvent::METHOD_CALL) {
         internalEventQueue.push_back(context->authorizedEvent);
         cond.notify();
     } else {
@@ -395,7 +442,7 @@
     contextMap.erase(iter);
 
     // Return an exception message to the requestor
-    sendResponse(context->replyTo, context->correlationId, Protocol::OP_EXCEPTION, exception);
+    send(context->replyTo, context->correlationId, Protocol::OP_EXCEPTION, Protocol::CONTENT_NONE, exception);
 }
 
 void AgentImpl::authDeny(uint32_t sequence, const string& error)
@@ -419,29 +466,37 @@
     QPID_LOG(trace, "SENT MethodResponse corr=" << context->correlationId << " status=" << status << " text=" << text);
 }
 
-void AgentImpl::queryResponse(uint32_t sequence, Data&)
+void AgentImpl::queryResponse(uint32_t sequence, Data& data)
 {
-    Mutex::ScopedLock _lock(lock);
-    map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
-    if (iter == contextMap.end())
-        return;
-    AsyncContext::Ptr context = iter->second;
+    AsyncContext::Ptr context;
+    {
+        Mutex::ScopedLock _lock(lock);
+        map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
+        if (iter == contextMap.end())
+            return;
+        context = iter->second;
+    }
 
-    // TODO: accumulate data records and send response messages when we have "enough"
+    Variant::List list;
+    list.push_back(data.impl->asMap());
+    send(context->replyTo, context->correlationId, Protocol::OP_QUERY_RESPONSE, Protocol::CONTENT_NONE, list, true);
+    QPID_LOG(trace, "SENT QueryResponse to=" << context->replyTo);
 }
 
 void AgentImpl::queryComplete(uint32_t sequence)
 {
-    Mutex::ScopedLock _lock(lock);
-    map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
-    if (iter == contextMap.end())
-        return;
-
-    // TODO: send a response message if there are any unsent data records
+    AsyncContext::Ptr context;
+    {
+        Mutex::ScopedLock _lock(lock);
+        map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
+        if (iter == contextMap.end())
+            return;
+        context = iter->second;
+        contextMap.erase(iter);
+    }
 
-    AsyncContext::Ptr context = iter->second;
-    contextMap.erase(iter);
-    //sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK");
+    send(context->replyTo, context->correlationId, Protocol::OP_QUERY_RESPONSE, Protocol::CONTENT_NONE, Variant::List());
+    QPID_LOG(trace, "SENT QueryResponse to=" << context->replyTo << " final response message");
 }
 
 void AgentImpl::registerClass(SchemaClass* cls)
@@ -463,13 +518,24 @@
 
 const char* AgentImpl::addObject(Data&, const char*)
 {
+    // TODO: Implement
+    //
+    // Determine a key for this object:
+    //   if supplied, use the supplied key
+    //   else:
+    //    if the data is described (has a schema), use the schema primary-key to generate a key
+    //    else make something up (a guid)
+    // 
+
     Mutex::ScopedLock _lock(lock);
     return 0;
 }
 
-void AgentImpl::raiseEvent(Data&)
+void AgentImpl::raiseEvent(Data& data)
 {
-    Mutex::ScopedLock _lock(lock);
+    Variant::List list;
+    list.push_back(data.impl->asMap());
+    send(eventSendAddr, "", Protocol::OP_DATA_INDICATION, Protocol::CONTENT_EVENT, list);
 }
 
 void AgentImpl::run()
@@ -601,16 +667,35 @@
 
 void AgentImpl::sendResponse(const Message& request, const string& opcode, const Data& data)
 {
-    sendResponse(request.getReplyTo(), request.getCorrelationId(), opcode, data);
+    send(request.getReplyTo(), request.getCorrelationId(), opcode, Protocol::CONTENT_NONE, data);
+}
+
+void AgentImpl::send(const Address& address, const string& correlationId, const string& opcode, const string& cType, const Data& data)
+{
+    Message message;
+    MapContent content(message, data.impl->asMap());
+
+    if (!correlationId.empty())
+        message.setCorrelationId(correlationId);
+    if (!cType.empty())
+        message.getHeaders()[Protocol::APP_CONTENT] = cType;
+    message.getHeaders()[Protocol::APP_OPCODE] = opcode;
+    content.encode();
+    session.createSender(address).send(message);
 }
 
-void AgentImpl::sendResponse(const Address& address, const string& correlationId, const string& opcode, const Data& data)
+void AgentImpl::send(const Address& address, const string& correlationId, const string& opcode, const string& cType, const Variant::List& list, bool partial)
 {
     Message message;
-    MapContent content(message, data.asMap());
+    ListContent content(message, list);
 
-    message.setCorrelationId(correlationId);
+    if (!correlationId.empty())
+        message.setCorrelationId(correlationId);
+    if (!cType.empty())
+        message.getHeaders()[Protocol::APP_CONTENT] = cType;
     message.getHeaders()[Protocol::APP_OPCODE] = opcode;
+    if (partial)
+        message.getHeaders()[Protocol::APP_PARTIAL] = Variant();
     content.encode();
     session.createSender(address).send(message);
 }

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.cpp?rev=917854&r1=917853&r2=917854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.cpp Tue Mar  2 03:25:56 2010
@@ -27,27 +27,44 @@
 using namespace qpid::messaging;
 
 DataImpl::DataImpl() :
-    objectClass(0), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime)
+    objectClass(0), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime),
+    manager(0)
 {
 }
 
 
 DataImpl::DataImpl(SchemaClass* type, const qpid::messaging::Variant::Map& v) :
     values(v), objectClass(type), createTime(uint64_t(Duration(now()))),
-    destroyTime(0), lastUpdatedTime(createTime)
+    destroyTime(0), lastUpdatedTime(createTime), manager(0)
 {
 }
 
 
-void DataImpl::touch()
+void DataImpl::modifyStart()
 {
+    Mutex::ScopedLock _lock(lock);
     lastUpdatedTime = uint64_t(Duration(now()));
+    if (manager != 0)
+        manager->modifyStart(parent);
+}
+
+
+void DataImpl::modifyDone()
+{
+    Mutex::ScopedLock _lock(lock);
+    if (manager != 0)
+        manager->modifyDone(parent);
 }
 
 
 void DataImpl::destroy()
 {
+    Mutex::ScopedLock _lock(lock);
     destroyTime = uint64_t(Duration(now()));
+    if (manager != 0)
+        manager->destroy(parent);
+    parent.reset();
+    manager = 0;
 }
 
 Variant::Map DataImpl::asMap() const
@@ -62,6 +79,18 @@
     return map;
 }
 
+Variant::Map DataImpl::asMapDelta(Data&) const
+{
+    Variant::Map map;
+    return map;
+}
+
+void DataImpl::registerManager(DataManager* m, DataPtr d)
+{
+    Mutex::ScopedLock _lock(lock);
+    manager = m;
+    parent = d;
+}
 
 //==================================================================
 // Wrappers
@@ -76,9 +105,8 @@
 const Variant::Map& Data::getSubtypes() const { return impl->getSubtypes(); }
 Variant::Map& Data::getSubtypes() { return impl->getSubtypes(); }
 const SchemaClass* Data::getSchema() const { return impl->getSchema(); }
-void Data::setSchema(SchemaClass* schema) { impl->setSchema(schema); }
 const char* Data::getKey() const { return impl->getKey(); }
 void Data::setKey(const char* key) { impl->setKey(key); }
-void Data::touch() { impl->touch(); }
+void Data::modifyStart() { impl->modifyStart(); }
+void Data::modifyDone() { impl->modifyDone(); }
 void Data::destroy() { impl->destroy(); }
-Variant::Map Data::asMap() const { return impl->asMap(); }

Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.h?rev=917854&r1=917853&r2=917854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.h Tue Mar  2 03:25:56 2010
@@ -35,7 +35,18 @@
 
     typedef boost::shared_ptr<Data> DataPtr;
 
+    class DataManager {
+    public:
+        virtual ~DataManager() {}
+        virtual void modifyStart(DataPtr data) = 0;
+        virtual void modifyDone(DataPtr data) = 0;
+        virtual void destroy(DataPtr data) = 0;
+        
+    };
+
     struct DataImpl {
+        qpid::sys::Mutex lock;
+
         /**
          * Content of the object's data
          */
@@ -56,8 +67,15 @@
         uint64_t destroyTime;
         uint64_t lastUpdatedTime;
 
+        DataManager* manager;
+        DataPtr parent;
+
         DataImpl();
         DataImpl(SchemaClass* type, const qpid::messaging::Variant::Map&);
+        DataImpl(const DataImpl& from) :
+            values(from.values), subtypes(from.subtypes), objectClass(from.objectClass),
+            key(from.key), createTime(from.createTime), destroyTime(from.destroyTime),
+            lastUpdatedTime(from.lastUpdatedTime), manager(0) {}
         ~DataImpl() {}
 
         const qpid::messaging::Variant::Map& getValues() const { return values; }
@@ -67,15 +85,17 @@
         qpid::messaging::Variant::Map& getSubtypes() { return subtypes; }
 
         const SchemaClass* getSchema() const { return objectClass; }
-        void setSchema(SchemaClass* schema) { objectClass = schema; }
 
         const char* getKey() const { return key.c_str(); }
         void setKey(const char* _key) { key = _key; }
 
-        void touch();
+        void modifyStart();
+        void modifyDone();
         void destroy();
 
         qpid::messaging::Variant::Map asMap() const;
+        qpid::messaging::Variant::Map asMapDelta(Data& base) const;
+        void registerManager(DataManager* manager, DataPtr data);
     };
 }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org