You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@qpid.apache.org by MartiN Beneš <ma...@gmail.com> on 2012/03/21 12:26:59 UTC

python QMF synchronization

Hi,
I cannot find information about qmf synchronization. Or is every call
synchronous?
For instance if I purge a queue: queue.purge(0)
how can i tell it was done?

Re: python QMF synchronization

Posted by Fraser Adams <fr...@blueyonder.co.uk>.
Hi Martin
If you are using the python QMF2 API implementation which is the one 
that lives in qpid/extras/qmf/src/py/qmf2-prototype and implements the 
QMF2 API specified here 
https://cwiki.apache.org/qpid/qmfv2-api-proposal.html then what you 
would normally do would be to use getObjects to return a queue object 
which would be a QmfConsoleData then call invoke_method on that 
"invoke_method(name, inArgs{}, [[reply-handle] | [timeout]]): invoke the 
named method on this instance." the invoke_method call should return a 
MethodResponse object (see the InvokingMethods subsection in the page I 
linked) so *if* the method in question has a sensible return value (or 
needs to deliver an error/exception) it should be contained in that.

class MethodResult:
       <constructor>( QmfData<exception>  |<map of properties>  )
       .succeeded(): returns True if the method call executed without error.
       .get_exception(): returns the QmfData error object if method fails, else None
       .get_arguments(): returns a map of "name"=<value>  pairs of all returned arguments.
       .get_argument(<name>): returns value of argument named "name".

