You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Boris Tyukin <bo...@boristyukin.com> on 2019/01/18 15:02:53 UTC

NiFi consumers concurrency

Hi all, Happy Friday!

I wonder if you have any ideas how to improve concurrency with NiFi Kafka
consumer processors.

We have 3 NiFi consumer processors in the flow, each listening to 250
topics. Each topic has 1 partition (it is critical for us to preserve
order). List of topic names is given as a comma-delimited list.

When we set concurrency to 6 for each consumer, hoping that NiFi will spawn
6 consumers per processor and they will be working concurrently, feeding
data from 6 topics at once. And since we have 3 consumer processors, it
should give us 18 concurrent feeds total.

[image: image.png]

Apparently, it does not work like that. NiFi does create 6 consumers per
processor (if I set concurrency to 6) but for some reason only one consumer
is reading from Kafka and one topic at the time while other 5 are sitting
and doing nothing. Because of that, total throughput is not really good.

We are looking at source code but I am hoping for some quick tips /
direction.

We could create 750 NiFi consumer processors instead but we do not really
like that idea.

Boris

Re: NiFi consumers concurrency

Posted by Boris Tyukin <bo...@boristyukin.com>.
TL;DR
Kafka consumer concurrency does nothing for you if you listen to a list of
topics with only one partition.

Long explanation..

I think we understand why this happens. As I mentioned we have a bunch of
single partition topics. After starring at source code of NiFi Kafka
consumer processor, we've noticed that NiFi creates multiple
threads/consumers (up to a number defined as concurrency property on a
processor). When it subscribes each consumer to the same list of topics -
it does not divide topics to listen between threads.

This post
https://dzone.com/articles/dont-use-apache-kafka-consumer-groups-the-wrong-wa
has a good explanation that consumer in a consumer group will subscribe to
topic partitions and then another consumer in the same group will subscribe
to other partitions. Now, in our case, our topics have only one partition
and other consumers have nothing to subscribe to and sit idle. This also
explains my previous screenshot, when the same consumer ID is doing all the
work.

At this point, we came up with two options:
1) split topics manually on "buckets" by estimated throughput
2) create or modify existing processor to split the list of topics so each
nifi thread will subscribe to different topics.




On Thu, Jan 24, 2019 at 4:44 PM Boris Tyukin <bo...@boristyukin.com> wrote:

