You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@qpid.apache.org by Greg Oliver <go...@microsoft.com> on 2017/01/20 23:56:08 UTC

filtering message stream

Using Python. Proton 0.16.0.

Would like to receive messages from an Azure Event Hub using proton. Need to set message filters based on offset or timestamp.

I've read about adding annotations to a request such as: x-opt-offset, x-opt-enqueued-time, x-opt-sequence-number.

But no idea how to set these values, especially using Python.

Is there a book or other resource with examples?

Thanks,
Greg

Re: filtering message stream

Posted by Gordon Sim <gs...@redhat.com>.
On 01/02/17 13:26, Robbie Gemmell wrote:
> The python bits are not an area I'm much use with, but from prior
> disccussion and giving things a look, I beleive youll be looking for
> amqps scheme in the URL, and maybe an ssldomain config in the
> container connect method.

Yes, amqps in the scheme is indeed the simple way to enable ssl. I would 
guess that the url you use to connect with messenger will also work to 
connect with the Container API.

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: filtering message stream

Posted by Robbie Gemmell <ro...@gmail.com>.
On 24 January 2017 at 17:11, Greg Oliver <go...@microsoft.com> wrote:
> Thanks Ken. Let me add a bit more context.
>
> I have one technique for consuming Event Hub messages working. It uses the Messenger() class. Going this route I can see no way to access the filtering mechanism that you mentioned. The essence is shown in this gist: https://gist.github.com/tomconte/e2a4667185a9bf674f59.
>
> By my read of the qpid docs and examples the mechanism for filtering or reading from an offset is enabled through Container/Connection/create_receiver. However, I haven't yet figured out how to connect to Azure Event Hubs via this mechanism. Here's the Python code as it stands. It does not throw any errors, but also does not display any messages. Once I have figured out how to get the messages, then I'll move on to filtering. I'm currently working with this document that explains how to connect: http://qpid.apache.org/releases/qpid-0.32/programming/book/connections.html#connection-url. It has some inaccuracies, such as setting connection.transport. Trying to do this throws. Not sure how to get an SSL connection. Tried using connection.url = "amqps:..." and also tried connection.url = "amqp:ssl:...".
>
> Can anyone help?

The old Qpid 0.32 document you linked to is for a different [AMQP
0-10] python client and isn't applicable to the newer Proton based
APIs you are using. Give http://qpid.apache.org/proton/index.html a
try for examples/tutorial/reference for those bits, e.g.
http://qpid.apache.org/releases/qpid-proton-0.16.0/proton/python/book/tutorial.html

The python bits are not an area I'm much use with, but from prior
disccussion and giving things a look, I beleive youll be looking for
amqps scheme in the URL, and maybe an ssldomain config in the
container connect method.

>
> from proton.handlers import MessagingHandler
> from proton.reactor import Container
> from proton import Connection
>
> class Recv(MessagingHandler):
>     def __init__(self):
>         super(Recv, self).__init__()
>         self.expected = 10
>         self.received = 0
>
>     def on_start(self, event):
>         print ("on_start")
>         connection = Connection()
>         connection.url = "amqp:tcp:myNamespace.servicebus.windows.net"
>         connection.username = "RootManageSharedAccessKey"
>         connection.password = "myPassword"
>         connection.reconnect = True
>         connection.open()
>
>         event.container.create_receiver(connection, "/myEventHub/ConsumerGroups/$Default/Partitions/0")
>
>     def on_message(self, event):
>         if event.message.id and event.message.id < self.received:
>             print(event.message.id)
>             # ignore duplicate message
>             return
>         if self.expected == 0 or self.received < self.expected:
>             print(event.message.body)
>             self.received += 1
>             if self.received == self.expected:
>                 event.receiver.close()
>                 event.connection.close()
>
> try:
>     Container(Recv()).run()
> except KeyboardInterrupt: pass
>
>
> -----Original Message-----
> From: Ken Giusti [mailto:kgiusti@redhat.com]
> Sent: Monday, January 23, 2017 10:57 AM
> To: users@qpid.apache.org
> Subject: Re: filtering message stream
>
>
>
> ----- Original Message -----
>> From: "Greg Oliver" <go...@microsoft.com>
>> To: users@qpid.apache.org
>> Sent: Friday, January 20, 2017 6:56:08 PM
>> Subject: filtering message stream
>>
>> Using Python. Proton 0.16.0.
>>
>> Would like to receive messages from an Azure Event Hub using proton.
>> Need to set message filters based on offset or timestamp.
>>
>> I've read about adding annotations to a request such as: x-opt-offset,
>> x-opt-enqueued-time, x-opt-sequence-number.
>>
>> But no idea how to set these values, especially using Python.
>>
>> Is there a book or other resource with examples?
>>
>
> Hi Greg,
>
> In python you can access/set the message annotations via the annotation property of the proton.Message class.
>
> You should set this to a map which uses the 'x-opt...' values as keys before sending the message.  Example:
>
> import proton
>
> m = proton.Message()
> m.annotations = {'x-opt-offset': '100', .... }
>
> caveat: totally untested, just suggesting this from what I've read here:
>
> https://amqpnetlite.codeplex.com/wikipage?title=Using%20Amqp.Net%20Lite%20with%20Azure%20Server%20Bus%20Event%20Hub
>
> -K
>
>
>> Thanks,
>> Greg
>>
>
> --
> -K
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org For additional commands, e-mail: users-help@qpid.apache.org
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> For additional commands, e-mail: users-help@qpid.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


