You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@qpid.apache.org by MartiN Beneš <ma...@gmail.com> on 2012/03/21 12:26:59 UTC
python QMF synchronization
Hi,
I cannot find information about qmf synchronization. Or is every call
synchronous?
For instance if I purge a queue: queue.purge(0)
how can i tell it was done?
Re: python QMF synchronization
Posted by Fraser Adams <fr...@blueyonder.co.uk>.
Hi Martin
If you are using the python QMF2 API implementation which is the one
that lives in qpid/extras/qmf/src/py/qmf2-prototype and implements the
QMF2 API specified here
https://cwiki.apache.org/qpid/qmfv2-api-proposal.html then what you
would normally do would be to use getObjects to return a queue object
which would be a QmfConsoleData then call invoke_method on that
"invoke_method(name, inArgs{}, [[reply-handle] | [timeout]]): invoke the
named method on this instance." the invoke_method call should return a
MethodResponse object (see the InvokingMethods subsection in the page I
linked) so *if* the method in question has a sensible return value (or
needs to deliver an error/exception) it should be contained in that.
class MethodResult:
<constructor>( QmfData<exception> |<map of properties> )
.succeeded(): returns True if the method call executed without error.
.get_exception(): returns the QmfData error object if method fails, else None
.get_arguments(): returns a map of "name"=<value> pairs of all returned arguments.
.get_argument(<name>): returns value of argument named "name".
so get_arguments should give you what you need if it's actually present
(and I don't actually know what purge returns).
I actually wrote a little QMF2 application using the purge method which
is a "fuse" that intercepts queueThresholdExceeded Events and purges the
queue in question of ~10% of the messages. It's actually written in Java
(it's one of the demos I included in my Java QMF2 API implementation).
I've just looked at it and unfortunately I didn't bother to do anything
with the return value :-) but hopefully it'll give you some ideas
(though do bear in mind that the program is intended to be just a demo
so the exception handling stuff is a bit noddy).
/**
* Look up a queue object with the given name and if it's not a
ring queue invoke the queue's purge method.
* @param queueName the name of the queue to purge
* @param msgDepth the number of messages on the queue, used to
determine how many messages to purge.
*/
private void purgeQueue(final String queueName, long msgDepth)
{
QmfConsoleData queue = _queueCache.get(queueName);
if (queue == null)
{
System.out.printf("%s ERROR QueueFuse.disconnectQueue() %s
reference couldn't be found\n",
new Date().toString(), queueName);
}
else
{ // If we've found a queue called queueName we then find the
bindings that reference it.
Map args = (Map)queue.getValue("arguments");
String policyType = (String)args.get("qpid.policy_type");
if (policyType != null && policyType.equals("ring"))
{ // If qpid.policy_type=ring we return.
return;
}
try
{
QmfData arguments = new QmfData();
arguments.setValue("request", (long)(_purge*msgDepth));
queue.invokeMethod("purge", arguments);
}
catch (QmfException e)
{
System.out.println(e.getMessage());
}
}
}
HTH
Frase
On 21/03/12 11:50, MartiN Beneš wrote:
> Interesting.
> If I understand correctly, you are manually sending qmf2 commands to the
> broker.
> I would very much prefer to use existing qmf library instead of writing a
> new one. Specifically to use the library bundled with the qpidd (mrg)
> broker.
>
> On Wed, Mar 21, 2012 at 12:35, Pavel Moravec<pm...@redhat.com> wrote:
>
>> .. and now also for python client (that I overlooked in email subject):
>>
>>
>> conn = Connection(broker)
>> try:
>> conn.open()
>> ssn = conn.session()
>> snd = ssn.sender("qmf.default.direct/broker")
>> reply_to = "reply-queue; {create:always,
>> node:{x-declare:{auto-delete:true}}}"
>> rcv = ssn.receiver(reply_to)
>>
>> content = {
>> "_object_id": {"_object_name":
>> "org.apache.qpid.broker:broker:amqp-broker"},
>> "_method_name": "delete",
>> "_arguments": {"type":"queue", "name":queue_name}
>> }
>> request = Message(reply_to=reply_to, content=content)
>> request.properties["x-amqp-0-10.app-id"] = "qmf2"
>> request.properties["qmf.opcode"] = "_method_request"
>> snd.send(request)
>>
>> try:
>> response = rcv.fetch(timeout=30)
>> if response.properties['x-amqp-0-10.app-id'] == 'qmf2':
>> if response.properties['qmf.opcode'] == '_method_response':
>> print "Response:"
>> print response.content['_arguments']
>> elif response.properties['qmf.opcode'] == '_exception':
>> raise Exception("Error: %s" % response.content['_values'])
>> else: raise Exception("Invalid response received, unexpected opcode:
>> %s" % m)
>> else: raise Exception("Invalid response received, not a qmfv2 method:
>> %s" % m)
>> except Empty:
>> print "No response received!"
>> except Exception, e:
>> print e
>> except ReceiverError, e:
>> print e
>> except KeyboardInterrupt:
>> pass
>> conn.close()
>>
>>
>>
>> ----- Original Message -----
>>> From: "Pavel Moravec"<pm...@redhat.com>
>>> To: users@qpid.apache.org
>>> Sent: Wednesday, March 21, 2012 12:33:28 PM
>>> Subject: Re: python QMF synchronization
>>>
>>> Hi Martin,
>>> you can set up ReplyTo address where qpid shall send its response.
>>> I.e. something like C++ code below (that invokes queue deletion and
>>> fetches response in 30seconds limit):
>>>
>>> Connection connection(url);
>>> try {
>>> connection.open();
>>> Session session = connection.createSession();
>>> Sender sender =
>>> session.createSender("qmf.default.direct/broker");
>>> Address responseQueue("#reply-queue; {create:always,
>>> node:{x-declare:{auto-delete:true}}}");
>>> Receiver receiver = session.createReceiver(responseQueue);
>>>
>>> Message message;
>>> Variant::Map content;
>>> Variant::Map OID;
>>> Variant::Map arguments;
>>> OID["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
>>> arguments["type"] = "queue";
>>> arguments["name"] = queue_name;
>>>
>>> content["_object_id"] = OID;
>>> content["_method_name"] = "delete";
>>> content["_arguments"] = arguments;
>>>
>>> encode(content, message);
>>> message.setReplyTo(responseQueue);
>>> message.setProperty("x-amqp-0-10.app-id", "qmf2");
>>> message.setProperty("qmf.opcode", "_method_request");
>>>
>>> sender.send(message, true);
>>>
>>> Message response;
>>> if (receiver.fetch(response,qpid::messaging::Duration(30000)) ==
>>> true)
>>> {
>>> qpid::types::Variant::Map recv_props =
>> response.getProperties();
>>> if (recv_props["x-amqp-0-10.app-id"] == "qmf2")
>>> if (recv_props["qmf.opcode"] == "_method_response")
>>> std::cout<< "Response: OK"<< std::endl;
>>> else if (recv_props["qmf.opcode"] == "_exception")
>>> std::cerr<< "Error: "<<
>> response.getContent()<< std::endl;
>>> else
>>> std::cerr<< "Invalid response received!"
>> << std::endl;
>>> else
>>> std::cerr<< "Invalid response not of qmf2 type
>> received!"<<
>>> std::endl;
>>> }
>>> else
>>> std::cout<< "Timeout: No response received within 30
>> seconds!"<<
>>> std::endl;
>>>
>>> connection.close();
>>> return 0;
>>> } catch(const std::exception& error) {
>>> std::cout<< error.what()<< std::endl;
>>> connection.close();
>>> }
>>>
>>> Kind regards,
>>> Pavel
>>>
>>>
>>>
>>> ----- Original Message -----
>>>> From: "MartiN Beneš"<ma...@gmail.com>
>>>> To: "users"<us...@qpid.apache.org>
>>>> Sent: Wednesday, March 21, 2012 12:26:59 PM
>>>> Subject: python QMF synchronization
>>>>
>>>> Hi,
>>>> I cannot find information about qmf synchronization. Or is every
>>>> call
>>>> synchronous?
>>>> For instance if I purge a queue: queue.purge(0)
>>>> how can i tell it was done?
>>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
>>> For additional commands, e-mail: users-help@qpid.apache.org
>>>
>>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
>> For additional commands, e-mail: users-help@qpid.apache.org
>>
>>
Re: python QMF synchronization
Posted by MartiN Beneš <ma...@gmail.com>.
Interesting.
If I understand correctly, you are manually sending qmf2 commands to the
broker.
I would very much prefer to use existing qmf library instead of writing a
new one. Specifically to use the library bundled with the qpidd (mrg)
broker.
On Wed, Mar 21, 2012 at 12:35, Pavel Moravec <pm...@redhat.com> wrote:
> .. and now also for python client (that I overlooked in email subject):
>
>
> conn = Connection(broker)
> try:
> conn.open()
> ssn = conn.session()
> snd = ssn.sender("qmf.default.direct/broker")
> reply_to = "reply-queue; {create:always,
> node:{x-declare:{auto-delete:true}}}"
> rcv = ssn.receiver(reply_to)
>
> content = {
> "_object_id": {"_object_name":
> "org.apache.qpid.broker:broker:amqp-broker"},
> "_method_name": "delete",
> "_arguments": {"type":"queue", "name":queue_name}
> }
> request = Message(reply_to=reply_to, content=content)
> request.properties["x-amqp-0-10.app-id"] = "qmf2"
> request.properties["qmf.opcode"] = "_method_request"
> snd.send(request)
>
> try:
> response = rcv.fetch(timeout=30)
> if response.properties['x-amqp-0-10.app-id'] == 'qmf2':
> if response.properties['qmf.opcode'] == '_method_response':
> print "Response:"
> print response.content['_arguments']
> elif response.properties['qmf.opcode'] == '_exception':
> raise Exception("Error: %s" % response.content['_values'])
> else: raise Exception("Invalid response received, unexpected opcode:
> %s" % m)
> else: raise Exception("Invalid response received, not a qmfv2 method:
> %s" % m)
> except Empty:
> print "No response received!"
> except Exception, e:
> print e
> except ReceiverError, e:
> print e
> except KeyboardInterrupt:
> pass
> conn.close()
>
>
>
> ----- Original Message -----
> > From: "Pavel Moravec" <pm...@redhat.com>
> > To: users@qpid.apache.org
> > Sent: Wednesday, March 21, 2012 12:33:28 PM
> > Subject: Re: python QMF synchronization
> >
> > Hi Martin,
> > you can set up ReplyTo address where qpid shall send its response.
> > I.e. something like C++ code below (that invokes queue deletion and
> > fetches response in 30seconds limit):
> >
> > Connection connection(url);
> > try {
> > connection.open();
> > Session session = connection.createSession();
> > Sender sender =
> > session.createSender("qmf.default.direct/broker");
> > Address responseQueue("#reply-queue; {create:always,
> > node:{x-declare:{auto-delete:true}}}");
> > Receiver receiver = session.createReceiver(responseQueue);
> >
> > Message message;
> > Variant::Map content;
> > Variant::Map OID;
> > Variant::Map arguments;
> > OID["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
> > arguments["type"] = "queue";
> > arguments["name"] = queue_name;
> >
> > content["_object_id"] = OID;
> > content["_method_name"] = "delete";
> > content["_arguments"] = arguments;
> >
> > encode(content, message);
> > message.setReplyTo(responseQueue);
> > message.setProperty("x-amqp-0-10.app-id", "qmf2");
> > message.setProperty("qmf.opcode", "_method_request");
> >
> > sender.send(message, true);
> >
> > Message response;
> > if (receiver.fetch(response,qpid::messaging::Duration(30000)) ==
> > true)
> > {
> > qpid::types::Variant::Map recv_props =
> response.getProperties();
> > if (recv_props["x-amqp-0-10.app-id"] == "qmf2")
> > if (recv_props["qmf.opcode"] == "_method_response")
> > std::cout << "Response: OK" << std::endl;
> > else if (recv_props["qmf.opcode"] == "_exception")
> > std::cerr << "Error: " <<
> response.getContent() << std::endl;
> > else
> > std::cerr << "Invalid response received!"
> << std::endl;
> > else
> > std::cerr << "Invalid response not of qmf2 type
> received!" <<
> > std::endl;
> > }
> > else
> > std::cout << "Timeout: No response received within 30
> seconds!" <<
> > std::endl;
> >
> > connection.close();
> > return 0;
> > } catch(const std::exception& error) {
> > std::cout << error.what() << std::endl;
> > connection.close();
> > }
> >
> > Kind regards,
> > Pavel
> >
> >
> >
> > ----- Original Message -----
> > > From: "MartiN Beneš" <ma...@gmail.com>
> > > To: "users" <us...@qpid.apache.org>
> > > Sent: Wednesday, March 21, 2012 12:26:59 PM
> > > Subject: python QMF synchronization
> > >
> > > Hi,
> > > I cannot find information about qmf synchronization. Or is every
> > > call
> > > synchronous?
> > > For instance if I purge a queue: queue.purge(0)
> > > how can i tell it was done?
> > >
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> > For additional commands, e-mail: users-help@qpid.apache.org
> >
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> For additional commands, e-mail: users-help@qpid.apache.org
>
>
Re: python QMF synchronization
Posted by Pavel Moravec <pm...@redhat.com>.
.. and now also for python client (that I overlooked in email subject):
conn = Connection(broker)
try:
conn.open()
ssn = conn.session()
snd = ssn.sender("qmf.default.direct/broker")
reply_to = "reply-queue; {create:always, node:{x-declare:{auto-delete:true}}}"
rcv = ssn.receiver(reply_to)
content = {
"_object_id": {"_object_name": "org.apache.qpid.broker:broker:amqp-broker"},
"_method_name": "delete",
"_arguments": {"type":"queue", "name":queue_name}
}
request = Message(reply_to=reply_to, content=content)
request.properties["x-amqp-0-10.app-id"] = "qmf2"
request.properties["qmf.opcode"] = "_method_request"
snd.send(request)
try:
response = rcv.fetch(timeout=30)
if response.properties['x-amqp-0-10.app-id'] == 'qmf2':
if response.properties['qmf.opcode'] == '_method_response':
print "Response:"
print response.content['_arguments']
elif response.properties['qmf.opcode'] == '_exception':
raise Exception("Error: %s" % response.content['_values'])
else: raise Exception("Invalid response received, unexpected opcode: %s" % m)
else: raise Exception("Invalid response received, not a qmfv2 method: %s" % m)
except Empty:
print "No response received!"
except Exception, e:
print e
except ReceiverError, e:
print e
except KeyboardInterrupt:
pass
conn.close()
----- Original Message -----
> From: "Pavel Moravec" <pm...@redhat.com>
> To: users@qpid.apache.org
> Sent: Wednesday, March 21, 2012 12:33:28 PM
> Subject: Re: python QMF synchronization
>
> Hi Martin,
> you can set up ReplyTo address where qpid shall send its response.
> I.e. something like C++ code below (that invokes queue deletion and
> fetches response in 30seconds limit):
>
> Connection connection(url);
> try {
> connection.open();
> Session session = connection.createSession();
> Sender sender =
> session.createSender("qmf.default.direct/broker");
> Address responseQueue("#reply-queue; {create:always,
> node:{x-declare:{auto-delete:true}}}");
> Receiver receiver = session.createReceiver(responseQueue);
>
> Message message;
> Variant::Map content;
> Variant::Map OID;
> Variant::Map arguments;
> OID["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
> arguments["type"] = "queue";
> arguments["name"] = queue_name;
>
> content["_object_id"] = OID;
> content["_method_name"] = "delete";
> content["_arguments"] = arguments;
>
> encode(content, message);
> message.setReplyTo(responseQueue);
> message.setProperty("x-amqp-0-10.app-id", "qmf2");
> message.setProperty("qmf.opcode", "_method_request");
>
> sender.send(message, true);
>
> Message response;
> if (receiver.fetch(response,qpid::messaging::Duration(30000)) ==
> true)
> {
> qpid::types::Variant::Map recv_props = response.getProperties();
> if (recv_props["x-amqp-0-10.app-id"] == "qmf2")
> if (recv_props["qmf.opcode"] == "_method_response")
> std::cout << "Response: OK" << std::endl;
> else if (recv_props["qmf.opcode"] == "_exception")
> std::cerr << "Error: " << response.getContent() << std::endl;
> else
> std::cerr << "Invalid response received!" << std::endl;
> else
> std::cerr << "Invalid response not of qmf2 type received!" <<
> std::endl;
> }
> else
> std::cout << "Timeout: No response received within 30 seconds!" <<
> std::endl;
>
> connection.close();
> return 0;
> } catch(const std::exception& error) {
> std::cout << error.what() << std::endl;
> connection.close();
> }
>
> Kind regards,
> Pavel
>
>
>
> ----- Original Message -----
> > From: "MartiN Beneš" <ma...@gmail.com>
> > To: "users" <us...@qpid.apache.org>
> > Sent: Wednesday, March 21, 2012 12:26:59 PM
> > Subject: python QMF synchronization
> >
> > Hi,
> > I cannot find information about qmf synchronization. Or is every
> > call
> > synchronous?
> > For instance if I purge a queue: queue.purge(0)
> > how can i tell it was done?
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> For additional commands, e-mail: users-help@qpid.apache.org
>
>
---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org
Re: python QMF synchronization
Posted by Pavel Moravec <pm...@redhat.com>.
Hi Martin,
you can set up ReplyTo address where qpid shall send its response. I.e. something like C++ code below (that invokes queue deletion and fetches response in 30seconds limit):
Connection connection(url);
try {
connection.open();
Session session = connection.createSession();
Sender sender = session.createSender("qmf.default.direct/broker");
Address responseQueue("#reply-queue; {create:always, node:{x-declare:{auto-delete:true}}}");
Receiver receiver = session.createReceiver(responseQueue);
Message message;
Variant::Map content;
Variant::Map OID;
Variant::Map arguments;
OID["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
arguments["type"] = "queue";
arguments["name"] = queue_name;
content["_object_id"] = OID;
content["_method_name"] = "delete";
content["_arguments"] = arguments;
encode(content, message);
message.setReplyTo(responseQueue);
message.setProperty("x-amqp-0-10.app-id", "qmf2");
message.setProperty("qmf.opcode", "_method_request");
sender.send(message, true);
Message response;
if (receiver.fetch(response,qpid::messaging::Duration(30000)) == true)
{
qpid::types::Variant::Map recv_props = response.getProperties();
if (recv_props["x-amqp-0-10.app-id"] == "qmf2")
if (recv_props["qmf.opcode"] == "_method_response")
std::cout << "Response: OK" << std::endl;
else if (recv_props["qmf.opcode"] == "_exception")
std::cerr << "Error: " << response.getContent() << std::endl;
else
std::cerr << "Invalid response received!" << std::endl;
else
std::cerr << "Invalid response not of qmf2 type received!" << std::endl;
}
else
std::cout << "Timeout: No response received within 30 seconds!" << std::endl;
connection.close();
return 0;
} catch(const std::exception& error) {
std::cout << error.what() << std::endl;
connection.close();
}
Kind regards,
Pavel
----- Original Message -----
> From: "MartiN Beneš" <ma...@gmail.com>
> To: "users" <us...@qpid.apache.org>
> Sent: Wednesday, March 21, 2012 12:26:59 PM
> Subject: python QMF synchronization
>
> Hi,
> I cannot find information about qmf synchronization. Or is every call
> synchronous?
> For instance if I purge a queue: queue.purge(0)
> how can i tell it was done?
>
---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org