> thanks Mark, but it did not help. other 3 consumer IDs are still not
> pulling messages from topics, only the very first one.
>
> But if I set up 9 different NiFi Kafka Consumer processor and each of them
> listen to a single topic, all 9 work in parallel, initiating 9 different
> consumer IDs (but the same consumer group)
>
> On Thu, Jan 24, 2019 at 3:56 PM Mark Payne <ma...@hotmail.com> wrote:
>
>> Boris,
>>
>> On the Settings tab, have you changed the value of the "Yield Duration"?
>> The default, I believe, is 1 second.
>> I would recommend that you change that to "0 sec" and that may do the
>> trick.
>>
>> Thanks
>> -Mark
>>
>> On Jan 24, 2019, at 3:30 PM, Boris Tyukin <bo...@boristyukin.com> wrote:
>>
>> any ideas?
>>
>> we've added another 7 topics per Kafka Consumer processor (so 9 topics
>> total) and with concurrency set to 4, it still pulls in one thread, using
>> the same consumer ID. other 3 are sitting and doing nothing.
>>
>> Based on the quick review of the source code, processor should spin up
>> multiple Kafka consumers up to a concurrency number defined on a processor
>> but clearly, this is not happening.
>>
>> On Fri, Jan 18, 2019 at 10:31 AM Boris Tyukin <bo...@boristyukin.com>
>> wrote:
>>
>>>
>>> Hi Joe and Charlie,
>>>
>>> thanks for a quick response and it is good to hear it should work like
>>> we expect it to work.
>>>
>>> We already bumped controller thread count to 50 and we also use
>>> demarcator to group messages out of kafka processor (this did improve
>>> performance quite a bit).
>>>
>>> We also tried to play with the last two properties (max poll records and
>>> uncommited time).
>>>
>>> <image.png>
>>>
>>> <image.png>
>>>
>>> check a screenshot below - you can see that the first two topics are
>>> consumed by the same consumer-id, next two by another one and next 2 by
>>> another but then you have a bunch of consumer IDs doing nothing.
>>> <image.png>
>>>
>>>
>>> On Fri, Jan 18, 2019 at 10:18 AM Joe Witt <jo...@gmail.com> wrote:
>>>
>>>> Boris
>>>>
>>>> You should check the total number of threads the flow controller
>>>> allows.  But also your description of how you'd like the processor to work
>>>> is indeed how it should work.  I'd also mention that you want to make sure
>>>> you're taking advantage of Kafka and NiFi's ability to operate on batches
>>>> efficiently.  When you poll for records/messages from Kafka often way more
>>>> than one is made available.  However, the default mode of ConsumeKafka is
>>>> single message per flowfile.  ConsumeKafka allows you to set a demarcator
>>>> value which means it will take what Kafka gives us and put a delimeter
>>>> between each record and put all that in a single flowfile.  This is great
>>>> for CSVs for instance.  Alternatively, and more recommended, take a look at
>>>> ConsumeKafkaRecord which inherently handles this demarcation logic and does
>>>> so using the appropriate mechanism for the given format/schema.  Depending
>>>> on the use case/scenario you might need to update your scripted processor
>>>> to operate on many records in a single flowfile or you can restructure that
>>>> to be using the record oriented scripted processors/controller services
>>>> which is often done for maximum performance and control.
>>>>
>>>> I'm not sure how well the Kafka client will behave when it is looking
>>>> at nearly 250 topics with relatively few threads but in any case the way
>>>> you expected NiFi to behave is how it should behave...  That we see the
>>>> other threads not being active is interesting for sure.  Check max threads
>>>> on controller and please share details of settings on the kafka procs.
>>>>
>>>> Thanks
>>>>
>>>> On Fri, Jan 18, 2019 at 10:09 AM Charlie Meyer <
>>>> charlie.meyer@civitaslearning.com> wrote:
>>>>
>>>>> Hi Boris,
>>>>>
>>>>> I have seen behavior similar to this before on other flows I have run.
>>>>> In your Controller settings (in the hamburger menu at the top right of the
>>>>> UI), have you adjusted the Maximum Timer Driven Thread Count?
>>>>>
>>>>> On Fri, Jan 18, 2019 at 9:04 AM Boris Tyukin <bo...@boristyukin.com>
>>>>> wrote:
>>>>>
>>>>>> Hi all, Happy Friday!
>>>>>>
>>>>>> I wonder if you have any ideas how to improve concurrency with NiFi
>>>>>> Kafka consumer processors.
>>>>>>
>>>>>> We have 3 NiFi consumer processors in the flow, each listening to 250
>>>>>> topics. Each topic has 1 partition (it is critical for us to preserve
>>>>>> order). List of topic names is given as a comma-delimited list.
>>>>>>
>>>>>> When we set concurrency to 6 for each consumer, hoping that NiFi will
>>>>>> spawn 6 consumers per processor and they will be working concurrently,
>>>>>> feeding data from 6 topics at once. And since we have 3 consumer
>>>>>> processors, it should give us 18 concurrent feeds total.
>>>>>>
>>>>>> <image.png>
>>>>>>
>>>>>> Apparently, it does not work like that. NiFi does create 6 consumers
>>>>>> per processor (if I set concurrency to 6) but for some reason only one
>>>>>> consumer is reading from Kafka and one topic at the time while other 5 are
>>>>>> sitting and doing nothing. Because of that, total throughput is not really
>>>>>> good.
>>>>>>
>>>>>> We are looking at source code but I am hoping for some quick tips /
>>>>>> direction.
>>>>>>
>>>>>> We could create 750 NiFi consumer processors instead but we do not
>>>>>> really like that idea.
>>>>>>
>>>>>> Boris
>>>>>>
>>>>>
>>

Re: NiFi consumers concurrency

Posted by Boris Tyukin <bo...@boristyukin.com>.
thanks Mark, but it did not help. other 3 consumer IDs are still not
pulling messages from topics, only the very first one.

But if I set up 9 different NiFi Kafka Consumer processor and each of them
listen to a single topic, all 9 work in parallel, initiating 9 different
consumer IDs (but the same consumer group)

On Thu, Jan 24, 2019 at 3:56 PM Mark Payne <ma...@hotmail.com> wrote:

