You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@qpid.apache.org by Caio Brentano <ca...@gmail.com> on 2010/03/25 15:05:15 UTC

Multiprocess Python Client

Hello all

I'm trying the Python Client for Qpid. I did a multiprocess
"topic_publisher" that work perfect, populating one queue. But I can't do a
multiprocess client to subscribe a topic queue.

What happen is that the first process subscribe the queue and start to get
messages, but the next processes always timed out on subscription the queue.

Is there anyway to do a multiprocess client to consume a queue?

Thanks in advance

-- 
Caio Brentano

Re: Multiprocess Python Client

Posted by Rafael Schloming <ra...@redhat.com>.
Caio Brentano wrote:
> Ok... I promise this is the last email! :-)
> 
> I got a better solution.
> 
> Just put "random.seed()" before "bytes = [random.randint(0, 255) for i in
> xrange(16)]" in the file "qpid-0.6/python/qpid/datatypes.py" (line 296).

Nice catch! It's always disconcerting when UUIDs aren't unique. I 
actually ran into the same issue the other day except it was with 
independent processes that were simultaneously started. The problem is 
that the random number generator seeds itself with the current time by 
default, and so if the processes start simultaneously you'll get the 
same random numbers. I've updated the code on trunk to seed itself based 
on a hash of the current time, the PID, and the hostname. Hopefully that 
will provide a bit more uniqueness:

   ...
   import os, random, socket, time
   rand = random.Random()
   rand.seed((os.getpid(), time.time(), socket.gethostname()))
   def random_uuid():
     bytes = [rand.randint(0, 255) for i in xrange(16)]
     ...

--Rafael


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


Re: Multiprocess Python Client

Posted by Caio Brentano <ca...@gmail.com>.
Ok... I promise this is the last email! :-)

I got a better solution.

Just put "random.seed()" before "bytes = [random.randint(0, 255) for i in
xrange(16)]" in the file "qpid-0.6/python/qpid/datatypes.py" (line 296).

=)

Best wishes!

On Fri, Mar 26, 2010 at 4:16 PM, Caio Brentano <ca...@gmail.com>wrote:

