You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Neil Pritchard <Ne...@securetrading.com> on 2010/09/14 15:17:08 UTC

Individual acknowledge in pyactivemq

Hi All,

I'm using pyactivemq as both a producer and consumer of messages which are being brokered by ActiveMQ 5.3.2.  In the past I used python stompy to produce messages and java ActiveMQ to consume them.  I need to set a prefetch policy of 1 messgae at a time and more importantly use individual acknowledge (or client acknowledge) but can't find any example of how to acknowledge the message in pyactivemq, also the activeMQ log (set to debug) indicates that the messages are being consumed regardless.  Can anyone tell me how to set the acknowledgement mode properly and how to acknowledge a message.  When I try to acknowledge, Python can't find the Acknowledge() method.

Many thanks,

Neil

Here's my consumer:....

#!/usr/bin/env python

import cpickle as pickle
import Queue
import pyactivemq
from pyactivemq import ActiveMQConnectionFactory
from pyactivemq import AcknowledgeMode

class MessageListener(pyactivemq.MessageListener):
    def __init__(self, name, queue):
        pyactivemq.MessageListener.__init__(self)
        self.name = name
        self.queue = queue

    def onMessage(self, message):
        self.queue.put('%s got: %s' % (self.name, message.text))

f = ActiveMQConnectionFactory('failover:(tcp://localhost:61616)?wireFormat=openwire')
conn = f.createConnection()
consumer_session = conn.createSession(AcknowledgeMode.INDIVIDUAL_ACKNOWLEDGE)
myqueue = consumer_session.createQueue('NOTIFICATIONS.QUEUE')
queue = Queue.Queue(0)

session = conn.createSession(AcknowledgeMode.INDIVIDUAL_ACKNOWLEDGE)
consumer = session.createConsumer(myqueue)
listener = MessageListener('consumer', queue)
consumer.messageListener = listener

conn.start()
while queue:
    message = queue.get(block=True)
    message = message[14:]
    notificationDict = pickle.loads(message)
    print notificationDict
    acknowledge()

conn.close()


RE: Individual acknowledge in pyactivemq

Posted by Timothy Bish <ta...@gmail.com>.
On Wed, 2010-09-15 at 09:53 +0100, Neil Pritchard wrote:
> 
> -----Original Message-----
> From: Timothy Bish [mailto:tabish121@gmail.com] 
> Sent: 14 September 2010 17:08
> To: users@activemq.apache.org
> Subject: RE: Individual acknowledge in pyactivemq
> 
> On Tue, 2010-09-14 at 17:00 +0100, Neil Pritchard wrote:
> > On Tue, 2010-09-14 at 14:17 +0100, Neil Pritchard wrote:
> > > Hi All,
> > > 
> > > I'm using pyactivemq as both a producer and consumer of messages which are being brokered by ActiveMQ 5.3.2.  In the past I used python stompy to produce messages and java ActiveMQ to consume them.  I need to set a prefetch policy of 1 messgae at a time and more importantly use individual acknowledge (or client acknowledge) but can't find any example of how to acknowledge the message in pyactivemq, also the activeMQ log (set to debug) indicates that the messages are being consumed regardless.  Can anyone tell me how to set the acknowledgement mode properly and how to acknowledge a message.  When I try to acknowledge, Python can't find the Acknowledge() method.
> > > 
> > > Many thanks,
> > > 
> > > Neil
> > > 
> > > Here's my consumer:....
> > > 
> > > #!/usr/bin/env python
> > > 
> > > import cpickle as pickle
> > > import Queue
> > > import pyactivemq
> > > from pyactivemq import ActiveMQConnectionFactory
> > > from pyactivemq import AcknowledgeMode
> > > 
> > > class MessageListener(pyactivemq.MessageListener):
> > >     def __init__(self, name, queue):
> > >         pyactivemq.MessageListener.__init__(self)
> > >         self.name = name
> > >         self.queue = queue
> > > 
> > >     def onMessage(self, message):
> > >         self.queue.put('%s got: %s' % (self.name, message.text))
> > > 
> > > f = ActiveMQConnectionFactory('failover:(tcp://localhost:61616)?wireFormat=openwire')
> > > conn = f.createConnection()
> > > consumer_session = conn.createSession(AcknowledgeMode.INDIVIDUAL_ACKNOWLEDGE)
> > > myqueue = consumer_session.createQueue('NOTIFICATIONS.QUEUE')
> > > queue = Queue.Queue(0)
> > > 
> > > session = conn.createSession(AcknowledgeMode.INDIVIDUAL_ACKNOWLEDGE)
> > > consumer = session.createConsumer(myqueue)
> > > listener = MessageListener('consumer', queue)
> > > consumer.messageListener = listener
> > > 
> > > conn.start()
> > > while queue:
> > >     message = queue.get(block=True)
> > >     message = message[14:]
> > >     notificationDict = pickle.loads(message)
> > >     print notificationDict
> > >     acknowledge()
> > > 
> > > conn.close()
> > > 
> > 
> > It looks as if the acknowledge method is exposed on each Message object,
> > so you should be able to ack them as they are received.  Whether or not
> > it works is another question.
> > 
> > Regards
> > 
> > 
> 
> The correct URI setting would be something like, 
> 
> cms.PrefetchPolicy.topicPrefetch=1
> cms.PrefetchPolicy.queuePrefetch=1
> etc...
> 
> the cms Prefetch policy doesn't current support an "all" configuration
> option, you could open a new Jira issue to add on to v3.2.4.
> 
> Regards
> 