> Boris,
>
> On the Settings tab, have you changed the value of the "Yield Duration"?
> The default, I believe, is 1 second.
> I would recommend that you change that to "0 sec" and that may do the
> trick.
>
> Thanks
> -Mark
>
> On Jan 24, 2019, at 3:30 PM, Boris Tyukin <bo...@boristyukin.com> wrote:
>
> any ideas?
>
> we've added another 7 topics per Kafka Consumer processor (so 9 topics
> total) and with concurrency set to 4, it still pulls in one thread, using
> the same consumer ID. other 3 are sitting and doing nothing.
>
> Based on the quick review of the source code, processor should spin up
> multiple Kafka consumers up to a concurrency number defined on a processor
> but clearly, this is not happening.
>
> On Fri, Jan 18, 2019 at 10:31 AM Boris Tyukin <bo...@boristyukin.com>
> wrote:
>
>>
>> Hi Joe and Charlie,
>>
>> thanks for a quick response and it is good to hear it should work like we
>> expect it to work.
>>
>> We already bumped controller thread count to 50 and we also use
>> demarcator to group messages out of kafka processor (this did improve
>> performance quite a bit).
>>
>> We also tried to play with the last two properties (max poll records and
>> uncommited time).
>>
>> <image.png>
>>
>> <image.png>
>>
>> check a screenshot below - you can see that the first two topics are
>> consumed by the same consumer-id, next two by another one and next 2 by
>> another but then you have a bunch of consumer IDs doing nothing.
>> <image.png>
>>
>>
>> On Fri, Jan 18, 2019 at 10:18 AM Joe Witt <jo...@gmail.com> wrote:
>>
>>> Boris
>>>
>>> You should check the total number of threads the flow controller
>>> allows.  But also your description of how you'd like the processor to work
>>> is indeed how it should work.  I'd also mention that you want to make sure
>>> you're taking advantage of Kafka and NiFi's ability to operate on batches
>>> efficiently.  When you poll for records/messages from Kafka often way more
>>> than one is made available.  However, the default mode of ConsumeKafka is
>>> single message per flowfile.  ConsumeKafka allows you to set a demarcator
>>> value which means it will take what Kafka gives us and put a delimeter
>>> between each record and put all that in a single flowfile.  This is great
>>> for CSVs for instance.  Alternatively, and more recommended, take a look at
>>> ConsumeKafkaRecord which inherently handles this demarcation logic and does
>>> so using the appropriate mechanism for the given format/schema.  Depending
>>> on the use case/scenario you might need to update your scripted processor
>>> to operate on many records in a single flowfile or you can restructure that
>>> to be using the record oriented scripted processors/controller services
>>> which is often done for maximum performance and control.
>>>
>>> I'm not sure how well the Kafka client will behave when it is looking at
>>> nearly 250 topics with relatively few threads but in any case the way you
>>> expected NiFi to behave is how it should behave...  That we see the other
>>> threads not being active is interesting for sure.  Check max threads on
>>> controller and please share details of settings on the kafka procs.
>>>
>>> Thanks
>>>
>>> On Fri, Jan 18, 2019 at 10:09 AM Charlie Meyer <
>>> charlie.meyer@civitaslearning.com> wrote:
>>>
>>>> Hi Boris,
>>>>
>>>> I have seen behavior similar to this before on other flows I have run.
>>>> In your Controller settings (in the hamburger menu at the top right of the
>>>> UI), have you adjusted the Maximum Timer Driven Thread Count?
>>>>
>>>> On Fri, Jan 18, 2019 at 9:04 AM Boris Tyukin <bo...@boristyukin.com>
>>>> wrote:
>>>>
>>>>> Hi all, Happy Friday!
>>>>>
>>>>> I wonder if you have any ideas how to improve concurrency with NiFi
>>>>> Kafka consumer processors.
>>>>>
>>>>> We have 3 NiFi consumer processors in the flow, each listening to 250
>>>>> topics. Each topic has 1 partition (it is critical for us to preserve
>>>>> order). List of topic names is given as a comma-delimited list.
>>>>>
>>>>> When we set concurrency to 6 for each consumer, hoping that NiFi will
>>>>> spawn 6 consumers per processor and they will be working concurrently,
>>>>> feeding data from 6 topics at once. And since we have 3 consumer
>>>>> processors, it should give us 18 concurrent feeds total.
>>>>>
>>>>> <image.png>
>>>>>
>>>>> Apparently, it does not work like that. NiFi does create 6 consumers
>>>>> per processor (if I set concurrency to 6) but for some reason only one
>>>>> consumer is reading from Kafka and one topic at the time while other 5 are
>>>>> sitting and doing nothing. Because of that, total throughput is not really
>>>>> good.
>>>>>
>>>>> We are looking at source code but I am hoping for some quick tips /
>>>>> direction.
>>>>>
>>>>> We could create 750 NiFi consumer processors instead but we do not
>>>>> really like that idea.
>>>>>
>>>>> Boris
>>>>>
>>>>
>

