You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Luca Giovannini <Lu...@dedagroup.it> on 2020/11/12 10:56:10 UTC

Consuming a dynamically variable number of AMQP queues

Hi All!

I have to implement a flow with NiFi that involves consuming different AMQP queues whose number may change in time.
One of the requirements is that when a new queue is available (of an old one is no more available) the NiFi flow adjusts to the new situation without the need for any manual adjustments.
In order to implement this I thought of having a simple configuration table in a postgres DB, listing the names of the active queues (that an operator would update when needed), and then use NiFi to go check periodically the queue names list and feed it parametrically to a single ConsumeAMQP processor.
However, I realized that my plan is not applicable because:

  1.  ConsumeAMQP does not allow incoming relationships
  2.  Its "Queue" property does not allow values in Expression Language (so putting a variable value like ${queue_name} wouldn't work, right?)

Do you have an alternative, doable approach in NiFi to suggest me?

Thank you very much,

Luca


Luca Giovannini
Information Systems Analyst
Dedagroup Public Services
www.linkedin.com/in/lucagio/<http://www.linkedin.com/in/lucagio/>
T +39.051.278.928 | M +39.347.799.3183 | VoIP 951.128
Dedagroup Public Services Srl - Sede di Casalecchio di Reno, Via del Lavoro 67
www.dedagroup.it/public-services<http://www.dedagroup.it/public-services>

[cid:image001.jpg@01D6B8E7.9C7E08F0]<http://www.dedagroup.it/home>

Le informazioni contenute in questo messaggio di posta elettronica sono riservate e confidenziali e ne e' vietata la diffusione in qualsiasi modo o forma. Qualora Lei non fosse la persona destinataria del presente messaggio, La invitiamo a non diffonderlo e ad eliminarlo, dandone gentilmente comunicazione al mittente.

The information included in this e-mail and any attachments are confidential and may also be privileged. If you are not the correct recipient, you are kindly requested to notify the sender immediately, to cancel it and not to disclose the contents to any other person.

R: Consuming a dynamically variable number of AMQP queues

Posted by Luca Giovannini <Lu...@dedagroup.it>.
Thank you, Bryan!

Luca


Da: Bryan Bende <bb...@gmail.com>
Inviato: giovedì 12 novembre 2020 17:14
A: users@nifi.apache.org
Oggetto: Re: Consuming a dynamically variable number of AMQP queues

**ATTENZIONE** Questo messaggio proviene da un ACCOUNT ESTERNO, presta attenzione ad eventuali link o allegati al suo interno.

Hello,

This idea/problem has come up a few times. The issue is that if you allow incoming connections with dynamic queue/topic on these "consume" processors, then there is potentially an unbounded number of topics/queues to consume from. How does the processor ever know to stop consuming from one of them?

I would suggest implementing your own custom processor which can make more assumptions for your specific use case.

Another option would be to have some type of script that used NiFi's REST API to add/remove ConsumeAMQP processors based on some external knowledge.

-Bryan


On Thu, Nov 12, 2020 at 5:56 AM Luca Giovannini <Lu...@dedagroup.it>> wrote:

Hi All!

I have to implement a flow with NiFi that involves consuming different AMQP queues whose number may change in time.
One of the requirements is that when a new queue is available (of an old one is no more available) the NiFi flow adjusts to the new situation without the need for any manual adjustments.
In order to implement this I thought of having a simple configuration table in a postgres DB, listing the names of the active queues (that an operator would update when needed), and then use NiFi to go check periodically the queue names list and feed it parametrically to a single ConsumeAMQP processor.
However, I realized that my plan is not applicable because:

1.       ConsumeAMQP does not allow incoming relationships

2.       Its “Queue” property does not allow values in Expression Language (so putting a variable value like ${queue_name} wouldn’t work, right?)

Do you have an alternative, doable approach in NiFi to suggest me?

Thank you very much,

Luca


Luca Giovannini
Information Systems Analyst
Dedagroup Public Services
www.linkedin.com/in/lucagio/<https://eur01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fwww.linkedin.com%2Fin%2Flucagio%2F&data=04%7C01%7CLuca.Giovannini%40dedagroup.it%7C283d86afb05448c8fcbb08d887261276%7Cbbf156d433fa4fee86f62cfcb1359ef0%7C0%7C0%7C637407944884240018%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=XL71PpPWJNZkgaV9HXrta%2BlVVSGmBCGTTgw%2BEfx4%2FO4%3D&reserved=0>
T +39.051.278.928 | M +39.347.799.3183 | VoIP 951.128
Dedagroup Public Services Srl – Sede di Casalecchio di Reno, Via del Lavoro 67
www.dedagroup.it/public-services<https://eur01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fwww.dedagroup.it%2Fpublic-services&data=04%7C01%7CLuca.Giovannini%40dedagroup.it%7C283d86afb05448c8fcbb08d887261276%7Cbbf156d433fa4fee86f62cfcb1359ef0%7C0%7C0%7C637407944884249969%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=yYuilwfCK%2BpxRa%2FhfZKnCy%2BFNJFS3q9Efl1xM9cKflk%3D&reserved=0>

[cid:image001.jpg@01D6BF30.132D0020]<https://eur01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fwww.dedagroup.it%2Fhome&data=04%7C01%7CLuca.Giovannini%40dedagroup.it%7C283d86afb05448c8fcbb08d887261276%7Cbbf156d433fa4fee86f62cfcb1359ef0%7C0%7C0%7C637407944884249969%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=IgiOSfdM2X%2Fy0qcRH2Hzb7LemFeXfwrUei9yyTP0zAU%3D&reserved=0>

Le informazioni contenute in questo messaggio di posta elettronica sono riservate e confidenziali e ne e' vietata la diffusione in qualsiasi modo o forma. Qualora Lei non fosse la persona destinataria del presente messaggio, La invitiamo a non diffonderlo e ad eliminarlo, dandone gentilmente comunicazione al mittente.

The information included in this e-mail and any attachments are confidential and may also be privileged. If you are not the correct recipient, you are kindly requested to notify the sender immediately, to cancel it and not to disclose the contents to any other person.

Re: Consuming a dynamically variable number of AMQP queues

Posted by Bryan Bende <bb...@gmail.com>.
Hello,

This idea/problem has come up a few times. The issue is that if you allow
incoming connections with dynamic queue/topic on these "consume"
processors, then there is potentially an unbounded number of topics/queues
to consume from. How does the processor ever know to stop consuming from
one of them?

I would suggest implementing your own custom processor which can make more
assumptions for your specific use case.

Another option would be to have some type of script that used NiFi's REST
API to add/remove ConsumeAMQP processors based on some external knowledge.

-Bryan


On Thu, Nov 12, 2020 at 5:56 AM Luca Giovannini <
Luca.Giovannini@dedagroup.it> wrote:

>
>
> Hi All!
>
>
>
> I have to implement a flow with NiFi that involves consuming different
> AMQP queues whose number may change in time.
>
> One of the requirements is that when a new queue is available (of an old
> one is no more available) the NiFi flow adjusts to the new situation
> without the need for any manual adjustments.
>
> In order to implement this I thought of having a simple configuration
> table in a postgres DB, listing the names of the active queues (that an
> operator would update when needed), and then use NiFi to go check
> periodically the queue names list and feed it parametrically to a single
> ConsumeAMQP processor.
>
> However, I realized that my plan is not applicable because:
>
>    1. ConsumeAMQP does not allow incoming relationships
>    2. Its “Queue” property does not allow values in Expression Language
>    (so putting a variable value like ${queue_name} wouldn’t work, right?)
>
>
>
> Do you have an alternative, doable approach in NiFi to suggest me?
>
>
>
> Thank you very much,
>
>
>
> Luca
>
>
>
>
>
> *Luca Giovannini*
> Information Systems Analyst
> *Dedagroup Public Services*
>
> www.linkedin.com/in/lucagio/
>
> T +39.051.278.928 | M +39.347.799.3183 | VoIP 951.128
> Dedagroup Public Services Srl – Sede di Casalecchio di Reno, Via del
> Lavoro 67
>
> www.dedagroup.it/public-services
>
>
>
> <http://www.dedagroup.it/home>
>
>
> Le informazioni contenute in questo messaggio di posta elettronica sono
> riservate e confidenziali e ne e' vietata la diffusione in qualsiasi modo o
> forma. Qualora Lei non fosse la persona destinataria del presente
> messaggio, La invitiamo a non diffonderlo e ad eliminarlo, dandone
> gentilmente comunicazione al mittente.
>
> The information included in this e-mail and any attachments are
> confidential and may also be privileged. If you are not the correct
> recipient, you are kindly requested to notify the sender immediately, to
> cancel it and not to disclose the contents to any other person.
>