You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/02/19 17:06:23 UTC
svn commit: r911854 [1/2] - in /qpid/branches/qmf-devel0.7/qpid/cpp:
include/qmf/ include/qmf/engine/ src/ src/qmf/ src/qmf/engine/
Author: tross
Date: Fri Feb 19 16:06:22 2010
New Revision: 911854
URL: http://svn.apache.org/viewvc?rev=911854&view=rev
Log:
Checking in work-in-progress for the qmf branch.
This branch will not compile until further changes are checked in.
Added:
qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp
Removed:
qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Connection.h
qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/ConnectionSettings.h
qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/ConnectionSettings.h
qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Message.h
qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/ResilientConnection.h
qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Typecode.h
qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Value.h
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.h
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/MessageImpl.cpp
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/MessageImpl.h
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Protocol.cpp
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Protocol.h
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ResilientConnection.cpp
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ValueImpl.cpp
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ValueImpl.h
Modified:
qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Agent.h
qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Console.h
qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Object.h
qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Query.h
qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Schema.h
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf.mk
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ConsoleImpl.h
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.cpp
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.h
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.cpp
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.h
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/SchemaImpl.cpp
qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/SchemaImpl.h
Added: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h?rev=911854&view=auto
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h (added)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/Protocol.h Fri Feb 19 16:06:22 2010
@@ -0,0 +1,88 @@
+#ifndef _QmfProtocol_
+#define _QmfProtocol_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/sys/IntegerTypes.h>
+#include <string>
+
+namespace qpid {
+ namespace messaging {
+ class Message;
+ }
+}
+
+namespace qmf {
+
+ class Protocol {
+ public:
+ //static bool checkHeader(const qpid::messaging::Message& msg, std::string& opcode, uint32_t *seq);
+ //static void encodeHeader(qpid::messaging::Message& msg, const std::string& opcode, uint32_t seq = 0);
+
+ const static std::string SCHEMA_ELT_NAME;
+ const static std::string SCHEMA_ELT_TYPE;
+ const static std::string SCHEMA_ELT_DIR;
+ const static std::string SCHEMA_ELT_UNIT;
+ const static std::string SCHEMA_ELT_DESC;
+ const static std::string SCHEMA_ELT_ACCESS;
+ const static std::string SCHEMA_ELT_OPTIONAL;
+ const static std::string SCHEMA_ARGS;
+ const static std::string SCHEMA_PACKAGE;
+ const static std::string SCHEMA_CLASS_KIND;
+ const static std::string SCHEMA_CLASS_KIND_DATA;
+ const static std::string SCHEMA_CLASS_KIND_EVENT;
+ const static std::string SCHEMA_CLASS;
+ const static std::string SCHEMA_HASH;
+ const static std::string AGENT_NAME;
+ const static std::string OBJECT_NAME;
+ const static std::string SCHEMA_ID;
+
+ /*
+ const static uint8_t OP_ATTACH_REQUEST = 'A';
+ const static uint8_t OP_ATTACH_RESPONSE = 'a';
+
+ const static uint8_t OP_BROKER_REQUEST = 'B';
+ const static uint8_t OP_BROKER_RESPONSE = 'b';
+
+ const static uint8_t OP_CONSOLE_ADDED_INDICATION = 'x';
+ const static uint8_t OP_COMMAND_COMPLETE = 'z';
+ const static uint8_t OP_HEARTBEAT_INDICATION = 'h';
+
+ const static uint8_t OP_PACKAGE_REQUEST = 'P';
+ const static uint8_t OP_PACKAGE_INDICATION = 'p';
+ const static uint8_t OP_CLASS_QUERY = 'Q';
+ const static uint8_t OP_CLASS_INDICATION = 'q';
+ const static uint8_t OP_SCHEMA_REQUEST = 'S';
+ const static uint8_t OP_SCHEMA_RESPONSE = 's';
+
+ const static uint8_t OP_METHOD_REQUEST = 'M';
+ const static uint8_t OP_METHOD_RESPONSE = 'm';
+ const static uint8_t OP_GET_QUERY = 'G';
+ const static uint8_t OP_OBJECT_INDICATION = 'g';
+ const static uint8_t OP_PROPERTY_INDICATION = 'c';
+ const static uint8_t OP_STATISTIC_INDICATION = 'i';
+ const static uint8_t OP_EVENT_INDICATION = 'e';
+ */
+ };
+
+}
+
+#endif
+
Modified: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Agent.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Agent.h?rev=911854&r1=911853&r2=911854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Agent.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Agent.h Fri Feb 19 16:06:22 2010
@@ -25,8 +25,8 @@
#include <qmf/engine/Object.h>
#include <qmf/engine/Event.h>
#include <qmf/engine/Query.h>
-#include <qmf/engine/Value.h>
-#include <qmf/engine/Message.h>
+#include <qpid/messaging/Connection.h>
+#include <qpid/messaging/Variant.h>
namespace qmf {
namespace engine {
@@ -42,12 +42,7 @@
GET_QUERY = 1,
START_SYNC = 2,
END_SYNC = 3,
- METHOD_CALL = 4,
- DECLARE_QUEUE = 5,
- DELETE_QUEUE = 6,
- BIND = 7,
- UNBIND = 8,
- SETUP_COMPLETE = 9
+ METHOD_CALL = 4
};
EventKind kind;
@@ -55,13 +50,11 @@
char* authUserId; // Authenticated user ID (for all kinds)
char* authToken; // Authentication token if issued (for all kinds)
char* name; // Name of the method/sync query
- // (METHOD_CALL, START_SYNC, END_SYNC, DECLARE_QUEUE, BIND, UNBIND)
+ // (METHOD_CALL, START_SYNC, END_SYNC)
Object* object; // Object involved in method call (METHOD_CALL)
- ObjectId* objectId; // ObjectId for method call (METHOD_CALL)
+ char* objectKey; // Object key for method call (METHOD_CALL)
Query* query; // Query parameters (GET_QUERY, START_SYNC)
- Value* arguments; // Method parameters (METHOD_CALL)
- char* exchange; // Exchange for bind (BIND, UNBIND)
- char* bindingKey; // Key for bind (BIND, UNBIND)
+ qpid::messaging::Variant::Map* arguments; // Method parameters (METHOD_CALL)
const SchemaObjectClass* objectClass; // (METHOD_CALL)
};
@@ -72,10 +65,17 @@
*/
class Agent {
public:
- Agent(char* label, bool internalStore=true);
+ Agent(const char* vendor, const char* product, const char* name, const char* domain=0, bool internalStore=true);
~Agent();
/**
+ * Set an agent attribute that can be used to describe this agent to consoles.
+ *@param key Null-terminated string that is the name of the attribute.
+ *@param value Variant value (or any API type) of the attribute.
+ */
+ void setAttr(const char* key, const qpid::messaging::Variant& value);
+
+ /**
* Configure the directory path for storing persistent data.
*@param path Null-terminated string containing a directory path where files can be
* created, written, and read. If NULL, no persistent storage will be
@@ -92,24 +92,6 @@
void setTransferDir(const char* path);
/**
- * Pass messages received from the AMQP session to the Agent engine.
- *@param message AMQP messages received on the agent session.
- */
- void handleRcvMessage(Message& message);
-
- /**
- * Get the next message to be sent to the AMQP network.
- *@param item The Message structure describing the message to be produced.
- *@return true if the Message is valid, false if there are no messages to send.
- */
- bool getXmtMessage(Message& item) const;
-
- /**
- * Remove and discard one message from the head of the transmit queue.
- */
- void popXmt();
-
- /**
* Get the next application event from the agent engine.
*@param event The event iff the return value is true
*@return true if event is valid, false if there are no events to process
@@ -122,20 +104,9 @@
void popEvent();
/**
- * A new AMQP session has been established for Agent communication.
- */
- void newSession();
-
- /**
- * Start the QMF Agent protocol. This should be invoked after a SETUP_COMPLETE event
- * is received from the Agent engine.
- */
- void startProtocol();
-
- /**
- * This method is called periodically so the agent can supply a heartbeat.
+ * Provide the AMQP connection to be used for this agent.
*/
- void heartbeat();
+ void setConnection(qpid::messaging::Connection& conn);
/**
* Respond to a method request.
@@ -144,7 +115,7 @@
*@param text Status text ("OK" or an error message)
*@param arguments The list of output arguments from the method call.
*/
- void methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments);
+ void methodResponse(uint32_t sequence, uint32_t status, char* text, const qpid::messaging::Variant::Map& arguments);
/**
* Send a content indication to the QMF bus. This is only needed for objects that are
@@ -152,10 +123,8 @@
* (inserted using addObject).
*@param sequence The sequence number of the GET request or the SYNC_START request.
*@param object The object (annotated with "changed" flags) for publication.
- *@param prop If true, changed object properties are transmitted.
- *@param stat If true, changed object statistics are transmitted.
*/
- void queryResponse(uint32_t sequence, Object& object, bool prop = true, bool stat = true);
+ void queryResponse(uint32_t sequence, Object& object);
/**
* Indicate the completion of a query. This is not used for SYNC_START requests.
@@ -178,20 +147,12 @@
/**
* Give an object to the Agent for storage and management. Once added, the agent takes
* responsibility for the life cycle of the object.
- *@param obj The object to be managed by the Agent.
- *@param persistId A unique non-zero value if the object-id is to be persistent.
- *@return The objectId of the managed object.
- */
- const ObjectId* addObject(Object& obj, uint64_t persistId);
- // const ObjectId* addObject(Object& obj, uint32_t persistIdLo, uint32_t persistIdHi);
-
- /**
- * Allocate an object-id for an object that will be managed by the application.
- *@param persistId A unique non-zero value if the object-id is to be persistent.
- *@return The objectId structure for the allocated ID.
+ *@param obj The object to be managed by the Agent.
+ *@param key A unique name (a primary key) to be used to address this object. If
+ * left null, the agent will create a unique name for the object.
+ *@return The key for the managed object.
*/
- const ObjectId* allocObjectId(uint64_t persistId);
- const ObjectId* allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi);
+ const char* addObject(Object& obj, const char* key=0);
/**
* Raise an event into the QMF network..
Modified: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Console.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Console.h?rev=911854&r1=911853&r2=911854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Console.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Console.h Fri Feb 19 16:06:22 2010
@@ -20,14 +20,12 @@
* under the License.
*/
-#include <qmf/engine/ResilientConnection.h>
#include <qmf/engine/Schema.h>
#include <qmf/engine/ObjectId.h>
#include <qmf/engine/Object.h>
#include <qmf/engine/Event.h>
#include <qmf/engine/Query.h>
-#include <qmf/engine/Value.h>
-#include <qmf/engine/Message.h>
+#include <qpid/messaging/Variant.h>
namespace qmf {
namespace engine {
@@ -49,8 +47,8 @@
MethodResponse(const MethodResponse& from);
~MethodResponse();
uint32_t getStatus() const;
- const Value* getException() const;
- const Value* getArgs() const;
+ const qpid::messaging::Variant* getException() const;
+ const qpid::messaging::Variant::Map* getArgs() const;
private:
friend struct MethodResponseImpl;
@@ -66,7 +64,7 @@
public:
~QueryResponse();
uint32_t getStatus() const;
- const Value* getException() const;
+ const qpid::messaging::Variant* getException() const;
uint32_t getObjectCount() const;
const Object* getObject(uint32_t idx) const;
@@ -161,10 +159,6 @@
void sessionClosed();
void startProtocol();
- void handleRcvMessage(Message& message);
- bool getXmtMessage(Message& item) const;
- void popXmt();
-
bool getEvent(BrokerEvent& event) const;
void popEvent();
Modified: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Object.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Object.h?rev=911854&r1=911853&r2=911854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Object.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Object.h Fri Feb 19 16:06:22 2010
@@ -22,7 +22,7 @@
#include <qmf/engine/Schema.h>
#include <qmf/engine/ObjectId.h>
-#include <qmf/engine/Value.h>
+#include <qpid/messaging/Variant.h>
namespace qmf {
namespace engine {
@@ -30,23 +30,26 @@
struct ObjectImpl;
class Object {
public:
- Object(const SchemaObjectClass* type);
+ Object();
+ Object(SchemaObjectClass* type);
Object(const Object& from);
virtual ~Object();
+ const qpid::messaging::Variant::Map& getValues() const;
+ qpid::messaging::Variant::Map& getValues();
+
+ const SchemaObjectClass* getSchema() const;
+ void setSchema(SchemaObjectClass* schema);
+
+ const char* getKey() const;
+ void setKey(const char* key);
+
+ void touch();
void destroy();
- const ObjectId* getObjectId() const;
- void setObjectId(ObjectId* oid);
- const SchemaObjectClass* getClass() const;
- Value* getValue(const char* key) const;
- void invokeMethod(const char* methodName, const Value* inArgs, void* context) const;
- bool isDeleted() const;
- void merge(const Object& from);
private:
friend struct ObjectImpl;
friend class AgentImpl;
- Object(ObjectImpl* impl);
ObjectImpl* impl;
};
}
Modified: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Query.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Query.h?rev=911854&r1=911853&r2=911854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Query.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Query.h Fri Feb 19 16:06:22 2010
@@ -20,89 +20,39 @@
* under the License.
*/
-#include <qmf/engine/ObjectId.h>
-#include <qmf/engine/Value.h>
+#include <qpid/messaging/Variant.h>
namespace qmf {
namespace engine {
class Object;
- struct QueryElementImpl;
- struct QueryImpl;
- struct QueryExpressionImpl;
- class SchemaClassKey;
-
- enum ValueOper {
- O_EQ = 1,
- O_NE = 2,
- O_LT = 3,
- O_LE = 4,
- O_GT = 5,
- O_GE = 6,
- O_RE_MATCH = 7,
- O_RE_NOMATCH = 8,
- O_PRESENT = 9,
- O_NOT_PRESENT = 10
- };
-
- struct QueryOperand {
- virtual ~QueryOperand() {}
- virtual bool evaluate(const Object* object) const = 0;
- };
-
- struct QueryElement : public QueryOperand {
- QueryElement(const char* attrName, const Value* value, ValueOper oper);
- QueryElement(QueryElementImpl* impl);
- virtual ~QueryElement();
- bool evaluate(const Object* object) const;
-
- QueryElementImpl* impl;
- };
-
- enum ExprOper {
- E_NOT = 1,
- E_AND = 2,
- E_OR = 3,
- E_XOR = 4
- };
-
- struct QueryExpression : public QueryOperand {
- QueryExpression(ExprOper oper, const QueryOperand* operand1, const QueryOperand* operand2);
- QueryExpression(QueryExpressionImpl* impl);
- virtual ~QueryExpression();
- bool evaluate(const Object* object) const;
-
- QueryExpressionImpl* impl;
- };
+ class QueryImpl;
class Query {
public:
- Query(const char* className, const char* packageName);
- Query(const SchemaClassKey* key);
- Query(const ObjectId* oid);
+ Query(const char* target);
+ Query(const char* target, const qpid::messaging::Variant::List& predicate);
+ Query(const char* target, const char* expression);
Query(const Query& from);
~Query();
- void setSelect(const QueryOperand* criterion);
- void setLimit(uint32_t maxResults);
- void setOrderBy(const char* attrName, bool decreasing);
-
- const char* getPackage() const;
- const char* getClass() const;
- const ObjectId* getObjectId() const;
+ void where(const qpid::messaging::Variant::List& predicate);
+ void where(const char* expression);
+ void limit(uint32_t maxResults);
+ void orderBy(const char* attrName, bool decreasing);
- bool haveSelect() const;
+ bool havePredicate() const;
bool haveLimit() const;
bool haveOrderBy() const;
- const QueryOperand* getSelect() const;
+ const qpid::messaging::Variant::List& getPredicate() const;
uint32_t getLimit() const;
const char* getOrderBy() const;
bool getDecreasing() const;
+ bool matches(const Object& object) const;
+
private:
friend struct QueryImpl;
- friend class BrokerProxyImpl;
- Query(QueryImpl* impl);
QueryImpl* impl;
};
}
Modified: qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Schema.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Schema.h?rev=911854&r1=911853&r2=911854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Schema.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/include/qmf/engine/Schema.h Fri Feb 19 16:06:22 2010
@@ -20,8 +20,8 @@
* under the License.
*/
-#include <qmf/engine/Typecode.h>
#include <qpid/sys/IntegerTypes.h>
+#include <qpid/messaging/Variant.h>
namespace qmf {
namespace engine {
@@ -29,12 +29,10 @@
enum Access { ACCESS_READ_CREATE = 1, ACCESS_READ_WRITE = 2, ACCESS_READ_ONLY = 3 };
enum Direction { DIR_IN = 1, DIR_OUT = 2, DIR_IN_OUT = 3 };
enum ClassKind { CLASS_OBJECT = 1, CLASS_EVENT = 2 };
- enum Severity { SEV_EMERG = 0, SEV_ALERT = 1, SEV_CRIT = 2, SEV_ERROR = 3, SEV_WARN = 4, SEV_NOTICE = 5, SEV_INFORM = 6, SEV_DEBUG = 7 };
struct SchemaArgumentImpl;
struct SchemaMethodImpl;
struct SchemaPropertyImpl;
- struct SchemaStatisticImpl;
struct SchemaObjectClassImpl;
struct SchemaEventClassImpl;
struct SchemaClassKeyImpl;
@@ -43,14 +41,14 @@
*/
class SchemaArgument {
public:
- SchemaArgument(const char* name, Typecode typecode);
+ SchemaArgument(const char* name, qpid::messaging::VariantType typecode);
SchemaArgument(const SchemaArgument& from);
~SchemaArgument();
void setDirection(Direction dir);
void setUnit(const char* val);
void setDesc(const char* desc);
const char* getName() const;
- Typecode getType() const;
+ qpid::messaging::VariantType getType() const;
Direction getDirection() const;
const char* getUnit() const;
const char* getDesc() const;
@@ -89,7 +87,7 @@
*/
class SchemaProperty {
public:
- SchemaProperty(const char* name, Typecode typecode);
+ SchemaProperty(const char* name, qpid::messaging::VariantType typecode);
SchemaProperty(const SchemaProperty& from);
~SchemaProperty();
void setAccess(Access access);
@@ -98,7 +96,7 @@
void setUnit(const char* val);
void setDesc(const char* desc);
const char* getName() const;
- Typecode getType() const;
+ qpid::messaging::VariantType getType() const;
Access getAccess() const;
bool isIndex() const;
bool isOptional() const;
@@ -114,27 +112,6 @@
/**
*/
- class SchemaStatistic {
- public:
- SchemaStatistic(const char* name, Typecode typecode);
- SchemaStatistic(const SchemaStatistic& from);
- ~SchemaStatistic();
- void setUnit(const char* val);
- void setDesc(const char* desc);
- const char* getName() const;
- Typecode getType() const;
- const char* getUnit() const;
- const char* getDesc() const;
-
- private:
- friend struct SchemaStatisticImpl;
- friend struct SchemaObjectClassImpl;
- SchemaStatistic(SchemaStatisticImpl* impl);
- SchemaStatisticImpl* impl;
- };
-
- /**
- */
class SchemaClassKey {
public:
SchemaClassKey(const SchemaClassKey& from);
@@ -164,15 +141,12 @@
SchemaObjectClass(const SchemaObjectClass& from);
~SchemaObjectClass();
void addProperty(const SchemaProperty* property);
- void addStatistic(const SchemaStatistic* statistic);
void addMethod(const SchemaMethod* method);
const SchemaClassKey* getClassKey() const;
int getPropertyCount() const;
- int getStatisticCount() const;
int getMethodCount() const;
const SchemaProperty* getProperty(int idx) const;
- const SchemaStatistic* getStatistic(int idx) const;
const SchemaMethod* getMethod(int idx) const;
private:
@@ -187,14 +161,13 @@
*/
class SchemaEventClass {
public:
- SchemaEventClass(const char* package, const char* name, Severity severity);
+ SchemaEventClass(const char* package, const char* name);
SchemaEventClass(const SchemaEventClass& from);
~SchemaEventClass();
void addArgument(const SchemaArgument* argument);
void setDesc(const char* desc);
const SchemaClassKey* getClassKey() const;
- Severity getSeverity() const;
int getArgumentCount() const;
const SchemaArgument* getArgument(int idx) const;
Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf.mk
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf.mk?rev=911854&r1=911853&r2=911854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf.mk (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf.mk Fri Feb 19 16:06:22 2010
@@ -31,9 +31,7 @@
../include/qpid/agent/ManagementAgent.h \
../include/qpid/agent/QmfAgentImportExport.h \
../include/qmf/Agent.h \
- ../include/qmf/Connection.h \
../include/qmf/QmfImportExport.h \
- ../include/qmf/ConnectionSettings.h \
../include/qmf/AgentObject.h
#
@@ -41,18 +39,14 @@
#
QMF_ENGINE_API = \
../include/qmf/engine/Agent.h \
- ../include/qmf/engine/ConnectionSettings.h \
../include/qmf/engine/Console.h \
../include/qmf/engine/Event.h \
- ../include/qmf/engine/Message.h \
../include/qmf/engine/Object.h \
- ../include/qmf/engine/ObjectId.h \
../include/qmf/engine/QmfEngineImportExport.h \
../include/qmf/engine/Query.h \
- ../include/qmf/engine/ResilientConnection.h \
- ../include/qmf/engine/Schema.h \
- ../include/qmf/engine/Typecode.h \
- ../include/qmf/engine/Value.h
+ ../include/qmf/engine/Schema.h
+
+# ../include/qmf/engine/ObjectId.h
# Public header files
nobase_include_HEADERS += \
@@ -67,31 +61,23 @@
libqmfengine_la_SOURCES = \
$(QMF_ENGINE_API) \
qmf/engine/Agent.cpp \
- qmf/engine/BrokerProxyImpl.cpp \
- qmf/engine/BrokerProxyImpl.h \
- qmf/engine/ConnectionSettingsImpl.cpp \
- qmf/engine/ConnectionSettingsImpl.h \
- qmf/engine/ConsoleImpl.cpp \
- qmf/engine/ConsoleImpl.h \
- qmf/engine/EventImpl.cpp \
- qmf/engine/EventImpl.h \
- qmf/engine/MessageImpl.cpp \
- qmf/engine/MessageImpl.h \
- qmf/engine/ObjectIdImpl.cpp \
- qmf/engine/ObjectIdImpl.h \
qmf/engine/ObjectImpl.cpp \
qmf/engine/ObjectImpl.h \
- qmf/engine/Protocol.cpp \
- qmf/engine/Protocol.h \
+ qmf/Protocol.cpp \
+ qmf/Protocol.h \
qmf/engine/QueryImpl.cpp \
qmf/engine/QueryImpl.h \
- qmf/engine/ResilientConnection.cpp \
- qmf/engine/SequenceManager.cpp \
- qmf/engine/SequenceManager.h \
qmf/engine/SchemaImpl.cpp \
- qmf/engine/SchemaImpl.h \
- qmf/engine/ValueImpl.cpp \
- qmf/engine/ValueImpl.h
+ qmf/engine/SchemaImpl.h
+
+# qmf/engine/BrokerProxyImpl.cpp
+# qmf/engine/BrokerProxyImpl.h
+# qmf/engine/ConsoleImpl.cpp
+# qmf/engine/ConsoleImpl.h
+# qmf/engine/ObjectIdImpl.cpp
+# qmf/engine/ObjectIdImpl.h
+# qmf/engine/SequenceManager.cpp
+# qmf/engine/SequenceManager.h
libqmf_la_LIBADD = libqmfengine.la
libqmfengine_la_LIBADD = libqpidclient.la
Added: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp?rev=911854&view=auto
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp (added)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/Protocol.cpp Fri Feb 19 16:06:22 2010
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/Protocol.h"
+
+using namespace std;
+using namespace qmf;
+
+const string Protocol::SCHEMA_ELT_NAME("name");
+const string Protocol::SCHEMA_ELT_TYPE("type");
+const string Protocol::SCHEMA_ELT_DIR("dir");
+const string Protocol::SCHEMA_ELT_UNIT("unit");
+const string Protocol::SCHEMA_ELT_DESC("desc");
+const string Protocol::SCHEMA_ELT_ACCESS("access");
+const string Protocol::SCHEMA_ELT_OPTIONAL("optional");
+const string Protocol::SCHEMA_ARGS("args");
+const string Protocol::SCHEMA_PACKAGE("_package_name");
+const string Protocol::SCHEMA_CLASS_KIND("_type");
+const string Protocol::SCHEMA_CLASS_KIND_DATA("_data");
+const string Protocol::SCHEMA_CLASS_KIND_EVENT("_event");
+const string Protocol::SCHEMA_CLASS("_class_name");
+const string Protocol::SCHEMA_HASH("_hash_str");
+const string Protocol::AGENT_NAME("_agent_name");
+const string Protocol::OBJECT_NAME("_object_name");
+const string Protocol::SCHEMA_ID("_schema_id");
+
+#if 0
+bool Protocol::checkHeader(const Message& /*msg*/, string& /*opcode*/, uint32_t* /*seq*/)
+{
+ // TODO
+ return true;
+}
+
+void Protocol::encodeHeader(Message& /*msg*/, const string& /*opcode*/, uint32_t /*seq*/)
+{
+ // TODO
+}
+#endif
Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp?rev=911854&r1=911853&r2=911854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/Agent.cpp Fri Feb 19 16:06:22 2010
@@ -18,23 +18,19 @@
*/
#include "qmf/engine/Agent.h"
-#include "qmf/engine/MessageImpl.h"
#include "qmf/engine/SchemaImpl.h"
-#include "qmf/engine/Typecode.h"
-#include "qmf/engine/EventImpl.h"
#include "qmf/engine/ObjectImpl.h"
-#include "qmf/engine/ObjectIdImpl.h"
#include "qmf/engine/QueryImpl.h"
-#include "qmf/engine/ValueImpl.h"
-#include "qmf/engine/Protocol.h"
-#include <qpid/framing/Buffer.h>
-#include <qpid/framing/Uuid.h>
-#include <qpid/framing/FieldTable.h>
-#include <qpid/framing/FieldValue.h>
+#include "qmf/Protocol.h"
#include <qpid/sys/Mutex.h>
#include <qpid/log/Statement.h>
#include <qpid/sys/Time.h>
-#include <string.h>
+#include <qpid/sys/Thread.h>
+#include <qpid/sys/Runnable.h>
+#include <qpid/messaging/Session.h>
+#include <qpid/messaging/Receiver.h>
+#include <qpid/messaging/Sender.h>
+#include <qpid/messaging/Message.h>
#include <string>
#include <deque>
#include <map>
@@ -45,8 +41,8 @@
using namespace std;
using namespace qmf::engine;
-using namespace qpid::framing;
using namespace qpid::sys;
+using namespace qpid::messaging;
namespace qmf {
namespace engine {
@@ -59,11 +55,9 @@
string authToken;
string name;
Object* object;
- boost::shared_ptr<ObjectId> objectId;
+ string objectKey;
boost::shared_ptr<Query> query;
- boost::shared_ptr<Value> arguments;
- string exchange;
- string bindingKey;
+ boost::shared_ptr<Variant::Map> arguments;
const SchemaObjectClass* objectClass;
AgentEventImpl(AgentEvent::EventKind k) :
@@ -72,71 +66,64 @@
AgentEvent copy();
};
+ /**
+ * AgentQueryContext is used to track asynchronous requests (Query, Sync, or Method)
+ * sent up to the application.
+ */
struct AgentQueryContext {
typedef boost::shared_ptr<AgentQueryContext> Ptr;
uint32_t sequence;
- string exchange;
- string key;
+ string consoleAddr;
const SchemaMethod* schemaMethod;
AgentQueryContext() : schemaMethod(0) {}
};
- class AgentImpl : public boost::noncopyable {
+ class AgentImpl : public boost::noncopyable, public qpid::sys::Runnable {
public:
- AgentImpl(char* label, bool internalStore);
+ AgentImpl(const char* vendor, const char* product, const char* name, const char* domain, bool internalStore);
~AgentImpl();
+ void setAttr(const char* key, const Variant& value);
void setStoreDir(const char* path);
void setTransferDir(const char* path);
- void handleRcvMessage(Message& message);
- bool getXmtMessage(Message& item) const;
- void popXmt();
bool getEvent(AgentEvent& event) const;
void popEvent();
- void newSession();
- void startProtocol();
- void heartbeat();
- void methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments);
- void queryResponse(uint32_t sequence, Object& object, bool prop, bool stat);
+ void setConnection(Connection& conn);
+ void methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& arguments);
+ void queryResponse(uint32_t sequence, Object& object);
void queryComplete(uint32_t sequence);
void registerClass(SchemaObjectClass* cls);
void registerClass(SchemaEventClass* cls);
- const ObjectId* addObject(Object& obj, uint64_t persistId);
- const ObjectId* allocObjectId(uint64_t persistId);
- const ObjectId* allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi);
+ const char* addObject(Object& obj, const char* key);
void raiseEvent(Event& event);
+ void run();
+ void stop();
+
private:
mutable Mutex lock;
Mutex addLock;
- string label;
- string queueName;
+ const string vendor;
+ const string product;
+ const string name;
+ const string domain;
+ string directAddr;
+ map<string, Variant> attrMap;
string storeDir;
string transferDir;
bool internalStore;
- uint64_t nextTransientId;
Uuid systemId;
- uint32_t requestedBrokerBank;
- uint32_t requestedAgentBank;
- uint32_t assignedBrokerBank;
- uint32_t assignedAgentBank;
- AgentAttachment attachment;
uint16_t bootSequence;
- uint64_t nextObjectId;
uint32_t nextContextNum;
+ bool running;
deque<AgentEventImpl::Ptr> eventQueue;
- deque<MessageImpl::Ptr> xmtQueue;
map<uint32_t, AgentQueryContext::Ptr> contextMap;
-
- static const char* QMF_EXCHANGE;
- static const char* DIR_EXCHANGE;
- static const char* BROKER_KEY;
- static const uint32_t MERR_UNKNOWN_METHOD = 2;
- static const uint32_t MERR_UNKNOWN_PACKAGE = 8;
- static const uint32_t MERR_UNKNOWN_CLASS = 9;
- static const uint32_t MERR_INTERNAL_ERROR = 10;
-# define MA_BUFFER_SIZE 65536
- char outputBuffer[MA_BUFFER_SIZE];
+ Connection connection;
+ Session session;
+ Receiver directReceiver;
+ Receiver topicReceiver;
+ Sender sender;
+ qpid::sys::Thread* thread;
struct AgentClassKey {
string name;
@@ -144,10 +131,6 @@
AgentClassKey(const string& n, const uint8_t* h) : name(n) {
memcpy(hash, h, 16);
}
- AgentClassKey(Buffer& buffer) {
- buffer.getShortString(name);
- buffer.getBin128(hash);
- }
string repr() {
return name;
}
@@ -176,37 +159,29 @@
map<string, ClassMaps> packages;
- AgentEventImpl::Ptr eventDeclareQueue(const string& queueName);
- AgentEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key);
- AgentEventImpl::Ptr eventSetupComplete();
AgentEventImpl::Ptr eventQuery(uint32_t num, const string& userId, const string& package, const string& cls,
- boost::shared_ptr<ObjectId> oid);
+ const string& key);
AgentEventImpl::Ptr eventMethod(uint32_t num, const string& userId, const string& method,
- boost::shared_ptr<ObjectId> oid, boost::shared_ptr<Value> argMap,
+ const string& key, boost::shared_ptr<Variant::Map> argMap,
const SchemaObjectClass* objectClass);
- void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey);
+ void handleRcvMessageLH(qpid::messaging::Message& message);
void sendPackageIndicationLH(const string& packageName);
void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key);
void sendCommandCompleteLH(const string& exchange, const string& key, uint32_t seq,
uint32_t code = 0, const string& text = "OK");
void sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text="");
- void handleAttachResponse(Buffer& inBuffer);
- void handlePackageRequest(Buffer& inBuffer);
- void handleClassQuery(Buffer& inBuffer);
- void handleSchemaRequest(Buffer& inBuffer, uint32_t sequence,
+ void handleAttachResponse(Message& msg);
+ void handlePackageRequest(Message& msg);
+ void handleClassQuery(Message& msg);
+ void handleSchemaRequest(Message& msg, uint32_t sequence,
const string& replyToExchange, const string& replyToKey);
- void handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId);
- void handleMethodRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId);
- void handleConsoleAddedIndication();
+ void handleGetQuery(Message& msg, uint32_t sequence, const string& replyTo, const string& userId);
+ void handleMethodRequest(Message& msg, uint32_t sequence, const string& replyTo, const string& userId);
};
}
}
-const char* AgentImpl::QMF_EXCHANGE = "qpid.management";
-const char* AgentImpl::DIR_EXCHANGE = "amq.direct";
-const char* AgentImpl::BROKER_KEY = "broker";
-
#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
AgentEvent AgentEventImpl::copy()
@@ -217,33 +192,38 @@
item.kind = kind;
item.sequence = sequence;
item.object = object;
- item.objectId = objectId.get();
item.query = query.get();
item.arguments = arguments.get();
item.objectClass = objectClass;
+ STRING_REF(objectKey);
STRING_REF(authUserId);
STRING_REF(authToken);
STRING_REF(name);
- STRING_REF(exchange);
- STRING_REF(bindingKey);
return item;
}
-AgentImpl::AgentImpl(char* _label, bool i) :
- label(_label), queueName("qmfa-"), internalStore(i), nextTransientId(1),
- requestedBrokerBank(0), requestedAgentBank(0),
- assignedBrokerBank(0), assignedAgentBank(0),
- bootSequence(1), nextObjectId(1), nextContextNum(1)
-{
- queueName += label;
+AgentImpl::AgentImpl(const char* _v, const char* _p, const char* _n, const char* _d, bool _i) :
+ vendor(_v), product(_p), name(_n), domain(_d ? _d : "default"), internalStore(_i),
+ bootSequence(1), nextContextNum(1), running(true), thread(0)
+{
+ directAddr = "qmf." + domain + ".direct/" + vendor + ":" + product + ":" + name;
+ if (_d == 0) {
+ directAddr += " { create:always }";
+ }
}
+
AgentImpl::~AgentImpl()
{
}
+void AgentImpl::setAttr(const char* key, const Variant& value)
+{
+ attrMap.insert(pair<string, Variant>(key, value));
+}
+
void AgentImpl::setStoreDir(const char* path)
{
Mutex::ScopedLock _lock(lock);
@@ -262,6 +242,7 @@
transferDir.clear();
}
+/*
void AgentImpl::handleRcvMessage(Message& message)
{
Buffer inBuffer(message.body, message.length);
@@ -283,22 +264,7 @@
}
}
}
-
-bool AgentImpl::getXmtMessage(Message& item) const
-{
- Mutex::ScopedLock _lock(lock);
- if (xmtQueue.empty())
- return false;
- item = xmtQueue.front()->copy();
- return true;
-}
-
-void AgentImpl::popXmt()
-{
- Mutex::ScopedLock _lock(lock);
- if (!xmtQueue.empty())
- xmtQueue.pop_front();
-}
+*/
bool AgentImpl::getEvent(AgentEvent& event) const
{
@@ -316,47 +282,16 @@
eventQueue.pop_front();
}
-void AgentImpl::newSession()
+void AgentImpl::setConnection(Connection& conn)
{
Mutex::ScopedLock _lock(lock);
- eventQueue.clear();
- xmtQueue.clear();
- eventQueue.push_back(eventDeclareQueue(queueName));
- eventQueue.push_back(eventBind("amq.direct", queueName, queueName));
- eventQueue.push_back(eventSetupComplete());
-}
-
-void AgentImpl::startProtocol()
-{
- Mutex::ScopedLock _lock(lock);
- char rawbuffer[512];
- Buffer buffer(rawbuffer, 512);
-
- Protocol::encodeHeader(buffer, Protocol::OP_ATTACH_REQUEST);
- buffer.putShortString("qmfa");
- systemId.encode(buffer);
- buffer.putLong(requestedBrokerBank);
- buffer.putLong(requestedAgentBank);
- sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT AttachRequest: reqBroker=" << requestedBrokerBank <<
- " reqAgent=" << requestedAgentBank);
-}
-
-void AgentImpl::heartbeat()
-{
- Mutex::ScopedLock _lock(lock);
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
-
- Protocol::encodeHeader(buffer, Protocol::OP_HEARTBEAT_INDICATION);
- buffer.putLongLong(uint64_t(Duration(now())));
- stringstream key;
- key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank;
- sendBufferLH(buffer, QMF_EXCHANGE, key.str());
- QPID_LOG(trace, "SENT HeartbeatIndication");
+ if (connection == 0)
+ return;
+ connection = conn;
+ thread = new qpid::sys::Thread(*this);
}
-void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text,
- const Value& argMap)
+void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& /*argMap*/)
{
Mutex::ScopedLock _lock(lock);
map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
@@ -365,30 +300,11 @@
AgentQueryContext::Ptr context = iter->second;
contextMap.erase(iter);
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, context->sequence);
- buffer.putLong(status);
- buffer.putMediumString(text);
- if (status == 0) {
- for (vector<const SchemaArgument*>::const_iterator aIter = context->schemaMethod->impl->arguments.begin();
- aIter != context->schemaMethod->impl->arguments.end(); aIter++) {
- const SchemaArgument* schemaArg = *aIter;
- if (schemaArg->getDirection() == DIR_OUT || schemaArg->getDirection() == DIR_IN_OUT) {
- if (argMap.keyInMap(schemaArg->getName())) {
- const Value* val = argMap.byKey(schemaArg->getName());
- val->impl->encode(buffer);
- } else {
- Value val(schemaArg->getType());
- val.impl->encode(buffer);
- }
- }
- }
- }
- sendBufferLH(buffer, context->exchange, context->key);
+ // TODO: Encode method response
QPID_LOG(trace, "SENT MethodResponse seq=" << context->sequence << " status=" << status << " text=" << text);
}
-void AgentImpl::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat)
+void AgentImpl::queryResponse(uint32_t sequence, Object&)
{
Mutex::ScopedLock _lock(lock);
map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
@@ -396,17 +312,8 @@
return;
AgentQueryContext::Ptr context = iter->second;
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- Protocol::encodeHeader(buffer, Protocol::OP_OBJECT_INDICATION, context->sequence);
+ // TODO: accumulate data records and send response messages when we have "enough"
- object.impl->encodeSchemaKey(buffer);
- object.impl->encodeManagedObjectData(buffer);
- if (prop)
- object.impl->encodeProperties(buffer);
- if (stat)
- object.impl->encodeStatistics(buffer);
-
- sendBufferLH(buffer, context->exchange, context->key);
QPID_LOG(trace, "SENT ContentIndication seq=" << context->sequence);
}
@@ -417,9 +324,11 @@
if (iter == contextMap.end())
return;
+ // TODO: send a response message if there are any unsent data records
+
AgentQueryContext::Ptr context = iter->second;
contextMap.erase(iter);
- sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK");
+ //sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK");
}
void AgentImpl::registerClass(SchemaObjectClass* cls)
@@ -456,413 +365,148 @@
// TODO: Indicate this schema if connected.
}
-const ObjectId* AgentImpl::addObject(Object&, uint64_t)
+const char* AgentImpl::addObject(Object&, const char*)
{
Mutex::ScopedLock _lock(lock);
return 0;
}
-const ObjectId* AgentImpl::allocObjectId(uint64_t persistId)
+void AgentImpl::raiseEvent(Event&)
{
Mutex::ScopedLock _lock(lock);
- uint16_t sequence = persistId ? 0 : bootSequence;
- uint64_t objectNum = persistId ? persistId : nextObjectId++;
-
- ObjectId* oid = ObjectIdImpl::factory(&attachment, 0, sequence, objectNum);
- return oid;
}
-const ObjectId* AgentImpl::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi)
+void AgentImpl::run()
{
- return allocObjectId(((uint64_t) persistIdHi) << 32 | (uint64_t) persistIdLo);
-}
+ qpid::sys::Duration duration = qpid::sys::TIME_MSEC * 500;
-void AgentImpl::raiseEvent(Event& event)
-{
- Mutex::ScopedLock _lock(lock);
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- Protocol::encodeHeader(buffer, Protocol::OP_EVENT_INDICATION);
-
- event.impl->encodeSchemaKey(buffer);
- buffer.putLongLong(uint64_t(Duration(now())));
- event.impl->encode(buffer);
- string key(event.impl->getRoutingKey(assignedBrokerBank, assignedAgentBank));
-
- sendBufferLH(buffer, QMF_EXCHANGE, key);
- QPID_LOG(trace, "SENT EventIndication");
-}
+ session = connection.newSession();
+ directReceiver = session.createReceiver(directAddr);
+ directReceiver.setCapacity(10);
-AgentEventImpl::Ptr AgentImpl::eventDeclareQueue(const string& name)
-{
- AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::DECLARE_QUEUE));
- event->name = name;
-
- return event;
-}
+ Mutex::ScopedLock _lock(lock);
+ while (running) {
+ Receiver rcvr;
+ bool available;
+ {
+ Mutex::ScopedUnlock _unlock(lock);
+ available = session.nextReceiver(rcvr, duration);
+ }
-AgentEventImpl::Ptr AgentImpl::eventBind(const string& exchange, const string& queue,
- const string& key)
-{
- AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::BIND));
- event->name = queue;
- event->exchange = exchange;
- event->bindingKey = key;
+ if (available) {
+ Message msg(rcvr.get());
+ handleRcvMessageLH(msg);
+ }
+ }
- return event;
+ directReceiver.close();
+ session.close();
}
-AgentEventImpl::Ptr AgentImpl::eventSetupComplete()
+void AgentImpl::stop()
{
- AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::SETUP_COMPLETE));
- return event;
+ Mutex::ScopedLock _lock(lock);
+ running = false;
}
-AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string& package,
- const string& cls, boost::shared_ptr<ObjectId> oid)
+AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string&, const string&, const string& key)
{
AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY));
event->sequence = num;
event->authUserId = userId;
- if (oid.get())
- event->query.reset(new Query(oid.get()));
- else
- event->query.reset(new Query(cls.c_str(), package.c_str()));
+ event->objectKey = key;
return event;
}
AgentEventImpl::Ptr AgentImpl::eventMethod(uint32_t num, const string& userId, const string& method,
- boost::shared_ptr<ObjectId> oid, boost::shared_ptr<Value> argMap,
- const SchemaObjectClass* objectClass)
+ const string& key, boost::shared_ptr<Variant::Map> argMap,
+ const SchemaObjectClass* objectClass)
{
AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::METHOD_CALL));
event->sequence = num;
event->authUserId = userId;
event->name = method;
- event->objectId = oid;
+ event->objectKey = key;
event->arguments = argMap;
event->objectClass = objectClass;
return event;
}
-void AgentImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
+void AgentImpl::handleRcvMessageLH(qpid::messaging::Message& /*msg*/)
{
- uint32_t length = buf.getPosition();
- MessageImpl::Ptr message(new MessageImpl);
-
- buf.reset();
- buf.getRawData(message->body, length);
- message->destination = destination;
- message->routingKey = routingKey;
- message->replyExchange = "amq.direct";
- message->replyKey = queueName;
-
- xmtQueue.push_back(message);
}
void AgentImpl::sendPackageIndicationLH(const string& packageName)
{
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- Protocol::encodeHeader(buffer, Protocol::OP_PACKAGE_INDICATION);
- buffer.putShortString(packageName);
- sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
+ // TODO
QPID_LOG(trace, "SENT PackageIndication: package_name=" << packageName);
}
-void AgentImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key)
+void AgentImpl::sendClassIndicationLH(ClassKind /*kind*/, const string& packageName, const AgentClassKey& key)
{
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- Protocol::encodeHeader(buffer, Protocol::OP_CLASS_INDICATION);
- buffer.putOctet((int) kind);
- buffer.putShortString(packageName);
- buffer.putShortString(key.name);
- buffer.putBin128(const_cast<uint8_t*>(key.hash)); // const_cast needed for older Qpid libraries
- sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
+ // TODO
QPID_LOG(trace, "SENT ClassIndication: package_name=" << packageName << " class_name=" << key.name);
}
-void AgentImpl::sendCommandCompleteLH(const string& exchange, const string& replyToKey,
- uint32_t sequence, uint32_t code, const string& text)
+void AgentImpl::sendCommandCompleteLH(const string&, const string&, uint32_t sequence, uint32_t code, const string& text)
{
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- Protocol::encodeHeader(buffer, Protocol::OP_COMMAND_COMPLETE, sequence);
- buffer.putLong(code);
- buffer.putShortString(text);
- sendBufferLH(buffer, exchange, replyToKey);
+ // TODO
QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text);
}
-void AgentImpl::sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text)
+void AgentImpl::sendMethodErrorLH(uint32_t /*sequence*/, const string& /*key*/, uint32_t code, const string& text)
{
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, sequence);
- buffer.putLong(code);
-
- string fulltext;
- switch (code) {
- case MERR_UNKNOWN_PACKAGE: fulltext = "Unknown Package"; break;
- case MERR_UNKNOWN_CLASS: fulltext = "Unknown Class"; break;
- case MERR_UNKNOWN_METHOD: fulltext = "Unknown Method"; break;
- case MERR_INTERNAL_ERROR: fulltext = "Internal Error"; break;
- default: fulltext = "Unspecified Error"; break;
- }
-
- if (!text.empty()) {
- fulltext += " (";
- fulltext += text;
- fulltext += ")";
- }
-
- buffer.putMediumString(fulltext);
- sendBufferLH(buffer, DIR_EXCHANGE, key);
- QPID_LOG(trace, "SENT MethodResponse: errorCode=" << code << " text=" << fulltext);
+ // TODO
+ QPID_LOG(trace, "SENT MethodResponse: errorCode=" << code << " text=" << text);
}
-void AgentImpl::handleAttachResponse(Buffer& inBuffer)
+void AgentImpl::handlePackageRequest(Message&)
{
Mutex::ScopedLock _lock(lock);
-
- assignedBrokerBank = inBuffer.getLong();
- assignedAgentBank = inBuffer.getLong();
-
- QPID_LOG(trace, "RCVD AttachResponse: broker=" << assignedBrokerBank << " agent=" << assignedAgentBank);
-
- if ((assignedBrokerBank != requestedBrokerBank) ||
- (assignedAgentBank != requestedAgentBank)) {
- if (requestedAgentBank == 0) {
- QPID_LOG(notice, "Initial object-id bank assigned: " << assignedBrokerBank << "." <<
- assignedAgentBank);
- } else {
- QPID_LOG(warning, "Collision in object-id! New bank assigned: " << assignedBrokerBank <<
- "." << assignedAgentBank);
- }
- //storeData(); // TODO
- requestedBrokerBank = assignedBrokerBank;
- requestedAgentBank = assignedAgentBank;
- }
-
- attachment.setBanks(assignedBrokerBank, assignedAgentBank);
-
- // Bind to qpid.management to receive commands
- stringstream key;
- key << "agent." << assignedBrokerBank << "." << assignedAgentBank;
- eventQueue.push_back(eventBind(QMF_EXCHANGE, queueName, key.str()));
-
- // Send package indications for all local packages
- for (map<string, ClassMaps>::iterator pIter = packages.begin();
- pIter != packages.end();
- pIter++) {
- sendPackageIndicationLH(pIter->first);
-
- // Send class indications for all local classes
- ClassMaps cMap = pIter->second;
- for (ObjectClassMap::iterator cIter = cMap.objectClasses.begin();
- cIter != cMap.objectClasses.end(); cIter++)
- sendClassIndicationLH(CLASS_OBJECT, pIter->first, cIter->first);
- for (EventClassMap::iterator cIter = cMap.eventClasses.begin();
- cIter != cMap.eventClasses.end(); cIter++)
- sendClassIndicationLH(CLASS_EVENT, pIter->first, cIter->first);
- }
}
-void AgentImpl::handlePackageRequest(Buffer&)
+void AgentImpl::handleClassQuery(Message&)
{
Mutex::ScopedLock _lock(lock);
}
-void AgentImpl::handleClassQuery(Buffer&)
+void AgentImpl::handleSchemaRequest(Message&, uint32_t, const string&, const string&)
{
Mutex::ScopedLock _lock(lock);
}
-void AgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence,
- const string& replyExchange, const string& replyKey)
+void AgentImpl::handleGetQuery(Message&, uint32_t, const string&, const string&)
{
Mutex::ScopedLock _lock(lock);
- string rExchange(replyExchange);
- string rKey(replyKey);
- string packageName;
- inBuffer.getShortString(packageName);
- AgentClassKey key(inBuffer);
-
- if (rExchange.empty())
- rExchange = QMF_EXCHANGE;
- if (rKey.empty())
- rKey = BROKER_KEY;
-
- QPID_LOG(trace, "RCVD SchemaRequest: package=" << packageName << " class=" << key.name);
-
- map<string, ClassMaps>::iterator pIter = packages.find(packageName);
- if (pIter == packages.end()) {
- sendCommandCompleteLH(rExchange, rKey, sequence, 1, "package not found");
- return;
- }
-
- ClassMaps cMap = pIter->second;
- ObjectClassMap::iterator ocIter = cMap.objectClasses.find(key);
- if (ocIter != cMap.objectClasses.end()) {
- SchemaObjectClass* oImpl = ocIter->second;
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence);
- oImpl->impl->encode(buffer);
- sendBufferLH(buffer, rExchange, rKey);
- QPID_LOG(trace, "SENT SchemaResponse: (object) package=" << packageName << " class=" << key.name);
- return;
- }
-
- EventClassMap::iterator ecIter = cMap.eventClasses.find(key);
- if (ecIter != cMap.eventClasses.end()) {
- SchemaEventClass* eImpl = ecIter->second;
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence);
- eImpl->impl->encode(buffer);
- sendBufferLH(buffer, rExchange, rKey);
- QPID_LOG(trace, "SENT SchemaResponse: (event) package=" << packageName << " class=" << key.name);
- return;
- }
-
- sendCommandCompleteLH(rExchange, rKey, sequence, 1, "class not found");
-}
-
-void AgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId)
-{
- Mutex::ScopedLock _lock(lock);
- FieldTable ft;
- FieldTable::ValuePtr value;
- map<string, ClassMaps>::const_iterator pIter = packages.end();
- string pname;
- string cname;
- string oidRepr;
- boost::shared_ptr<ObjectId> oid;
-
- ft.decode(inBuffer);
-
- QPID_LOG(trace, "RCVD GetQuery: seq=" << sequence << " map=" << ft);
-
- value = ft.get("_package");
- if (value.get() && value->convertsTo<string>()) {
- pname = value->get<string>();
- pIter = packages.find(pname);
- if (pIter == packages.end()) {
- sendCommandCompleteLH(DIR_EXCHANGE, replyTo, sequence);
- return;
- }
- }
-
- value = ft.get("_class");
- if (value.get() && value->convertsTo<string>()) {
- cname = value->get<string>();
- // TODO - check for validity of class (in package or any package)
- if (pIter == packages.end()) {
- } else {
-
- }
- }
-
- value = ft.get("_objectid");
- if (value.get() && value->convertsTo<string>()) {
- oidRepr = value->get<string>();
- oid.reset(new ObjectId());
- oid->impl->fromString(oidRepr);
- }
-
- AgentQueryContext::Ptr context(new AgentQueryContext);
- uint32_t contextNum = nextContextNum++;
- context->sequence = sequence;
- context->exchange = DIR_EXCHANGE;
- context->key = replyTo;
- contextMap[contextNum] = context;
-
- eventQueue.push_back(eventQuery(contextNum, userId, pname, cname, oid));
}
-void AgentImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, const string& replyTo, const string& userId)
+void AgentImpl::handleMethodRequest(Message& /*msg*/, uint32_t sequence, const string& /*replyTo*/, const string& /*userId*/)
{
Mutex::ScopedLock _lock(lock);
- string pname;
- string method;
- boost::shared_ptr<ObjectId> oid(ObjectIdImpl::factory(buffer));
- buffer.getShortString(pname);
- AgentClassKey classKey(buffer);
- buffer.getShortString(method);
-
- QPID_LOG(trace, "RCVD MethodRequest seq=" << sequence << " method=" << method);
-
- map<string, ClassMaps>::const_iterator pIter = packages.find(pname);
- if (pIter == packages.end()) {
- sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_PACKAGE, pname);
- return;
- }
-
- ObjectClassMap::const_iterator cIter = pIter->second.objectClasses.find(classKey);
- if (cIter == pIter->second.objectClasses.end()) {
- sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_CLASS, classKey.repr());
- return;
- }
-
- const SchemaObjectClass* schema = cIter->second;
- vector<const SchemaMethod*>::const_iterator mIter = schema->impl->methods.begin();
- for (; mIter != schema->impl->methods.end(); mIter++) {
- if ((*mIter)->getName() == method)
- break;
- }
-
- if (mIter == schema->impl->methods.end()) {
- sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_METHOD, method);
- return;
- }
-
- const SchemaMethod* schemaMethod = *mIter;
- boost::shared_ptr<Value> argMap(new Value(TYPE_MAP));
- Value* value;
- for (vector<const SchemaArgument*>::const_iterator aIter = schemaMethod->impl->arguments.begin();
- aIter != schemaMethod->impl->arguments.end(); aIter++) {
- const SchemaArgument* schemaArg = *aIter;
- if (schemaArg->getDirection() == DIR_IN || schemaArg->getDirection() == DIR_IN_OUT)
- value = ValueImpl::factory(schemaArg->getType(), buffer);
- else
- value = ValueImpl::factory(schemaArg->getType());
- argMap->insert(schemaArg->getName(), value);
- }
+ QPID_LOG(trace, "RCVD MethodRequest seq=" << sequence << " method=");
AgentQueryContext::Ptr context(new AgentQueryContext);
uint32_t contextNum = nextContextNum++;
- context->sequence = sequence;
- context->exchange = DIR_EXCHANGE;
- context->key = replyTo;
- context->schemaMethod = schemaMethod;
contextMap[contextNum] = context;
-
- eventQueue.push_back(eventMethod(contextNum, userId, method, oid, argMap, schema));
-}
-
-void AgentImpl::handleConsoleAddedIndication()
-{
- Mutex::ScopedLock _lock(lock);
}
//==================================================================
// Wrappers
//==================================================================
-Agent::Agent(char* label, bool internalStore) { impl = new AgentImpl(label, internalStore); }
+Agent::Agent(const char* v, const char* p, const char* n, const char* d, bool i) { impl = new AgentImpl(v, p, n, d, i); }
Agent::~Agent() { delete impl; }
+void Agent::setAttr(const char* key, const Variant& value) { impl->setAttr(key, value); }
void Agent::setStoreDir(const char* path) { impl->setStoreDir(path); }
void Agent::setTransferDir(const char* path) { impl->setTransferDir(path); }
-void Agent::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); }
-bool Agent::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); }
-void Agent::popXmt() { impl->popXmt(); }
bool Agent::getEvent(AgentEvent& event) const { return impl->getEvent(event); }
void Agent::popEvent() { impl->popEvent(); }
-void Agent::newSession() { impl->newSession(); }
-void Agent::startProtocol() { impl->startProtocol(); }
-void Agent::heartbeat() { impl->heartbeat(); }
-void Agent::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments) { impl->methodResponse(sequence, status, text, arguments); }
-void Agent::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) { impl->queryResponse(sequence, object, prop, stat); }
+void Agent::setConnection(Connection& conn) { impl->setConnection(conn); }
+void Agent::methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& arguments) { impl->methodResponse(sequence, status, text, arguments); }
+void Agent::queryResponse(uint32_t sequence, Object& object) { impl->queryResponse(sequence, object); }
void Agent::queryComplete(uint32_t sequence) { impl->queryComplete(sequence); }
void Agent::registerClass(SchemaObjectClass* cls) { impl->registerClass(cls); }
void Agent::registerClass(SchemaEventClass* cls) { impl->registerClass(cls); }
-const ObjectId* Agent::addObject(Object& obj, uint64_t persistId) { return impl->addObject(obj, persistId); }
-const ObjectId* Agent::allocObjectId(uint64_t persistId) { return impl->allocObjectId(persistId); }
-const ObjectId* Agent::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) { return impl->allocObjectId(persistIdLo, persistIdHi); }
+const char* Agent::addObject(Object& obj, const char* key) { return impl->addObject(obj, key); }
void Agent::raiseEvent(Event& event) { impl->raiseEvent(event); }
Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h?rev=911854&r1=911853&r2=911854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h Fri Feb 19 16:06:22 2010
@@ -23,12 +23,11 @@
#include "qmf/engine/Console.h"
#include "qmf/engine/ObjectImpl.h"
#include "qmf/engine/SchemaImpl.h"
-#include "qmf/engine/ValueImpl.h"
#include "qmf/engine/QueryImpl.h"
#include "qmf/engine/SequenceManager.h"
#include "qmf/engine/MessageImpl.h"
-#include "qpid/framing/Buffer.h"
#include "qpid/framing/Uuid.h"
+#include "qpid/messaging/Variant.h"
#include "qpid/sys/Mutex.h"
#include "boost/shared_ptr.hpp"
#include "boost/noncopyable.hpp"
@@ -46,8 +45,8 @@
struct MethodResponseImpl {
uint32_t status;
const SchemaMethod* schema;
- std::auto_ptr<Value> exception;
- std::auto_ptr<Value> arguments;
+ std::auto_ptr<qpid::messaging::Variant> exception;
+ std::auto_ptr<qpid::messaging::Variant::Map> arguments;
MethodResponseImpl(const MethodResponseImpl& from);
MethodResponseImpl(qpid::framing::Buffer& buf, const SchemaMethod* schema);
@@ -56,14 +55,14 @@
static MethodResponse* factory(uint32_t status, const std::string& text);
~MethodResponseImpl() {}
uint32_t getStatus() const { return status; }
- const Value* getException() const { return exception.get(); }
- const Value* getArgs() const { return arguments.get(); }
+ const qpid::messaging::Variant* getException() const { return exception.get(); }
+ const qpid::messaging::Variant::Map* getArgs() const { return arguments.get(); }
};
typedef boost::shared_ptr<QueryResponse> QueryResponsePtr;
struct QueryResponseImpl {
uint32_t status;
- std::auto_ptr<Value> exception;
+ std::auto_ptr<qpid::messaging::Variant> exception;
std::vector<ObjectPtr> results;
QueryResponseImpl() : status(0) {}
@@ -73,7 +72,7 @@
}
~QueryResponseImpl() {}
uint32_t getStatus() const { return status; }
- const Value* getException() const { return exception.get(); }
+ const qpid::messaging::Variant* getException() const { return exception.get(); }
uint32_t getObjectCount() const { return results.size(); }
const Object* getObject(uint32_t idx) const;
};
@@ -140,8 +139,8 @@
const AgentProxy* getAgent(uint32_t idx) const;
void sendQuery(const Query& query, void* context, const AgentProxy* agent);
bool sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent);
- std::string encodeMethodArguments(const SchemaMethod* schema, const Value* args, qpid::framing::Buffer& buffer);
- void sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls, const std::string& method, const Value* args, void* context);
+ std::string encodeMethodArguments(const SchemaMethod* schema, const qpid::messaging::Variant::Map* args, qpid::framing::Buffer& buffer);
+ void sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls, const std::string& method, const qpid::messaging::Variant::Map* args, void* context);
void addBinding(const std::string& exchange, const std::string& key);
void staticRelease() { decOutstanding(); }
@@ -219,6 +218,9 @@
QueryResponsePtr queryResponse;
};
+ //
+ // MethodContext is used to track and handle the response associated with a single Method Request
+ //
struct MethodContext : public SequenceContext {
MethodContext(BrokerProxyImpl& b, void* u, const SchemaMethod* s) : broker(b), userContext(u), schema(s) {}
virtual ~MethodContext() {}
Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ConsoleImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ConsoleImpl.h?rev=911854&r1=911853&r2=911854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ConsoleImpl.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ConsoleImpl.h Fri Feb 19 16:06:22 2010
@@ -23,18 +23,13 @@
#include "qmf/engine/Console.h"
#include "qmf/engine/MessageImpl.h"
#include "qmf/engine/SchemaImpl.h"
-#include "qmf/engine/Typecode.h"
#include "qmf/engine/ObjectImpl.h"
#include "qmf/engine/ObjectIdImpl.h"
#include "qmf/engine/QueryImpl.h"
-#include "qmf/engine/ValueImpl.h"
#include "qmf/engine/Protocol.h"
#include "qmf/engine/SequenceManager.h"
#include "qmf/engine/BrokerProxyImpl.h"
-#include <qpid/framing/Buffer.h>
#include <qpid/framing/Uuid.h>
-#include <qpid/framing/FieldTable.h>
-#include <qpid/framing/FieldValue.h>
#include <qpid/sys/Mutex.h>
#include <qpid/sys/Time.h>
#include <qpid/sys/SystemInfo.h>
Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.cpp?rev=911854&r1=911853&r2=911854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.cpp Fri Feb 19 16:06:22 2010
@@ -18,215 +18,50 @@
*/
#include "qmf/engine/ObjectImpl.h"
-#include "qmf/engine/ValueImpl.h"
-#include "qmf/engine/BrokerProxyImpl.h"
#include <qpid/sys/Time.h>
using namespace std;
using namespace qmf::engine;
using namespace qpid::sys;
-using qpid::framing::Buffer;
+using namespace qpid::messaging;
-ObjectImpl::ObjectImpl(const SchemaObjectClass* type) : objectClass(type), broker(0), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime)
+ObjectImpl::ObjectImpl() :
+ objectClass(0), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime)
{
- int propCount = objectClass->getPropertyCount();
- int statCount = objectClass->getStatisticCount();
- int idx;
-
- for (idx = 0; idx < propCount; idx++) {
- const SchemaProperty* prop = objectClass->getProperty(idx);
- properties[prop->getName()] = ValuePtr(new Value(prop->getType()));
- }
-
- for (idx = 0; idx < statCount; idx++) {
- const SchemaStatistic* stat = objectClass->getStatistic(idx);
- statistics[stat->getName()] = ValuePtr(new Value(stat->getType()));
- }
}
-ObjectImpl::ObjectImpl(const SchemaObjectClass* type, BrokerProxyImpl* b, Buffer& buffer, bool prop, bool stat, bool managed) :
- objectClass(type), broker(b), createTime(0), destroyTime(0), lastUpdatedTime(0)
-{
- int idx;
-
- if (managed) {
- lastUpdatedTime = buffer.getLongLong();
- createTime = buffer.getLongLong();
- destroyTime = buffer.getLongLong();
- objectId.reset(ObjectIdImpl::factory(buffer));
- }
-
- if (prop) {
- int propCount = objectClass->getPropertyCount();
- set<string> excludes;
- parsePresenceMasks(buffer, excludes);
- for (idx = 0; idx < propCount; idx++) {
- const SchemaProperty* prop = objectClass->getProperty(idx);
- if (excludes.count(prop->getName()) != 0) {
- properties[prop->getName()] = ValuePtr(new Value(prop->getType()));
- } else {
- Value* pval = ValueImpl::factory(prop->getType(), buffer);
- properties[prop->getName()] = ValuePtr(pval);
- }
- }
- }
-
- if (stat) {
- int statCount = objectClass->getStatisticCount();
- for (idx = 0; idx < statCount; idx++) {
- const SchemaStatistic* stat = objectClass->getStatistic(idx);
- Value* sval = ValueImpl::factory(stat->getType(), buffer);
- statistics[stat->getName()] = ValuePtr(sval);
- }
- }
-}
-
-Object* ObjectImpl::factory(const SchemaObjectClass* type, BrokerProxyImpl* b, Buffer& buffer, bool prop, bool stat, bool managed)
-{
- ObjectImpl* impl(new ObjectImpl(type, b, buffer, prop, stat, managed));
- return new Object(impl);
-}
-
-ObjectImpl::~ObjectImpl()
-{
-}
-
-void ObjectImpl::destroy()
-{
- destroyTime = uint64_t(Duration(now()));
- // TODO - flag deletion
-}
-
-Value* ObjectImpl::getValue(const string& key) const
-{
- map<string, ValuePtr>::const_iterator iter;
- iter = properties.find(key);
- if (iter != properties.end())
- return iter->second.get();
-
- iter = statistics.find(key);
- if (iter != statistics.end())
- return iter->second.get();
-
- return 0;
-}
-
-void ObjectImpl::invokeMethod(const string& methodName, const Value* inArgs, void* context) const
+ObjectImpl::ObjectImpl(SchemaObjectClass* type) :
+ objectClass(type), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime)
{
- if (broker != 0 && objectId.get() != 0)
- broker->sendMethodRequest(objectId.get(), objectClass, methodName, inArgs, context);
}
-void ObjectImpl::merge(const Object& from)
-{
- for (map<string, ValuePtr>::const_iterator piter = from.impl->properties.begin();
- piter != from.impl->properties.end(); piter++)
- properties[piter->first] = piter->second;
- for (map<string, ValuePtr>::const_iterator siter = from.impl->statistics.begin();
- siter != from.impl->statistics.end(); siter++)
- statistics[siter->first] = siter->second;
-}
-
-void ObjectImpl::parsePresenceMasks(Buffer& buffer, set<string>& excludeList)
-{
- int propCount = objectClass->getPropertyCount();
- excludeList.clear();
- uint8_t bit = 0;
- uint8_t mask = 0;
-
- for (int idx = 0; idx < propCount; idx++) {
- const SchemaProperty* prop = objectClass->getProperty(idx);
- if (prop->isOptional()) {
- if (bit == 0) {
- mask = buffer.getOctet();
- bit = 1;
- }
- if ((mask & bit) == 0)
- excludeList.insert(string(prop->getName()));
- if (bit == 0x80)
- bit = 0;
- else
- bit = bit << 1;
- }
- }
-}
-void ObjectImpl::encodeSchemaKey(qpid::framing::Buffer& buffer) const
+void ObjectImpl::touch()
{
- buffer.putShortString(objectClass->getClassKey()->getPackageName());
- buffer.putShortString(objectClass->getClassKey()->getClassName());
- buffer.putBin128(const_cast<uint8_t*>(objectClass->getClassKey()->getHash()));
+ lastUpdatedTime = uint64_t(Duration(now()));
}
-void ObjectImpl::encodeManagedObjectData(qpid::framing::Buffer& buffer) const
-{
- buffer.putLongLong(lastUpdatedTime);
- buffer.putLongLong(createTime);
- buffer.putLongLong(destroyTime);
- objectId->impl->encode(buffer);
-}
-void ObjectImpl::encodeProperties(qpid::framing::Buffer& buffer) const
+void ObjectImpl::destroy()
{
- int propCount = objectClass->getPropertyCount();
- uint8_t bit = 0;
- uint8_t mask = 0;
- ValuePtr value;
-
- for (int idx = 0; idx < propCount; idx++) {
- const SchemaProperty* prop = objectClass->getProperty(idx);
- if (prop->isOptional()) {
- value = properties[prop->getName()];
- if (bit == 0)
- bit = 1;
- if (!value->isNull())
- mask |= bit;
- if (bit == 0x80) {
- buffer.putOctet(mask);
- bit = 0;
- mask = 0;
- } else
- bit = bit << 1;
- }
- }
- if (bit != 0) {
- buffer.putOctet(mask);
- }
-
- for (int idx = 0; idx < propCount; idx++) {
- const SchemaProperty* prop = objectClass->getProperty(idx);
- value = properties[prop->getName()];
- if (!prop->isOptional() || !value->isNull()) {
- value->impl->encode(buffer);
- }
- }
+ destroyTime = uint64_t(Duration(now()));
}
-void ObjectImpl::encodeStatistics(qpid::framing::Buffer& buffer) const
-{
- int statCount = objectClass->getStatisticCount();
- for (int idx = 0; idx < statCount; idx++) {
- const SchemaStatistic* stat = objectClass->getStatistic(idx);
- ValuePtr value = statistics[stat->getName()];
- value->impl->encode(buffer);
- }
-}
//==================================================================
// Wrappers
//==================================================================
-Object::Object(const SchemaObjectClass* type) : impl(new ObjectImpl(type)) {}
-Object::Object(ObjectImpl* i) : impl(i) {}
+Object::Object() : impl(new ObjectImpl()) {}
+Object::Object(SchemaObjectClass* type) : impl(new ObjectImpl(type)) {}
Object::Object(const Object& from) : impl(new ObjectImpl(*(from.impl))) {}
Object::~Object() { delete impl; }
+const Variant::Map& Object::getValues() const { return impl->getValues(); }
+Variant::Map& Object::getValues() { return impl->getValues(); }
+const SchemaObjectClass* Object::getSchema() const { return impl->getSchema(); }
+void Object::setSchema(SchemaObjectClass* schema) { impl->setSchema(schema); }
+const char* Object::getKey() const { return impl->getKey(); }
+void Object::setKey(const char* key) { impl->setKey(key); }
+void Object::touch() { impl->touch(); }
void Object::destroy() { impl->destroy(); }
-const ObjectId* Object::getObjectId() const { return impl->getObjectId(); }
-void Object::setObjectId(ObjectId* oid) { impl->setObjectId(oid); }
-const SchemaObjectClass* Object::getClass() const { return impl->getClass(); }
-Value* Object::getValue(const char* key) const { return impl->getValue(key); }
-void Object::invokeMethod(const char* m, const Value* a, void* c) const { impl->invokeMethod(m, a, c); }
-bool Object::isDeleted() const { return impl->isDeleted(); }
-void Object::merge(const Object& from) { impl->merge(from); }
-
Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.h?rev=911854&r1=911853&r2=911854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.h (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/ObjectImpl.h Fri Feb 19 16:06:22 2010
@@ -21,53 +21,55 @@
*/
#include <qmf/engine/Object.h>
-#include <qmf/engine/ObjectIdImpl.h>
+#include <qpid/sys/Mutex.h>
+#include <qpid/messaging/Variant.h>
#include <map>
#include <set>
#include <string>
-#include <qpid/framing/Buffer.h>
#include <boost/shared_ptr.hpp>
-#include <qpid/sys/Mutex.h>
namespace qmf {
namespace engine {
- class BrokerProxyImpl;
+ class SchemaObjectClass;
typedef boost::shared_ptr<Object> ObjectPtr;
struct ObjectImpl {
- typedef boost::shared_ptr<Value> ValuePtr;
- const SchemaObjectClass* objectClass;
- BrokerProxyImpl* broker;
- boost::shared_ptr<ObjectId> objectId;
+ /**
+ * Content of the object's data
+ */
+ qpid::messaging::Variant::Map values;
+
+ /**
+ * Schema reference if this object is "described"
+ */
+ SchemaObjectClass* objectClass;
+
+ /**
+ * Address and lifecycle information if this object is "managed"
+ * The object is considered "managed" if the key is non-empty.
+ */
+ std::string key;
uint64_t createTime;
uint64_t destroyTime;
uint64_t lastUpdatedTime;
- mutable std::map<std::string, ValuePtr> properties;
- mutable std::map<std::string, ValuePtr> statistics;
- ObjectImpl(const SchemaObjectClass* type);
- ObjectImpl(const SchemaObjectClass* type, BrokerProxyImpl* b, qpid::framing::Buffer& buffer,
- bool prop, bool stat, bool managed);
- static Object* factory(const SchemaObjectClass* type, BrokerProxyImpl* b, qpid::framing::Buffer& buffer,
- bool prop, bool stat, bool managed);
- ~ObjectImpl();
+ ObjectImpl();
+ ObjectImpl(SchemaObjectClass* type);
+ ~ObjectImpl() {}
+
+ const qpid::messaging::Variant::Map& getValues() const { return values; }
+ qpid::messaging::Variant::Map& getValues() { return values; }
+
+ const SchemaObjectClass* getSchema() const { return objectClass; }
+ void setSchema(SchemaObjectClass* schema) { objectClass = schema; }
+
+ const char* getKey() const { return key.c_str(); }
+ void setKey(const char* _key) { key = _key; }
+ void touch();
void destroy();
- const ObjectId* getObjectId() const { return objectId.get(); }
- void setObjectId(ObjectId* oid) { objectId.reset(new ObjectId(*oid)); }
- const SchemaObjectClass* getClass() const { return objectClass; }
- Value* getValue(const std::string& key) const;
- void invokeMethod(const std::string& methodName, const Value* inArgs, void* context) const;
- bool isDeleted() const { return destroyTime != 0; }
- void merge(const Object& from);
-
- void parsePresenceMasks(qpid::framing::Buffer& buffer, std::set<std::string>& excludeList);
- void encodeSchemaKey(qpid::framing::Buffer& buffer) const;
- void encodeManagedObjectData(qpid::framing::Buffer& buffer) const;
- void encodeProperties(qpid::framing::Buffer& buffer) const;
- void encodeStatistics(qpid::framing::Buffer& buffer) const;
};
}
}
Modified: qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.cpp?rev=911854&r1=911853&r2=911854&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7/qpid/cpp/src/qmf/engine/QueryImpl.cpp Fri Feb 19 16:06:22 2010
@@ -18,52 +18,20 @@
*/
#include "qmf/engine/QueryImpl.h"
-#include "qmf/engine/ObjectIdImpl.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/framing/FieldTable.h"
using namespace std;
using namespace qmf::engine;
-using namespace qpid::framing;
+using namespace qpid::messaging;
-bool QueryElementImpl::evaluate(const Object* /*object*/) const
+bool QueryImpl::matches(const Object&) const
{
- // TODO: Implement this
- return false;
+ return true;
}
-bool QueryExpressionImpl::evaluate(const Object* /*object*/) const
-{
- // TODO: Implement this
- return false;
-}
-
-QueryImpl::QueryImpl(Buffer& buffer)
-{
- FieldTable ft;
- ft.decode(buffer);
- // TODO
-}
-Query* QueryImpl::factory(Buffer& buffer)
+void QueryImpl::parsePredicate(const std::string&)
{
- QueryImpl* impl(new QueryImpl(buffer));
- return new Query(impl);
-}
-
-void QueryImpl::encode(Buffer& buffer) const
-{
- FieldTable ft;
-
- if (oid.get() != 0) {
- ft.setString("_objectid", oid->impl->asString());
- } else {
- if (!packageName.empty())
- ft.setString("_package", packageName);
- ft.setString("_class", className);
- }
-
- ft.encode(buffer);
+ predicate.clear();
}
@@ -71,33 +39,21 @@
// Wrappers
//==================================================================
-QueryElement::QueryElement(const char* attrName, const Value* value, ValueOper oper) : impl(new QueryElementImpl(attrName, value, oper)) {}
-QueryElement::QueryElement(QueryElementImpl* i) : impl(i) {}
-QueryElement::~QueryElement() { delete impl; }
-bool QueryElement::evaluate(const Object* object) const { return impl->evaluate(object); }
-
-QueryExpression::QueryExpression(ExprOper oper, const QueryOperand* operand1, const QueryOperand* operand2) : impl(new QueryExpressionImpl(oper, operand1, operand2)) {}
-QueryExpression::QueryExpression(QueryExpressionImpl* i) : impl(i) {}
-QueryExpression::~QueryExpression() { delete impl; }
-bool QueryExpression::evaluate(const Object* object) const { return impl->evaluate(object); }
-
-Query::Query(const char* className, const char* packageName) : impl(new QueryImpl(className, packageName)) {}
-Query::Query(const SchemaClassKey* key) : impl(new QueryImpl(key)) {}
-Query::Query(const ObjectId* oid) : impl(new QueryImpl(oid)) {}
-Query::Query(QueryImpl* i) : impl(i) {}
+Query::Query(const char* target) : impl(new QueryImpl(target)) {}
+Query::Query(const char* target, const Variant::List& predicate) : impl(new QueryImpl(target, predicate)) {}
+Query::Query(const char* target, const char* expression) : impl(new QueryImpl(target, expression)) {}
Query::Query(const Query& from) : impl(new QueryImpl(*(from.impl))) {}
Query::~Query() { delete impl; }
-void Query::setSelect(const QueryOperand* criterion) { impl->setSelect(criterion); }
-void Query::setLimit(uint32_t maxResults) { impl->setLimit(maxResults); }
-void Query::setOrderBy(const char* attrName, bool decreasing) { impl->setOrderBy(attrName, decreasing); }
-const char* Query::getPackage() const { return impl->getPackage().c_str(); }
-const char* Query::getClass() const { return impl->getClass().c_str(); }
-const ObjectId* Query::getObjectId() const { return impl->getObjectId(); }
-bool Query::haveSelect() const { return impl->haveSelect(); }
+void Query::where(const Variant::List& predicate) { impl->where(predicate); }
+void Query::where(const char* expression) { impl->where(expression); }
+void Query::limit(uint32_t maxResults) { impl->limit(maxResults); }
+void Query::orderBy(const char* attrName, bool decreasing) { impl->orderBy(attrName, decreasing); }
+bool Query::havePredicate() const { return impl->havePredicate(); }
bool Query::haveLimit() const { return impl->haveLimit(); }
bool Query::haveOrderBy() const { return impl->haveOrderBy(); }
-const QueryOperand* Query::getSelect() const { return impl->getSelect(); }
+const Variant::List& Query::getPredicate() const { return impl->getPredicate(); }
uint32_t Query::getLimit() const { return impl->getLimit(); }
-const char* Query::getOrderBy() const { return impl->getOrderBy().c_str(); }
+const char* Query::getOrderBy() const { return impl->getOrderBy(); }
bool Query::getDecreasing() const { return impl->getDecreasing(); }
+bool Query::matches(const Object& object) const { return impl->matches(object); }
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org