Re: NiFi consumers concurrency

Posted by Mark Payne <ma...@hotmail.com>.
Boris,

On the Settings tab, have you changed the value of the "Yield Duration"? The default, I believe, is 1 second.
I would recommend that you change that to "0 sec" and that may do the trick.

Thanks
-Mark

On Jan 24, 2019, at 3:30 PM, Boris Tyukin <bo...@boristyukin.com>> wrote:

any ideas?

we've added another 7 topics per Kafka Consumer processor (so 9 topics total) and with concurrency set to 4, it still pulls in one thread, using the same consumer ID. other 3 are sitting and doing nothing.

Based on the quick review of the source code, processor should spin up multiple Kafka consumers up to a concurrency number defined on a processor but clearly, this is not happening.

On Fri, Jan 18, 2019 at 10:31 AM Boris Tyukin <bo...@boristyukin.com>> wrote:

Hi Joe and Charlie,

thanks for a quick response and it is good to hear it should work like we expect it to work.

We already bumped controller thread count to 50 and we also use demarcator to group messages out of kafka processor (this did improve performance quite a bit).

We also tried to play with the last two properties (max poll records and uncommited time).

<image.png>

<image.png>

check a screenshot below - you can see that the first two topics are consumed by the same consumer-id, next two by another one and next 2 by another but then you have a bunch of consumer IDs doing nothing.
<image.png>


On Fri, Jan 18, 2019 at 10:18 AM Joe Witt <jo...@gmail.com>> wrote:
Boris

You should check the total number of threads the flow controller allows.  But also your description of how you'd like the processor to work is indeed how it should work.  I'd also mention that you want to make sure you're taking advantage of Kafka and NiFi's ability to operate on batches efficiently.  When you poll for records/messages from Kafka often way more than one is made available.  However, the default mode of ConsumeKafka is single message per flowfile.  ConsumeKafka allows you to set a demarcator value which means it will take what Kafka gives us and put a delimeter between each record and put all that in a single flowfile.  This is great for CSVs for instance.  Alternatively, and more recommended, take a look at ConsumeKafkaRecord which inherently handles this demarcation logic and does so using the appropriate mechanism for the given format/schema.  Depending on the use case/scenario you might need to update your scripted processor to operate on many records in a single flowfile or you can restructure that to be using the record oriented scripted processors/controller services which is often done for maximum performance and control.

I'm not sure how well the Kafka client will behave when it is looking at nearly 250 topics with relatively few threads but in any case the way you expected NiFi to behave is how it should behave...  That we see the other threads not being active is interesting for sure.  Check max threads on controller and please share details of settings on the kafka procs.

Thanks

On Fri, Jan 18, 2019 at 10:09 AM Charlie Meyer <ch...@civitaslearning.com>> wrote:
Hi Boris,

I have seen behavior similar to this before on other flows I have run. In your Controller settings (in the hamburger menu at the top right of the UI), have you adjusted the Maximum Timer Driven Thread Count?

On Fri, Jan 18, 2019 at 9:04 AM Boris Tyukin <bo...@boristyukin.com>> wrote:
Hi all, Happy Friday!

I wonder if you have any ideas how to improve concurrency with NiFi Kafka consumer processors.

We have 3 NiFi consumer processors in the flow, each listening to 250 topics. Each topic has 1 partition (it is critical for us to preserve order). List of topic names is given as a comma-delimited list.

When we set concurrency to 6 for each consumer, hoping that NiFi will spawn 6 consumers per processor and they will be working concurrently, feeding data from 6 topics at once. And since we have 3 consumer processors, it should give us 18 concurrent feeds total.

<image.png>

