You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2009/08/31 22:18:48 UTC
svn commit: r809728 - in /qpid/trunk/qpid/cpp: bindings/qmf/ruby/
bindings/qmf/tests/ src/ src/qmf/
Author: tross
Date: Mon Aug 31 20:18:48 2009
New Revision: 809728
URL: http://svn.apache.org/viewvc?rev=809728&view=rev
Log:
Added protocol module for codepoint definitions and header handling.
Fixed a deadlock case in ResilientConnection.
Added more code to the ConsoleEngine implementation.
Added:
qpid/trunk/qpid/cpp/src/qmf/Protocol.cpp
qpid/trunk/qpid/cpp/src/qmf/Protocol.h
qpid/trunk/qpid/cpp/src/qmf/SequenceManager.cpp
qpid/trunk/qpid/cpp/src/qmf/SequenceManager.h
Modified:
qpid/trunk/qpid/cpp/bindings/qmf/ruby/qmf.rb
qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console.rb
qpid/trunk/qpid/cpp/src/qmf.mk
qpid/trunk/qpid/cpp/src/qmf/AgentEngine.cpp
qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.cpp
qpid/trunk/qpid/cpp/src/qmf/ResilientConnection.cpp
Modified: qpid/trunk/qpid/cpp/bindings/qmf/ruby/qmf.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/bindings/qmf/ruby/qmf.rb?rev=809728&r1=809727&r2=809728&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/bindings/qmf/ruby/qmf.rb (original)
+++ qpid/trunk/qpid/cpp/bindings/qmf/ruby/qmf.rb Mon Aug 31 20:18:48 2009
@@ -536,7 +536,7 @@
class Console
attr_reader :impl
- def initialize(handler, kwargs={})
+ def initialize(handler = nil, kwargs={})
@handler = handler
@impl = Qmfengine::ConsoleEngine.new
@event = Qmfengine::ConsoleEvent.new
@@ -587,6 +587,7 @@
valid = @impl.getEvent(@event)
while valid
count += 1
+ puts "Console Event: #{@event.kind}"
case @event.kind
when Qmfengine::ConsoleEvent::AGENT_ADDED
when Qmfengine::ConsoleEvent::AGENT_DELETED
Modified: qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console.rb?rev=809728&r1=809727&r2=809728&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console.rb (original)
+++ qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console.rb Mon Aug 31 20:18:48 2009
@@ -29,7 +29,7 @@
@settings.set_attr("host", ARGV[0]) if ARGV.size > 0
@settings.set_attr("port", ARGV[1].to_i) if ARGV.size > 1
@connection = Qmf::Connection.new(@settings)
- @qmf = Qmf::Console.new(self)
+ @qmf = Qmf::Console.new
@qmf.add_connection(@connection)
Modified: qpid/trunk/qpid/cpp/src/qmf.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf.mk?rev=809728&r1=809727&r2=809728&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf.mk (original)
+++ qpid/trunk/qpid/cpp/src/qmf.mk Mon Aug 31 20:18:48 2009
@@ -49,11 +49,15 @@
qmf/ObjectIdImpl.h \
qmf/ObjectImpl.cpp \
qmf/ObjectImpl.h \
+ qmf/Protocol.cpp \
+ qmf/Protocol.h \
qmf/Query.h \
qmf/QueryImpl.cpp \
qmf/QueryImpl.h \
qmf/ResilientConnection.cpp \
qmf/ResilientConnection.h \
+ qmf/SequenceManager.cpp \
+ qmf/SequenceManager.h \
qmf/Schema.h \
qmf/SchemaImpl.cpp \
qmf/SchemaImpl.h \
Modified: qpid/trunk/qpid/cpp/src/qmf/AgentEngine.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/AgentEngine.cpp?rev=809728&r1=809727&r2=809728&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/AgentEngine.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/AgentEngine.cpp Mon Aug 31 20:18:48 2009
@@ -25,6 +25,7 @@
#include "qmf/ObjectIdImpl.h"
#include "qmf/QueryImpl.h"
#include "qmf/ValueImpl.h"
+#include "qmf/Protocol.h"
#include <qpid/framing/Buffer.h>
#include <qpid/framing/Uuid.h>
#include <qpid/framing/FieldTable.h>
@@ -172,8 +173,6 @@
map<string, ClassMaps> packages;
- bool checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq);
- void encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq = 0);
AgentEventImpl::Ptr eventDeclareQueue(const string& queueName);
AgentEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key);
AgentEventImpl::Ptr eventSetupComplete();
@@ -268,12 +267,16 @@
string replyToKey(message.replyKey ? message.replyKey : "");
string userId(message.userId ? message.userId : "");
- if (checkHeader(inBuffer, &opcode, &sequence)) {
- if (opcode == 'a') handleAttachResponse(inBuffer);
- else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey);
- else if (opcode == 'x') handleConsoleAddedIndication();
- else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey, userId);
- else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey, userId);
+ while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) {
+ if (opcode == Protocol::OP_ATTACH_RESPONSE) handleAttachResponse(inBuffer);
+ else if (opcode == Protocol::OP_SCHEMA_REQUEST) handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey);
+ else if (opcode == Protocol::OP_CONSOLE_ADDED_INDICATION) handleConsoleAddedIndication();
+ else if (opcode == Protocol::OP_GET_QUERY) handleGetQuery(inBuffer, sequence, replyToKey, userId);
+ else if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(inBuffer, sequence, replyToKey, userId);
+ else {
+ QPID_LOG(error, "AgentEngineImpl::handleRcvMessage invalid opcode=" << opcode);
+ break;
+ }
}
}
@@ -325,7 +328,7 @@
char rawbuffer[512];
Buffer buffer(rawbuffer, 512);
- encodeHeader(buffer, 'A');
+ Protocol::encodeHeader(buffer, Protocol::OP_ATTACH_REQUEST);
buffer.putShortString("qmfa");
systemId.encode(buffer);
buffer.putLong(requestedBrokerBank);
@@ -340,7 +343,7 @@
Mutex::ScopedLock _lock(lock);
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 'h');
+ Protocol::encodeHeader(buffer, Protocol::OP_HEARTBEAT_INDICATION);
buffer.putLongLong(uint64_t(Duration(now())));
stringstream key;
key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank;
@@ -349,7 +352,7 @@
}
void AgentEngineImpl::methodResponse(uint32_t sequence, uint32_t status, char* text,
- const Value& argMap)
+ const Value& argMap)
{
Mutex::ScopedLock _lock(lock);
map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
@@ -359,7 +362,7 @@
contextMap.erase(iter);
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 'm', context->sequence);
+ Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, context->sequence);
buffer.putLong(status);
buffer.putMediumString(text);
if (status == 0) {
@@ -390,7 +393,7 @@
AgentQueryContext::Ptr context = iter->second;
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 'g', context->sequence);
+ Protocol::encodeHeader(buffer, Protocol::OP_OBJECT_INDICATION, context->sequence);
object.impl->encodeSchemaKey(buffer);
object.impl->encodeManagedObjectData(buffer);
@@ -477,30 +480,6 @@
Mutex::ScopedLock _lock(lock);
}
-void AgentEngineImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq)
-{
- buf.putOctet('A');
- buf.putOctet('M');
- buf.putOctet('3');
- buf.putOctet(opcode);
- buf.putLong (seq);
-}
-
-bool AgentEngineImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq)
-{
- if (buf.getSize() < 8)
- return false;
-
- uint8_t h1 = buf.getOctet();
- uint8_t h2 = buf.getOctet();
- uint8_t h3 = buf.getOctet();
-
- *opcode = buf.getOctet();
- *seq = buf.getLong();
-
- return h1 == 'A' && h2 == 'M' && h3 == '3';
-}
-
AgentEventImpl::Ptr AgentEngineImpl::eventDeclareQueue(const string& name)
{
AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::DECLARE_QUEUE));
@@ -570,7 +549,7 @@
void AgentEngineImpl::sendPackageIndicationLH(const string& packageName)
{
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 'p');
+ Protocol::encodeHeader(buffer, Protocol::OP_PACKAGE_INDICATION);
buffer.putShortString(packageName);
sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
QPID_LOG(trace, "SENT PackageIndication: package_name=" << packageName);
@@ -579,7 +558,7 @@
void AgentEngineImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key)
{
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 'q');
+ Protocol::encodeHeader(buffer, Protocol::OP_CLASS_INDICATION);
buffer.putOctet((int) kind);
buffer.putShortString(packageName);
buffer.putShortString(key.name);
@@ -592,7 +571,7 @@
uint32_t sequence, uint32_t code, const string& text)
{
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 'z', sequence);
+ Protocol::encodeHeader(buffer, Protocol::OP_COMMAND_COMPLETE, sequence);
buffer.putLong(code);
buffer.putShortString(text);
sendBufferLH(buffer, exchange, replyToKey);
@@ -602,7 +581,7 @@
void AgentEngineImpl::sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text)
{
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 'm', sequence);
+ Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, sequence);
buffer.putLong(code);
string fulltext;
@@ -710,7 +689,7 @@
if (ocIter != cMap.objectClasses.end()) {
SchemaObjectClassImpl* oImpl = ocIter->second;
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 's', sequence);
+ Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence);
oImpl->encode(buffer);
sendBufferLH(buffer, rExchange, rKey);
QPID_LOG(trace, "SENT SchemaResponse: (object) package=" << packageName << " class=" << key.name);
@@ -721,7 +700,7 @@
if (ecIter != cMap.eventClasses.end()) {
SchemaEventClassImpl* eImpl = ecIter->second;
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 's', sequence);
+ Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence);
eImpl->encode(buffer);
sendBufferLH(buffer, rExchange, rKey);
QPID_LOG(trace, "SENT SchemaResponse: (event) package=" << packageName << " class=" << key.name);
@@ -851,108 +830,25 @@
// Wrappers
//==================================================================
-AgentEngine::AgentEngine(char* label, bool internalStore)
-{
- impl = new AgentEngineImpl(label, internalStore);
-}
-
-AgentEngine::~AgentEngine()
-{
- delete impl;
-}
-
-void AgentEngine::setStoreDir(const char* path)
-{
- impl->setStoreDir(path);
-}
-
-void AgentEngine::setTransferDir(const char* path)
-{
- impl->setTransferDir(path);
-}
-
-void AgentEngine::handleRcvMessage(Message& message)
-{
- impl->handleRcvMessage(message);
-}
-
-bool AgentEngine::getXmtMessage(Message& item) const
-{
- return impl->getXmtMessage(item);
-}
-
-void AgentEngine::popXmt()
-{
- impl->popXmt();
-}
-
-bool AgentEngine::getEvent(AgentEvent& event) const
-{
- return impl->getEvent(event);
-}
-
-void AgentEngine::popEvent()
-{
- impl->popEvent();
-}
-
-void AgentEngine::newSession()
-{
- impl->newSession();
-}
-
-void AgentEngine::startProtocol()
-{
- impl->startProtocol();
-}
-
-void AgentEngine::heartbeat()
-{
- impl->heartbeat();
-}
-
-void AgentEngine::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments)
-{
- impl->methodResponse(sequence, status, text, arguments);
-}
-
-void AgentEngine::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat)
-{
- impl->queryResponse(sequence, object, prop, stat);
-}
-
-void AgentEngine::queryComplete(uint32_t sequence)
-{
- impl->queryComplete(sequence);
-}
-
-void AgentEngine::registerClass(SchemaObjectClass* cls)
-{
- impl->registerClass(cls);
-}
-
-void AgentEngine::registerClass(SchemaEventClass* cls)
-{
- impl->registerClass(cls);
-}
-
-const ObjectId* AgentEngine::addObject(Object& obj, uint64_t persistId)
-{
- return impl->addObject(obj, persistId);
-}
-
-const ObjectId* AgentEngine::allocObjectId(uint64_t persistId)
-{
- return impl->allocObjectId(persistId);
-}
-
-const ObjectId* AgentEngine::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi)
-{
- return impl->allocObjectId(persistIdLo, persistIdHi);
-}
-
-void AgentEngine::raiseEvent(Event& event)
-{
- impl->raiseEvent(event);
-}
+AgentEngine::AgentEngine(char* label, bool internalStore) { impl = new AgentEngineImpl(label, internalStore); }
+AgentEngine::~AgentEngine() { delete impl; }
+void AgentEngine::setStoreDir(const char* path) { impl->setStoreDir(path); }
+void AgentEngine::setTransferDir(const char* path) { impl->setTransferDir(path); }
+void AgentEngine::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); }
+bool AgentEngine::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); }
+void AgentEngine::popXmt() { impl->popXmt(); }
+bool AgentEngine::getEvent(AgentEvent& event) const { return impl->getEvent(event); }
+void AgentEngine::popEvent() { impl->popEvent(); }
+void AgentEngine::newSession() { impl->newSession(); }
+void AgentEngine::startProtocol() { impl->startProtocol(); }
+void AgentEngine::heartbeat() { impl->heartbeat(); }
+void AgentEngine::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments) { impl->methodResponse(sequence, status, text, arguments); }
+void AgentEngine::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) { impl->queryResponse(sequence, object, prop, stat); }
+void AgentEngine::queryComplete(uint32_t sequence) { impl->queryComplete(sequence); }
+void AgentEngine::registerClass(SchemaObjectClass* cls) { impl->registerClass(cls); }
+void AgentEngine::registerClass(SchemaEventClass* cls) { impl->registerClass(cls); }
+const ObjectId* AgentEngine::addObject(Object& obj, uint64_t persistId) { return impl->addObject(obj, persistId); }
+const ObjectId* AgentEngine::allocObjectId(uint64_t persistId) { return impl->allocObjectId(persistId); }
+const ObjectId* AgentEngine::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) { return impl->allocObjectId(persistIdLo, persistIdHi); }
+void AgentEngine::raiseEvent(Event& event) { impl->raiseEvent(event); }
Modified: qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.cpp?rev=809728&r1=809727&r2=809728&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.cpp Mon Aug 31 20:18:48 2009
@@ -25,6 +25,8 @@
#include "qmf/ObjectIdImpl.h"
#include "qmf/QueryImpl.h"
#include "qmf/ValueImpl.h"
+#include "qmf/Protocol.h"
+#include "qmf/SequenceManager.h"
#include <qpid/framing/Buffer.h>
#include <qpid/framing/Uuid.h>
#include <qpid/framing/FieldTable.h>
@@ -36,6 +38,7 @@
#include <string>
#include <deque>
#include <map>
+#include <vector>
#include <iostream>
#include <fstream>
#include <boost/shared_ptr.hpp>
@@ -92,18 +95,9 @@
BrokerEvent copy();
};
- struct BrokerProxyImpl {
+ class BrokerProxyImpl : public SequenceContext {
+ public:
typedef boost::shared_ptr<BrokerProxyImpl> Ptr;
- mutable Mutex lock;
- BrokerProxy* envelope;
- ConsoleEngineImpl* console;
- string queueName;
- deque<MessageImpl::Ptr> xmtQueue;
- deque<BrokerEventImpl::Ptr> eventQueue;
-
- static const char* QMF_EXCHANGE;
- static const char* DIR_EXCHANGE;
- static const char* BROKER_KEY;
BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console);
~BrokerProxyImpl() {}
@@ -112,6 +106,7 @@
void sessionClosed();
void startProtocol();
+ void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey);
void handleRcvMessage(Message& message);
bool getXmtMessage(Message& item) const;
void popXmt();
@@ -119,9 +114,41 @@
bool getEvent(BrokerEvent& event) const;
void popEvent();
+ // From SequenceContext
+ void complete();
+
+ void addBinding(const string& exchange, const string& key);
+
+ private:
+ mutable Mutex lock;
+ BrokerProxy* envelope;
+ ConsoleEngineImpl* console;
+ string queueName;
+ Uuid brokerId;
+ SequenceManager seqMgr;
+ uint32_t requestsOutstanding;
+ bool topicBound;
+ deque<MessageImpl::Ptr> xmtQueue;
+ deque<BrokerEventImpl::Ptr> eventQueue;
+
+# define MA_BUFFER_SIZE 65536
+ char outputBuffer[MA_BUFFER_SIZE];
+
BrokerEventImpl::Ptr eventDeclareQueue(const string& queueName);
BrokerEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key);
BrokerEventImpl::Ptr eventSetupComplete();
+
+ void handleBrokerResponse(Buffer& inBuffer, uint32_t seq);
+ void handlePackageIndication(Buffer& inBuffer, uint32_t seq);
+ void handleCommandComplete(Buffer& inBuffer, uint32_t seq);
+ void handleClassIndication(Buffer& inBuffer, uint32_t seq);
+ void handleMethodResponse(Buffer& inBuffer, uint32_t seq);
+ void handleHeartbeatIndication(Buffer& inBuffer, uint32_t seq);
+ void handleEventIndication(Buffer& inBuffer, uint32_t seq);
+ void handleSchemaResponse(Buffer& inBuffer, uint32_t seq);
+ void handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat);
+ void incOutstanding();
+ void decOutstanding();
};
struct AgentProxyImpl {
@@ -139,10 +166,6 @@
ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& settings = ConsoleSettings());
~ConsoleEngineImpl();
- void handleRcvMessage(BrokerProxy& broker, Message& message);
- bool getXmtMessage(Message& item) const;
- void popXmt();
-
bool getEvent(ConsoleEvent& event) const;
void popEvent();
@@ -175,17 +198,21 @@
*/
private:
+ friend class BrokerProxyImpl;
ConsoleEngine* envelope;
const ConsoleSettings& settings;
mutable Mutex lock;
deque<ConsoleEventImpl::Ptr> eventQueue;
+ vector<BrokerProxyImpl::Ptr> brokerList;
+ vector<pair<string, string> > bindingList; // exchange/key (empty exchange => QMF_EXCHANGE)
};
}
-const char* BrokerProxyImpl::QMF_EXCHANGE = "qpid.management";
-const char* BrokerProxyImpl::DIR_EXCHANGE = "amq.direct";
-const char* BrokerProxyImpl::BROKER_KEY = "broker";
-
+namespace {
+const char* QMF_EXCHANGE = "qpid.management";
+const char* DIR_EXCHANGE = "amq.direct";
+const char* BROKER_KEY = "broker";
+}
#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
@@ -250,12 +277,55 @@
void BrokerProxyImpl::startProtocol()
{
- cout << "BrokerProxyImpl::startProtocol" << endl;
-}
+ Mutex::ScopedLock _lock(lock);
+ char rawbuffer[512];
+ Buffer buffer(rawbuffer, 512);
-void BrokerProxyImpl::handleRcvMessage(Message& /*message*/)
-{
- // TODO: Dispatch the messages types
+ requestsOutstanding = 1;
+ topicBound = false;
+ Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST);
+ sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT BrokerRequest");
+}
+
+void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
+{
+ uint32_t length = buf.getPosition();
+ MessageImpl::Ptr message(new MessageImpl);
+
+ buf.reset();
+ buf.getRawData(message->body, length);
+ message->destination = destination;
+ message->routingKey = routingKey;
+ message->replyExchange = DIR_EXCHANGE;
+ message->replyKey = queueName;
+
+ xmtQueue.push_back(message);
+}
+
+void BrokerProxyImpl::handleRcvMessage(Message& message)
+{
+ Buffer inBuffer(message.body, message.length);
+ uint8_t opcode;
+ uint32_t sequence;
+
+ while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) {
+ if (opcode == Protocol::OP_BROKER_RESPONSE) handleBrokerResponse(inBuffer, sequence);
+ else if (opcode == Protocol::OP_PACKAGE_INDICATION) handlePackageIndication(inBuffer, sequence);
+ else if (opcode == Protocol::OP_COMMAND_COMPLETE) handleCommandComplete(inBuffer, sequence);
+ else if (opcode == Protocol::OP_CLASS_INDICATION) handleClassIndication(inBuffer, sequence);
+ else if (opcode == Protocol::OP_METHOD_RESPONSE) handleMethodResponse(inBuffer, sequence);
+ else if (opcode == Protocol::OP_HEARTBEAT_INDICATION) handleHeartbeatIndication(inBuffer, sequence);
+ else if (opcode == Protocol::OP_EVENT_INDICATION) handleEventIndication(inBuffer, sequence);
+ else if (opcode == Protocol::OP_SCHEMA_RESPONSE) handleSchemaResponse(inBuffer, sequence);
+ else if (opcode == Protocol::OP_PROPERTY_INDICATION) handleObjectIndication(inBuffer, sequence, true, false);
+ else if (opcode == Protocol::OP_STATISTIC_INDICATION) handleObjectIndication(inBuffer, sequence, false, true);
+ else if (opcode == Protocol::OP_OBJECT_INDICATION) handleObjectIndication(inBuffer, sequence, true, true);
+ else {
+ QPID_LOG(trace, "BrokerProxyImpl::handleRcvMessage invalid opcode: " << opcode);
+ break;
+ }
+ }
}
bool BrokerProxyImpl::getXmtMessage(Message& item) const
@@ -290,6 +360,16 @@
eventQueue.pop_front();
}
+void BrokerProxyImpl::complete()
+{
+ decOutstanding();
+}
+
+void BrokerProxyImpl::addBinding(const string& exchange, const string& key)
+{
+ eventQueue.push_back(eventBind(exchange, queueName, key));
+}
+
BrokerEventImpl::Ptr BrokerProxyImpl::eventDeclareQueue(const string& queueName)
{
BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::DECLARE_QUEUE));
@@ -313,6 +393,89 @@
return event;
}
+void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq)
+{
+ // Note that this function doesn't touch requestsOutstanding. This is because
+ // it accounts for one request completed (the BrokerRequest) and one request
+ // started (the PackageRequest) which cancel each other out.
+
+ brokerId.decode(inBuffer);
+ QPID_LOG(trace, "RCVD BrokerResponse seq=" << seq << " brokerId=" << brokerId);
+ Mutex::ScopedLock _lock(lock);
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve(this));
+ Protocol::encodeHeader(outBuffer, Protocol::OP_PACKAGE_REQUEST, sequence);
+ sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT PackageRequest seq=" << sequence);
+}
+
+void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq)
+{
+ string package;
+
+ inBuffer.getShortString(package);
+ QPID_LOG(trace, "RCVD PackageIndication seq=" << seq << " package=" << package);
+}
+
+void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq)
+{
+ string text;
+ uint32_t code = inBuffer.getLong();
+ inBuffer.getShortString(text);
+ QPID_LOG(trace, "RCVD CommandComplete seq=" << seq << " code=" << code << " text=" << text);
+ seqMgr.release(seq);
+}
+
+void BrokerProxyImpl::handleClassIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
+{
+ // TODO
+}
+
+void BrokerProxyImpl::handleMethodResponse(Buffer& /*inBuffer*/, uint32_t /*seq*/)
+{
+ // TODO
+}
+
+void BrokerProxyImpl::handleHeartbeatIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
+{
+ // TODO
+}
+
+void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
+{
+ // TODO
+}
+
+void BrokerProxyImpl::handleSchemaResponse(Buffer& /*inBuffer*/, uint32_t /*seq*/)
+{
+ // TODO
+}
+
+void BrokerProxyImpl::handleObjectIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/, bool /*prop*/, bool /*stat*/)
+{
+ // TODO
+}
+
+void BrokerProxyImpl::incOutstanding()
+{
+ Mutex::ScopedLock _lock(lock);
+ requestsOutstanding++;
+}
+
+void BrokerProxyImpl::decOutstanding()
+{
+ Mutex::ScopedLock _lock(lock);
+ requestsOutstanding--;
+ if (requestsOutstanding == 0 && !topicBound) {
+ for (vector<pair<string, string> >::const_iterator iter = console->bindingList.begin();
+ iter != console->bindingList.end(); iter++) {
+ string exchange(iter->first.empty() ? QMF_EXCHANGE : iter->first);
+ string key(iter->second);
+ eventQueue.push_back(eventBind(exchange, queueName, key));
+ }
+ }
+}
+
MethodResponseImpl::MethodResponseImpl(Buffer& buf) : envelope(new MethodResponse(this))
{
string text;
@@ -329,6 +492,19 @@
ConsoleEngineImpl::ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& s) :
envelope(e), settings(s)
{
+ bindingList.push_back(pair<string, string>(string(), "schema.#"));
+ if (settings.rcvObjects && settings.rcvEvents && settings.rcvHeartbeats && !settings.userBindings) {
+ bindingList.push_back(pair<string, string>(string(), "console.#"));
+ } else {
+ if (settings.rcvObjects && !settings.userBindings)
+ bindingList.push_back(pair<string, string>(string(), "console.obj.#"));
+ else
+ bindingList.push_back(pair<string, string>(string(), "console.obj.*.*.org.apache.qpid.broker.agent"));
+ if (settings.rcvEvents)
+ bindingList.push_back(pair<string, string>(string(), "console.event.#"));
+ if (settings.rcvHeartbeats)
+ bindingList.push_back(pair<string, string>(string(), "console.heartbeat.#"));
+ }
}
ConsoleEngineImpl::~ConsoleEngineImpl()
@@ -354,72 +530,105 @@
void ConsoleEngineImpl::addConnection(BrokerProxy& /*broker*/, void* /*context*/)
{
+ // TODO
}
void ConsoleEngineImpl::delConnection(BrokerProxy& /*broker*/)
{
+ // TODO
}
uint32_t ConsoleEngineImpl::packageCount() const
{
+ // TODO
return 0;
}
const string& ConsoleEngineImpl::getPackageName(uint32_t /*idx*/) const
{
+ // TODO
static string temp;
return temp;
}
uint32_t ConsoleEngineImpl::classCount(const char* /*packageName*/) const
{
+ // TODO
return 0;
}
const SchemaClassKey* ConsoleEngineImpl::getClass(const char* /*packageName*/, uint32_t /*idx*/) const
{
+ // TODO
return 0;
}
ClassKind ConsoleEngineImpl::getClassKind(const SchemaClassKey& /*key*/) const
{
+ // TODO
return CLASS_OBJECT;
}
const SchemaObjectClass* ConsoleEngineImpl::getObjectClass(const SchemaClassKey& /*key*/) const
{
+ // TODO
return 0;
}
const SchemaEventClass* ConsoleEngineImpl::getEventClass(const SchemaClassKey& /*key*/) const
{
+ // TODO
return 0;
}
-void ConsoleEngineImpl::bindPackage(const char* /*packageName*/)
+void ConsoleEngineImpl::bindPackage(const char* packageName)
{
+ stringstream key;
+ key << "console.obj.*.*." << packageName << ".#";
+ Mutex::ScopedLock _lock(lock);
+ bindingList.push_back(pair<string, string>(string(), key.str()));
+ for (vector<BrokerProxyImpl::Ptr>::iterator iter = brokerList.begin();
+ iter != brokerList.end(); iter++)
+ (*iter)->addBinding(QMF_EXCHANGE, key.str());
}
-void ConsoleEngineImpl::bindClass(const SchemaClassKey& /*key*/)
+void ConsoleEngineImpl::bindClass(const SchemaClassKey& classKey)
{
+ stringstream key;
+ key << "console.obj.*.*." << classKey.getPackageName() << "." << classKey.getClassName() << ".#";
+ Mutex::ScopedLock _lock(lock);
+ bindingList.push_back(pair<string, string>(string(), key.str()));
+ for (vector<BrokerProxyImpl::Ptr>::iterator iter = brokerList.begin();
+ iter != brokerList.end(); iter++)
+ (*iter)->addBinding(QMF_EXCHANGE, key.str());
}
-void ConsoleEngineImpl::bindClass(const char* /*packageName*/, const char* /*className*/)
+void ConsoleEngineImpl::bindClass(const char* packageName, const char* className)
{
+ stringstream key;
+ key << "console.obj.*.*." << packageName << "." << className << ".#";
+ Mutex::ScopedLock _lock(lock);
+ bindingList.push_back(pair<string, string>(string(), key.str()));
+ for (vector<BrokerProxyImpl::Ptr>::iterator iter = brokerList.begin();
+ iter != brokerList.end(); iter++)
+ (*iter)->addBinding(QMF_EXCHANGE, key.str());
}
uint32_t ConsoleEngineImpl::agentCount() const
{
+ // TODO
return 0;
}
const AgentProxy* ConsoleEngineImpl::getAgent(uint32_t /*idx*/) const
{
+ // TODO
return 0;
}
void ConsoleEngineImpl::sendQuery(const Query& /*query*/, void* /*context*/)
{
+ // TODO
}
/*
@@ -437,7 +646,6 @@
*/
-
//==================================================================
// Wrappers
//==================================================================
Added: qpid/trunk/qpid/cpp/src/qmf/Protocol.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/Protocol.cpp?rev=809728&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/Protocol.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qmf/Protocol.cpp Mon Aug 31 20:18:48 2009
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/Protocol.h"
+#include "qpid/framing/Buffer.h"
+
+using namespace std;
+using namespace qmf;
+using namespace qpid::framing;
+
+
+bool Protocol::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq)
+{
+ if (buf.available() < 8)
+ return false;
+
+ uint8_t h1 = buf.getOctet();
+ uint8_t h2 = buf.getOctet();
+ uint8_t h3 = buf.getOctet();
+
+ *opcode = buf.getOctet();
+ *seq = buf.getLong();
+
+ return h1 == 'A' && h2 == 'M' && h3 == '3';
+}
+
+void Protocol::encodeHeader(qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq)
+{
+ buf.putOctet('A');
+ buf.putOctet('M');
+ buf.putOctet('3');
+ buf.putOctet(opcode);
+ buf.putLong (seq);
+}
+
+
Added: qpid/trunk/qpid/cpp/src/qmf/Protocol.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/Protocol.h?rev=809728&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/Protocol.h (added)
+++ qpid/trunk/qpid/cpp/src/qmf/Protocol.h Mon Aug 31 20:18:48 2009
@@ -0,0 +1,67 @@
+#ifndef _QmfProtocol_
+#define _QmfProtocol_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/sys/IntegerTypes.h>
+
+namespace qpid {
+ namespace framing {
+ class Buffer;
+ }
+}
+
+namespace qmf {
+
+ class Protocol {
+ public:
+ static bool checkHeader(qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
+ static void encodeHeader(qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
+
+ const static uint8_t OP_ATTACH_REQUEST = 'A';
+ const static uint8_t OP_ATTACH_RESPONSE = 'a';
+
+ const static uint8_t OP_BROKER_REQUEST = 'B';
+ const static uint8_t OP_BROKER_RESPONSE = 'b';
+
+ const static uint8_t OP_CONSOLE_ADDED_INDICATION = 'x';
+ const static uint8_t OP_COMMAND_COMPLETE = 'z';
+ const static uint8_t OP_HEARTBEAT_INDICATION = 'h';
+
+ const static uint8_t OP_PACKAGE_REQUEST = 'P';
+ const static uint8_t OP_PACKAGE_INDICATION = 'p';
+ const static uint8_t OP_CLASS_QUERY = 'Q';
+ const static uint8_t OP_CLASS_INDICATION = 'q';
+ const static uint8_t OP_SCHEMA_REQUEST = 'S';
+ const static uint8_t OP_SCHEMA_RESPONSE = 's';
+
+ const static uint8_t OP_METHOD_REQUEST = 'M';
+ const static uint8_t OP_METHOD_RESPONSE = 'm';
+ const static uint8_t OP_GET_QUERY = 'G';
+ const static uint8_t OP_OBJECT_INDICATION = 'g';
+ const static uint8_t OP_PROPERTY_INDICATION = 'c';
+ const static uint8_t OP_STATISTIC_INDICATION = 'i';
+ const static uint8_t OP_EVENT_INDICATION = 'e';
+ };
+
+}
+
+#endif
+
Modified: qpid/trunk/qpid/cpp/src/qmf/ResilientConnection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/ResilientConnection.cpp?rev=809728&r1=809727&r2=809728&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/ResilientConnection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/ResilientConnection.cpp Mon Aug 31 20:18:48 2009
@@ -318,6 +318,7 @@
while (true) {
try {
+ QPID_LOG(trace, "Trying to open connection...");
connection.open(settings.impl->getClientSettings());
{
Mutex::ScopedLock _lock(lock);
@@ -326,6 +327,7 @@
while (connected)
cond.wait(lock);
+ delay = delayMin;
while (!sessions.empty()) {
set<RCSession::Ptr>::iterator iter = sessions.begin();
@@ -334,6 +336,11 @@
EnqueueEvent(ResilientConnectionEvent::SESSION_CLOSED, sess->userContext);
Mutex::ScopedUnlock _u(lock);
sess->stop();
+
+ // Nullify the intrusive pointer within the scoped unlock, otherwise,
+ // the reference is held until overwritted above (under lock) which causes
+ // the session destructor to be called with the lock held.
+ sess = 0;
}
EnqueueEvent(ResilientConnectionEvent::DISCONNECTED);
@@ -341,7 +348,6 @@
if (shutdown)
return;
}
- delay = delayMin;
connection.close();
} catch (exception &e) {
QPID_LOG(debug, "connection.open exception: " << e.what());
Added: qpid/trunk/qpid/cpp/src/qmf/SequenceManager.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/SequenceManager.cpp?rev=809728&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/SequenceManager.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qmf/SequenceManager.cpp Mon Aug 31 20:18:48 2009
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/SequenceManager.h"
+
+using namespace std;
+using namespace qmf;
+using namespace qpid::sys;
+
+SequenceManager::SequenceManager() : nextSequence(1) {}
+
+uint32_t SequenceManager::reserve(SequenceContext* ctx)
+{
+ Mutex::ScopedLock _lock(lock);
+ uint32_t seq = nextSequence;
+ while (contextMap.find(seq) != contextMap.end())
+ seq = seq < 0xFFFFFFFF ? seq + 1 : 1;
+ nextSequence = seq < 0xFFFFFFFF ? seq + 1 : 1;
+ contextMap[seq] = ctx;
+ return seq;
+}
+
+void SequenceManager::release(uint32_t sequence)
+{
+ Mutex::ScopedLock _lock(lock);
+ map<uint32_t, SequenceContext*>::iterator iter = contextMap.find(sequence);
+ if (iter != contextMap.end()) {
+ if (iter->second != 0)
+ iter->second->complete();
+ contextMap.erase(iter);
+ }
+}
+
+
Added: qpid/trunk/qpid/cpp/src/qmf/SequenceManager.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/SequenceManager.h?rev=809728&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/SequenceManager.h (added)
+++ qpid/trunk/qpid/cpp/src/qmf/SequenceManager.h Mon Aug 31 20:18:48 2009
@@ -0,0 +1,52 @@
+#ifndef _QmfSequenceManager_
+#define _QmfSequenceManager_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qpid/sys/Mutex.h"
+#include <map>
+
+namespace qmf {
+
+ class SequenceContext {
+ public:
+ SequenceContext() {}
+ virtual ~SequenceContext() {}
+
+ virtual void complete() = 0;
+ };
+
+ class SequenceManager {
+ public:
+ SequenceManager();
+
+ uint32_t reserve(SequenceContext* ctx);
+ void release(uint32_t sequence);
+
+ private:
+ mutable qpid::sys::Mutex lock;
+ uint32_t nextSequence;
+ std::map<uint32_t, SequenceContext*> contextMap;
+ };
+
+}
+
+#endif
+
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org