You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/03/02 01:58:17 UTC
svn commit: r917825 - in /qpid/branches/qmf-devel0.7/qpid/cpp: include/qmf/
include/qmf/engine/ src/ src/qmf/ src/qmf/engine/
Author: tross
Date: Tue Mar 2 00:58:16 2010
New Revision: 917825
URL: http://svn.apache.org/viewvc?rev=917825&view=rev
Log:
Further implementation of the QMFv2 agent engine.
- deprecated old ObjectId class
- renamed Object to Data
- added hooks for authorization of get, subscribe, and method call
- added infrastructure for optional internal storage
Added:
qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Data.h
- copied, changed from r917292, qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Object.h
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.cpp
- copied, changed from r917292, qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.cpp
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.h
- copied, changed from r917292, qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.h
Removed:
qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Event.h
qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Object.h
qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/ObjectId.h
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/EventImpl.cpp
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/EventImpl.h
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectIdImpl.h
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.cpp
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.h
Modified:
qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Notifiable.h
qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h
qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Agent.h
qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Query.h
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf.mk
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.cpp
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.h
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/SchemaImpl.h
Modified: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Notifiable.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Notifiable.h?rev=917825&r1=917824&r2=917825&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Notifiable.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Notifiable.h Tue Mar 2 00:58:16 2010
@@ -40,7 +40,7 @@
*/
class Notifiable {
public:
- QMF_EXTERN virtual ~Notifiable();
+ QMF_EXTERN virtual ~Notifiable() {}
virtual void notify() = 0;
};
}
Modified: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h?rev=917825&r1=917824&r2=917825&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h Tue Mar 2 00:58:16 2010
@@ -71,6 +71,7 @@
/**
* QMF Op Codes
*/
+ const static std::string OP_EXCEPTION;
const static std::string OP_AGENT_LOCATE_REQUEST;
const static std::string OP_AGENT_LOCATE_RESPONSE;
const static std::string OP_AGENT_HEARTBEAT_INDICATION;
Modified: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Agent.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Agent.h?rev=917825&r1=917824&r2=917825&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Agent.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Agent.h Tue Mar 2 00:58:16 2010
@@ -22,9 +22,7 @@
#include <qmf/Notifiable.h>
#include <qmf/engine/Schema.h>
-#include <qmf/engine/ObjectId.h>
-#include <qmf/engine/Object.h>
-#include <qmf/engine/Event.h>
+#include <qmf/engine/Data.h>
#include <qmf/engine/Query.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Variant.h>
@@ -40,10 +38,13 @@
*/
struct AgentEvent {
enum EventKind {
- GET_QUERY = 1,
- START_SYNC = 2,
- END_SYNC = 3,
- METHOD_CALL = 4
+ GET_QUERY = 1,
+ START_SYNC = 2,
+ END_SYNC = 3,
+ METHOD_CALL = 4,
+ GET_AUTHORIZE = 5,
+ METHOD_AUTHORIZE = 6,
+ SYNC_AUTHORIZE = 7
};
EventKind kind;
@@ -52,7 +53,7 @@
char* authToken; // Authentication token if issued (for all kinds)
char* name; // Name of the method/sync query
// (METHOD_CALL, START_SYNC, END_SYNC)
- Object* object; // Object involved in method call (METHOD_CALL)
+ Data* object; // Object involved in method call (METHOD_CALL)
char* objectKey; // Object key for method call (METHOD_CALL)
Query* query; // Query parameters (GET_QUERY, START_SYNC)
qpid::messaging::Variant::Map* arguments; // Method parameters (METHOD_CALL)
@@ -128,6 +129,21 @@
void setConnection(qpid::messaging::Connection& conn);
/**
+ * Respond to an authorize request by allowing the requested action.
+ *@param sequence The sequence number from the authorization request event.
+ */
+ void authAllow(uint32_t sequence);
+
+ /**
+ * Respond to an authorize request by denying the requested action.
+ *@param sequence The sequence number from the authorization request event.
+ *@param exception Value (typically a string) describing the reason for the
+ * rejection of authorization.
+ */
+ void authDeny(uint32_t sequence, const Data& exception=Data());
+ void authDeny(uint32_t sequence, const char* error);
+
+ /**
* Respond to a method request.
*@param sequence The sequence number from the method request event.
*@param status The method's completion status.
@@ -143,7 +159,7 @@
*@param sequence The sequence number of the GET request or the SYNC_START request.
*@param object The object (annotated with "changed" flags) for publication.
*/
- void queryResponse(uint32_t sequence, Object& object);
+ void queryResponse(uint32_t sequence, Data& object);
/**
* Indicate the completion of a query. This is not used for SYNC_START requests.
@@ -165,13 +181,13 @@
* left null, the agent will create a unique name for the object.
*@return The key for the managed object.
*/
- const char* addObject(Object& obj, const char* key=0);
+ const char* addObject(Data& obj, const char* key=0);
/**
* Raise an event into the QMF network..
*@param event The event object for the event to be raised.
*/
- void raiseEvent(Event& event);
+ void raiseEvent(Data& event);
private:
AgentImpl* impl;
Copied: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Data.h (from r917292, qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Object.h)
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Data.h?p2=qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Data.h&p1=qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Object.h&r1=917292&r2=917825&rev=917825&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Object.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Data.h Tue Mar 2 00:58:16 2010
@@ -1,5 +1,5 @@
-#ifndef _QmfEngineObject_
-#define _QmfEngineObject_
+#ifndef _QmfEngineData_
+#define _QmfEngineData_
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -21,23 +21,25 @@
*/
#include <qmf/engine/Schema.h>
-#include <qmf/engine/ObjectId.h>
#include <qpid/messaging/Variant.h>
namespace qmf {
namespace engine {
- struct ObjectImpl;
- class Object {
+ struct DataImpl;
+ class Data {
public:
- Object();
- Object(SchemaClass* type);
- Object(const Object& from);
- virtual ~Object();
+ Data();
+ Data(SchemaClass* type, const qpid::messaging::Variant::Map& values=qpid::messaging::Variant::Map());
+ Data(const Data& from);
+ virtual ~Data();
const qpid::messaging::Variant::Map& getValues() const;
qpid::messaging::Variant::Map& getValues();
+ const qpid::messaging::Variant::Map& getSubtypes() const;
+ qpid::messaging::Variant::Map& getSubtypes();
+
const SchemaClass* getSchema() const;
void setSchema(SchemaClass* schema);
@@ -47,10 +49,12 @@
void touch();
void destroy();
+ qpid::messaging::Variant::Map asMap() const;
+
private:
- friend struct ObjectImpl;
+ friend struct DataImpl;
friend class AgentImpl;
- ObjectImpl* impl;
+ DataImpl* impl;
};
}
}
Modified: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Query.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Query.h?rev=917825&r1=917824&r2=917825&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Query.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Query.h Tue Mar 2 00:58:16 2010
@@ -25,7 +25,6 @@
namespace qmf {
namespace engine {
- class Object;
class QueryImpl;
class Query {
@@ -49,11 +48,12 @@
const char* getOrderBy() const;
bool getDecreasing() const;
- bool matches(const Object& object) const;
+ bool matches(const qpid::messaging::Variant::Map& data) const;
private:
friend struct QueryImpl;
friend struct BrokerProxyImpl;
+ Query(QueryImpl*);
QueryImpl* impl;
};
}
Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf.mk
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf.mk?rev=917825&r1=917824&r2=917825&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf.mk (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf.mk Tue Mar 2 00:58:16 2010
@@ -40,16 +40,13 @@
QMF_ENGINE_API = \
../include/qmf/engine/Agent.h \
../include/qmf/engine/Console.h \
- ../include/qmf/engine/Event.h \
- ../include/qmf/engine/Object.h \
+ ../include/qmf/engine/Data.h \
../include/qmf/engine/QmfEngineImportExport.h \
../include/qmf/engine/Query.h \
../include/qmf/engine/Schema.h \
../include/qmf/Agent.h \
../include/qmf/Notifiable.h
-# ../include/qmf/engine/ObjectId.h
-
# Public header files
nobase_include_HEADERS += \
$(QMF_API) \
@@ -65,8 +62,8 @@
libqmfengine_la_SOURCES = \
$(QMF_ENGINE_API) \
qmf/engine/Agent.cpp \
- qmf/engine/ObjectImpl.cpp \
- qmf/engine/ObjectImpl.h \
+ qmf/engine/DataImpl.cpp \
+ qmf/engine/DataImpl.h \
qmf/Protocol.cpp \
qmf/Protocol.h \
qmf/engine/QueryImpl.cpp \
@@ -78,8 +75,6 @@
# qmf/engine/BrokerProxyImpl.h
# qmf/engine/ConsoleImpl.cpp
# qmf/engine/ConsoleImpl.h
-# qmf/engine/ObjectIdImpl.cpp
-# qmf/engine/ObjectIdImpl.h
# qmf/engine/SequenceManager.cpp
# qmf/engine/SequenceManager.h
Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp?rev=917825&r1=917824&r2=917825&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp Tue Mar 2 00:58:16 2010
@@ -48,6 +48,7 @@
const string Protocol::APP_OPCODE("qmf.opcode");
+const string Protocol::OP_EXCEPTION("_exception");
const string Protocol::OP_AGENT_LOCATE_REQUEST("_agent_locate_request");
const string Protocol::OP_AGENT_LOCATE_RESPONSE("_agent_locate_response");
const string Protocol::OP_AGENT_HEARTBEAT_INDICATION("_agent_heartbeat_indication");
Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp?rev=917825&r1=917824&r2=917825&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp Tue Mar 2 00:58:16 2010
@@ -19,10 +19,11 @@
#include "qmf/engine/Agent.h"
#include "qmf/engine/SchemaImpl.h"
-#include "qmf/engine/ObjectImpl.h"
+#include "qmf/engine/Data.h"
#include "qmf/engine/QueryImpl.h"
#include "qmf/Protocol.h"
#include <qpid/sys/Mutex.h>
+#include <qpid/sys/Condition.h>
#include <qpid/log/Statement.h>
#include <qpid/sys/Time.h>
#include <qpid/sys/Thread.h>
@@ -30,8 +31,11 @@
#include <qpid/messaging/Session.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
+#include <qpid/messaging/Address.h>
#include <qpid/messaging/Message.h>
+#include <qpid/messaging/MapContent.h>
#include <qpid/messaging/MapView.h>
+#include <qpid/messaging/ListView.h>
#include <string>
#include <deque>
#include <map>
@@ -48,6 +52,8 @@
namespace qmf {
namespace engine {
+ class AgentImpl;
+
struct AgentEventImpl {
typedef boost::shared_ptr<AgentEventImpl> Ptr;
AgentEvent::EventKind kind;
@@ -55,7 +61,7 @@
string authUserId;
string authToken;
string name;
- Object* object;
+ Data* object;
string objectKey;
boost::shared_ptr<Query> query;
boost::shared_ptr<Variant::Map> arguments;
@@ -68,15 +74,31 @@
};
/**
- * AgentQueryContext is used to track asynchronous requests (Query, Sync, or Method)
+ * AsyncContext is used to track asynchronous requests (Query, Sync, or Method)
* sent up to the application.
*/
- struct AgentQueryContext {
- typedef boost::shared_ptr<AgentQueryContext> Ptr;
- uint32_t sequence;
- string consoleAddr;
+ struct AsyncContext {
+ typedef boost::shared_ptr<AsyncContext> Ptr;
+ string correlationId;
+ Address replyTo;
+ AgentEventImpl::Ptr authorizedEvent;
const SchemaMethod* schemaMethod;
- AgentQueryContext() : schemaMethod(0) {}
+ AsyncContext(const string& cid, const Address& rt) : correlationId(cid), replyTo(rt), schemaMethod(0) {}
+ };
+
+ class StoreThread : public boost::noncopyable, public qpid::sys::Runnable {
+ public:
+ StoreThread(AgentImpl& a) : agent(a), running(true), thread(*this) {}
+ ~StoreThread() {
+ stop();
+ }
+ void run();
+ void stop();
+
+ private:
+ AgentImpl& agent;
+ bool running;
+ qpid::sys::Thread thread;
};
class AgentImpl : public boost::noncopyable, public qpid::sys::Runnable {
@@ -92,24 +114,34 @@
bool getEvent(AgentEvent& event) const;
void popEvent();
void setConnection(Connection& conn);
+ void authAllow(uint32_t sequence);
+ void authDeny(uint32_t sequence, const Data&);
+ void authDeny(uint32_t sequence, const string&);
void methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& arguments);
- void queryResponse(uint32_t sequence, Object& object);
+ void queryResponse(uint32_t sequence, Data& object);
void queryComplete(uint32_t sequence);
void registerClass(SchemaClass* cls);
- const char* addObject(Object& obj, const char* key);
- void raiseEvent(Event& event);
+ const char* addObject(Data& obj, const char* key);
+ void raiseEvent(Data& event);
void run();
void stop();
+ // This blocking call is used by the internal store thread(s) to get work to do.
+ AgentEventImpl::Ptr nextInternalEvent();
+ void signalInternal() { cond.notify(); }
+
private:
mutable Mutex lock;
- Mutex addLock;
+ Condition cond;
const string vendor;
const string product;
const string name;
const string domain;
string directAddr;
+ string directAddrParams;
+ string topicAddr;
+ string topicAddrParams;
Variant::Map attrMap;
string storeDir;
string transferDir;
@@ -121,13 +153,15 @@
uint32_t nextContextNum;
bool running;
deque<AgentEventImpl::Ptr> eventQueue;
- map<uint32_t, AgentQueryContext::Ptr> contextMap;
+ deque<AgentEventImpl::Ptr> internalEventQueue;
+ map<uint32_t, AsyncContext::Ptr> contextMap;
Connection connection;
Session session;
Receiver directReceiver;
Receiver topicReceiver;
Sender sender;
qpid::sys::Thread* thread;
+ StoreThread* storeThread;
struct AgentClassKey {
string name;
@@ -169,6 +203,8 @@
void handleSubscribeCancel(const Message& message);
void handleSubscribeRefresh(const Message& message);
void handleMethodRequest(const Message& message);
+ void sendResponse(const Message& request, const string& opcode, const Data& data);
+ void sendResponse(const Address& address, const string& correlationId, const string& opcode, const Data& data);
void sendPackageIndicationLH(const string& packageName);
void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key);
@@ -207,18 +243,38 @@
return item;
}
+void StoreThread::run()
+{
+ while (running) {
+ AgentEventImpl::Ptr ptr(agent.nextInternalEvent());
+ }
+}
+
+void StoreThread::stop()
+{
+ running = false;
+ agent.signalInternal();
+}
+
AgentImpl::AgentImpl(const char* _v, const char* _p, const char* _n, const char* _d, bool _i) :
vendor(_v), product(_p), name(_n), domain(_d ? _d : "default"), internalStore(_i),
notifyHandler(0), notifiable(0),
bootSequence(1), nextContextNum(1), running(true), thread(0)
{
directAddr = "qmf." + domain + ".direct/" + vendor + ":" + product + ":" + name;
- if (_d == 0) {
- directAddr += " { create:always }";
+ topicAddr = "qmf." + domain + ".topic/console.ind.#";
+ if (_d != 0) {
+ directAddrParams = " {create: always, type: topic, x-properties: {type: direct}}";
+ topicAddrParams = " {create: always, type: topic, x-properties: {type: topic}}";
+ }
+ attrMap["_vendor"] = vendor;
+ attrMap["_product"] = product;
+ attrMap["_instance"] = name;
+ attrMap["_name"] = vendor + ":" + product + ":" + name;
+
+ if (internalStore) {
+ storeThread = new StoreThread(*this);
}
- attrMap["vendor"] = vendor;
- attrMap["product"] = product;
- attrMap["name"] = name;
}
@@ -297,42 +353,93 @@
thread = new qpid::sys::Thread(*this);
}
+void AgentImpl::authAllow(uint32_t sequence)
+{
+ Mutex::ScopedLock _lock(lock);
+
+ // Find the context associated with the sequence number
+ map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
+ if (iter == contextMap.end())
+ return;
+ AsyncContext::Ptr context = iter->second;
+
+ // Transform the authorize event into the real event
+ switch (context->authorizedEvent->kind) {
+ case AgentEvent::GET_AUTHORIZE : context->authorizedEvent->kind = AgentEvent::GET_QUERY; break;
+ case AgentEvent::METHOD_AUTHORIZE : context->authorizedEvent->kind = AgentEvent::METHOD_CALL; break;
+ case AgentEvent::SYNC_AUTHORIZE : context->authorizedEvent->kind = AgentEvent::START_SYNC; break;
+ default:
+ contextMap.erase(iter);
+ return;
+ }
+
+ // Re-issue the now-authorized action. If this is a data query (get or subscribe),
+ // and the agent is handling storage internally, redirect to the internal event
+ // queue for processing by the internal-storage thread.
+ if (internalStore) {
+ internalEventQueue.push_back(context->authorizedEvent);
+ cond.notify();
+ } else {
+ eventQueue.push_back(context->authorizedEvent);
+ notify();
+ }
+}
+
+void AgentImpl::authDeny(uint32_t sequence, const Data& exception)
+{
+ Mutex::ScopedLock _lock(lock);
+ map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
+ if (iter == contextMap.end())
+ return;
+ AsyncContext::Ptr context = iter->second;
+ contextMap.erase(iter);
+
+ // Return an exception message to the requestor
+ sendResponse(context->replyTo, context->correlationId, Protocol::OP_EXCEPTION, exception);
+}
+
+void AgentImpl::authDeny(uint32_t sequence, const string& error)
+{
+ Data exception;
+ exception.getValues()["status"] = "Access to this Operation Denied";
+ exception.getValues()["text"] = error;
+ authDeny(sequence, exception);
+}
+
void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& /*argMap*/)
{
Mutex::ScopedLock _lock(lock);
- map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
+ map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
if (iter == contextMap.end())
return;
- AgentQueryContext::Ptr context = iter->second;
+ AsyncContext::Ptr context = iter->second;
contextMap.erase(iter);
// TODO: Encode method response
- QPID_LOG(trace, "SENT MethodResponse seq=" << context->sequence << " status=" << status << " text=" << text);
+ QPID_LOG(trace, "SENT MethodResponse corr=" << context->correlationId << " status=" << status << " text=" << text);
}
-void AgentImpl::queryResponse(uint32_t sequence, Object&)
+void AgentImpl::queryResponse(uint32_t sequence, Data&)
{
Mutex::ScopedLock _lock(lock);
- map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
+ map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
if (iter == contextMap.end())
return;
- AgentQueryContext::Ptr context = iter->second;
+ AsyncContext::Ptr context = iter->second;
// TODO: accumulate data records and send response messages when we have "enough"
-
- QPID_LOG(trace, "SENT ContentIndication seq=" << context->sequence);
}
void AgentImpl::queryComplete(uint32_t sequence)
{
Mutex::ScopedLock _lock(lock);
- map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
+ map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
if (iter == contextMap.end())
return;
// TODO: send a response message if there are any unsent data records
- AgentQueryContext::Ptr context = iter->second;
+ AsyncContext::Ptr context = iter->second;
contextMap.erase(iter);
//sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK");
}
@@ -354,13 +461,13 @@
// TODO: Indicate this schema if connected.
}
-const char* AgentImpl::addObject(Object&, const char*)
+const char* AgentImpl::addObject(Data&, const char*)
{
Mutex::ScopedLock _lock(lock);
return 0;
}
-void AgentImpl::raiseEvent(Event&)
+void AgentImpl::raiseEvent(Data&)
{
Mutex::ScopedLock _lock(lock);
}
@@ -370,9 +477,14 @@
qpid::sys::Duration duration = qpid::sys::TIME_MSEC * 500;
session = connection.newSession();
- directReceiver = session.createReceiver(directAddr);
+ QPID_LOG(trace, "Creating direct receiver to address: " << directAddr << directAddrParams);
+ directReceiver = session.createReceiver(directAddr + directAddrParams);
directReceiver.setCapacity(10);
+ QPID_LOG(trace, "Creating topic receiver to address: " << topicAddr << topicAddrParams);
+ topicReceiver = session.createReceiver(topicAddr + topicAddrParams);
+ topicReceiver.setCapacity(10);
+
Mutex::ScopedLock _lock(lock);
while (running) {
Receiver rcvr;
@@ -398,12 +510,29 @@
running = false;
}
+AgentEventImpl::Ptr AgentImpl::nextInternalEvent()
+{
+ Mutex::ScopedLock _lock(lock);
+ while (internalEventQueue.empty())
+ cond.wait(lock);
+
+ AgentEventImpl::Ptr event(internalEventQueue.front());
+ internalEventQueue.pop_front();
+ return event;
+
+ // TODO: make sure this function returns with a null pointer when the thread needs to stop.
+}
+
+
void AgentImpl::handleRcvMessageLH(const Message& message)
{
Variant::Map headers(message.getHeaders());
- cout << "AgentImpl::handleRcvMessageLH headers=" << headers << endl;
+ cout << "AgentImpl::handleRcvMessageLH contentType=" << message.getContentType() <<
+ " replyTo=" << message.getReplyTo() <<
+ " headers=" << headers << endl;
- if (message.getContentType() != Protocol::AMQP_CONTENT_MAP)
+ if (message.getContentType() != Protocol::AMQP_CONTENT_MAP &&
+ message.getContentType() != Protocol::AMQP_CONTENT_LIST)
return;
Variant::Map::const_iterator iter = headers.find(Protocol::APP_OPCODE);
@@ -421,16 +550,33 @@
void AgentImpl::handleAgentLocateLH(const Message& message)
{
- const MapView predicate(message);
-
- //if (predicateMatches(predicate, attrMap)) {
- // sendResponse(message, Protocol::OP_AGENT_LOCATE_RESPONSE, attrMap);
- //}
+ QPID_LOG(trace, "RCVD AgentLocateRequest replyTo=" << message.getReplyTo());
+ auto_ptr<Query> query(QueryImpl::factory(ListView(message)));
+ if (query->matches(attrMap)) {
+ Data data(0, attrMap);
+ sendResponse(message, Protocol::OP_AGENT_LOCATE_RESPONSE, data);
+ QPID_LOG(trace, "SENT AgentLocateResponse");
+ }
}
void AgentImpl::handleQueryRequestLH(const Message& message)
{
- const MapView map(message);
+ uint32_t contextNum = nextContextNum++;
+ AsyncContext::Ptr context(new AsyncContext(message.getCorrelationId(), message.getReplyTo()));
+ contextMap[contextNum] = context;
+
+ // Build the event for the get request
+ AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_AUTHORIZE));
+ event->sequence = contextNum;
+ event->authUserId = message.getUserId();
+ event->query.reset(QueryImpl::factory(MapView(message)));
+
+ // Put the not-yet-authorized event into the context for possible later use
+ context->authorizedEvent = event;
+
+ // Enqueue the event
+ eventQueue.push_back(event);
+ notify();
}
void AgentImpl::handleSubscribeRequest(const Message& message)
@@ -453,6 +599,22 @@
const MapView map(message);
}
+void AgentImpl::sendResponse(const Message& request, const string& opcode, const Data& data)
+{
+ sendResponse(request.getReplyTo(), request.getCorrelationId(), opcode, data);
+}
+
+void AgentImpl::sendResponse(const Address& address, const string& correlationId, const string& opcode, const Data& data)
+{
+ Message message;
+ MapContent content(message, data.asMap());
+
+ message.setCorrelationId(correlationId);
+ message.getHeaders()[Protocol::APP_OPCODE] = opcode;
+ content.encode();
+ session.createSender(address).send(message);
+}
+
AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string&, const string&, const string& key)
{
AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY));
@@ -528,14 +690,8 @@
Mutex::ScopedLock _lock(lock);
}
-void AgentImpl::handleMethodRequest(Message& /*msg*/, uint32_t sequence, const string& /*replyTo*/, const string& /*userId*/)
+void AgentImpl::handleMethodRequest(Message& /*msg*/, uint32_t, const string& /*replyTo*/, const string& /*userId*/)
{
- Mutex::ScopedLock _lock(lock);
- QPID_LOG(trace, "RCVD MethodRequest seq=" << sequence << " method=");
-
- AgentQueryContext::Ptr context(new AgentQueryContext);
- uint32_t contextNum = nextContextNum++;
- contextMap[contextNum] = context;
}
//==================================================================
@@ -552,10 +708,13 @@
bool Agent::getEvent(AgentEvent& event) const { return impl->getEvent(event); }
void Agent::popEvent() { impl->popEvent(); }
void Agent::setConnection(Connection& conn) { impl->setConnection(conn); }
+void Agent::authAllow(uint32_t sequence) { impl->authAllow(sequence); }
+void Agent::authDeny(uint32_t sequence, const Data& ex) { impl->authDeny(sequence, ex); }
+void Agent::authDeny(uint32_t sequence, const char* ex) { impl->authDeny(sequence, string(ex)); }
void Agent::methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& arguments) { impl->methodResponse(sequence, status, text, arguments); }
-void Agent::queryResponse(uint32_t sequence, Object& object) { impl->queryResponse(sequence, object); }
+void Agent::queryResponse(uint32_t sequence, Data& object) { impl->queryResponse(sequence, object); }
void Agent::queryComplete(uint32_t sequence) { impl->queryComplete(sequence); }
void Agent::registerClass(SchemaClass* cls) { impl->registerClass(cls); }
-const char* Agent::addObject(Object& obj, const char* key) { return impl->addObject(obj, key); }
-void Agent::raiseEvent(Event& event) { impl->raiseEvent(event); }
+const char* Agent::addObject(Data& obj, const char* key) { return impl->addObject(obj, key); }
+void Agent::raiseEvent(Data& event) { impl->raiseEvent(event); }
Copied: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.cpp (from r917292, qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.cpp)
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.cpp?p2=qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.cpp&p1=qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.cpp&r1=917292&r2=917825&rev=917825&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.cpp Tue Mar 2 00:58:16 2010
@@ -17,7 +17,8 @@
* under the License.
*/
-#include "qmf/engine/ObjectImpl.h"
+#include "qmf/Protocol.h"
+#include "qmf/engine/DataImpl.h"
#include <qpid/sys/Time.h>
using namespace std;
@@ -25,43 +26,59 @@
using namespace qpid::sys;
using namespace qpid::messaging;
-ObjectImpl::ObjectImpl() :
+DataImpl::DataImpl() :
objectClass(0), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime)
{
}
-ObjectImpl::ObjectImpl(SchemaClass* type) :
- objectClass(type), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime)
+DataImpl::DataImpl(SchemaClass* type, const qpid::messaging::Variant::Map& v) :
+ values(v), objectClass(type), createTime(uint64_t(Duration(now()))),
+ destroyTime(0), lastUpdatedTime(createTime)
{
}
-void ObjectImpl::touch()
+void DataImpl::touch()
{
lastUpdatedTime = uint64_t(Duration(now()));
}
-void ObjectImpl::destroy()
+void DataImpl::destroy()
{
destroyTime = uint64_t(Duration(now()));
}
+Variant::Map DataImpl::asMap() const
+{
+ Variant::Map map;
+
+ map[Protocol::VALUES] = values;
+ if (!subtypes.empty())
+ map[Protocol::SUBTYPES] = subtypes;
+ // TODO: Add key, schema, and lifecycle data
+
+ return map;
+}
+
//==================================================================
// Wrappers
//==================================================================
-Object::Object() : impl(new ObjectImpl()) {}
-Object::Object(SchemaClass* type) : impl(new ObjectImpl(type)) {}
-Object::Object(const Object& from) : impl(new ObjectImpl(*(from.impl))) {}
-Object::~Object() { delete impl; }
-const Variant::Map& Object::getValues() const { return impl->getValues(); }
-Variant::Map& Object::getValues() { return impl->getValues(); }
-const SchemaClass* Object::getSchema() const { return impl->getSchema(); }
-void Object::setSchema(SchemaClass* schema) { impl->setSchema(schema); }
-const char* Object::getKey() const { return impl->getKey(); }
-void Object::setKey(const char* key) { impl->setKey(key); }
-void Object::touch() { impl->touch(); }
-void Object::destroy() { impl->destroy(); }
+Data::Data() : impl(new DataImpl()) {}
+Data::Data(SchemaClass* type, const Variant::Map& m) : impl(new DataImpl(type, m)) {}
+Data::Data(const Data& from) : impl(new DataImpl(*(from.impl))) {}
+Data::~Data() { delete impl; }
+const Variant::Map& Data::getValues() const { return impl->getValues(); }
+Variant::Map& Data::getValues() { return impl->getValues(); }
+const Variant::Map& Data::getSubtypes() const { return impl->getSubtypes(); }
+Variant::Map& Data::getSubtypes() { return impl->getSubtypes(); }
+const SchemaClass* Data::getSchema() const { return impl->getSchema(); }
+void Data::setSchema(SchemaClass* schema) { impl->setSchema(schema); }
+const char* Data::getKey() const { return impl->getKey(); }
+void Data::setKey(const char* key) { impl->setKey(key); }
+void Data::touch() { impl->touch(); }
+void Data::destroy() { impl->destroy(); }
+Variant::Map Data::asMap() const { return impl->asMap(); }
Copied: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.h (from r917292, qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.h)
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.h?p2=qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.h&p1=qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.h&r1=917292&r2=917825&rev=917825&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/DataImpl.h Tue Mar 2 00:58:16 2010
@@ -1,5 +1,5 @@
-#ifndef _QmfEngineObjectImpl_
-#define _QmfEngineObjectImpl_
+#ifndef _QmfEngineDataImpl_
+#define _QmfEngineDataImpl_
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,7 +20,7 @@
* under the License.
*/
-#include <qmf/engine/Object.h>
+#include <qmf/engine/Data.h>
#include <qpid/sys/Mutex.h>
#include <qpid/messaging/Variant.h>
#include <map>
@@ -33,13 +33,14 @@
class SchemaClass;
- typedef boost::shared_ptr<Object> ObjectPtr;
+ typedef boost::shared_ptr<Data> DataPtr;
- struct ObjectImpl {
+ struct DataImpl {
/**
* Content of the object's data
*/
qpid::messaging::Variant::Map values;
+ qpid::messaging::Variant::Map subtypes;
/**
* Schema reference if this object is "described"
@@ -55,13 +56,16 @@
uint64_t destroyTime;
uint64_t lastUpdatedTime;
- ObjectImpl();
- ObjectImpl(SchemaClass* type);
- ~ObjectImpl() {}
+ DataImpl();
+ DataImpl(SchemaClass* type, const qpid::messaging::Variant::Map&);
+ ~DataImpl() {}
const qpid::messaging::Variant::Map& getValues() const { return values; }
qpid::messaging::Variant::Map& getValues() { return values; }
+ const qpid::messaging::Variant::Map& getSubtypes() const { return subtypes; }
+ qpid::messaging::Variant::Map& getSubtypes() { return subtypes; }
+
const SchemaClass* getSchema() const { return objectClass; }
void setSchema(SchemaClass* schema) { objectClass = schema; }
@@ -70,6 +74,8 @@
void touch();
void destroy();
+
+ qpid::messaging::Variant::Map asMap() const;
};
}
}
Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.cpp?rev=917825&r1=917824&r2=917825&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.cpp Tue Mar 2 00:58:16 2010
@@ -23,7 +23,17 @@
using namespace qmf::engine;
using namespace qpid::messaging;
-bool QueryImpl::matches(const Object&) const
+QueryImpl::QueryImpl(const qpid::messaging::MapView&)
+{
+ // TODO
+}
+
+QueryImpl::QueryImpl(const qpid::messaging::ListView&)
+{
+ //TODO
+}
+
+bool QueryImpl::matches(const Variant::Map&) const
{
return true;
}
@@ -34,6 +44,17 @@
predicate.clear();
}
+Query* QueryImpl::factory(const qpid::messaging::MapView& map)
+{
+ QueryImpl* impl(new QueryImpl(map));
+ return new Query(impl);
+}
+
+Query* QueryImpl::factory(const qpid::messaging::ListView& pred)
+{
+ QueryImpl* impl(new QueryImpl(pred));
+ return new Query(impl);
+}
//==================================================================
// Wrappers
@@ -43,6 +64,7 @@
Query::Query(const char* target, const Variant::List& predicate) : impl(new QueryImpl(target, predicate)) {}
Query::Query(const char* target, const char* expression) : impl(new QueryImpl(target, expression)) {}
Query::Query(const Query& from) : impl(new QueryImpl(*(from.impl))) {}
+Query::Query(QueryImpl* i) : impl(i) {}
Query::~Query() { delete impl; }
void Query::where(const Variant::List& predicate) { impl->where(predicate); }
void Query::where(const char* expression) { impl->where(expression); }
@@ -55,5 +77,5 @@
uint32_t Query::getLimit() const { return impl->getLimit(); }
const char* Query::getOrderBy() const { return impl->getOrderBy(); }
bool Query::getDecreasing() const { return impl->getDecreasing(); }
-bool Query::matches(const Object& object) const { return impl->matches(object); }
+bool Query::matches(const Variant::Map& data) const { return impl->matches(data); }
Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.h?rev=917825&r1=917824&r2=917825&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.h Tue Mar 2 00:58:16 2010
@@ -22,6 +22,8 @@
#include "qmf/engine/Query.h"
#include <qpid/messaging/Variant.h>
+#include <qpid/messaging/MapView.h>
+#include <qpid/messaging/ListView.h>
#include <string>
#include <boost/shared_ptr.hpp>
@@ -34,8 +36,13 @@
target(_target), predicate(_predicate), resultLimit(0) {}
QueryImpl(const char* _target, const char* expression) :
target(_target), resultLimit(0) { parsePredicate(expression); }
+ QueryImpl(const qpid::messaging::MapView& map);
+ QueryImpl(const qpid::messaging::ListView& pred);
~QueryImpl() {}
+ static Query* factory(const qpid::messaging::MapView& map);
+ static Query* factory(const qpid::messaging::ListView& pred);
+
void where(const qpid::messaging::Variant::List& _predicate) { predicate = _predicate; }
void where(const char* expression) { parsePredicate(expression); }
void limit(uint32_t maxResults) { resultLimit = maxResults; }
@@ -48,7 +55,7 @@
uint32_t getLimit() const { return resultLimit; }
const char* getOrderBy() const { return sortAttr.c_str(); }
bool getDecreasing() const { return orderDecreasing; }
- bool matches(const Object& object) const;
+ bool matches(const qpid::messaging::Variant::Map& data) const;
void parsePredicate(const std::string& expression);
Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/SchemaImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/SchemaImpl.h?rev=917825&r1=917824&r2=917825&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/SchemaImpl.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/SchemaImpl.h Tue Mar 2 00:58:16 2010
@@ -41,7 +41,7 @@
SchemaException(const std::string& context, const std::string& expected) {
text = context + ": Expected item with key " + expected;
}
- virtual ~SchemaException() throw();
+ virtual ~SchemaException() throw() {}
virtual const char* what() const throw() { return text.c_str(); }
private:
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org