You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@qpid.apache.org by Greg Oliver <go...@microsoft.com> on 2017/01/20 23:56:08 UTC
filtering message stream
Using Python. Proton 0.16.0.
Would like to receive messages from an Azure Event Hub using proton. Need to set message filters based on offset or timestamp.
I've read about adding annotations to a request such as: x-opt-offset, x-opt-enqueued-time, x-opt-sequence-number.
But no idea how to set these values, especially using Python.
Is there a book or other resource with examples?
Thanks,
Greg
Re: filtering message stream
Posted by Gordon Sim <gs...@redhat.com>.
On 01/02/17 13:26, Robbie Gemmell wrote:
> The python bits are not an area I'm much use with, but from prior
> disccussion and giving things a look, I beleive youll be looking for
> amqps scheme in the URL, and maybe an ssldomain config in the
> container connect method.
Yes, amqps in the scheme is indeed the simple way to enable ssl. I would
guess that the url you use to connect with messenger will also work to
connect with the Container API.
---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org
Re: filtering message stream
Posted by Robbie Gemmell <ro...@gmail.com>.
On 24 January 2017 at 17:11, Greg Oliver <go...@microsoft.com> wrote:
> Thanks Ken. Let me add a bit more context.
>
> I have one technique for consuming Event Hub messages working. It uses the Messenger() class. Going this route I can see no way to access the filtering mechanism that you mentioned. The essence is shown in this gist: https://gist.github.com/tomconte/e2a4667185a9bf674f59.
>
> By my read of the qpid docs and examples the mechanism for filtering or reading from an offset is enabled through Container/Connection/create_receiver. However, I haven't yet figured out how to connect to Azure Event Hubs via this mechanism. Here's the Python code as it stands. It does not throw any errors, but also does not display any messages. Once I have figured out how to get the messages, then I'll move on to filtering. I'm currently working with this document that explains how to connect: http://qpid.apache.org/releases/qpid-0.32/programming/book/connections.html#connection-url. It has some inaccuracies, such as setting connection.transport. Trying to do this throws. Not sure how to get an SSL connection. Tried using connection.url = "amqps:..." and also tried connection.url = "amqp:ssl:...".
>
> Can anyone help?
The old Qpid 0.32 document you linked to is for a different [AMQP
0-10] python client and isn't applicable to the newer Proton based
APIs you are using. Give http://qpid.apache.org/proton/index.html a
try for examples/tutorial/reference for those bits, e.g.
http://qpid.apache.org/releases/qpid-proton-0.16.0/proton/python/book/tutorial.html
The python bits are not an area I'm much use with, but from prior
disccussion and giving things a look, I beleive youll be looking for
amqps scheme in the URL, and maybe an ssldomain config in the
container connect method.
>
> from proton.handlers import MessagingHandler
> from proton.reactor import Container
> from proton import Connection
>
> class Recv(MessagingHandler):
> def __init__(self):
> super(Recv, self).__init__()
> self.expected = 10
> self.received = 0
>
> def on_start(self, event):
> print ("on_start")
> connection = Connection()
> connection.url = "amqp:tcp:myNamespace.servicebus.windows.net"
> connection.username = "RootManageSharedAccessKey"
> connection.password = "myPassword"
> connection.reconnect = True
> connection.open()
>
> event.container.create_receiver(connection, "/myEventHub/ConsumerGroups/$Default/Partitions/0")
>
> def on_message(self, event):
> if event.message.id and event.message.id < self.received:
> print(event.message.id)
> # ignore duplicate message
> return
> if self.expected == 0 or self.received < self.expected:
> print(event.message.body)
> self.received += 1
> if self.received == self.expected:
> event.receiver.close()
> event.connection.close()
>
> try:
> Container(Recv()).run()
> except KeyboardInterrupt: pass
>
>
> -----Original Message-----
> From: Ken Giusti [mailto:kgiusti@redhat.com]
> Sent: Monday, January 23, 2017 10:57 AM
> To: users@qpid.apache.org
> Subject: Re: filtering message stream
>
>
>
> ----- Original Message -----
>> From: "Greg Oliver" <go...@microsoft.com>
>> To: users@qpid.apache.org
>> Sent: Friday, January 20, 2017 6:56:08 PM
>> Subject: filtering message stream
>>
>> Using Python. Proton 0.16.0.
>>
>> Would like to receive messages from an Azure Event Hub using proton.
>> Need to set message filters based on offset or timestamp.
>>
>> I've read about adding annotations to a request such as: x-opt-offset,
>> x-opt-enqueued-time, x-opt-sequence-number.
>>
>> But no idea how to set these values, especially using Python.
>>
>> Is there a book or other resource with examples?
>>
>
> Hi Greg,
>
> In python you can access/set the message annotations via the annotation property of the proton.Message class.
>
> You should set this to a map which uses the 'x-opt...' values as keys before sending the message. Example:
>
> import proton
>
> m = proton.Message()
> m.annotations = {'x-opt-offset': '100', .... }
>
> caveat: totally untested, just suggesting this from what I've read here:
>
> https://amqpnetlite.codeplex.com/wikipage?title=Using%20Amqp.Net%20Lite%20with%20Azure%20Server%20Bus%20Event%20Hub
>
> -K
>
>
>> Thanks,
>> Greg
>>
>
> --
> -K
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org For additional commands, e-mail: users-help@qpid.apache.org
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> For additional commands, e-mail: users-help@qpid.apache.org
>
---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org
RE: filtering message stream
Posted by Greg Oliver <go...@microsoft.com>.
Thanks Ken. Let me add a bit more context.
I have one technique for consuming Event Hub messages working. It uses the Messenger() class. Going this route I can see no way to access the filtering mechanism that you mentioned. The essence is shown in this gist: https://gist.github.com/tomconte/e2a4667185a9bf674f59.
By my read of the qpid docs and examples the mechanism for filtering or reading from an offset is enabled through Container/Connection/create_receiver. However, I haven't yet figured out how to connect to Azure Event Hubs via this mechanism. Here's the Python code as it stands. It does not throw any errors, but also does not display any messages. Once I have figured out how to get the messages, then I'll move on to filtering. I'm currently working with this document that explains how to connect: http://qpid.apache.org/releases/qpid-0.32/programming/book/connections.html#connection-url. It has some inaccuracies, such as setting connection.transport. Trying to do this throws. Not sure how to get an SSL connection. Tried using connection.url = "amqps:..." and also tried connection.url = "amqp:ssl:...".
Can anyone help?
from proton.handlers import MessagingHandler
from proton.reactor import Container
from proton import Connection
class Recv(MessagingHandler):
def __init__(self):
super(Recv, self).__init__()
self.expected = 10
self.received = 0
def on_start(self, event):
print ("on_start")
connection = Connection()
connection.url = "amqp:tcp:myNamespace.servicebus.windows.net"
connection.username = "RootManageSharedAccessKey"
connection.password = "myPassword"
connection.reconnect = True
connection.open()
event.container.create_receiver(connection, "/myEventHub/ConsumerGroups/$Default/Partitions/0")
def on_message(self, event):
if event.message.id and event.message.id < self.received:
print(event.message.id)
# ignore duplicate message
return
if self.expected == 0 or self.received < self.expected:
print(event.message.body)
self.received += 1
if self.received == self.expected:
event.receiver.close()
event.connection.close()
try:
Container(Recv()).run()
except KeyboardInterrupt: pass
-----Original Message-----
From: Ken Giusti [mailto:kgiusti@redhat.com]
Sent: Monday, January 23, 2017 10:57 AM
To: users@qpid.apache.org
Subject: Re: filtering message stream
----- Original Message -----
> From: "Greg Oliver" <go...@microsoft.com>
> To: users@qpid.apache.org
> Sent: Friday, January 20, 2017 6:56:08 PM
> Subject: filtering message stream
>
> Using Python. Proton 0.16.0.
>
> Would like to receive messages from an Azure Event Hub using proton.
> Need to set message filters based on offset or timestamp.
>
> I've read about adding annotations to a request such as: x-opt-offset,
> x-opt-enqueued-time, x-opt-sequence-number.
>
> But no idea how to set these values, especially using Python.
>
> Is there a book or other resource with examples?
>
Hi Greg,
In python you can access/set the message annotations via the annotation property of the proton.Message class.
You should set this to a map which uses the 'x-opt...' values as keys before sending the message. Example:
import proton
m = proton.Message()
m.annotations = {'x-opt-offset': '100', .... }
caveat: totally untested, just suggesting this from what I've read here:
https://amqpnetlite.codeplex.com/wikipage?title=Using%20Amqp.Net%20Lite%20with%20Azure%20Server%20Bus%20Event%20Hub
-K
> Thanks,
> Greg
>
--
-K
---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org For additional commands, e-mail: users-help@qpid.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org
Re: filtering message stream
Posted by Ken Giusti <kg...@redhat.com>.
----- Original Message -----
> From: "Greg Oliver" <go...@microsoft.com>
> To: users@qpid.apache.org
> Sent: Friday, January 20, 2017 6:56:08 PM
> Subject: filtering message stream
>
> Using Python. Proton 0.16.0.
>
> Would like to receive messages from an Azure Event Hub using proton. Need to
> set message filters based on offset or timestamp.
>
> I've read about adding annotations to a request such as: x-opt-offset,
> x-opt-enqueued-time, x-opt-sequence-number.
>
> But no idea how to set these values, especially using Python.
>
> Is there a book or other resource with examples?
>
Hi Greg,
In python you can access/set the message annotations via the annotation property of the proton.Message class.
You should set this to a map which uses the 'x-opt...' values as keys before sending the message. Example:
import proton
m = proton.Message()
m.annotations = {'x-opt-offset': '100', .... }
caveat: totally untested, just suggesting this from what I've read here:
https://amqpnetlite.codeplex.com/wikipage?title=Using%20Amqp.Net%20Lite%20with%20Azure%20Server%20Bus%20Event%20Hub
-K
> Thanks,
> Greg
>
--
-K
---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org
Re: filtering message stream
Posted by Gordon Sim <gs...@redhat.com>.
On 20/01/17 23:56, Greg Oliver wrote:
> Using Python. Proton 0.16.0.
>
> Would like to receive messages from an Azure Event Hub using proton. Need to set message filters based on offset or timestamp.
>
> I've read about adding annotations to a request such as: x-opt-offset, x-opt-enqueued-time, x-opt-sequence-number.
>
> But no idea how to set these values, especially using Python.
Attached is a simple example of setting a selector filter. You can
replace the actual selector string with something that Event Hubs will
understand e.g. from Ken's link, amqp.annotation.x-opt-offset > '100'.