RE: filtering message stream

Posted by Greg Oliver <go...@microsoft.com>.
Thanks Ken. Let me add a bit more context.

I have one technique for consuming Event Hub messages working. It uses the Messenger() class. Going this route I can see no way to access the filtering mechanism that you mentioned. The essence is shown in this gist: https://gist.github.com/tomconte/e2a4667185a9bf674f59. 

By my read of the qpid docs and examples the mechanism for filtering or reading from an offset is enabled through Container/Connection/create_receiver. However, I haven't yet figured out how to connect to Azure Event Hubs via this mechanism. Here's the Python code as it stands. It does not throw any errors, but also does not display any messages. Once I have figured out how to get the messages, then I'll move on to filtering. I'm currently working with this document that explains how to connect: http://qpid.apache.org/releases/qpid-0.32/programming/book/connections.html#connection-url. It has some inaccuracies, such as setting connection.transport. Trying to do this throws. Not sure how to get an SSL connection. Tried using connection.url = "amqps:..." and also tried connection.url = "amqp:ssl:...". 

Can anyone help?

from proton.handlers import MessagingHandler
from proton.reactor import Container
from proton import Connection

class Recv(MessagingHandler):
    def __init__(self):
        super(Recv, self).__init__()
        self.expected = 10
        self.received = 0

    def on_start(self, event):
        print ("on_start")
        connection = Connection()
        connection.url = "amqp:tcp:myNamespace.servicebus.windows.net"
        connection.username = "RootManageSharedAccessKey"
        connection.password = "myPassword"
        connection.reconnect = True
        connection.open()
        
        event.container.create_receiver(connection, "/myEventHub/ConsumerGroups/$Default/Partitions/0")
    
    def on_message(self, event):
        if event.message.id and event.message.id < self.received:
            print(event.message.id)
            # ignore duplicate message
            return
        if self.expected == 0 or self.received < self.expected:
            print(event.message.body)
            self.received += 1
            if self.received == self.expected:
                event.receiver.close()
                event.connection.close()

try:
    Container(Recv()).run()
except KeyboardInterrupt: pass


-----Original Message-----
From: Ken Giusti [mailto:kgiusti@redhat.com] 
Sent: Monday, January 23, 2017 10:57 AM
To: users@qpid.apache.org
Subject: Re: filtering message stream



----- Original Message -----
> From: "Greg Oliver" <go...@microsoft.com>
> To: users@qpid.apache.org
> Sent: Friday, January 20, 2017 6:56:08 PM
> Subject: filtering message stream
> 
> Using Python. Proton 0.16.0.
> 
> Would like to receive messages from an Azure Event Hub using proton. 
> Need to set message filters based on offset or timestamp.
> 
> I've read about adding annotations to a request such as: x-opt-offset, 
> x-opt-enqueued-time, x-opt-sequence-number.
> 
> But no idea how to set these values, especially using Python.
> 
> Is there a book or other resource with examples?
> 

Hi Greg,

In python you can access/set the message annotations via the annotation property of the proton.Message class.

You should set this to a map which uses the 'x-opt...' values as keys before sending the message.  Example:

import proton

m = proton.Message()
m.annotations = {'x-opt-offset': '100', .... }

caveat: totally untested, just suggesting this from what I've read here:

https://amqpnetlite.codeplex.com/wikipage?title=Using%20Amqp.Net%20Lite%20with%20Azure%20Server%20Bus%20Event%20Hub

-K


> Thanks,
> Greg
> 

--
-K

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org For additional commands, e-mail: users-help@qpid.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: filtering message stream

Posted by Ken Giusti <kg...@redhat.com>.

----- Original Message -----
> From: "Greg Oliver" <go...@microsoft.com>
> To: users@qpid.apache.org
> Sent: Friday, January 20, 2017 6:56:08 PM
> Subject: filtering message stream
> 
> Using Python. Proton 0.16.0.
> 
> Would like to receive messages from an Azure Event Hub using proton. Need to
> set message filters based on offset or timestamp.
> 
> I've read about adding annotations to a request such as: x-opt-offset,
> x-opt-enqueued-time, x-opt-sequence-number.
> 
> But no idea how to set these values, especially using Python.
> 
> Is there a book or other resource with examples?
> 

Hi Greg,

In python you can access/set the message annotations via the annotation property of the proton.Message class.

You should set this to a map which uses the 'x-opt...' values as keys before sending the message.  Example:

import proton

m = proton.Message()
m.annotations = {'x-opt-offset': '100', .... }

caveat: totally untested, just suggesting this from what I've read here:

https://amqpnetlite.codeplex.com/wikipage?title=Using%20Amqp.Net%20Lite%20with%20Azure%20Server%20Bus%20Event%20Hub

-K


> Thanks,
> Greg
> 

-- 
-K

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: filtering message stream

Posted by Gordon Sim <gs...@redhat.com>.
On 20/01/17 23:56, Greg Oliver wrote:
> Using Python. Proton 0.16.0.
>
> Would like to receive messages from an Azure Event Hub using proton. Need to set message filters based on offset or timestamp.
>
> I've read about adding annotations to a request such as: x-opt-offset, x-opt-enqueued-time, x-opt-sequence-number.
>
> But no idea how to set these values, especially using Python.

Attached is a simple example of setting a selector filter. You can 
replace the actual selector string with something that Event Hubs will 
understand e.g. from Ken's link, amqp.annotation.x-opt-offset > '100'.