Yes, the options are applied on the URI that is supplied to the
ConnectionFactory.

Regards

-- 
Tim Bish

Open Source Integration: http://fusesource.com

Follow me on Twitter: http://twitter.com/tabish121
My Blog: http://timbish.blogspot.com/


RE: Individual acknowledge in pyactivemq

Posted by Neil Pritchard <Ne...@securetrading.com>.

-----Original Message-----
From: Timothy Bish [mailto:tabish121@gmail.com] 
Sent: 14 September 2010 17:08
To: users@activemq.apache.org
Subject: RE: Individual acknowledge in pyactivemq

On Tue, 2010-09-14 at 17:00 +0100, Neil Pritchard wrote:
> On Tue, 2010-09-14 at 14:17 +0100, Neil Pritchard wrote:
> > Hi All,
> > 
> > I'm using pyactivemq as both a producer and consumer of messages which are being brokered by ActiveMQ 5.3.2.  In the past I used python stompy to produce messages and java ActiveMQ to consume them.  I need to set a prefetch policy of 1 messgae at a time and more importantly use individual acknowledge (or client acknowledge) but can't find any example of how to acknowledge the message in pyactivemq, also the activeMQ log (set to debug) indicates that the messages are being consumed regardless.  Can anyone tell me how to set the acknowledgement mode properly and how to acknowledge a message.  When I try to acknowledge, Python can't find the Acknowledge() method.
> > 
> > Many thanks,
> > 
> > Neil
> > 
> > Here's my consumer:....
> > 
> > #!/usr/bin/env python
> > 
> > import cpickle as pickle
> > import Queue
> > import pyactivemq
> > from pyactivemq import ActiveMQConnectionFactory
> > from pyactivemq import AcknowledgeMode
> > 
> > class MessageListener(pyactivemq.MessageListener):
> >     def __init__(self, name, queue):
> >         pyactivemq.MessageListener.__init__(self)
> >         self.name = name
> >         self.queue = queue
> > 
> >     def onMessage(self, message):
> >         self.queue.put('%s got: %s' % (self.name, message.text))
> > 
> > f = ActiveMQConnectionFactory('failover:(tcp://localhost:61616)?wireFormat=openwire')
> > conn = f.createConnection()
> > consumer_session = conn.createSession(AcknowledgeMode.INDIVIDUAL_ACKNOWLEDGE)
> > myqueue = consumer_session.createQueue('NOTIFICATIONS.QUEUE')
> > queue = Queue.Queue(0)
> > 
> > session = conn.createSession(AcknowledgeMode.INDIVIDUAL_ACKNOWLEDGE)
> > consumer = session.createConsumer(myqueue)
> > listener = MessageListener('consumer', queue)
> > consumer.messageListener = listener
> > 
> > conn.start()
> > while queue:
> >     message = queue.get(block=True)
> >     message = message[14:]
> >     notificationDict = pickle.loads(message)
> >     print notificationDict
> >     acknowledge()
> > 
> > conn.close()
> > 
> 
> It looks as if the acknowledge method is exposed on each Message object,
> so you should be able to ack them as they are received.  Whether or not
> it works is another question.
> 
> Regards
> 
> 

The correct URI setting would be something like, 

cms.PrefetchPolicy.topicPrefetch=1
cms.PrefetchPolicy.queuePrefetch=1
etc...

the cms Prefetch policy doesn't current support an "all" configuration
option, you could open a new Jira issue to add on to v3.2.4.

Regards

-- 
Tim Bish

Open Source Integration: http://fusesource.com

Follow me on Twitter: http://twitter.com/tabish121
My Blog: http://timbish.blogspot.com/