Apparently, it does not work like that. NiFi does create 6 consumers per processor (if I set concurrency to 6) but for some reason only one consumer is reading from Kafka and one topic at the time while other 5 are sitting and doing nothing. Because of that, total throughput is not really good.

We are looking at source code but I am hoping for some quick tips / direction.

We could create 750 NiFi consumer processors instead but we do not really like that idea.

Boris


Re: NiFi consumers concurrency

Posted by Boris Tyukin <bo...@boristyukin.com>.
any ideas?

we've added another 7 topics per Kafka Consumer processor (so 9 topics
total) and with concurrency set to 4, it still pulls in one thread, using
the same consumer ID. other 3 are sitting and doing nothing.

Based on the quick review of the source code, processor should spin up
multiple Kafka consumers up to a concurrency number defined on a processor
but clearly, this is not happening.

On Fri, Jan 18, 2019 at 10:31 AM Boris Tyukin <bo...@boristyukin.com> wrote:

>
> Hi Joe and Charlie,
>
> thanks for a quick response and it is good to hear it should work like we
> expect it to work.
>
> We already bumped controller thread count to 50 and we also use
> demarcator to group messages out of kafka processor (this did improve
> performance quite a bit).
>
> We also tried to play with the last two properties (max poll records and
> uncommited time).
>
> [image: image.png]
>
> [image: image.png]
>
> check a screenshot below - you can see that the first two topics are
> consumed by the same consumer-id, next two by another one and next 2 by
> another but then you have a bunch of consumer IDs doing nothing.
> [image: image.png]
>
>
> On Fri, Jan 18, 2019 at 10:18 AM Joe Witt <jo...@gmail.com> wrote:
>
>> Boris
>>
>> You should check the total number of threads the flow controller allows.
>> But also your description of how you'd like the processor to work is indeed
>> how it should work.  I'd also mention that you want to make sure you're
>> taking advantage of Kafka and NiFi's ability to operate on batches
>> efficiently.  When you poll for records/messages from Kafka often way more
>> than one is made available.  However, the default mode of ConsumeKafka is
>> single message per flowfile.  ConsumeKafka allows you to set a demarcator
>> value which means it will take what Kafka gives us and put a delimeter
>> between each record and put all that in a single flowfile.  This is great
>> for CSVs for instance.  Alternatively, and more recommended, take a look at
>> ConsumeKafkaRecord which inherently handles this demarcation logic and does
>> so using the appropriate mechanism for the given format/schema.  Depending
>> on the use case/scenario you might need to update your scripted processor
>> to operate on many records in a single flowfile or you can restructure that
>> to be using the record oriented scripted processors/controller services
>> which is often done for maximum performance and control.
>>
>> I'm not sure how well the Kafka client will behave when it is looking at
>> nearly 250 topics with relatively few threads but in any case the way you
>> expected NiFi to behave is how it should behave...  That we see the other
>> threads not being active is interesting for sure.  Check max threads on
>> controller and please share details of settings on the kafka procs.
>>
>> Thanks
>>
>> On Fri, Jan 18, 2019 at 10:09 AM Charlie Meyer <
>> charlie.meyer@civitaslearning.com> wrote:
>>
>>> Hi Boris,
>>>
>>> I have seen behavior similar to this before on other flows I have run.
>>> In your Controller settings (in the hamburger menu at the top right of the
>>> UI), have you adjusted the Maximum Timer Driven Thread Count?
>>>
>>> On Fri, Jan 18, 2019 at 9:04 AM Boris Tyukin <bo...@boristyukin.com>
>>> wrote:
>>>
>>>> Hi all, Happy Friday!
>>>>
>>>> I wonder if you have any ideas how to improve concurrency with NiFi
>>>> Kafka consumer processors.
>>>>
>>>> We have 3 NiFi consumer processors in the flow, each listening to 250
>>>> topics. Each topic has 1 partition (it is critical for us to preserve
>>>> order). List of topic names is given as a comma-delimited list.
>>>>
>>>> When we set concurrency to 6 for each consumer, hoping that NiFi will
>>>> spawn 6 consumers per processor and they will be working concurrently,
>>>> feeding data from 6 topics at once. And since we have 3 consumer
>>>> processors, it should give us 18 concurrent feeds total.
>>>>
>>>> [image: image.png]
>>>>
>>>> Apparently, it does not work like that. NiFi does create 6 consumers
>>>> per processor (if I set concurrency to 6) but for some reason only one
>>>> consumer is reading from Kafka and one topic at the time while other 5 are
>>>> sitting and doing nothing. Because of that, total throughput is not really
>>>> good.
>>>>
>>>> We are looking at source code but I am hoping for some quick tips /
>>>> direction.
>>>>
>>>> We could create 750 NiFi consumer processors instead but we do not
>>>> really like that idea.
>>>>
>>>> Boris
>>>>
>>>