> I found what caused the problem!
>
> As I am using python 2.4, the module uuid can be imported by
> "datatypes.py".
>
> So, the api does this function instead of raise an exception:
>
> def random_uuid():
>     bytes = [random.randint(0, 255) for i in xrange(16)]
>
>     # From RFC4122, the version bits are set to 0100
>     bytes[7] &= 0x0F
>     bytes[7] |= 0x40
>
>     # From RFC4122, the top two bits of byte 8 get set to 01
>     bytes[8] &= 0x3F
>     bytes[8] |= 0x80
>     return "".join(map(chr, bytes))
>
>
> If you fork your process and call this function in all process, the result
> will always be the same!
>
> Does it right?
>
> Regards!
>
> Caio Brentano
>
>
> On Fri, Mar 26, 2010 at 3:40 PM, Caio Brentano <ca...@gmail.com>wrote:
>
>> Resolved!
>>
>> The problem was caused for this call:
>>
>> session = connection.session(str(uuid4()))
>>
>> This is in all python examples!
>>
>> Parent and children processes got the same session id because uuid4() was
>> returning the same uuid for both processes.
>>
>> I merged the PID with "str(uuid4())", to get different sessions id, and
>> everything worked perfect.
>>
>> Shouldn't uuid4 return random unique values?
>>
>> Thanks for your help!!
>>
>> Joshua, thanks for graphic! It's very helpful!!
>>
>> --
>> Caio Brentano
>>
>> On Fri, Mar 26, 2010 at 2:18 PM, Joshua Kramer <jo...@globalherald.net>wrote:
>>
>>>
>>> I think from the snippet of code you sent, you have a good understanding
>>> how everything works with regards to local and server queues.  I've attached
>>> the graphic that was in the article (they put the wrong graphic in the
>>> magazine article) that explains how you might have more than one consumer on
>>> a queue.  The code included with my article definitely demonstrates this.
>>>  I'll watch the mail list to see if you have any further questions.  :)
>>>
>>> On Fri, 26 Mar 2010, Caio Brentano wrote:
>>>
>>>  Date: Fri, 26 Mar 2010 12:02:32 -0300
>>>> From: Caio Brentano <ca...@gmail.com>
>>>> Reply-To: users@qpid.apache.org
>>>> To: Joshua Kramer <jo...@globalherald.net>
>>>> Cc: users@qpid.apache.org
>>>> Subject: Re: Multiprocess Python Client
>>>>
>>>>
>>>> Joshua, thanks for the Article... I'll read it carefully! =)
>>>>
>>>> Answering to Alan: It is the C++ Broker!
>>>>
>>>> About the code, what I tried to is very simple, I just want parent and
>>>> child
>>>> process reading the same queue...
>>>>
>>>>
>>>> =========================================================================
>>>> def consume_queues(server_queue_name):
>>>>
>>>>       session = connect_to_broker()
>>>>
>>>>       my_pid = str(os.getpid())
>>>>       local_queue_name = 'local_' + my_pid + server_queue_name
>>>>
>>>>       queue = session.incoming(local_queue_name)
>>>>       session.message_subscribe(queue=server_queue_name,
>>>> destination=local_queue_name)
>>>>       queue.start()
>>>>
>>>>       dump_queue(session, local_queue)
>>>>
>>>> if __name__ == "__main__":
>>>>
>>>>       queue_name = "queue_for_test"
>>>>
>>>>       try:
>>>>               pid = os.fork()
>>>>               if pid == 0:
>>>>                       # Child start to consume queues
>>>>                       consume_queues(queue_name)
>>>>                       os._exit(0)
>>>>               else:
>>>>                       consume_queues(queue_name)
>>>>                       child_pid = os.wait()
>>>>                       print 'process ' + str(child_pid) + 'has finished'
>>>>                       sys.exit(0)
>>>>       except OSError, error:
>>>>               print 'Unable to fork. Error: %d (%s)' % (error.errno,
>>>> error.strerror)
>>>>               sys.exit(-1)
>>>>
>>>> =========================================================================
>>>>
>>>> Function *dump_queue* is similar to pubsub topic publisher example.
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Mar 26, 2010 at 1:18 PM, Joshua Kramer <josh@globalherald.net
>>>> >wrote:
>>>>
>>>>
>>>>>  Is there anyway to do a multiprocess client to consume a queue?
>>>>>
>>>>>>
>>>>>>>
>>>>>>  Hello Caio,
>>>>>
>>>>> I wrote a Linux Journal article last year illustrating this exact
>>>>> concept.
>>>>> You can find it here:
>>>>>
>>>>>
>>>>> http://www.linuxjournal.com/magazine/advanced-message-queuing-protocol-amqp
>>>>>
>>>>> If you'd like a copy of the graphic (they seem to have broken it) let
>>>>> me
>>>>> know.
>>>>>
>>>>> Thanks,
>>>>> -Josh
>>>>>
>>>>> --
>>>>>
>>>>> -----
>>>>> http://www.globalherald.net/jb01
>>>>> GlobalHerald.NET, the Smarter Social Network! (tm)
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>>
>>> --
>>>
>>> -----
>>> http://www.globalherald.net/jb01
>>> GlobalHerald.NET, the Smarter Social Network! (tm)
>>>
>>
>>
>>
>> --
>> Caio Brentano
>>
>
>
>
> --
> Caio Brentano
>



-- 
Caio Brentano

Re: Multiprocess Python Client

Posted by Caio Brentano <ca...@gmail.com>.
I found what caused the problem!

As I am using python 2.4, the module uuid can be imported by "datatypes.py".

So, the api does this function instead of raise an exception:

def random_uuid():
    bytes = [random.randint(0, 255) for i in xrange(16)]

    # From RFC4122, the version bits are set to 0100
    bytes[7] &= 0x0F
    bytes[7] |= 0x40

    # From RFC4122, the top two bits of byte 8 get set to 01
    bytes[8] &= 0x3F
    bytes[8] |= 0x80
    return "".join(map(chr, bytes))


If you fork your process and call this function in all process, the result
will always be the same!

Does it right?

Regards!

Caio Brentano


On Fri, Mar 26, 2010 at 3:40 PM, Caio Brentano <ca...@gmail.com>wrote:

> Resolved!
>
> The problem was caused for this call:
>
> session = connection.session(str(uuid4()))
>
> This is in all python examples!
>
> Parent and children processes got the same session id because uuid4() was
> returning the same uuid for both processes.
>
> I merged the PID with "str(uuid4())", to get different sessions id, and
> everything worked perfect.
>
> Shouldn't uuid4 return random unique values?
>
> Thanks for your help!!
>
> Joshua, thanks for graphic! It's very helpful!!
>
> --
> Caio Brentano
>
> On Fri, Mar 26, 2010 at 2:18 PM, Joshua Kramer <jo...@globalherald.net>wrote:
>
>>
>> I think from the snippet of code you sent, you have a good understanding
>> how everything works with regards to local and server queues.  I've attached
>> the graphic that was in the article (they put the wrong graphic in the
>> magazine article) that explains how you might have more than one consumer on
>> a queue.  The code included with my article definitely demonstrates this.
>>  I'll watch the mail list to see if you have any further questions.  :)
>>
>> On Fri, 26 Mar 2010, Caio Brentano wrote:
>>
>>  Date: Fri, 26 Mar 2010 12:02:32 -0300
>>> From: Caio Brentano <ca...@gmail.com>
>>> Reply-To: users@qpid.apache.org
>>> To: Joshua Kramer <jo...@globalherald.net>
>>> Cc: users@qpid.apache.org
>>> Subject: Re: Multiprocess Python Client
>>>
>>>
>>> Joshua, thanks for the Article... I'll read it carefully! =)
>>>
>>> Answering to Alan: It is the C++ Broker!
>>>
>>> About the code, what I tried to is very simple, I just want parent and
>>> child
>>> process reading the same queue...
>>>
>>> =========================================================================
>>> def consume_queues(server_queue_name):
>>>
>>>       session = connect_to_broker()
>>>
>>>       my_pid = str(os.getpid())
>>>       local_queue_name = 'local_' + my_pid + server_queue_name
>>>
>>>       queue = session.incoming(local_queue_name)
>>>       session.message_subscribe(queue=server_queue_name,
>>> destination=local_queue_name)
>>>       queue.start()
>>>
>>>       dump_queue(session, local_queue)
>>>
>>> if __name__ == "__main__":
>>>
>>>       queue_name = "queue_for_test"
>>>
>>>       try:
>>>               pid = os.fork()
>>>               if pid == 0:
>>>                       # Child start to consume queues
>>>                       consume_queues(queue_name)
>>>                       os._exit(0)
>>>               else:
>>>                       consume_queues(queue_name)
>>>                       child_pid = os.wait()
>>>                       print 'process ' + str(child_pid) + 'has finished'
>>>                       sys.exit(0)
>>>       except OSError, error:
>>>               print 'Unable to fork. Error: %d (%s)' % (error.errno,
>>> error.strerror)
>>>               sys.exit(-1)
>>> =========================================================================
>>>
>>> Function *dump_queue* is similar to pubsub topic publisher example.
>>>
>>>
>>>
>>>
>>> On Fri, Mar 26, 2010 at 1:18 PM, Joshua Kramer <josh@globalherald.net
>>> >wrote:
>>>
>>>
>>>>  Is there anyway to do a multiprocess client to consume a queue?
>>>>
>>>>>
>>>>>>
>>>>>  Hello Caio,
>>>>
>>>> I wrote a Linux Journal article last year illustrating this exact
>>>> concept.
>>>> You can find it here:
>>>>
>>>>
>>>> http://www.linuxjournal.com/magazine/advanced-message-queuing-protocol-amqp
>>>>
>>>> If you'd like a copy of the graphic (they seem to have broken it) let me
>>>> know.
>>>>
>>>> Thanks,
>>>> -Josh
>>>>
>>>> --
>>>>
>>>> -----
>>>> http://www.globalherald.net/jb01
>>>> GlobalHerald.NET, the Smarter Social Network! (tm)
>>>>
>>>>
>>>
>>>
>>>
>>>
>> --
>>
>> -----
>> http://www.globalherald.net/jb01
>> GlobalHerald.NET, the Smarter Social Network! (tm)
>>
>
>
>
> --
> Caio Brentano
>