so get_arguments should give you what you need if it's actually present 
(and I don't actually know what purge returns).

I actually wrote a little QMF2 application using the purge method which 
is a "fuse" that intercepts queueThresholdExceeded Events and purges the 
queue in question of ~10% of the messages. It's actually written in Java 
(it's one of the demos I included in my Java QMF2 API implementation). 
I've just looked at it and unfortunately I didn't bother to do anything 
with the return value :-) but hopefully it'll give you some ideas 
(though do bear in mind that the program is intended to be just a demo 
so the exception handling stuff is a bit noddy).


     /**
      * Look up a queue object with the given name and if it's not a 
ring queue invoke the queue's purge method.
      * @param queueName the name of the queue to purge
      * @param msgDepth the number of messages on the queue, used to 
determine how many messages to purge.
      */
     private void purgeQueue(final String queueName, long msgDepth)
     {
         QmfConsoleData queue = _queueCache.get(queueName);

         if (queue == null)
         {
             System.out.printf("%s ERROR QueueFuse.disconnectQueue() %s 
reference couldn't be found\n",
                               new Date().toString(), queueName);
         }
         else
         { // If we've found a queue called queueName we then find the 
bindings that reference it.

             Map args = (Map)queue.getValue("arguments");
             String policyType = (String)args.get("qpid.policy_type");
             if (policyType != null && policyType.equals("ring"))
             {  // If qpid.policy_type=ring we return.
                 return;
             }

             try
             {
                 QmfData arguments = new QmfData();
                 arguments.setValue("request", (long)(_purge*msgDepth));
                 queue.invokeMethod("purge", arguments);
             }
             catch (QmfException e)
             {
                 System.out.println(e.getMessage());
             }
         }
     }


HTH
Frase

On 21/03/12 11:50, MartiN Beneš wrote:
> Interesting.
> If I understand correctly, you are manually sending qmf2 commands to the
> broker.
> I would very much prefer to use existing qmf library instead of writing a
> new one. Specifically to use the library bundled with the qpidd (mrg)
> broker.
>
> On Wed, Mar 21, 2012 at 12:35, Pavel Moravec<pm...@redhat.com>  wrote:
>
>> .. and now also for python client (that I overlooked in email subject):
>>
>>
>> conn = Connection(broker)
>> try:
>>   conn.open()
>>   ssn = conn.session()
>>   snd = ssn.sender("qmf.default.direct/broker")
>>   reply_to = "reply-queue; {create:always,
>> node:{x-declare:{auto-delete:true}}}"
>>   rcv = ssn.receiver(reply_to)
>>
>>   content = {
>>              "_object_id": {"_object_name":
>> "org.apache.qpid.broker:broker:amqp-broker"},
>>              "_method_name": "delete",
>>              "_arguments": {"type":"queue", "name":queue_name}
>>             }
>>   request = Message(reply_to=reply_to, content=content)
>>   request.properties["x-amqp-0-10.app-id"] = "qmf2"
>>   request.properties["qmf.opcode"] = "_method_request"
>>   snd.send(request)
>>
>>   try:
>>     response = rcv.fetch(timeout=30)
>>     if response.properties['x-amqp-0-10.app-id'] == 'qmf2':
>>       if response.properties['qmf.opcode'] == '_method_response':
>>         print "Response:"
>>         print response.content['_arguments']
>>       elif response.properties['qmf.opcode'] == '_exception':
>>         raise Exception("Error: %s" % response.content['_values'])
>>       else: raise Exception("Invalid response received, unexpected opcode:
>> %s" % m)
>>     else: raise Exception("Invalid response received, not a qmfv2 method:
>> %s" % m)
>>   except Empty:
>>     print "No response received!"
>>   except Exception, e:
>>     print e
>> except ReceiverError, e:
>>   print e
>> except KeyboardInterrupt:
>>   pass
>> conn.close()
>>
>>
>>
>> ----- Original Message -----
>>> From: "Pavel Moravec"<pm...@redhat.com>
>>> To: users@qpid.apache.org
>>> Sent: Wednesday, March 21, 2012 12:33:28 PM
>>> Subject: Re: python QMF synchronization
>>>
>>> Hi Martin,
>>> you can set up ReplyTo address where qpid shall send its response.
>>> I.e. something like C++ code below (that invokes queue deletion and
>>> fetches response in 30seconds limit):
>>>
>>>      Connection connection(url);
>>>      try {
>>>          connection.open();
>>>          Session session = connection.createSession();
>>>          Sender sender =
>>>          session.createSender("qmf.default.direct/broker");
>>>          Address responseQueue("#reply-queue; {create:always,
>>>          node:{x-declare:{auto-delete:true}}}");
>>>          Receiver receiver = session.createReceiver(responseQueue);
>>>
>>>          Message message;
>>>          Variant::Map content;
>>>        Variant::Map OID;
>>>        Variant::Map arguments;
>>>        OID["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
>>>        arguments["type"] = "queue";
>>>        arguments["name"] = queue_name;
>>>
>>>          content["_object_id"] = OID;
>>>          content["_method_name"] = "delete";
>>>          content["_arguments"] = arguments;
>>>
>>>          encode(content, message);
>>>        message.setReplyTo(responseQueue);
>>>        message.setProperty("x-amqp-0-10.app-id", "qmf2");
>>>        message.setProperty("qmf.opcode", "_method_request");
>>>
>>>          sender.send(message, true);
>>>
>>>        Message response;
>>>        if (receiver.fetch(response,qpid::messaging::Duration(30000)) ==
>>>        true)
>>>        {
>>>                qpid::types::Variant::Map recv_props =
>> response.getProperties();
>>>                if (recv_props["x-amqp-0-10.app-id"] == "qmf2")
>>>                        if (recv_props["qmf.opcode"] == "_method_response")
>>>                                std::cout<<  "Response: OK"<<  std::endl;
>>>                        else if (recv_props["qmf.opcode"] == "_exception")
>>>                                std::cerr<<  "Error: "<<
>> response.getContent()<<  std::endl;
>>>                        else
>>>                                std::cerr<<  "Invalid response received!"
>> <<  std::endl;
>>>                else
>>>                        std::cerr<<  "Invalid response not of qmf2 type
>> received!"<<
>>>                        std::endl;
>>>        }
>>>        else
>>>                std::cout<<  "Timeout: No response received within 30
>> seconds!"<<
>>>                std::endl;
>>>
>>>          connection.close();
>>>          return 0;
>>>      } catch(const std::exception&  error) {
>>>          std::cout<<  error.what()<<  std::endl;
>>>          connection.close();
>>>      }
>>>
>>> Kind regards,
>>> Pavel
>>>
>>>
>>>
>>> ----- Original Message -----
>>>> From: "MartiN Beneš"<ma...@gmail.com>
>>>> To: "users"<us...@qpid.apache.org>
>>>> Sent: Wednesday, March 21, 2012 12:26:59 PM
>>>> Subject: python QMF synchronization
>>>>
>>>> Hi,
>>>> I cannot find information about qmf synchronization. Or is every
>>>> call
>>>> synchronous?
>>>> For instance if I purge a queue: queue.purge(0)
>>>> how can i tell it was done?
>>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
>>> For additional commands, e-mail: users-help@qpid.apache.org
>>>
>>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
>> For additional commands, e-mail: users-help@qpid.apache.org
>>
>>


Re: python QMF synchronization

Posted by MartiN Beneš <ma...@gmail.com>.
Interesting.
If I understand correctly, you are manually sending qmf2 commands to the
broker.
I would very much prefer to use existing qmf library instead of writing a
new one. Specifically to use the library bundled with the qpidd (mrg)
broker.

On Wed, Mar 21, 2012 at 12:35, Pavel Moravec <pm...@redhat.com> wrote:

> .. and now also for python client (that I overlooked in email subject):
>
>
> conn = Connection(broker)
> try:
>  conn.open()
>  ssn = conn.session()
>  snd = ssn.sender("qmf.default.direct/broker")
>  reply_to = "reply-queue; {create:always,
> node:{x-declare:{auto-delete:true}}}"
>  rcv = ssn.receiver(reply_to)
>
>  content = {
>             "_object_id": {"_object_name":
> "org.apache.qpid.broker:broker:amqp-broker"},
>             "_method_name": "delete",
>             "_arguments": {"type":"queue", "name":queue_name}
>            }
>  request = Message(reply_to=reply_to, content=content)
>  request.properties["x-amqp-0-10.app-id"] = "qmf2"
>  request.properties["qmf.opcode"] = "_method_request"
>  snd.send(request)
>
>  try:
>    response = rcv.fetch(timeout=30)
>    if response.properties['x-amqp-0-10.app-id'] == 'qmf2':
>      if response.properties['qmf.opcode'] == '_method_response':
>        print "Response:"
>        print response.content['_arguments']
>      elif response.properties['qmf.opcode'] == '_exception':
>        raise Exception("Error: %s" % response.content['_values'])
>      else: raise Exception("Invalid response received, unexpected opcode:
> %s" % m)
>    else: raise Exception("Invalid response received, not a qmfv2 method:
> %s" % m)
>  except Empty:
>    print "No response received!"
>  except Exception, e:
>    print e
> except ReceiverError, e:
>  print e
> except KeyboardInterrupt:
>  pass
> conn.close()
>
>
>
> ----- Original Message -----
> > From: "Pavel Moravec" <pm...@redhat.com>
> > To: users@qpid.apache.org
> > Sent: Wednesday, March 21, 2012 12:33:28 PM
> > Subject: Re: python QMF synchronization
> >
> > Hi Martin,
> > you can set up ReplyTo address where qpid shall send its response.
> > I.e. something like C++ code below (that invokes queue deletion and
> > fetches response in 30seconds limit):
> >
> >     Connection connection(url);
> >     try {
> >         connection.open();
> >         Session session = connection.createSession();
> >         Sender sender =
> >         session.createSender("qmf.default.direct/broker");
> >         Address responseQueue("#reply-queue; {create:always,
> >         node:{x-declare:{auto-delete:true}}}");
> >         Receiver receiver = session.createReceiver(responseQueue);
> >
> >         Message message;
> >         Variant::Map content;
> >       Variant::Map OID;
> >       Variant::Map arguments;
> >       OID["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
> >       arguments["type"] = "queue";
> >       arguments["name"] = queue_name;
> >
> >         content["_object_id"] = OID;
> >         content["_method_name"] = "delete";
> >         content["_arguments"] = arguments;
> >
> >         encode(content, message);
> >       message.setReplyTo(responseQueue);
> >       message.setProperty("x-amqp-0-10.app-id", "qmf2");
> >       message.setProperty("qmf.opcode", "_method_request");
> >
> >         sender.send(message, true);
> >
> >       Message response;
> >       if (receiver.fetch(response,qpid::messaging::Duration(30000)) ==
> >       true)
> >       {
> >               qpid::types::Variant::Map recv_props =
> response.getProperties();
> >               if (recv_props["x-amqp-0-10.app-id"] == "qmf2")
> >                       if (recv_props["qmf.opcode"] == "_method_response")
> >                               std::cout << "Response: OK" << std::endl;
> >                       else if (recv_props["qmf.opcode"] == "_exception")
> >                               std::cerr << "Error: " <<
> response.getContent() << std::endl;
> >                       else
> >                               std::cerr << "Invalid response received!"
> << std::endl;
> >               else
> >                       std::cerr << "Invalid response not of qmf2 type
> received!" <<
> >                       std::endl;
> >       }
> >       else
> >               std::cout << "Timeout: No response received within 30
> seconds!" <<
> >               std::endl;
> >
> >         connection.close();
> >         return 0;
> >     } catch(const std::exception& error) {
> >         std::cout << error.what() << std::endl;
> >         connection.close();
> >     }
> >
> > Kind regards,
> > Pavel
> >
> >
> >
> > ----- Original Message -----
> > > From: "MartiN Beneš" <ma...@gmail.com>
> > > To: "users" <us...@qpid.apache.org>
> > > Sent: Wednesday, March 21, 2012 12:26:59 PM
> > > Subject: python QMF synchronization
> > >
> > > Hi,
> > > I cannot find information about qmf synchronization. Or is every
> > > call
> > > synchronous?
> > > For instance if I purge a queue: queue.purge(0)
> > > how can i tell it was done?
> > >
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> > For additional commands, e-mail: users-help@qpid.apache.org
> >
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> For additional commands, e-mail: users-help@qpid.apache.org
>
>

Re: python QMF synchronization

Posted by Pavel Moravec <pm...@redhat.com>.
.. and now also for python client (that I overlooked in email subject):


conn = Connection(broker)
try:
  conn.open()
  ssn = conn.session()
  snd = ssn.sender("qmf.default.direct/broker")
  reply_to = "reply-queue; {create:always, node:{x-declare:{auto-delete:true}}}"
  rcv = ssn.receiver(reply_to)

  content = {
             "_object_id": {"_object_name": "org.apache.qpid.broker:broker:amqp-broker"},
             "_method_name": "delete",
             "_arguments": {"type":"queue", "name":queue_name}
            } 
  request = Message(reply_to=reply_to, content=content)
  request.properties["x-amqp-0-10.app-id"] = "qmf2"
  request.properties["qmf.opcode"] = "_method_request"
  snd.send(request)

  try:
    response = rcv.fetch(timeout=30)
    if response.properties['x-amqp-0-10.app-id'] == 'qmf2':
      if response.properties['qmf.opcode'] == '_method_response':
        print "Response:"
	print response.content['_arguments']
      elif response.properties['qmf.opcode'] == '_exception':
        raise Exception("Error: %s" % response.content['_values'])
      else: raise Exception("Invalid response received, unexpected opcode: %s" % m)
    else: raise Exception("Invalid response received, not a qmfv2 method: %s" % m)
  except Empty:
    print "No response received!"
  except Exception, e:
    print e
except ReceiverError, e:
  print e
except KeyboardInterrupt:
  pass
conn.close()



----- Original Message -----
> From: "Pavel Moravec" <pm...@redhat.com>
> To: users@qpid.apache.org
> Sent: Wednesday, March 21, 2012 12:33:28 PM
> Subject: Re: python QMF synchronization
> 
> Hi Martin,
> you can set up ReplyTo address where qpid shall send its response.
> I.e. something like C++ code below (that invokes queue deletion and
> fetches response in 30seconds limit):
> 
>     Connection connection(url);
>     try {
>         connection.open();
>         Session session = connection.createSession();
>         Sender sender =
>         session.createSender("qmf.default.direct/broker");
>         Address responseQueue("#reply-queue; {create:always,
>         node:{x-declare:{auto-delete:true}}}");
>         Receiver receiver = session.createReceiver(responseQueue);
> 
>         Message message;
>         Variant::Map content;
> 	Variant::Map OID;
> 	Variant::Map arguments;
> 	OID["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
> 	arguments["type"] = "queue";
> 	arguments["name"] = queue_name;
> 
>         content["_object_id"] = OID;
>         content["_method_name"] = "delete";
>         content["_arguments"] = arguments;
> 
>         encode(content, message);
> 	message.setReplyTo(responseQueue);
> 	message.setProperty("x-amqp-0-10.app-id", "qmf2");
> 	message.setProperty("qmf.opcode", "_method_request");
> 
>         sender.send(message, true);
> 
> 	Message response;
> 	if (receiver.fetch(response,qpid::messaging::Duration(30000)) ==
> 	true)
> 	{
> 		qpid::types::Variant::Map recv_props = response.getProperties();
> 		if (recv_props["x-amqp-0-10.app-id"] == "qmf2")
> 			if (recv_props["qmf.opcode"] == "_method_response")
> 				std::cout << "Response: OK" << std::endl;
> 			else if (recv_props["qmf.opcode"] == "_exception")
> 				std::cerr << "Error: " << response.getContent() << std::endl;
> 			else
> 				std::cerr << "Invalid response received!" << std::endl;
> 		else
> 			std::cerr << "Invalid response not of qmf2 type received!" <<
> 			std::endl;
> 	}
> 	else
> 		std::cout << "Timeout: No response received within 30 seconds!" <<
> 		std::endl;
> 
>         connection.close();
>         return 0;
>     } catch(const std::exception& error) {
>         std::cout << error.what() << std::endl;
>         connection.close();
>     }
> 
> Kind regards,
> Pavel
> 
> 
> 
> ----- Original Message -----
> > From: "MartiN Beneš" <ma...@gmail.com>
> > To: "users" <us...@qpid.apache.org>
> > Sent: Wednesday, March 21, 2012 12:26:59 PM
> > Subject: python QMF synchronization
> >
> > Hi,
> > I cannot find information about qmf synchronization. Or is every
> > call
> > synchronous?
> > For instance if I purge a queue: queue.purge(0)
> > how can i tell it was done?
> >
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> For additional commands, e-mail: users-help@qpid.apache.org
> 
> 

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: python QMF synchronization

Posted by Pavel Moravec <pm...@redhat.com>.
Hi Martin,
you can set up ReplyTo address where qpid shall send its response. I.e. something like C++ code below (that invokes queue deletion and fetches response in 30seconds limit):

    Connection connection(url);
    try {
        connection.open();
        Session session = connection.createSession();
        Sender sender = session.createSender("qmf.default.direct/broker");
        Address responseQueue("#reply-queue; {create:always, node:{x-declare:{auto-delete:true}}}");
        Receiver receiver = session.createReceiver(responseQueue);

        Message message;
        Variant::Map content;
	Variant::Map OID;
	Variant::Map arguments;
	OID["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
	arguments["type"] = "queue";
	arguments["name"] = queue_name;
	
        content["_object_id"] = OID;
        content["_method_name"] = "delete";
        content["_arguments"] = arguments;
	
        encode(content, message);
	message.setReplyTo(responseQueue);
	message.setProperty("x-amqp-0-10.app-id", "qmf2");
	message.setProperty("qmf.opcode", "_method_request");

        sender.send(message, true);
	
	Message response;
	if (receiver.fetch(response,qpid::messaging::Duration(30000)) == true)
	{
		qpid::types::Variant::Map recv_props = response.getProperties();
		if (recv_props["x-amqp-0-10.app-id"] == "qmf2")
			if (recv_props["qmf.opcode"] == "_method_response")
				std::cout << "Response: OK" << std::endl;
			else if (recv_props["qmf.opcode"] == "_exception")
				std::cerr << "Error: " << response.getContent() << std::endl;
			else
				std::cerr << "Invalid response received!" << std::endl;
		else
			std::cerr << "Invalid response not of qmf2 type received!" << std::endl;
	}
	else
		std::cout << "Timeout: No response received within 30 seconds!" << std::endl;

        connection.close();
        return 0;
    } catch(const std::exception& error) {
        std::cout << error.what() << std::endl;
        connection.close();
    }

Kind regards,
Pavel



----- Original Message -----
> From: "MartiN Beneš" <ma...@gmail.com>
> To: "users" <us...@qpid.apache.org>
> Sent: Wednesday, March 21, 2012 12:26:59 PM
> Subject: python QMF synchronization
> 
> Hi,
> I cannot find information about qmf synchronization. Or is every call
> synchronous?
> For instance if I purge a queue: queue.purge(0)
> how can i tell it was done?
> 

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org