Re: NiFi consumers concurrency

Posted by Boris Tyukin <bo...@boristyukin.com>.
Hi Joe and Charlie,

thanks for a quick response and it is good to hear it should work like we
expect it to work.

We already bumped controller thread count to 50 and we also use
demarcator to group messages out of kafka processor (this did improve
performance quite a bit).

We also tried to play with the last two properties (max poll records and
uncommited time).

[image: image.png]

[image: image.png]

check a screenshot below - you can see that the first two topics are
consumed by the same consumer-id, next two by another one and next 2 by
another but then you have a bunch of consumer IDs doing nothing.
[image: image.png]


On Fri, Jan 18, 2019 at 10:18 AM Joe Witt <jo...@gmail.com> wrote:

> Boris
>
> You should check the total number of threads the flow controller allows.
> But also your description of how you'd like the processor to work is indeed
> how it should work.  I'd also mention that you want to make sure you're
> taking advantage of Kafka and NiFi's ability to operate on batches
> efficiently.  When you poll for records/messages from Kafka often way more
> than one is made available.  However, the default mode of ConsumeKafka is
> single message per flowfile.  ConsumeKafka allows you to set a demarcator
> value which means it will take what Kafka gives us and put a delimeter
> between each record and put all that in a single flowfile.  This is great
> for CSVs for instance.  Alternatively, and more recommended, take a look at
> ConsumeKafkaRecord which inherently handles this demarcation logic and does
> so using the appropriate mechanism for the given format/schema.  Depending
> on the use case/scenario you might need to update your scripted processor
> to operate on many records in a single flowfile or you can restructure that
> to be using the record oriented scripted processors/controller services
> which is often done for maximum performance and control.
>
> I'm not sure how well the Kafka client will behave when it is looking at
> nearly 250 topics with relatively few threads but in any case the way you
> expected NiFi to behave is how it should behave...  That we see the other
> threads not being active is interesting for sure.  Check max threads on
> controller and please share details of settings on the kafka procs.
>
> Thanks
>
> On Fri, Jan 18, 2019 at 10:09 AM Charlie Meyer <
> charlie.meyer@civitaslearning.com> wrote:
>
>> Hi Boris,
>>
>> I have seen behavior similar to this before on other flows I have run. In
>> your Controller settings (in the hamburger menu at the top right of the
>> UI), have you adjusted the Maximum Timer Driven Thread Count?
>>
>> On Fri, Jan 18, 2019 at 9:04 AM Boris Tyukin <bo...@boristyukin.com>
>> wrote:
>>
>>> Hi all, Happy Friday!
>>>
>>> I wonder if you have any ideas how to improve concurrency with NiFi
>>> Kafka consumer processors.
>>>
>>> We have 3 NiFi consumer processors in the flow, each listening to 250
>>> topics. Each topic has 1 partition (it is critical for us to preserve
>>> order). List of topic names is given as a comma-delimited list.
>>>
>>> When we set concurrency to 6 for each consumer, hoping that NiFi will
>>> spawn 6 consumers per processor and they will be working concurrently,
>>> feeding data from 6 topics at once. And since we have 3 consumer
>>> processors, it should give us 18 concurrent feeds total.
>>>
>>> [image: image.png]
>>>
>>> Apparently, it does not work like that. NiFi does create 6 consumers per
>>> processor (if I set concurrency to 6) but for some reason only one consumer
>>> is reading from Kafka and one topic at the time while other 5 are sitting
>>> and doing nothing. Because of that, total throughput is not really good.
>>>
>>> We are looking at source code but I am hoping for some quick tips /
>>> direction.
>>>
>>> We could create 750 NiFi consumer processors instead but we do not
>>> really like that idea.
>>>
>>> Boris
>>>
>>

Re: NiFi consumers concurrency