-- 
Caio Brentano

Re: Multiprocess Python Client

Posted by Caio Brentano <ca...@gmail.com>.
Resolved!

The problem was caused for this call:

session = connection.session(str(uuid4()))

This is in all python examples!

Parent and children processes got the same session id because uuid4() was
returning the same uuid for both processes.

I merged the PID with "str(uuid4())", to get different sessions id, and
everything worked perfect.

Shouldn't uuid4 return random unique values?

Thanks for your help!!

Joshua, thanks for graphic! It's very helpful!!

--
Caio Brentano

On Fri, Mar 26, 2010 at 2:18 PM, Joshua Kramer <jo...@globalherald.net>wrote:

>
> I think from the snippet of code you sent, you have a good understanding
> how everything works with regards to local and server queues.  I've attached
> the graphic that was in the article (they put the wrong graphic in the
> magazine article) that explains how you might have more than one consumer on
> a queue.  The code included with my article definitely demonstrates this.
>  I'll watch the mail list to see if you have any further questions.  :)
>
> On Fri, 26 Mar 2010, Caio Brentano wrote:
>
>  Date: Fri, 26 Mar 2010 12:02:32 -0300
>> From: Caio Brentano <ca...@gmail.com>
>> Reply-To: users@qpid.apache.org
>> To: Joshua Kramer <jo...@globalherald.net>
>> Cc: users@qpid.apache.org
>> Subject: Re: Multiprocess Python Client
>>
>>
>> Joshua, thanks for the Article... I'll read it carefully! =)
>>
>> Answering to Alan: It is the C++ Broker!
>>
>> About the code, what I tried to is very simple, I just want parent and
>> child
>> process reading the same queue...
>>
>> =========================================================================
>> def consume_queues(server_queue_name):
>>
>>       session = connect_to_broker()
>>
>>       my_pid = str(os.getpid())
>>       local_queue_name = 'local_' + my_pid + server_queue_name
>>
>>       queue = session.incoming(local_queue_name)
>>       session.message_subscribe(queue=server_queue_name,
>> destination=local_queue_name)
>>       queue.start()
>>
>>       dump_queue(session, local_queue)
>>
>> if __name__ == "__main__":
>>
>>       queue_name = "queue_for_test"
>>
>>       try:
>>               pid = os.fork()
>>               if pid == 0:
>>                       # Child start to consume queues
>>                       consume_queues(queue_name)
>>                       os._exit(0)
>>               else:
>>                       consume_queues(queue_name)
>>                       child_pid = os.wait()
>>                       print 'process ' + str(child_pid) + 'has finished'
>>                       sys.exit(0)
>>       except OSError, error:
>>               print 'Unable to fork. Error: %d (%s)' % (error.errno,
>> error.strerror)
>>               sys.exit(-1)
>> =========================================================================
>>
>> Function *dump_queue* is similar to pubsub topic publisher example.
>>
>>
>>
>>
>> On Fri, Mar 26, 2010 at 1:18 PM, Joshua Kramer <josh@globalherald.net
>> >wrote:
>>
>>
>>>  Is there anyway to do a multiprocess client to consume a queue?
>>>
>>>>
>>>>>
>>>>  Hello Caio,
>>>
>>> I wrote a Linux Journal article last year illustrating this exact
>>> concept.
>>> You can find it here:
>>>
>>>
>>> http://www.linuxjournal.com/magazine/advanced-message-queuing-protocol-amqp
>>>
>>> If you'd like a copy of the graphic (they seem to have broken it) let me
>>> know.
>>>
>>> Thanks,
>>> -Josh
>>>
>>> --
>>>
>>> -----
>>> http://www.globalherald.net/jb01
>>> GlobalHerald.NET, the Smarter Social Network! (tm)
>>>
>>>
>>
>>
>>
>>
> --
>
> -----
> http://www.globalherald.net/jb01
> GlobalHerald.NET, the Smarter Social Network! (tm)
>



