You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2009/09/17 21:27:53 UTC
svn commit: r816345 [1/2] - in /qpid/trunk/qpid/cpp: bindings/qmf/ruby/
bindings/qmf/tests/ src/ src/qmf/
Author: tross
Date: Thu Sep 17 19:27:52 2009
New Revision: 816345
URL: http://svn.apache.org/viewvc?rev=816345&view=rev
Log:
QMF Console
- Added implementation for method invocation
- Added metaprogramming hooks in Ruby for attribute and method access
- Refactored file structure
Added:
qpid/trunk/qpid/cpp/src/qmf/BrokerProxyImpl.cpp
qpid/trunk/qpid/cpp/src/qmf/BrokerProxyImpl.h
qpid/trunk/qpid/cpp/src/qmf/ConsoleEngineImpl.cpp
- copied, changed from r815416, qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.cpp
qpid/trunk/qpid/cpp/src/qmf/ConsoleEngineImpl.h
Removed:
qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.cpp
Modified:
qpid/trunk/qpid/cpp/bindings/qmf/ruby/qmf.rb
qpid/trunk/qpid/cpp/bindings/qmf/tests/agent_ruby.rb
qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console.rb
qpid/trunk/qpid/cpp/src/qmf.mk
qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.h
qpid/trunk/qpid/cpp/src/qmf/Object.h
qpid/trunk/qpid/cpp/src/qmf/ObjectImpl.cpp
qpid/trunk/qpid/cpp/src/qmf/ObjectImpl.h
qpid/trunk/qpid/cpp/src/qmf/Value.h
qpid/trunk/qpid/cpp/src/qmf/ValueImpl.cpp
qpid/trunk/qpid/cpp/src/qmf/ValueImpl.h
Modified: qpid/trunk/qpid/cpp/bindings/qmf/ruby/qmf.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/bindings/qmf/ruby/qmf.rb?rev=816345&r1=816344&r2=816345&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/bindings/qmf/ruby/qmf.rb (original)
+++ qpid/trunk/qpid/cpp/bindings/qmf/ruby/qmf.rb Thu Sep 17 19:27:52 2009
@@ -189,8 +189,16 @@
##==============================================================================
class QmfObject
+ include MonitorMixin
attr_reader :impl, :object_class
def initialize(cls, kwargs={})
+ super()
+ @cv = new_cond
+ @sync_count = 0
+ @sync_result = nil
+ @allow_sets = :false
+ @broker = kwargs[:broker] if kwargs.include?(:broker)
+
if cls:
@object_class = cls
@impl = Qmfengine::Object.new(@object_class.impl)
@@ -264,6 +272,103 @@
set_attr(name, get_attr(name) - by)
end
+ def method_missing(name_in, *args)
+ #
+ # Convert the name to a string and determine if it represents an
+ # attribute assignment (i.e. "attr=")
+ #
+ name = name_in.to_s
+ attr_set = (name[name.length - 1] == 61)
+ name = name[0..name.length - 2] if attr_set
+ raise "Sets not permitted on this object" if attr_set && !@allow_sets
+
+ #
+ # If the name matches a property name, set or return the value of the property.
+ #
+ @object_class.properties.each do |prop|
+ if prop.name == name
+ if attr_set
+ return set_attr(name, args[0])
+ else
+ return get_attr(name)
+ end
+ end
+ end
+
+ #
+ # Do the same for statistics
+ #
+ @object_class.statistics.each do |stat|
+ if stat.name == name
+ if attr_set
+ return set_attr(name, args[0])
+ else
+ return get_attr(name)
+ end
+ end
+ end
+
+ #
+ # If we still haven't found a match for the name, check to see if
+ # it matches a method name. If so, marshall the arguments and invoke
+ # the method.
+ #
+ @object_class.methods.each do |method|
+ if method.name == name
+ raise "Sets not permitted on methods" if attr_set
+ timeout = 30
+ synchronize do
+ @sync_count = 1
+ @impl.invokeMethod(name, _marshall(method, args), self)
+ @broker.conn.kick if @broker
+ unless @cv.wait(timeout) { @sync_count == 0 }
+ raise "Timed out waiting for response"
+ end
+ end
+
+ return @sync_result
+ end
+ end
+
+ #
+ # This name means nothing to us, pass it up the line to the parent
+ # class's handler.
+ #
+ super.method_missing(name_in, args)
+ end
+
+ def _method_result(result)
+ synchronize do
+ @sync_result = result
+ @sync_count -= 1
+ @cv.signal
+ end
+ end
+
+ #
+ # Convert a Ruby array of arguments (positional) into a Value object of type "map".
+ #
+ private
+ def _marshall(schema, args)
+ map = Qmfengine::Value.new(TYPE_MAP)
+ schema.arguments.each do |arg|
+ if arg.direction == DIR_IN || arg.direction == DIR_IN_OUT
+ map.insert(arg.name, Qmfengine::Value.new(arg.typecode))
+ end
+ end
+
+ marshalled = Arguments.new(map)
+ idx = 0
+ schema.arguments.each do |arg|
+ if arg.direction == DIR_IN || arg.direction == DIR_IN_OUT
+ marshalled[arg.name] = args[idx] unless args[idx] == nil
+ idx += 1
+ end
+ end
+
+ return marshalled.map
+ end
+
private
def value(name)
val = @impl.getValue(name.to_s)
@@ -277,6 +382,7 @@
class AgentObject < QmfObject
def initialize(cls, kwargs={})
super(cls, kwargs)
+ @allow_sets = :true
end
def destroy
@@ -307,9 +413,6 @@
def index()
end
-
- def method_missing(name, *args)
- end
end
class ObjectId
@@ -407,6 +510,22 @@
end
end
+ class MethodResponse
+ def initialize(impl)
+ puts "start copying..."
+ @impl = Qmfengine::MethodResponse.new(impl)
+ puts "done copying..."
+ end
+
+ def status
+ @impl.getStatus
+ end
+
+ def exception
+ @impl.getException
+ end
+ end
+
##==============================================================================
## QUERY
##==============================================================================
@@ -470,6 +589,14 @@
def name
@impl.getName
end
+
+ def direction
+ @impl.getDirection
+ end
+
+ def typecode
+ @impl.getType
+ end
end
class SchemaMethod
@@ -802,7 +929,7 @@
class Broker < ConnectionHandler
include MonitorMixin
- attr_reader :impl
+ attr_reader :impl, :conn
def initialize(console, conn)
super()
@@ -871,9 +998,12 @@
when Qmfengine::BrokerEvent::QUERY_COMPLETE
result = []
for idx in 0...@event.queryResponse.getObjectCount
- result << ConsoleObject.new(nil, :impl => @event.queryResponse.getObject(idx))
+ result << ConsoleObject.new(nil, :impl => @event.queryResponse.getObject(idx), :broker => self)
end
@console._get_result(result, @event.context)
+ when Qmfengine::BrokerEvent::METHOD_RESPONSE
+ obj = @event.context
+ obj._method_result(MethodResponse.new(@event.methodResponse))
end
@impl.popEvent
valid = @impl.getEvent(@event)
Modified: qpid/trunk/qpid/cpp/bindings/qmf/tests/agent_ruby.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/bindings/qmf/tests/agent_ruby.rb?rev=816345&r1=816344&r2=816345&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/bindings/qmf/tests/agent_ruby.rb (original)
+++ qpid/trunk/qpid/cpp/bindings/qmf/tests/agent_ruby.rb Thu Sep 17 19:27:52 2009
@@ -92,37 +92,37 @@
retText = "OK"
if args['test'] == "big"
- @parent.set_attr("uint64val", 0x9494949449494949)
- @parent.set_attr("uint32val", 0xa5a55a5a)
- @parent.set_attr("uint16val", 0xb66b)
- @parent.set_attr("uint8val", 0xc7)
-
- @parent.set_attr("int64val", 1000000000000000000)
- @parent.set_attr("int32val", 1000000000)
- @parent.set_attr("int16val", 10000)
- @parent.set_attr("int8val", 100)
+ @parent.uint64val = 0x9494949449494949
+ @parent.uint32val = 0xa5a55a5a
+ @parent.uint16val = 0xb66b
+ @parent.uint8val = 0xc7
+
+ @parent.int64val = 1000000000000000000
+ @parent.int32val = 1000000000
+ @parent.int16val = 10000
+ @parent.int8val = 100
elsif args['test'] == "small"
- @parent.set_attr("uint64val", 4)
- @parent.set_attr("uint32val", 5)
- @parent.set_attr("uint16val", 6)
- @parent.set_attr("uint8val", 7)
-
- @parent.set_attr("int64val", 8)
- @parent.set_attr("int32val", 9)
- @parent.set_attr("int16val", 10)
- @parent.set_attr("int8val", 11)
+ @parent.uint64val = 4
+ @parent.uint32val = 5
+ @parent.uint16val = 6
+ @parent.uint8val = 7
+
+ @parent.int64val = 8
+ @parent.int32val = 9
+ @parent.int16val = 10
+ @parent.int8val = 11
elsif args['test'] == "negative"
- @parent.set_attr("uint64val", 0)
- @parent.set_attr("uint32val", 0)
- @parent.set_attr("uint16val", 0)
- @parent.set_attr("uint8val", 0)
-
- @parent.set_attr("int64val", -10000000000)
- @parent.set_attr("int32val", -100000)
- @parent.set_attr("int16val", -1000)
- @parent.set_attr("int8val", -100)
+ @parent.uint64val = 0
+ @parent.uint32val = 0
+ @parent.uint16val = 0
+ @parent.uint8val = 0
+
+ @parent.int64val = -10000000000
+ @parent.int32val = -100000
+ @parent.int16val = -1000
+ @parent.int8val = -100
else
retCode = 1
@@ -135,7 +135,7 @@
oid = @agent.alloc_object_id(2)
args['child_ref'] = oid
@child = Qmf::AgentObject.new(@model.child_class)
- @child.set_attr("name", args.by_key("child_name"))
+ @child.name = args.by_key("child_name")
@child.set_object_id(oid)
@agent.method_response(context, 0, "OK", args)
@@ -161,18 +161,18 @@
@agent.set_connection(@connection)
@parent = Qmf::AgentObject.new(@model.parent_class)
- @parent.set_attr("name", "Parent One")
- @parent.set_attr("state", "OPERATIONAL")
+ @parent.name = "Parent One"
+ @parent.state = "OPERATIONAL"
- @parent.set_attr("uint64val", 0)
- @parent.set_attr("uint32val", 0)
- @parent.set_attr("uint16val", 0)
- @parent.set_attr("uint8val", 0)
-
- @parent.set_attr("int64val", 0)
- @parent.set_attr("int32val", 0)
- @parent.set_attr("int16val", 0)
- @parent.set_attr("int8val", 0)
+ @parent.uint64val = 0
+ @parent.uint32val = 0
+ @parent.uint16val = 0
+ @parent.uint8val = 0
+
+ @parent.int64val = 0
+ @parent.int32val = 0
+ @parent.int16val = 0
+ @parent.int8val = 0
@parent_oid = @agent.alloc_object_id(1)
@parent.set_object_id(@parent_oid)
Modified: qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console.rb?rev=816345&r1=816344&r2=816345&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console.rb (original)
+++ qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console.rb Thu Sep 17 19:27:52 2009
@@ -96,9 +96,24 @@
puts "---- Brokers ----"
blist.each do |b|
puts " ---- Broker ----"
- puts " systemRef: #{b.get_attr('systemRef')}"
- puts " port : #{b.get_attr('port')}"
- puts " uptime : #{b.get_attr('uptime') / 1000000000}"
+ puts " systemRef: #{b.systemRef}"
+ puts " port : #{b.port}"
+ puts " uptime : #{b.uptime / 1000000000}"
+
+ for rep in 0...0
+ puts " Pinging..."
+ ret = b.echo(45, 'text string')
+ puts " ret=#{ret}"
+ end
+ end
+ puts "----"
+
+ qlist = @qmfc.get_objects(Qmf::Query.new(:package => "org.apache.qpid.broker",
+ :class => "queue"))
+ puts "---- Queues ----"
+ qlist.each do |q|
+ puts " ---- Queue ----"
+ puts " name : #{q.name}"
end
puts "----"
sleep(5)
Modified: qpid/trunk/qpid/cpp/src/qmf.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf.mk?rev=816345&r1=816344&r2=816345&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf.mk (original)
+++ qpid/trunk/qpid/cpp/src/qmf.mk Thu Sep 17 19:27:52 2009
@@ -35,9 +35,12 @@
../include/qmf/AgentObject.h
libqmfcommon_la_SOURCES = \
+ qmf/BrokerProxyImpl.cpp \
+ qmf/BrokerProxyImpl.h \
qmf/ConnectionSettingsImpl.cpp \
qmf/ConnectionSettingsImpl.h \
- qmf/ConsoleEngine.cpp \
+ qmf/ConsoleEngineImpl.cpp \
+ qmf/ConsoleEngineImpl.h \
qmf/ConsoleEngine.h \
qmf/Event.h \
qmf/Message.h \
Added: qpid/trunk/qpid/cpp/src/qmf/BrokerProxyImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/BrokerProxyImpl.cpp?rev=816345&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/BrokerProxyImpl.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qmf/BrokerProxyImpl.cpp Thu Sep 17 19:27:52 2009
@@ -0,0 +1,663 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/BrokerProxyImpl.h"
+#include "qmf/ConsoleEngineImpl.h"
+#include "qmf/Protocol.h"
+#include "qpid/Address.h"
+#include "qpid/sys/SystemInfo.h"
+#include <qpid/log/Statement.h>
+#include <string.h>
+#include <iostream>
+#include <fstream>
+
+using namespace std;
+using namespace qmf;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+namespace {
+ const char* QMF_EXCHANGE = "qpid.management";
+ const char* DIR_EXCHANGE = "amq.direct";
+ const char* BROKER_KEY = "broker";
+ const char* BROKER_PACKAGE = "org.apache.qpid.broker";
+ const char* AGENT_CLASS = "agent";
+ const char* BROKER_AGENT_KEY = "agent.1.0";
+}
+
+const Object* QueryResponseImpl::getObject(uint32_t idx) const
+{
+ vector<ObjectImpl::Ptr>::const_iterator iter = results.begin();
+
+ while (idx > 0) {
+ if (iter == results.end())
+ return 0;
+ iter++;
+ idx--;
+ }
+
+ return (*iter)->envelope;
+}
+
+#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
+
+BrokerEvent BrokerEventImpl::copy()
+{
+ BrokerEvent item;
+
+ ::memset(&item, 0, sizeof(BrokerEvent));
+ item.kind = kind;
+
+ STRING_REF(name);
+ STRING_REF(exchange);
+ STRING_REF(bindingKey);
+ item.context = context;
+ item.queryResponse = queryResponse.get() ? queryResponse->envelope : 0;
+ item.methodResponse = methodResponse.get() ? methodResponse->envelope : 0;
+
+ return item;
+}
+
+BrokerProxyImpl::BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console) :
+ envelope(e), console(_console.impl)
+{
+ stringstream qn;
+ qpid::TcpAddress addr;
+
+ SystemInfo::getLocalHostname(addr);
+ qn << "qmfc-" << SystemInfo::getProcessName() << "-" << addr << "-" << SystemInfo::getProcessId();
+ queueName = qn.str();
+
+ seqMgr.setUnsolicitedContext(SequenceContext::Ptr(new StaticContext(*this)));
+}
+
+void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/)
+{
+ Mutex::ScopedLock _lock(lock);
+ agentList.clear();
+ eventQueue.clear();
+ xmtQueue.clear();
+ eventQueue.push_back(eventDeclareQueue(queueName));
+ eventQueue.push_back(eventBind(DIR_EXCHANGE, queueName, queueName));
+ eventQueue.push_back(eventSetupComplete());
+
+ // TODO: Store session handle
+}
+
+void BrokerProxyImpl::sessionClosed()
+{
+ Mutex::ScopedLock _lock(lock);
+ agentList.clear();
+ eventQueue.clear();
+ xmtQueue.clear();
+}
+
+void BrokerProxyImpl::startProtocol()
+{
+ Mutex::ScopedLock _lock(lock);
+ char rawbuffer[512];
+ Buffer buffer(rawbuffer, 512);
+
+ agentList.push_back(AgentProxyImpl::Ptr(new AgentProxyImpl(console, this, 0, "Agent embedded in broker")));
+
+ requestsOutstanding = 1;
+ topicBound = false;
+ uint32_t sequence(seqMgr.reserve());
+ Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence);
+ sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence);
+}
+
+void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
+{
+ uint32_t length = buf.getPosition();
+ MessageImpl::Ptr message(new MessageImpl);
+
+ buf.reset();
+ buf.getRawData(message->body, length);
+ message->destination = destination;
+ message->routingKey = routingKey;
+ message->replyExchange = DIR_EXCHANGE;
+ message->replyKey = queueName;
+
+ xmtQueue.push_back(message);
+}
+
+void BrokerProxyImpl::handleRcvMessage(Message& message)
+{
+ Buffer inBuffer(message.body, message.length);
+ uint8_t opcode;
+ uint32_t sequence;
+
+ while (Protocol::checkHeader(inBuffer, &opcode, &sequence))
+ seqMgr.dispatch(opcode, sequence, inBuffer);
+}
+
+bool BrokerProxyImpl::getXmtMessage(Message& item) const
+{
+ Mutex::ScopedLock _lock(lock);
+ if (xmtQueue.empty())
+ return false;
+ item = xmtQueue.front()->copy();
+ return true;
+}
+
+void BrokerProxyImpl::popXmt()
+{
+ Mutex::ScopedLock _lock(lock);
+ if (!xmtQueue.empty())
+ xmtQueue.pop_front();
+}
+
+bool BrokerProxyImpl::getEvent(BrokerEvent& event) const
+{
+ Mutex::ScopedLock _lock(lock);
+ if (eventQueue.empty())
+ return false;
+ event = eventQueue.front()->copy();
+ return true;
+}
+
+void BrokerProxyImpl::popEvent()
+{
+ Mutex::ScopedLock _lock(lock);
+ if (!eventQueue.empty())
+ eventQueue.pop_front();
+}
+
+uint32_t BrokerProxyImpl::agentCount() const
+{
+ Mutex::ScopedLock _lock(lock);
+ return agentList.size();
+}
+
+const AgentProxy* BrokerProxyImpl::getAgent(uint32_t idx) const
+{
+ Mutex::ScopedLock _lock(lock);
+ for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin();
+ iter != agentList.end(); iter++)
+ if (idx-- == 0)
+ return (*iter)->envelope;
+ return 0;
+}
+
+void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentProxy* agent)
+{
+ SequenceContext::Ptr queryContext(new QueryContext(*this, context));
+ Mutex::ScopedLock _lock(lock);
+ if (agent != 0) {
+ sendGetRequestLH(queryContext, query, agent->impl);
+ } else {
+ // TODO (optimization) only send queries to agents that have the requested class+package
+ for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin();
+ iter != agentList.end(); iter++) {
+ sendGetRequestLH(queryContext, query, (*iter).get());
+ }
+ }
+}
+
+void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent)
+{
+ stringstream key;
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve(queryContext));
+
+ Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
+ query.impl->encode(outBuffer);
+ key << "agent.1." << agent->agentBank;
+ sendBufferLH(outBuffer, QMF_EXCHANGE, key.str());
+ QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << key.str());
+}
+
+string BrokerProxyImpl::encodeMethodArguments(const SchemaMethod* schema, const Value* argmap, Buffer& buffer)
+{
+ int argCount = schema->getArgumentCount();
+
+ if (argmap == 0 || !argmap->isMap())
+ return string("Arguments must be in a map value");
+
+ for (int aIdx = 0; aIdx < argCount; aIdx++) {
+ const SchemaArgument* arg(schema->getArgument(aIdx));
+ if (arg->getDirection() == DIR_IN || arg->getDirection() == DIR_IN_OUT) {
+ if (argmap->keyInMap(arg->getName())) {
+ const Value* argVal(argmap->byKey(arg->getName()));
+ if (argVal->getType() != arg->getType())
+ return string("Argument is the wrong type: ") + arg->getName();
+ argVal->impl->encode(buffer);
+ } else {
+ Value defaultValue(arg->getType());
+ defaultValue.impl->encode(buffer);
+ }
+ }
+ }
+
+ return string();
+}
+
+void BrokerProxyImpl::sendMethodRequest(ObjectIdImpl* oid, const SchemaObjectClass* cls,
+ const string& methodName, const Value* args, void* userContext)
+{
+ int methodCount = cls->getMethodCount();
+ int idx;
+ for (idx = 0; idx < methodCount; idx++) {
+ const SchemaMethod* method = cls->getMethod(idx);
+ if (string(method->getName()) == methodName) {
+ Mutex::ScopedLock _lock(lock);
+ SequenceContext::Ptr methodContext(new MethodContext(*this, userContext, method->impl));
+ stringstream key;
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve(methodContext));
+
+ Protocol::encodeHeader(outBuffer, Protocol::OP_METHOD_REQUEST, sequence);
+ oid->encode(outBuffer);
+ cls->getClassKey()->impl->encode(outBuffer);
+ outBuffer.putShortString(methodName);
+
+ string argErrorString = encodeMethodArguments(method, args, outBuffer);
+ if (argErrorString.empty()) {
+ key << "agent.1." << oid->getAgentBank();
+ sendBufferLH(outBuffer, QMF_EXCHANGE, key.str());
+ QPID_LOG(trace, "SENT MethodRequest seq=" << sequence << " method=" << methodName << " key=" << key.str());
+ } else {
+ MethodResponseImpl::Ptr argError(new MethodResponseImpl(1, argErrorString));
+ eventQueue.push_back(eventMethodResponse(userContext, argError));
+ }
+ return;
+ }
+ }
+
+ MethodResponseImpl::Ptr error(new MethodResponseImpl(1, string("Unknown method: ") + methodName));
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(eventMethodResponse(userContext, error));
+}
+
+void BrokerProxyImpl::addBinding(const string& exchange, const string& key)
+{
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(eventBind(exchange, queueName, key));
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventDeclareQueue(const string& queueName)
+{
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::DECLARE_QUEUE));
+ event->name = queueName;
+ return event;
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventBind(const string& exchange, const string& queue, const string& key)
+{
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::BIND));
+ event->name = queue;
+ event->exchange = exchange;
+ event->bindingKey = key;
+
+ return event;
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventSetupComplete()
+{
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::SETUP_COMPLETE));
+ return event;
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventStable()
+{
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::STABLE));
+ return event;
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventQueryComplete(void* context, QueryResponseImpl::Ptr response)
+{
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::QUERY_COMPLETE));
+ event->context = context;
+ event->queryResponse = response;
+ return event;
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventMethodResponse(void* context, MethodResponseImpl::Ptr response)
+{
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::METHOD_RESPONSE));
+ event->context = context;
+ event->methodResponse = response;
+ return event;
+}
+
+void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq)
+{
+ brokerId.decode(inBuffer);
+ QPID_LOG(trace, "RCVD BrokerResponse seq=" << seq << " brokerId=" << brokerId);
+ Mutex::ScopedLock _lock(lock);
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve());
+ incOutstandingLH();
+ Protocol::encodeHeader(outBuffer, Protocol::OP_PACKAGE_REQUEST, sequence);
+ sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT PackageRequest seq=" << sequence);
+}
+
+void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq)
+{
+ string package;
+
+ inBuffer.getShortString(package);
+ QPID_LOG(trace, "RCVD PackageIndication seq=" << seq << " package=" << package);
+ console->learnPackage(package);
+
+ Mutex::ScopedLock _lock(lock);
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve());
+ incOutstandingLH();
+ Protocol::encodeHeader(outBuffer, Protocol::OP_CLASS_QUERY, sequence);
+ outBuffer.putShortString(package);
+ sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT ClassQuery seq=" << sequence << " package=" << package);
+}
+
+void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq)
+{
+ string text;
+ uint32_t code = inBuffer.getLong();
+ inBuffer.getShortString(text);
+ QPID_LOG(trace, "RCVD CommandComplete seq=" << seq << " code=" << code << " text=" << text);
+}
+
+void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq)
+{
+ uint8_t kind = inBuffer.getOctet();
+ SchemaClassKeyImpl classKey(inBuffer);
+
+ QPID_LOG(trace, "RCVD ClassIndication seq=" << seq << " kind=" << (int) kind << " key=" << classKey.str());
+
+ if (!console->haveClass(classKey)) {
+ Mutex::ScopedLock _lock(lock);
+ incOutstandingLH();
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve());
+ Protocol::encodeHeader(outBuffer, Protocol::OP_SCHEMA_REQUEST, sequence);
+ classKey.encode(outBuffer);
+ sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT SchemaRequest seq=" << sequence <<" key=" << classKey.str());
+ }
+}
+
+MethodResponseImpl::Ptr BrokerProxyImpl::handleMethodResponse(Buffer& inBuffer, uint32_t seq, SchemaMethodImpl* schema)
+{
+ MethodResponseImpl::Ptr response(new MethodResponseImpl(inBuffer, schema));
+
+ QPID_LOG(trace, "RCVD MethodResponse seq=" << seq << " status=" << response->getStatus() << " text=" <<
+ response->getException()->asString());
+
+ return response;
+}
+
+void BrokerProxyImpl::handleHeartbeatIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
+{
+ // TODO
+}
+
+void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
+{
+ // TODO
+}
+
+void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq)
+{
+ SchemaObjectClassImpl::Ptr oClassPtr;
+ SchemaEventClassImpl::Ptr eClassPtr;
+ uint8_t kind = inBuffer.getOctet();
+ const SchemaClassKeyImpl* key;
+ if (kind == CLASS_OBJECT) {
+ oClassPtr.reset(new SchemaObjectClassImpl(inBuffer));
+ console->learnClass(oClassPtr);
+ key = oClassPtr->getClassKey()->impl;
+ QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->str());
+
+ //
+ // If we have just learned about the org.apache.qpid.broker:agent class, send a get
+ // request for the current list of agents so we can have it on-hand before we declare
+ // this session "stable".
+ //
+ if (key->getClassName() == AGENT_CLASS && key->getPackageName() == BROKER_PACKAGE) {
+ Mutex::ScopedLock _lock(lock);
+ incOutstandingLH();
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve());
+ Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
+ FieldTable ft;
+ ft.setString("_class", AGENT_CLASS);
+ ft.setString("_package", BROKER_PACKAGE);
+ ft.encode(outBuffer);
+ sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_AGENT_KEY);
+ QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << BROKER_AGENT_KEY);
+ }
+ } else if (kind == CLASS_EVENT) {
+ eClassPtr.reset(new SchemaEventClassImpl(inBuffer));
+ console->learnClass(eClassPtr);
+ key = eClassPtr->getClassKey()->impl;
+ QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=event key=" << key->str());
+ }
+ else {
+ QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind);
+ }
+}
+
+ObjectImpl::Ptr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat)
+{
+ SchemaClassKeyImpl classKey(inBuffer);
+ QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey.str());
+
+ SchemaObjectClassImpl::Ptr schema = console->getSchema(classKey);
+ if (schema.get() == 0) {
+ QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey.str());
+ return ObjectImpl::Ptr();
+ }
+
+ return ObjectImpl::Ptr(new ObjectImpl(schema->envelope, this, inBuffer, prop, stat, true));
+}
+
+void BrokerProxyImpl::incOutstandingLH()
+{
+ requestsOutstanding++;
+}
+
+void BrokerProxyImpl::decOutstanding()
+{
+ Mutex::ScopedLock _lock(lock);
+ requestsOutstanding--;
+ if (requestsOutstanding == 0 && !topicBound) {
+ topicBound = true;
+ for (vector<pair<string, string> >::const_iterator iter = console->bindingList.begin();
+ iter != console->bindingList.end(); iter++) {
+ string exchange(iter->first.empty() ? QMF_EXCHANGE : iter->first);
+ string key(iter->second);
+ eventQueue.push_back(eventBind(exchange, queueName, key));
+ }
+ eventQueue.push_back(eventStable());
+ }
+}
+
+MethodResponseImpl::MethodResponseImpl(const MethodResponseImpl& from) :
+ envelope(from.envelope), // !!!! TODO !!!! this is not right
+ status(from.status), schema(from.schema),
+ exception(from.exception.get() ? new Value(*from.exception) : 0),
+ arguments(from.arguments.get() ? new Value(*from.arguments) : 0)
+{
+}
+
+MethodResponseImpl::MethodResponseImpl(Buffer& buf, SchemaMethodImpl* s) :
+ envelope(new MethodResponse(this)), schema(s)
+{
+ string text;
+
+ status = buf.getLong();
+ buf.getMediumString(text);
+ exception.reset(new Value(TYPE_LSTR));
+ exception->setString(text.c_str());
+
+ if (status != 0)
+ return;
+
+ arguments.reset(new Value(TYPE_MAP));
+ int argCount(schema->getArgumentCount());
+ for (int idx = 0; idx < argCount; idx++) {
+ const SchemaArgument* arg = schema->getArgument(idx);
+ if (arg->getDirection() == DIR_OUT || arg->getDirection() == DIR_IN_OUT) {
+ ValueImpl* value(new ValueImpl(arg->getType(), buf));
+ arguments->insert(arg->getName(), value->envelope);
+ }
+ }
+}
+
+MethodResponseImpl::MethodResponseImpl(uint32_t s, const string& text) : envelope(new MethodResponse(this)), schema(0)
+{
+ status = s;
+ exception.reset(new Value(TYPE_LSTR));
+ exception->setString(text.c_str());
+}
+
+bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
+{
+ bool completeContext = false;
+ if (opcode == Protocol::OP_BROKER_RESPONSE) {
+ broker.handleBrokerResponse(buffer, sequence);
+ completeContext = true;
+ }
+ else if (opcode == Protocol::OP_COMMAND_COMPLETE) {
+ broker.handleCommandComplete(buffer, sequence);
+ completeContext = true;
+ }
+ else if (opcode == Protocol::OP_SCHEMA_RESPONSE) {
+ broker.handleSchemaResponse(buffer, sequence);
+ completeContext = true;
+ }
+ else if (opcode == Protocol::OP_PACKAGE_INDICATION)
+ broker.handlePackageIndication(buffer, sequence);
+ else if (opcode == Protocol::OP_CLASS_INDICATION)
+ broker.handleClassIndication(buffer, sequence);
+ else if (opcode == Protocol::OP_HEARTBEAT_INDICATION)
+ broker.handleHeartbeatIndication(buffer, sequence);
+ else if (opcode == Protocol::OP_EVENT_INDICATION)
+ broker.handleEventIndication(buffer, sequence);
+ else if (opcode == Protocol::OP_PROPERTY_INDICATION)
+ broker.handleObjectIndication(buffer, sequence, true, false);
+ else if (opcode == Protocol::OP_STATISTIC_INDICATION)
+ broker.handleObjectIndication(buffer, sequence, false, true);
+ else if (opcode == Protocol::OP_OBJECT_INDICATION)
+ broker.handleObjectIndication(buffer, sequence, true, true);
+ else {
+ QPID_LOG(trace, "StaticContext::handleMessage invalid opcode: " << opcode);
+ completeContext = true;
+ }
+
+ return completeContext;
+}
+
+void QueryContext::reserve()
+{
+ Mutex::ScopedLock _lock(lock);
+ requestsOutstanding++;
+}
+
+void QueryContext::release()
+{
+ {
+ Mutex::ScopedLock _lock(lock);
+ if (--requestsOutstanding > 0)
+ return;
+ }
+
+ Mutex::ScopedLock _block(broker.lock);
+ broker.eventQueue.push_back(broker.eventQueryComplete(userContext, queryResponse));
+}
+
+bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
+{
+ bool completeContext = false;
+ ObjectImpl::Ptr object;
+
+ if (opcode == Protocol::OP_COMMAND_COMPLETE) {
+ broker.handleCommandComplete(buffer, sequence);
+ completeContext = true;
+ }
+ else if (opcode == Protocol::OP_OBJECT_INDICATION) {
+ object = broker.handleObjectIndication(buffer, sequence, true, true);
+ if (object.get() != 0)
+ queryResponse->results.push_back(object);
+ }
+ else {
+ QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode);
+ completeContext = true;
+ }
+
+ return completeContext;
+}
+
+void MethodContext::release()
+{
+ Mutex::ScopedLock _block(broker.lock);
+ broker.eventQueue.push_back(broker.eventMethodResponse(userContext, methodResponse));
+}
+
+bool MethodContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
+{
+ if (opcode == Protocol::OP_METHOD_RESPONSE)
+ methodResponse = broker.handleMethodResponse(buffer, sequence, schema);
+ else
+ QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode);
+
+ return true;
+}
+
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+AgentProxy::AgentProxy(AgentProxyImpl* i) : impl(i) {}
+AgentProxy::~AgentProxy() { delete impl; }
+const char* AgentProxy::getLabel() const { return impl->getLabel().c_str(); }
+
+BrokerProxy::BrokerProxy(ConsoleEngine& console) : impl(new BrokerProxyImpl(this, console)) {}
+BrokerProxy::~BrokerProxy() { delete impl; }
+void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); }
+void BrokerProxy::sessionClosed() { impl->sessionClosed(); }
+void BrokerProxy::startProtocol() { impl->startProtocol(); }
+void BrokerProxy::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); }
+bool BrokerProxy::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); }
+void BrokerProxy::popXmt() { impl->popXmt(); }
+bool BrokerProxy::getEvent(BrokerEvent& event) const { return impl->getEvent(event); }
+void BrokerProxy::popEvent() { impl->popEvent(); }
+uint32_t BrokerProxy::agentCount() const { return impl->agentCount(); }
+const AgentProxy* BrokerProxy::getAgent(uint32_t idx) const { return impl->getAgent(idx); }
+void BrokerProxy::sendQuery(const Query& query, void* context, const AgentProxy* agent) { impl->sendQuery(query, context, agent); }
+
+MethodResponse::MethodResponse(const MethodResponse& from) : impl(new MethodResponseImpl(*(from.impl))) {}
+MethodResponse::MethodResponse(MethodResponseImpl* i) : impl(i) {}
+MethodResponse::~MethodResponse() {}
+uint32_t MethodResponse::getStatus() const { return impl->getStatus(); }
+const Value* MethodResponse::getException() const { return impl->getException(); }
+const Value* MethodResponse::getArgs() const { return impl->getArgs(); }
+
+QueryResponse::QueryResponse(QueryResponseImpl* i) : impl(i) {}
+QueryResponse::~QueryResponse() {}
+uint32_t QueryResponse::getStatus() const { return impl->getStatus(); }
+const Value* QueryResponse::getException() const { return impl->getException(); }
+uint32_t QueryResponse::getObjectCount() const { return impl->getObjectCount(); }
+const Object* QueryResponse::getObject(uint32_t idx) const { return impl->getObject(idx); }
+
Added: qpid/trunk/qpid/cpp/src/qmf/BrokerProxyImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/BrokerProxyImpl.h?rev=816345&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/BrokerProxyImpl.h (added)
+++ qpid/trunk/qpid/cpp/src/qmf/BrokerProxyImpl.h Thu Sep 17 19:27:52 2009
@@ -0,0 +1,222 @@
+#ifndef _QmfBrokerProxyImpl_
+#define _QmfBrokerProxyImpl_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/ConsoleEngine.h"
+#include "qmf/ObjectImpl.h"
+#include "qmf/SchemaImpl.h"
+#include "qmf/ValueImpl.h"
+#include "qmf/QueryImpl.h"
+#include "qmf/SequenceManager.h"
+#include "qmf/MessageImpl.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/sys/Mutex.h"
+#include "boost/shared_ptr.hpp"
+#include <memory>
+#include <string>
+#include <deque>
+#include <map>
+#include <vector>
+
+namespace qmf {
+
+ struct MethodResponseImpl {
+ typedef boost::shared_ptr<MethodResponseImpl> Ptr;
+ MethodResponse* envelope;
+ uint32_t status;
+ SchemaMethodImpl* schema;
+ boost::shared_ptr<Value> exception;
+ boost::shared_ptr<Value> arguments;
+
+ MethodResponseImpl(const MethodResponseImpl& from);
+ MethodResponseImpl(qpid::framing::Buffer& buf, SchemaMethodImpl* schema);
+ MethodResponseImpl(uint32_t status, const std::string& text);
+ ~MethodResponseImpl() { delete envelope; }
+ uint32_t getStatus() const { return status; }
+ const Value* getException() const { return exception.get(); }
+ const Value* getArgs() const { return arguments.get(); }
+ };
+
+ struct QueryResponseImpl {
+ typedef boost::shared_ptr<QueryResponseImpl> Ptr;
+ QueryResponse *envelope;
+ uint32_t status;
+ std::auto_ptr<Value> exception;
+ std::vector<ObjectImpl::Ptr> results;
+
+ QueryResponseImpl() : envelope(new QueryResponse(this)), status(0) {}
+ ~QueryResponseImpl() { delete envelope; }
+ uint32_t getStatus() const { return status; }
+ const Value* getException() const { return exception.get(); }
+ uint32_t getObjectCount() const { return results.size(); }
+ const Object* getObject(uint32_t idx) const;
+ };
+
+ struct BrokerEventImpl {
+ typedef boost::shared_ptr<BrokerEventImpl> Ptr;
+ BrokerEvent::EventKind kind;
+ std::string name;
+ std::string exchange;
+ std::string bindingKey;
+ void* context;
+ QueryResponseImpl::Ptr queryResponse;
+ MethodResponseImpl::Ptr methodResponse;
+
+ BrokerEventImpl(BrokerEvent::EventKind k) : kind(k), context(0) {}
+ ~BrokerEventImpl() {}
+ BrokerEvent copy();
+ };
+
+ struct AgentProxyImpl {
+ typedef boost::shared_ptr<AgentProxyImpl> Ptr;
+ AgentProxy* envelope;
+ ConsoleEngineImpl* console;
+ BrokerProxyImpl* broker;
+ uint32_t agentBank;
+ std::string label;
+
+ AgentProxyImpl(ConsoleEngineImpl* c, BrokerProxyImpl* b, uint32_t ab, const std::string& l) :
+ envelope(new AgentProxy(this)), console(c), broker(b), agentBank(ab), label(l) {}
+ ~AgentProxyImpl() {}
+ const std::string& getLabel() const { return label; }
+ };
+
+ class BrokerProxyImpl {
+ public:
+ typedef boost::shared_ptr<BrokerProxyImpl> Ptr;
+
+ BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console);
+ ~BrokerProxyImpl() {}
+
+ void sessionOpened(SessionHandle& sh);
+ void sessionClosed();
+ void startProtocol();
+
+ void sendBufferLH(qpid::framing::Buffer& buf, const std::string& destination, const std::string& routingKey);
+ void handleRcvMessage(Message& message);
+ bool getXmtMessage(Message& item) const;
+ void popXmt();
+
+ bool getEvent(BrokerEvent& event) const;
+ void popEvent();
+
+ uint32_t agentCount() const;
+ const AgentProxy* getAgent(uint32_t idx) const;
+ void sendQuery(const Query& query, void* context, const AgentProxy* agent);
+ void sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent);
+ std::string encodeMethodArguments(const SchemaMethod* schema, const Value* args, qpid::framing::Buffer& buffer);
+ void sendMethodRequest(ObjectIdImpl* oid, const SchemaObjectClass* cls, const std::string& method, const Value* args, void* context);
+
+ void addBinding(const std::string& exchange, const std::string& key);
+ void staticRelease() { decOutstanding(); }
+
+ private:
+ friend class StaticContext;
+ friend class QueryContext;
+ friend class MethodContext;
+ mutable qpid::sys::Mutex lock;
+ BrokerProxy* envelope;
+ ConsoleEngineImpl* console;
+ std::string queueName;
+ qpid::framing::Uuid brokerId;
+ SequenceManager seqMgr;
+ uint32_t requestsOutstanding;
+ bool topicBound;
+ std::vector<AgentProxyImpl::Ptr> agentList;
+ std::deque<MessageImpl::Ptr> xmtQueue;
+ std::deque<BrokerEventImpl::Ptr> eventQueue;
+
+# define MA_BUFFER_SIZE 65536
+ char outputBuffer[MA_BUFFER_SIZE];
+
+ BrokerEventImpl::Ptr eventDeclareQueue(const std::string& queueName);
+ BrokerEventImpl::Ptr eventBind(const std::string& exchange, const std::string& queue, const std::string& key);
+ BrokerEventImpl::Ptr eventSetupComplete();
+ BrokerEventImpl::Ptr eventStable();
+ BrokerEventImpl::Ptr eventQueryComplete(void* context, QueryResponseImpl::Ptr response);
+ BrokerEventImpl::Ptr eventMethodResponse(void* context, MethodResponseImpl::Ptr response);
+
+ void handleBrokerResponse(qpid::framing::Buffer& inBuffer, uint32_t seq);
+ void handlePackageIndication(qpid::framing::Buffer& inBuffer, uint32_t seq);
+ void handleCommandComplete(qpid::framing::Buffer& inBuffer, uint32_t seq);
+ void handleClassIndication(qpid::framing::Buffer& inBuffer, uint32_t seq);
+ MethodResponseImpl::Ptr handleMethodResponse(qpid::framing::Buffer& inBuffer, uint32_t seq, SchemaMethodImpl* schema);
+ void handleHeartbeatIndication(qpid::framing::Buffer& inBuffer, uint32_t seq);
+ void handleEventIndication(qpid::framing::Buffer& inBuffer, uint32_t seq);
+ void handleSchemaResponse(qpid::framing::Buffer& inBuffer, uint32_t seq);
+ ObjectImpl::Ptr handleObjectIndication(qpid::framing::Buffer& inBuffer, uint32_t seq, bool prop, bool stat);
+ void incOutstandingLH();
+ void decOutstanding();
+ };
+
+ //
+ // StaticContext is used to handle:
+ //
+ // 1) Responses to console-level requests (for schema info, etc.)
+ // 2) Unsolicited messages from agents (events, published updates, etc.)
+ //
+ struct StaticContext : public SequenceContext {
+ StaticContext(BrokerProxyImpl& b) : broker(b) {}
+ virtual ~StaticContext() {}
+ void reserve() {}
+ void release() { broker.staticRelease(); }
+ bool handleMessage(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer);
+ BrokerProxyImpl& broker;
+ };
+
+ //
+ // QueryContext is used to track and handle responses associated with a single Get Query
+ //
+ struct QueryContext : public SequenceContext {
+ QueryContext(BrokerProxyImpl& b, void* u) :
+ broker(b), userContext(u), requestsOutstanding(0), queryResponse(new QueryResponseImpl()) {}
+ virtual ~QueryContext() {}
+ void reserve();
+ void release();
+ bool handleMessage(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer);
+
+ mutable qpid::sys::Mutex lock;
+ BrokerProxyImpl& broker;
+ void* userContext;
+ uint32_t requestsOutstanding;
+ QueryResponseImpl::Ptr queryResponse;
+ };
+
+ struct MethodContext : public SequenceContext {
+ MethodContext(BrokerProxyImpl& b, void* u, SchemaMethodImpl* s) : broker(b), userContext(u), schema(s) {}
+ virtual ~MethodContext() {}
+ void reserve() {}
+ void release();
+ bool handleMessage(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer);
+
+ BrokerProxyImpl& broker;
+ void* userContext;
+ SchemaMethodImpl* schema;
+ MethodResponseImpl::Ptr methodResponse;
+ };
+
+
+
+}
+
+#endif
+
Modified: qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.h?rev=816345&r1=816344&r2=816345&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.h (original)
+++ qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.h Thu Sep 17 19:27:52 2009
@@ -46,6 +46,7 @@
class MethodResponse {
public:
MethodResponse(MethodResponseImpl* impl);
+ MethodResponse(const MethodResponse& from);
~MethodResponse();
uint32_t getStatus() const;
const Value* getException() const;
@@ -84,8 +85,7 @@
NEW_CLASS = 4,
OBJECT_UPDATE = 5,
EVENT_RECEIVED = 7,
- AGENT_HEARTBEAT = 8,
- METHOD_RESPONSE = 9
+ AGENT_HEARTBEAT = 8
};
EventKind kind;
@@ -96,8 +96,6 @@
void* context; // (OBJECT_UPDATE)
Event* event; // (EVENT_RECEIVED)
uint64_t timestamp; // (AGENT_HEARTBEAT)
- uint32_t methodHandle; // (METHOD_RESPONSE)
- MethodResponse* methodResponse; // (METHOD_RESPONSE)
QueryResponse* queryResponse; // (QUERY_COMPLETE)
};
@@ -113,15 +111,17 @@
UNBIND = 14,
SETUP_COMPLETE = 15,
STABLE = 16,
- QUERY_COMPLETE = 17
+ QUERY_COMPLETE = 17,
+ METHOD_RESPONSE = 18
};
EventKind kind;
- char* name; // ([DECLARE|DELETE]_QUEUE, [UN]BIND)
- char* exchange; // ([UN]BIND)
- char* bindingKey; // ([UN]BIND)
- void* context; // (QUERY_COMPLETE)
- QueryResponse* queryResponse; // (QUERY_COMPLETE)
+ char* name; // ([DECLARE|DELETE]_QUEUE, [UN]BIND)
+ char* exchange; // ([UN]BIND)
+ char* bindingKey; // ([UN]BIND)
+ void* context; // (QUERY_COMPLETE, METHOD_RESPONSE)
+ QueryResponse* queryResponse; // (QUERY_COMPLETE)
+ MethodResponse* methodResponse; // (METHOD_RESPONSE)
};
/**
Copied: qpid/trunk/qpid/cpp/src/qmf/ConsoleEngineImpl.cpp (from r815416, qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.cpp)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/ConsoleEngineImpl.cpp?p2=qpid/trunk/qpid/cpp/src/qmf/ConsoleEngineImpl.cpp&p1=qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.cpp&r1=815416&r2=816345&rev=816345&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/ConsoleEngine.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/ConsoleEngineImpl.cpp Thu Sep 17 19:27:52 2009
@@ -17,7 +17,7 @@
* under the License.
*/
-#include "qmf/ConsoleEngine.h"
+#include "qmf/ConsoleEngineImpl.h"
#include "qmf/MessageImpl.h"
#include "qmf/SchemaImpl.h"
#include "qmf/Typecode.h"
@@ -27,278 +27,25 @@
#include "qmf/ValueImpl.h"
#include "qmf/Protocol.h"
#include "qmf/SequenceManager.h"
+#include "qmf/BrokerProxyImpl.h"
#include <qpid/framing/Buffer.h>
#include <qpid/framing/Uuid.h>
#include <qpid/framing/FieldTable.h>
#include <qpid/framing/FieldValue.h>
-#include <qpid/sys/Mutex.h>
#include <qpid/log/Statement.h>
#include <qpid/sys/Time.h>
#include <qpid/sys/SystemInfo.h>
#include <string.h>
-#include <string>
-#include <deque>
-#include <map>
-#include <vector>
#include <iostream>
#include <fstream>
-#include <boost/shared_ptr.hpp>
using namespace std;
using namespace qmf;
using namespace qpid::framing;
using namespace qpid::sys;
-namespace qmf {
-
- struct MethodResponseImpl {
- typedef boost::shared_ptr<MethodResponseImpl> Ptr;
- MethodResponse* envelope;
- uint32_t status;
- auto_ptr<Value> exception;
- auto_ptr<Value> arguments;
-
- MethodResponseImpl(Buffer& buf);
- ~MethodResponseImpl() { delete envelope; }
- uint32_t getStatus() const { return status; }
- const Value* getException() const { return exception.get(); }
- const Value* getArgs() const { return arguments.get(); }
- };
-
- struct QueryResponseImpl {
- typedef boost::shared_ptr<QueryResponseImpl> Ptr;
- QueryResponse *envelope;
- uint32_t status;
- auto_ptr<Value> exception;
- vector<ObjectImpl::Ptr> results;
-
- QueryResponseImpl() : envelope(new QueryResponse(this)), status(0) {}
- ~QueryResponseImpl() { delete envelope; }
- uint32_t getStatus() const { return status; }
- const Value* getException() const { return exception.get(); }
- uint32_t getObjectCount() const { return results.size(); }
- const Object* getObject(uint32_t idx) const;
- };
-
- struct ConsoleEventImpl {
- typedef boost::shared_ptr<ConsoleEventImpl> Ptr;
- ConsoleEvent::EventKind kind;
- boost::shared_ptr<AgentProxyImpl> agent;
- string name;
- boost::shared_ptr<SchemaClassKey> classKey;
- Object* object;
- void* context;
- Event* event;
- uint64_t timestamp;
- uint32_t methodHandle;
- MethodResponseImpl::Ptr methodResponse;
-
- ConsoleEventImpl(ConsoleEvent::EventKind k) :
- kind(k), object(0), context(0), event(0), timestamp(0), methodHandle(0) {}
- ~ConsoleEventImpl() {}
- ConsoleEvent copy();
- };
-
- struct BrokerEventImpl {
- typedef boost::shared_ptr<BrokerEventImpl> Ptr;
- BrokerEvent::EventKind kind;
- string name;
- string exchange;
- string bindingKey;
- void* context;
- QueryResponseImpl::Ptr queryResponse;
-
- BrokerEventImpl(BrokerEvent::EventKind k) : kind(k) {}
- ~BrokerEventImpl() {}
- BrokerEvent copy();
- };
-
- struct AgentProxyImpl {
- typedef boost::shared_ptr<AgentProxyImpl> Ptr;
- AgentProxy* envelope;
- ConsoleEngineImpl* console;
- BrokerProxyImpl* broker;
- uint32_t agentBank;
- string label;
-
- AgentProxyImpl(ConsoleEngineImpl* c, BrokerProxyImpl* b, uint32_t ab, const string& l) :
- envelope(new AgentProxy(this)), console(c), broker(b), agentBank(ab), label(l) {}
- ~AgentProxyImpl() {}
- const string& getLabel() const { return label; }
- };
-
- class BrokerProxyImpl {
- public:
- typedef boost::shared_ptr<BrokerProxyImpl> Ptr;
-
- BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console);
- ~BrokerProxyImpl() {}
-
- void sessionOpened(SessionHandle& sh);
- void sessionClosed();
- void startProtocol();
-
- void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey);
- void handleRcvMessage(Message& message);
- bool getXmtMessage(Message& item) const;
- void popXmt();
-
- bool getEvent(BrokerEvent& event) const;
- void popEvent();
-
- uint32_t agentCount() const;
- const AgentProxy* getAgent(uint32_t idx) const;
- void sendQuery(const Query& query, void* context, const AgentProxy* agent);
- void sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent);
-
- void addBinding(const string& exchange, const string& key);
- void staticRelease() { decOutstanding(); }
-
- private:
- friend class StaticContext;
- friend class QueryContext;
- mutable Mutex lock;
- BrokerProxy* envelope;
- ConsoleEngineImpl* console;
- string queueName;
- Uuid brokerId;
- SequenceManager seqMgr;
- uint32_t requestsOutstanding;
- bool topicBound;
- vector<AgentProxyImpl::Ptr> agentList;
- deque<MessageImpl::Ptr> xmtQueue;
- deque<BrokerEventImpl::Ptr> eventQueue;
-
-# define MA_BUFFER_SIZE 65536
- char outputBuffer[MA_BUFFER_SIZE];
-
- BrokerEventImpl::Ptr eventDeclareQueue(const string& queueName);
- BrokerEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key);
- BrokerEventImpl::Ptr eventSetupComplete();
- BrokerEventImpl::Ptr eventStable();
- BrokerEventImpl::Ptr eventQueryComplete(void* context, QueryResponseImpl::Ptr response);
-
- void handleBrokerResponse(Buffer& inBuffer, uint32_t seq);
- void handlePackageIndication(Buffer& inBuffer, uint32_t seq);
- void handleCommandComplete(Buffer& inBuffer, uint32_t seq);
- void handleClassIndication(Buffer& inBuffer, uint32_t seq);
- void handleMethodResponse(Buffer& inBuffer, uint32_t seq);
- void handleHeartbeatIndication(Buffer& inBuffer, uint32_t seq);
- void handleEventIndication(Buffer& inBuffer, uint32_t seq);
- void handleSchemaResponse(Buffer& inBuffer, uint32_t seq);
- ObjectImpl::Ptr handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat);
- void incOutstandingLH();
- void decOutstanding();
- };
-
- struct StaticContext : public SequenceContext {
- StaticContext(BrokerProxyImpl& b) : broker(b) {}
- ~StaticContext() {}
- void reserve() {}
- void release() { broker.staticRelease(); }
- bool handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer);
- BrokerProxyImpl& broker;
- };
-
- struct QueryContext : public SequenceContext {
- QueryContext(BrokerProxyImpl& b, void* u) :
- broker(b), userContext(u), requestsOutstanding(0), queryResponse(new QueryResponseImpl()) {}
- ~QueryContext() {}
- void reserve();
- void release();
- bool handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer);
-
- mutable Mutex lock;
- BrokerProxyImpl& broker;
- void* userContext;
- uint32_t requestsOutstanding;
- QueryResponseImpl::Ptr queryResponse;
- };
-
- class ConsoleEngineImpl {
- public:
- ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& settings = ConsoleSettings());
- ~ConsoleEngineImpl();
-
- bool getEvent(ConsoleEvent& event) const;
- void popEvent();
-
- void addConnection(BrokerProxy& broker, void* context);
- void delConnection(BrokerProxy& broker);
-
- uint32_t packageCount() const;
- const string& getPackageName(uint32_t idx) const;
-
- uint32_t classCount(const char* packageName) const;
- const SchemaClassKey* getClass(const char* packageName, uint32_t idx) const;
-
- ClassKind getClassKind(const SchemaClassKey* key) const;
- const SchemaObjectClass* getObjectClass(const SchemaClassKey* key) const;
- const SchemaEventClass* getEventClass(const SchemaClassKey* key) const;
-
- void bindPackage(const char* packageName);
- void bindClass(const SchemaClassKey* key);
- void bindClass(const char* packageName, const char* className);
-
- /*
- void startSync(const Query& query, void* context, SyncQuery& sync);
- void touchSync(SyncQuery& sync);
- void endSync(SyncQuery& sync);
- */
-
- private:
- friend class BrokerProxyImpl;
- ConsoleEngine* envelope;
- const ConsoleSettings& settings;
- mutable Mutex lock;
- deque<ConsoleEventImpl::Ptr> eventQueue;
- vector<BrokerProxyImpl*> brokerList;
- vector<pair<string, string> > bindingList; // exchange/key (empty exchange => QMF_EXCHANGE)
-
- // Declare a compare class for the class maps that compares the dereferenced
- // class key pointers. The default behavior would be to compare the pointer
- // addresses themselves.
- struct KeyCompare {
- bool operator()(const SchemaClassKeyImpl* left, const SchemaClassKeyImpl* right) const {
- return *left < *right;
- }
- };
-
- typedef map<const SchemaClassKeyImpl*, SchemaObjectClassImpl::Ptr, KeyCompare> ObjectClassList;
- typedef map<const SchemaClassKeyImpl*, SchemaEventClassImpl::Ptr, KeyCompare> EventClassList;
- typedef map<string, pair<ObjectClassList, EventClassList> > PackageList;
-
- PackageList packages;
-
- void learnPackage(const string& packageName);
- void learnClass(SchemaObjectClassImpl::Ptr cls);
- void learnClass(SchemaEventClassImpl::Ptr cls);
- bool haveClass(const SchemaClassKeyImpl& key) const;
- SchemaObjectClassImpl::Ptr getSchema(const SchemaClassKeyImpl& key) const;
- };
-}
-
namespace {
- const char* QMF_EXCHANGE = "qpid.management";
- const char* DIR_EXCHANGE = "amq.direct";
- const char* BROKER_KEY = "broker";
- const char* BROKER_PACKAGE = "org.apache.qpid.broker";
- const char* AGENT_CLASS = "agent";
- const char* BROKER_AGENT_KEY = "agent.1.0";
-}
-
-const Object* QueryResponseImpl::getObject(uint32_t idx) const
-{
- vector<ObjectImpl::Ptr>::const_iterator iter = results.begin();
-
- while (idx > 0) {
- if (iter == results.end())
- return 0;
- iter++;
- idx--;
- }
-
- return (*iter)->envelope;
+ const char* QMF_EXCHANGE = "qpid.management";
}
#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
@@ -308,465 +55,19 @@
ConsoleEvent item;
::memset(&item, 0, sizeof(ConsoleEvent));
- item.kind = kind;
- item.agent = agent.get() ? agent->envelope : 0;
- item.classKey = classKey.get();
- item.object = object;
- item.context = context;
- item.event = event;
- item.timestamp = timestamp;
- item.methodHandle = methodHandle;
- item.methodResponse = methodResponse.get() ? methodResponse->envelope : 0;
-
- STRING_REF(name);
-
- return item;
-}
-
-BrokerEvent BrokerEventImpl::copy()
-{
- BrokerEvent item;
-
- ::memset(&item, 0, sizeof(BrokerEvent));
- item.kind = kind;
+ item.kind = kind;
+ item.agent = agent.get() ? agent->envelope : 0;
+ item.classKey = classKey.get();
+ item.object = object;
+ item.context = context;
+ item.event = event;
+ item.timestamp = timestamp;
STRING_REF(name);
- STRING_REF(exchange);
- STRING_REF(bindingKey);
- item.context = context;
- item.queryResponse = queryResponse.get() ? queryResponse->envelope : 0;
return item;
}
-BrokerProxyImpl::BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console) :
- envelope(e), console(_console.impl)
-{
- stringstream qn;
- qpid::TcpAddress addr;
-
- SystemInfo::getLocalHostname(addr);
- qn << "qmfc-" << SystemInfo::getProcessName() << "-" << addr << "-" << SystemInfo::getProcessId();
- queueName = qn.str();
-
- seqMgr.setUnsolicitedContext(SequenceContext::Ptr(new StaticContext(*this)));
-}
-
-void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/)
-{
- Mutex::ScopedLock _lock(lock);
- agentList.clear();
- eventQueue.clear();
- xmtQueue.clear();
- eventQueue.push_back(eventDeclareQueue(queueName));
- eventQueue.push_back(eventBind(DIR_EXCHANGE, queueName, queueName));
- eventQueue.push_back(eventSetupComplete());
-
- // TODO: Store session handle
-}
-
-void BrokerProxyImpl::sessionClosed()
-{
- Mutex::ScopedLock _lock(lock);
- agentList.clear();
- eventQueue.clear();
- xmtQueue.clear();
-}
-
-void BrokerProxyImpl::startProtocol()
-{
- Mutex::ScopedLock _lock(lock);
- char rawbuffer[512];
- Buffer buffer(rawbuffer, 512);
-
- agentList.push_back(AgentProxyImpl::Ptr(new AgentProxyImpl(console, this, 0, "Agent embedded in broker")));
-
- requestsOutstanding = 1;
- topicBound = false;
- uint32_t sequence(seqMgr.reserve());
- Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence);
- sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence);
-}
-
-void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
-{
- uint32_t length = buf.getPosition();
- MessageImpl::Ptr message(new MessageImpl);
-
- buf.reset();
- buf.getRawData(message->body, length);
- message->destination = destination;
- message->routingKey = routingKey;
- message->replyExchange = DIR_EXCHANGE;
- message->replyKey = queueName;
-
- xmtQueue.push_back(message);
-}
-
-void BrokerProxyImpl::handleRcvMessage(Message& message)
-{
- Buffer inBuffer(message.body, message.length);
- uint8_t opcode;
- uint32_t sequence;
-
- while (Protocol::checkHeader(inBuffer, &opcode, &sequence))
- seqMgr.dispatch(opcode, sequence, inBuffer);
-}
-
-bool BrokerProxyImpl::getXmtMessage(Message& item) const
-{
- Mutex::ScopedLock _lock(lock);
- if (xmtQueue.empty())
- return false;
- item = xmtQueue.front()->copy();
- return true;
-}
-
-void BrokerProxyImpl::popXmt()
-{
- Mutex::ScopedLock _lock(lock);
- if (!xmtQueue.empty())
- xmtQueue.pop_front();
-}
-
-bool BrokerProxyImpl::getEvent(BrokerEvent& event) const
-{
- Mutex::ScopedLock _lock(lock);
- if (eventQueue.empty())
- return false;
- event = eventQueue.front()->copy();
- return true;
-}
-
-void BrokerProxyImpl::popEvent()
-{
- Mutex::ScopedLock _lock(lock);
- if (!eventQueue.empty())
- eventQueue.pop_front();
-}
-
-uint32_t BrokerProxyImpl::agentCount() const
-{
- Mutex::ScopedLock _lock(lock);
- return agentList.size();
-}
-
-const AgentProxy* BrokerProxyImpl::getAgent(uint32_t idx) const
-{
- Mutex::ScopedLock _lock(lock);
- for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin();
- iter != agentList.end(); iter++)
- if (idx-- == 0)
- return (*iter)->envelope;
- return 0;
-}
-
-void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentProxy* agent)
-{
- SequenceContext::Ptr queryContext(new QueryContext(*this, context));
- Mutex::ScopedLock _lock(lock);
- if (agent != 0) {
- sendGetRequestLH(queryContext, query, agent->impl);
- } else {
- // TODO (optimization) only send queries to agents that have the requested class+package
- for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin();
- iter != agentList.end(); iter++) {
- sendGetRequestLH(queryContext, query, (*iter).get());
- }
- }
-}
-
-void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent)
-{
- stringstream key;
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve(queryContext));
-
- Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
- query.impl->encode(outBuffer);
- key << "agent.1." << agent->agentBank;
- sendBufferLH(outBuffer, QMF_EXCHANGE, key.str());
- QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << key.str());
-}
-
-void BrokerProxyImpl::addBinding(const string& exchange, const string& key)
-{
- eventQueue.push_back(eventBind(exchange, queueName, key));
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventDeclareQueue(const string& queueName)
-{
- BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::DECLARE_QUEUE));
- event->name = queueName;
- return event;
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventBind(const string& exchange, const string& queue, const string& key)
-{
- BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::BIND));
- event->name = queue;
- event->exchange = exchange;
- event->bindingKey = key;
-
- return event;
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventSetupComplete()
-{
- BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::SETUP_COMPLETE));
- return event;
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventStable()
-{
- BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::STABLE));
- return event;
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventQueryComplete(void* context, QueryResponseImpl::Ptr response)
-{
- BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::QUERY_COMPLETE));
- event->context = context;
- event->queryResponse = response;
- return event;
-}
-
-void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq)
-{
- brokerId.decode(inBuffer);
- QPID_LOG(trace, "RCVD BrokerResponse seq=" << seq << " brokerId=" << brokerId);
- Mutex::ScopedLock _lock(lock);
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve());
- incOutstandingLH();
- Protocol::encodeHeader(outBuffer, Protocol::OP_PACKAGE_REQUEST, sequence);
- sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT PackageRequest seq=" << sequence);
-}
-
-void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq)
-{
- string package;
-
- inBuffer.getShortString(package);
- QPID_LOG(trace, "RCVD PackageIndication seq=" << seq << " package=" << package);
- console->learnPackage(package);
-
- Mutex::ScopedLock _lock(lock);
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve());
- incOutstandingLH();
- Protocol::encodeHeader(outBuffer, Protocol::OP_CLASS_QUERY, sequence);
- outBuffer.putShortString(package);
- sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT ClassQuery seq=" << sequence << " package=" << package);
-}
-
-void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq)
-{
- string text;
- uint32_t code = inBuffer.getLong();
- inBuffer.getShortString(text);
- QPID_LOG(trace, "RCVD CommandComplete seq=" << seq << " code=" << code << " text=" << text);
-}
-
-void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq)
-{
- uint8_t kind = inBuffer.getOctet();
- SchemaClassKeyImpl classKey(inBuffer);
-
- QPID_LOG(trace, "RCVD ClassIndication seq=" << seq << " kind=" << (int) kind << " key=" << classKey.str());
-
- if (!console->haveClass(classKey)) {
- Mutex::ScopedLock _lock(lock);
- incOutstandingLH();
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve());
- Protocol::encodeHeader(outBuffer, Protocol::OP_SCHEMA_REQUEST, sequence);
- classKey.encode(outBuffer);
- sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT SchemaRequest seq=" << sequence <<" key=" << classKey.str());
- }
-}
-
-void BrokerProxyImpl::handleMethodResponse(Buffer& /*inBuffer*/, uint32_t /*seq*/)
-{
- // TODO
-}
-
-void BrokerProxyImpl::handleHeartbeatIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
-{
- // TODO
-}
-
-void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
-{
- // TODO
-}
-
-void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq)
-{
- SchemaObjectClassImpl::Ptr oClassPtr;
- SchemaEventClassImpl::Ptr eClassPtr;
- uint8_t kind = inBuffer.getOctet();
- const SchemaClassKeyImpl* key;
- if (kind == CLASS_OBJECT) {
- oClassPtr.reset(new SchemaObjectClassImpl(inBuffer));
- console->learnClass(oClassPtr);
- key = oClassPtr->getClassKey()->impl;
- QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->str());
-
- //
- // If we have just learned about the org.apache.qpid.broker:agent class, send a get
- // request for the current list of agents so we can have it on-hand before we declare
- // this session "stable".
- //
- if (key->getClassName() == AGENT_CLASS && key->getPackageName() == BROKER_PACKAGE) {
- Mutex::ScopedLock _lock(lock);
- incOutstandingLH();
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve());
- Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
- FieldTable ft;
- ft.setString("_class", AGENT_CLASS);
- ft.setString("_package", BROKER_PACKAGE);
- ft.encode(outBuffer);
- sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_AGENT_KEY);
- QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << BROKER_AGENT_KEY);
- }
- } else if (kind == CLASS_EVENT) {
- eClassPtr.reset(new SchemaEventClassImpl(inBuffer));
- console->learnClass(eClassPtr);
- key = eClassPtr->getClassKey()->impl;
- QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=event key=" << key->str());
- }
- else {
- QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind);
- }
-}
-
-ObjectImpl::Ptr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat)
-{
- SchemaClassKeyImpl classKey(inBuffer);
- QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey.str());
-
- SchemaObjectClassImpl::Ptr schema = console->getSchema(classKey);
- if (schema.get() == 0) {
- QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey.str());
- return ObjectImpl::Ptr();
- }
-
- return ObjectImpl::Ptr(new ObjectImpl(schema->envelope, inBuffer, prop, stat, true));
-}
-
-void BrokerProxyImpl::incOutstandingLH()
-{
- requestsOutstanding++;
-}
-
-void BrokerProxyImpl::decOutstanding()
-{
- Mutex::ScopedLock _lock(lock);
- requestsOutstanding--;
- if (requestsOutstanding == 0 && !topicBound) {
- topicBound = true;
- for (vector<pair<string, string> >::const_iterator iter = console->bindingList.begin();
- iter != console->bindingList.end(); iter++) {
- string exchange(iter->first.empty() ? QMF_EXCHANGE : iter->first);
- string key(iter->second);
- eventQueue.push_back(eventBind(exchange, queueName, key));
- }
- eventQueue.push_back(eventStable());
- }
-}
-
-MethodResponseImpl::MethodResponseImpl(Buffer& buf) : envelope(new MethodResponse(this))
-{
- string text;
-
- status = buf.getLong();
- buf.getMediumString(text);
- exception.reset(new Value(TYPE_LSTR));
- exception->setString(text.c_str());
-
- // TODO: Parse schema-specific output arguments.
- arguments.reset(new Value(TYPE_MAP));
-}
-
-bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
-{
- bool completeContext = false;
- if (opcode == Protocol::OP_BROKER_RESPONSE) {
- broker.handleBrokerResponse(buffer, sequence);
- completeContext = true;
- }
- else if (opcode == Protocol::OP_COMMAND_COMPLETE) {
- broker.handleCommandComplete(buffer, sequence);
- completeContext = true;
- }
- else if (opcode == Protocol::OP_SCHEMA_RESPONSE) {
- broker.handleSchemaResponse(buffer, sequence);
- completeContext = true;
- }
- else if (opcode == Protocol::OP_PACKAGE_INDICATION)
- broker.handlePackageIndication(buffer, sequence);
- else if (opcode == Protocol::OP_CLASS_INDICATION)
- broker.handleClassIndication(buffer, sequence);
- else if (opcode == Protocol::OP_HEARTBEAT_INDICATION)
- broker.handleHeartbeatIndication(buffer, sequence);
- else if (opcode == Protocol::OP_EVENT_INDICATION)
- broker.handleEventIndication(buffer, sequence);
- else if (opcode == Protocol::OP_PROPERTY_INDICATION)
- broker.handleObjectIndication(buffer, sequence, true, false);
- else if (opcode == Protocol::OP_STATISTIC_INDICATION)
- broker.handleObjectIndication(buffer, sequence, false, true);
- else if (opcode == Protocol::OP_OBJECT_INDICATION)
- broker.handleObjectIndication(buffer, sequence, true, true);
- else {
- QPID_LOG(trace, "StaticContext::handleMessage invalid opcode: " << opcode);
- completeContext = true;
- }
-
- return completeContext;
-}
-
-void QueryContext::reserve()
-{
- Mutex::ScopedLock _lock(lock);
- requestsOutstanding++;
-}
-
-void QueryContext::release()
-{
- Mutex::ScopedLock _lock(lock);
- if (--requestsOutstanding == 0) {
- broker.eventQueue.push_back(broker.eventQueryComplete(userContext, queryResponse));
- }
-}
-
-bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
-{
- bool completeContext = false;
- ObjectImpl::Ptr object;
-
- if (opcode == Protocol::OP_COMMAND_COMPLETE) {
- broker.handleCommandComplete(buffer, sequence);
- completeContext = true;
- }
- else if (opcode == Protocol::OP_OBJECT_INDICATION) {
- object = broker.handleObjectIndication(buffer, sequence, true, true);
- if (object.get() != 0)
- queryResponse->results.push_back(object);
- }
- else {
- QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode);
- completeContext = true;
- }
-
- return completeContext;
-}
-
ConsoleEngineImpl::ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& s) :
envelope(e), settings(s)
{
@@ -1037,37 +338,6 @@
// Wrappers
//==================================================================
-AgentProxy::AgentProxy(AgentProxyImpl* i) : impl(i) {}
-AgentProxy::~AgentProxy() { delete impl; }
-const char* AgentProxy::getLabel() const { return impl->getLabel().c_str(); }
-
-BrokerProxy::BrokerProxy(ConsoleEngine& console) : impl(new BrokerProxyImpl(this, console)) {}
-BrokerProxy::~BrokerProxy() { delete impl; }
-void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); }
-void BrokerProxy::sessionClosed() { impl->sessionClosed(); }
-void BrokerProxy::startProtocol() { impl->startProtocol(); }
-void BrokerProxy::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); }
-bool BrokerProxy::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); }
-void BrokerProxy::popXmt() { impl->popXmt(); }
-bool BrokerProxy::getEvent(BrokerEvent& event) const { return impl->getEvent(event); }
-void BrokerProxy::popEvent() { impl->popEvent(); }
-uint32_t BrokerProxy::agentCount() const { return impl->agentCount(); }
-const AgentProxy* BrokerProxy::getAgent(uint32_t idx) const { return impl->getAgent(idx); }
-void BrokerProxy::sendQuery(const Query& query, void* context, const AgentProxy* agent) { impl->sendQuery(query, context, agent); }
-
-MethodResponse::MethodResponse(MethodResponseImpl* i) : impl(i) {}
-MethodResponse::~MethodResponse() {}
-uint32_t MethodResponse::getStatus() const { return impl->getStatus(); }
-const Value* MethodResponse::getException() const { return impl->getException(); }
-const Value* MethodResponse::getArgs() const { return impl->getArgs(); }
-
-QueryResponse::QueryResponse(QueryResponseImpl* i) : impl(i) {}
-QueryResponse::~QueryResponse() {}
-uint32_t QueryResponse::getStatus() const { return impl->getStatus(); }
-const Value* QueryResponse::getException() const { return impl->getException(); }
-uint32_t QueryResponse::getObjectCount() const { return impl->getObjectCount(); }
-const Object* QueryResponse::getObject(uint32_t idx) const { return impl->getObject(idx); }
-
ConsoleEngine::ConsoleEngine(const ConsoleSettings& settings) : impl(new ConsoleEngineImpl(this, settings)) {}
ConsoleEngine::~ConsoleEngine() { delete impl; }
bool ConsoleEngine::getEvent(ConsoleEvent& event) const { return impl->getEvent(event); }
Added: qpid/trunk/qpid/cpp/src/qmf/ConsoleEngineImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/ConsoleEngineImpl.h?rev=816345&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/ConsoleEngineImpl.h (added)
+++ qpid/trunk/qpid/cpp/src/qmf/ConsoleEngineImpl.h Thu Sep 17 19:27:52 2009
@@ -0,0 +1,133 @@
+#ifndef _QmfConsoleEngineImpl_
+#define _QmfConsoleEngineImpl_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/ConsoleEngine.h"
+#include "qmf/MessageImpl.h"
+#include "qmf/SchemaImpl.h"
+#include "qmf/Typecode.h"
+#include "qmf/ObjectImpl.h"
+#include "qmf/ObjectIdImpl.h"
+#include "qmf/QueryImpl.h"
+#include "qmf/ValueImpl.h"
+#include "qmf/Protocol.h"
+#include "qmf/SequenceManager.h"
+#include "qmf/BrokerProxyImpl.h"
+#include <qpid/framing/Buffer.h>
+#include <qpid/framing/Uuid.h>
+#include <qpid/framing/FieldTable.h>
+#include <qpid/framing/FieldValue.h>
+#include <qpid/sys/Mutex.h>
+#include <qpid/sys/Time.h>
+#include <qpid/sys/SystemInfo.h>
+#include <string.h>
+#include <string>
+#include <deque>
+#include <map>
+#include <vector>
+#include <iostream>
+#include <fstream>
+#include <boost/shared_ptr.hpp>
+
+namespace qmf {
+
+ struct ConsoleEventImpl {
+ typedef boost::shared_ptr<ConsoleEventImpl> Ptr;
+ ConsoleEvent::EventKind kind;
+ boost::shared_ptr<AgentProxyImpl> agent;
+ std::string name;
+ boost::shared_ptr<SchemaClassKey> classKey;
+ Object* object;
+ void* context;
+ Event* event;
+ uint64_t timestamp;
+
+ ConsoleEventImpl(ConsoleEvent::EventKind k) :
+ kind(k), object(0), context(0), event(0), timestamp(0) {}
+ ~ConsoleEventImpl() {}
+ ConsoleEvent copy();
+ };
+
+ class ConsoleEngineImpl {
+ public:
+ ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& settings = ConsoleSettings());
+ ~ConsoleEngineImpl();
+
+ bool getEvent(ConsoleEvent& event) const;
+ void popEvent();
+
+ void addConnection(BrokerProxy& broker, void* context);
+ void delConnection(BrokerProxy& broker);
+
+ uint32_t packageCount() const;
+ const std::string& getPackageName(uint32_t idx) const;
+
+ uint32_t classCount(const char* packageName) const;
+ const SchemaClassKey* getClass(const char* packageName, uint32_t idx) const;
+
+ ClassKind getClassKind(const SchemaClassKey* key) const;
+ const SchemaObjectClass* getObjectClass(const SchemaClassKey* key) const;
+ const SchemaEventClass* getEventClass(const SchemaClassKey* key) const;
+
+ void bindPackage(const char* packageName);
+ void bindClass(const SchemaClassKey* key);
+ void bindClass(const char* packageName, const char* className);
+
+ /*
+ void startSync(const Query& query, void* context, SyncQuery& sync);
+ void touchSync(SyncQuery& sync);
+ void endSync(SyncQuery& sync);
+ */
+
+ private:
+ friend class BrokerProxyImpl;
+ ConsoleEngine* envelope;
+ const ConsoleSettings& settings;
+ mutable qpid::sys::Mutex lock;
+ std::deque<ConsoleEventImpl::Ptr> eventQueue;
+ std::vector<BrokerProxyImpl*> brokerList;
+ std::vector<std::pair<std::string, std::string> > bindingList; // exchange/key (empty exchange => QMF_EXCHANGE)
+
+ // Declare a compare class for the class maps that compares the dereferenced
+ // class key pointers. The default behavior would be to compare the pointer
+ // addresses themselves.
+ struct KeyCompare {
+ bool operator()(const SchemaClassKeyImpl* left, const SchemaClassKeyImpl* right) const {
+ return *left < *right;
+ }
+ };
+
+ typedef std::map<const SchemaClassKeyImpl*, SchemaObjectClassImpl::Ptr, KeyCompare> ObjectClassList;
+ typedef std::map<const SchemaClassKeyImpl*, SchemaEventClassImpl::Ptr, KeyCompare> EventClassList;
+ typedef std::map<std::string, std::pair<ObjectClassList, EventClassList> > PackageList;
+
+ PackageList packages;
+
+ void learnPackage(const std::string& packageName);
+ void learnClass(SchemaObjectClassImpl::Ptr cls);
+ void learnClass(SchemaEventClassImpl::Ptr cls);
+ bool haveClass(const SchemaClassKeyImpl& key) const;
+ SchemaObjectClassImpl::Ptr getSchema(const SchemaClassKeyImpl& key) const;
+ };
+}
+
+#endif
+
Modified: qpid/trunk/qpid/cpp/src/qmf/Object.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/Object.h?rev=816345&r1=816344&r2=816345&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/Object.h (original)
+++ qpid/trunk/qpid/cpp/src/qmf/Object.h Thu Sep 17 19:27:52 2009
@@ -39,6 +39,7 @@
void setObjectId(ObjectId* oid);
const SchemaObjectClass* getClass() const;
Value* getValue(char* key) const;
+ void invokeMethod(const char* methodName, const Value* inArgs, void* context) const;
ObjectImpl* impl;
};
Modified: qpid/trunk/qpid/cpp/src/qmf/ObjectImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/ObjectImpl.cpp?rev=816345&r1=816344&r2=816345&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/ObjectImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/ObjectImpl.cpp Thu Sep 17 19:27:52 2009
@@ -19,6 +19,7 @@
#include "qmf/ObjectImpl.h"
#include "qmf/ValueImpl.h"
+#include "qmf/BrokerProxyImpl.h"
#include <qpid/sys/Time.h>
using namespace std;
@@ -27,7 +28,7 @@
using qpid::framing::Buffer;
ObjectImpl::ObjectImpl(Object* e, const SchemaObjectClass* type) :
- envelope(e), objectClass(type), createTime(uint64_t(Duration(now()))),
+ envelope(e), objectClass(type), broker(0), createTime(uint64_t(Duration(now()))),
destroyTime(0), lastUpdatedTime(createTime)
{
int propCount = objectClass->getPropertyCount();
@@ -45,8 +46,8 @@
}
}
-ObjectImpl::ObjectImpl(const SchemaObjectClass* type, Buffer& buffer, bool prop, bool stat, bool managed) :
- envelope(new Object(this)), objectClass(type), createTime(0), destroyTime(0), lastUpdatedTime(0)
+ObjectImpl::ObjectImpl(const SchemaObjectClass* type, BrokerProxyImpl* b, Buffer& buffer, bool prop, bool stat, bool managed) :
+ envelope(new Object(this)), objectClass(type), broker(b), createTime(0), destroyTime(0), lastUpdatedTime(0)
{
int idx;
@@ -107,6 +108,12 @@
return 0;
}
+void ObjectImpl::invokeMethod(const string& methodName, const Value* inArgs, void* context) const
+{
+ if (broker != 0 && objectId.get() != 0)
+ broker->sendMethodRequest(objectId.get(), objectClass, methodName, inArgs, context);
+}
+
void ObjectImpl::parsePresenceMasks(Buffer& buffer, set<string>& excludeList)
{
int propCount = objectClass->getPropertyCount();
@@ -205,4 +212,5 @@
void Object::setObjectId(ObjectId* oid) { impl->setObjectId(oid); }
const SchemaObjectClass* Object::getClass() const { return impl->getClass(); }
Value* Object::getValue(char* key) const { return impl->getValue(key); }
+void Object::invokeMethod(const char* m, const Value* a, void* c) const { impl->invokeMethod(m, a, c); }
Modified: qpid/trunk/qpid/cpp/src/qmf/ObjectImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/ObjectImpl.h?rev=816345&r1=816344&r2=816345&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/ObjectImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qmf/ObjectImpl.h Thu Sep 17 19:27:52 2009
@@ -31,11 +31,14 @@
namespace qmf {
+ class BrokerProxyImpl;
+
struct ObjectImpl {
typedef boost::shared_ptr<ObjectImpl> Ptr;
typedef boost::shared_ptr<Value> ValuePtr;
Object* envelope;
const SchemaObjectClass* objectClass;
+ BrokerProxyImpl* broker;
boost::shared_ptr<ObjectIdImpl> objectId;
uint64_t createTime;
uint64_t destroyTime;
@@ -44,7 +47,8 @@
mutable std::map<std::string, ValuePtr> statistics;
ObjectImpl(Object* e, const SchemaObjectClass* type);
- ObjectImpl(const SchemaObjectClass* type, qpid::framing::Buffer& buffer, bool prop, bool stat, bool managed);
+ ObjectImpl(const SchemaObjectClass* type, BrokerProxyImpl* b, qpid::framing::Buffer& buffer,
+ bool prop, bool stat, bool managed);
~ObjectImpl();
void destroy();
@@ -52,6 +56,7 @@
void setObjectId(ObjectId* oid) { objectId.reset(oid->impl); }
const SchemaObjectClass* getClass() const { return objectClass; }
Value* getValue(const std::string& key) const;
+ void invokeMethod(const std::string& methodName, const Value* inArgs, void* context) const;
void parsePresenceMasks(qpid::framing::Buffer& buffer, std::set<std::string>& excludeList);
void encodeSchemaKey(qpid::framing::Buffer& buffer) const;
Modified: qpid/trunk/qpid/cpp/src/qmf/Value.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/Value.h?rev=816345&r1=816344&r2=816345&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/Value.h (original)
+++ qpid/trunk/qpid/cpp/src/qmf/Value.h Thu Sep 17 19:27:52 2009
@@ -31,6 +31,7 @@
class Value {
public:
// Value();
+ Value(const Value& from);
Value(Typecode t, Typecode arrayType = TYPE_UINT8);
Value(ValueImpl* impl);
~Value();
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org