Posted by Joe Witt <jo...@gmail.com>.
Boris

You should check the total number of threads the flow controller allows.
But also your description of how you'd like the processor to work is indeed
how it should work.  I'd also mention that you want to make sure you're
taking advantage of Kafka and NiFi's ability to operate on batches
efficiently.  When you poll for records/messages from Kafka often way more
than one is made available.  However, the default mode of ConsumeKafka is
single message per flowfile.  ConsumeKafka allows you to set a demarcator
value which means it will take what Kafka gives us and put a delimeter
between each record and put all that in a single flowfile.  This is great
for CSVs for instance.  Alternatively, and more recommended, take a look at
ConsumeKafkaRecord which inherently handles this demarcation logic and does
so using the appropriate mechanism for the given format/schema.  Depending
on the use case/scenario you might need to update your scripted processor
to operate on many records in a single flowfile or you can restructure that
to be using the record oriented scripted processors/controller services
which is often done for maximum performance and control.

I'm not sure how well the Kafka client will behave when it is looking at
nearly 250 topics with relatively few threads but in any case the way you
expected NiFi to behave is how it should behave...  That we see the other
threads not being active is interesting for sure.  Check max threads on
controller and please share details of settings on the kafka procs.

Thanks

On Fri, Jan 18, 2019 at 10:09 AM Charlie Meyer <
charlie.meyer@civitaslearning.com> wrote:

> Hi Boris,
>
> I have seen behavior similar to this before on other flows I have run. In
> your Controller settings (in the hamburger menu at the top right of the
> UI), have you adjusted the Maximum Timer Driven Thread Count?
>
> On Fri, Jan 18, 2019 at 9:04 AM Boris Tyukin <bo...@boristyukin.com>
> wrote:
>
>> Hi all, Happy Friday!
>>
>> I wonder if you have any ideas how to improve concurrency with NiFi Kafka
>> consumer processors.
>>
>> We have 3 NiFi consumer processors in the flow, each listening to 250
>> topics. Each topic has 1 partition (it is critical for us to preserve
>> order). List of topic names is given as a comma-delimited list.
>>
>> When we set concurrency to 6 for each consumer, hoping that NiFi will
>> spawn 6 consumers per processor and they will be working concurrently,
>> feeding data from 6 topics at once. And since we have 3 consumer
>> processors, it should give us 18 concurrent feeds total.
>>
>> [image: image.png]
>>
>> Apparently, it does not work like that. NiFi does create 6 consumers per
>> processor (if I set concurrency to 6) but for some reason only one consumer
>> is reading from Kafka and one topic at the time while other 5 are sitting
>> and doing nothing. Because of that, total throughput is not really good.
>>
>> We are looking at source code but I am hoping for some quick tips /
>> direction.
>>
>> We could create 750 NiFi consumer processors instead but we do not really
>> like that idea.
>>
>> Boris
>>
>

Re: NiFi consumers concurrency

Posted by Charlie Meyer <ch...@civitaslearning.com>.
Hi Boris,

I have seen behavior similar to this before on other flows I have run. In
your Controller settings (in the hamburger menu at the top right of the
UI), have you adjusted the Maximum Timer Driven Thread Count?

On Fri, Jan 18, 2019 at 9:04 AM Boris Tyukin <bo...@boristyukin.com> wrote:

> Hi all, Happy Friday!
>
> I wonder if you have any ideas how to improve concurrency with NiFi Kafka
> consumer processors.
>
> We have 3 NiFi consumer processors in the flow, each listening to 250
> topics. Each topic has 1 partition (it is critical for us to preserve
> order). List of topic names is given as a comma-delimited list.
>
> When we set concurrency to 6 for each consumer, hoping that NiFi will
> spawn 6 consumers per processor and they will be working concurrently,
> feeding data from 6 topics at once. And since we have 3 consumer
> processors, it should give us 18 concurrent feeds total.
>
> [image: image.png]
>
> Apparently, it does not work like that. NiFi does create 6 consumers per
> processor (if I set concurrency to 6) but for some reason only one consumer
> is reading from Kafka and one topic at the time while other 5 are sitting
> and doing nothing. Because of that, total throughput is not really good.
>
> We are looking at source code but I am hoping for some quick tips /
> direction.
>
> We could create 750 NiFi consumer processors instead but we do not really
> like that idea.
>
> Boris
>