-- 
Caio Brentano

Re: Multiprocess Python Client

Posted by Caio Brentano <ca...@gmail.com>.
Joshua, thanks for the Article... I'll read it carefully! =)

Answering to Alan: It is the C++ Broker!

About the code, what I tried to is very simple, I just want parent and child
process reading the same queue...

=========================================================================
def consume_queues(server_queue_name):

        session = connect_to_broker()

        my_pid = str(os.getpid())
        local_queue_name = 'local_' + my_pid + server_queue_name

        queue = session.incoming(local_queue_name)
        session.message_subscribe(queue=server_queue_name,
destination=local_queue_name)
        queue.start()

        dump_queue(session, local_queue)

if __name__ == "__main__":

        queue_name = "queue_for_test"

        try:
                pid = os.fork()
                if pid == 0:
                        # Child start to consume queues
                        consume_queues(queue_name)
                        os._exit(0)
                else:
                        consume_queues(queue_name)
                        child_pid = os.wait()
                        print 'process ' + str(child_pid) + 'has finished'
                        sys.exit(0)
        except OSError, error:
                print 'Unable to fork. Error: %d (%s)' % (error.errno,
error.strerror)
                sys.exit(-1)
=========================================================================

Function *dump_queue* is similar to pubsub topic publisher example.




On Fri, Mar 26, 2010 at 1:18 PM, Joshua Kramer <jo...@globalherald.net>wrote:

>
>  Is there anyway to do a multiprocess client to consume a queue?
>>>
>>
> Hello Caio,
>
> I wrote a Linux Journal article last year illustrating this exact concept.
> You can find it here:
>
> http://www.linuxjournal.com/magazine/advanced-message-queuing-protocol-amqp
>
> If you'd like a copy of the graphic (they seem to have broken it) let me
> know.
>
> Thanks,
> -Josh
>
> --
>
> -----
> http://www.globalherald.net/jb01
> GlobalHerald.NET, the Smarter Social Network! (tm)
>



-- 
Caio Brentano

Re: Multiprocess Python Client

Posted by Joshua Kramer <jo...@globalherald.net>.
>> Is there anyway to do a multiprocess client to consume a queue?

Hello Caio,

I wrote a Linux Journal article last year illustrating this exact concept. 
You can find it here:

http://www.linuxjournal.com/magazine/advanced-message-queuing-protocol-amqp

If you'd like a copy of the graphic (they seem to have broken it) let me 
know.

Thanks,
-Josh

-- 

-----
http://www.globalherald.net/jb01
GlobalHerald.NET, the Smarter Social Network! (tm)

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


Re: Multiprocess Python Client

Posted by Alan Conway <ac...@redhat.com>.
On 03/25/2010 10:05 AM, Caio Brentano wrote:
> Hello all
>
> I'm trying the Python Client for Qpid. I did a multiprocess
> "topic_publisher" that work perfect, populating one queue. But I can't do a
> multiprocess client to subscribe a topic queue.
>
> What happen is that the first process subscribe the queue and start to get
> messages, but the next processes always timed out on subscription the queue.
>
> Is there anyway to do a multiprocess client to consume a queue?

Is this the C++ or the Java broker?
There should be no problem with multiple processes subscribing to the same 
queue, if you send your code to the list we can perhaps figure out whats going on.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org