Hi Tim,

I assume the config goes here....

f = ActiveMQConnectionFactory('failover:(tcp://localhost:61616)?cms.PrefetchPolicy.queuePrefetch=1')

Cheers,

Neil


RE: Individual acknowledge in pyactivemq

Posted by Timothy Bish <ta...@gmail.com>.
On Tue, 2010-09-14 at 17:00 +0100, Neil Pritchard wrote:
> On Tue, 2010-09-14 at 14:17 +0100, Neil Pritchard wrote:
> > Hi All,
> > 
> > I'm using pyactivemq as both a producer and consumer of messages which are being brokered by ActiveMQ 5.3.2.  In the past I used python stompy to produce messages and java ActiveMQ to consume them.  I need to set a prefetch policy of 1 messgae at a time and more importantly use individual acknowledge (or client acknowledge) but can't find any example of how to acknowledge the message in pyactivemq, also the activeMQ log (set to debug) indicates that the messages are being consumed regardless.  Can anyone tell me how to set the acknowledgement mode properly and how to acknowledge a message.  When I try to acknowledge, Python can't find the Acknowledge() method.
> > 
> > Many thanks,
> > 
> > Neil
> > 
> > Here's my consumer:....
> > 
> > #!/usr/bin/env python
> > 
> > import cpickle as pickle
> > import Queue
> > import pyactivemq
> > from pyactivemq import ActiveMQConnectionFactory
> > from pyactivemq import AcknowledgeMode
> > 
> > class MessageListener(pyactivemq.MessageListener):
> >     def __init__(self, name, queue):
> >         pyactivemq.MessageListener.__init__(self)
> >         self.name = name
> >         self.queue = queue
> > 
> >     def onMessage(self, message):
> >         self.queue.put('%s got: %s' % (self.name, message.text))
> > 
> > f = ActiveMQConnectionFactory('failover:(tcp://localhost:61616)?wireFormat=openwire')
> > conn = f.createConnection()
> > consumer_session = conn.createSession(AcknowledgeMode.INDIVIDUAL_ACKNOWLEDGE)
> > myqueue = consumer_session.createQueue('NOTIFICATIONS.QUEUE')
> > queue = Queue.Queue(0)
> > 
> > session = conn.createSession(AcknowledgeMode.INDIVIDUAL_ACKNOWLEDGE)
> > consumer = session.createConsumer(myqueue)
> > listener = MessageListener('consumer', queue)
> > consumer.messageListener = listener
> > 
> > conn.start()
> > while queue:
> >     message = queue.get(block=True)
> >     message = message[14:]
> >     notificationDict = pickle.loads(message)
> >     print notificationDict
> >     acknowledge()
> > 
> > conn.close()
> > 
> 
> It looks as if the acknowledge method is exposed on each Message object,
> so you should be able to ack them as they are received.  Whether or not
> it works is another question.
> 
> Regards
> 
> 

The correct URI setting would be something like, 

cms.PrefetchPolicy.topicPrefetch=1
cms.PrefetchPolicy.queuePrefetch=1
etc...

the cms Prefetch policy doesn't current support an "all" configuration
option, you could open a new Jira issue to add on to v3.2.4.

Regards

-- 
Tim Bish

Open Source Integration: http://fusesource.com

Follow me on Twitter: http://twitter.com/tabish121
My Blog: http://timbish.blogspot.com/


RE: Individual acknowledge in pyactivemq

Posted by Neil Pritchard <Ne...@securetrading.com>.
On Tue, 2010-09-14 at 14:17 +0100, Neil Pritchard wrote:
> Hi All,
> 
> I'm using pyactivemq as both a producer and consumer of messages which are being brokered by ActiveMQ 5.3.2.  In the past I used python stompy to produce messages and java ActiveMQ to consume them.  I need to set a prefetch policy of 1 messgae at a time and more importantly use individual acknowledge (or client acknowledge) but can't find any example of how to acknowledge the message in pyactivemq, also the activeMQ log (set to debug) indicates that the messages are being consumed regardless.  Can anyone tell me how to set the acknowledgement mode properly and how to acknowledge a message.  When I try to acknowledge, Python can't find the Acknowledge() method.
> 
> Many thanks,
> 
> Neil
> 
> Here's my consumer:....
> 
> #!/usr/bin/env python
> 
> import cpickle as pickle
> import Queue
> import pyactivemq
> from pyactivemq import ActiveMQConnectionFactory
> from pyactivemq import AcknowledgeMode
> 
> class MessageListener(pyactivemq.MessageListener):
>     def __init__(self, name, queue):
>         pyactivemq.MessageListener.__init__(self)
>         self.name = name
>         self.queue = queue
> 
>     def onMessage(self, message):
>         self.queue.put('%s got: %s' % (self.name, message.text))
> 
> f = ActiveMQConnectionFactory('failover:(tcp://localhost:61616)?wireFormat=openwire')
> conn = f.createConnection()
> consumer_session = conn.createSession(AcknowledgeMode.INDIVIDUAL_ACKNOWLEDGE)
> myqueue = consumer_session.createQueue('NOTIFICATIONS.QUEUE')
> queue = Queue.Queue(0)
> 
> session = conn.createSession(AcknowledgeMode.INDIVIDUAL_ACKNOWLEDGE)
> consumer = session.createConsumer(myqueue)
> listener = MessageListener('consumer', queue)
> consumer.messageListener = listener
> 
> conn.start()
> while queue:
>     message = queue.get(block=True)
>     message = message[14:]
>     notificationDict = pickle.loads(message)
>     print notificationDict
>     acknowledge()
> 
> conn.close()
> 

It looks as if the acknowledge method is exposed on each Message object,
so you should be able to ack them as they are received.  Whether or not
it works is another question.

Regards


-- 
Tim Bish



That's what I thaught, but I've had to move the message.acknowledge() to the 'onMessgae' method definition on the MessageListener class to get it to acknowledge.  The problem that I now have is that the consumer takes all the messages off the queue, processes them and then acknowledges them.  I need the messages to stay on the queue so that they're available to other consumers while this one is blocked (for load balancing).  I the Java consumer that I have running, I point the consumer at a brokerURL of 
"failover:(tcp://localhost:61616)?jms.prefetchPolicy.all=1"
to set a prefetch policy of 1.  I've tried adding it to the connection URL in this script
" ActiveMQConnectionFactory('failover:(tcp://localhost:61616)?wireFormat=openire&jms.prefetchPolicy.all=1')"
but it's obviously still collecting as many messages as it can.  How should I sent the prefetch policy ?

Re: Individual acknowledge in pyactivemq

Posted by Timothy Bish <ta...@gmail.com>.
On Tue, 2010-09-14 at 14:17 +0100, Neil Pritchard wrote:
> Hi All,
> 
> I'm using pyactivemq as both a producer and consumer of messages which are being brokered by ActiveMQ 5.3.2.  In the past I used python stompy to produce messages and java ActiveMQ to consume them.  I need to set a prefetch policy of 1 messgae at a time and more importantly use individual acknowledge (or client acknowledge) but can't find any example of how to acknowledge the message in pyactivemq, also the activeMQ log (set to debug) indicates that the messages are being consumed regardless.  Can anyone tell me how to set the acknowledgement mode properly and how to acknowledge a message.  When I try to acknowledge, Python can't find the Acknowledge() method.
> 
> Many thanks,
> 
> Neil
> 
> Here's my consumer:....
> 
> #!/usr/bin/env python
> 
> import cpickle as pickle
> import Queue
> import pyactivemq
> from pyactivemq import ActiveMQConnectionFactory
> from pyactivemq import AcknowledgeMode
> 
> class MessageListener(pyactivemq.MessageListener):
>     def __init__(self, name, queue):
>         pyactivemq.MessageListener.__init__(self)
>         self.name = name
>         self.queue = queue
> 
>     def onMessage(self, message):
>         self.queue.put('%s got: %s' % (self.name, message.text))
> 
> f = ActiveMQConnectionFactory('failover:(tcp://localhost:61616)?wireFormat=openwire')
> conn = f.createConnection()
> consumer_session = conn.createSession(AcknowledgeMode.INDIVIDUAL_ACKNOWLEDGE)
> myqueue = consumer_session.createQueue('NOTIFICATIONS.QUEUE')
> queue = Queue.Queue(0)
> 
> session = conn.createSession(AcknowledgeMode.INDIVIDUAL_ACKNOWLEDGE)
> consumer = session.createConsumer(myqueue)
> listener = MessageListener('consumer', queue)
> consumer.messageListener = listener
> 
> conn.start()
> while queue:
>     message = queue.get(block=True)
>     message = message[14:]
>     notificationDict = pickle.loads(message)
>     print notificationDict
>     acknowledge()
> 
> conn.close()
> 

It looks as if the acknowledge method is exposed on each Message object,
so you should be able to ack them as they are received.  Whether or not
it works is another question.

Regards


-- 
Tim Bish

Open Source Integration: http://fusesource.com

Follow me on Twitter: http://twitter.com/tabish121
My Blog: http://timbish.blogspot.com/