You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by sunil chaudhari <su...@gmail.com> on 2020/06/19 11:13:58 UTC

Duplicate records on consumer side.

Hi,
I am using kafka as a broker in my event data pipeline.
Filebeat as producer
Logstash as consumer.


Filebeat simply pushes to Kafka.
Logstash has 3 instances.
Each instance has a consumer group say consumer_mytopic which reads from
mytopic.

mytopic has 3 partitions and 2 replica.

As per my understanding, each consumer group can have threads equal to
number of partitions so i kept 3 threads for each consumer.

Here I am considering one logstash instance as a one consumer which is part
of consumer_mytopic.
Similar consumer running on some other server which has group_id same as
above. Note that 3 servers has client Id different so that they wont read
duplicate data.
So 3 instances of logstash running with group_id as consumer_mytopic with 3
threads each, and diff client id. Means 9 threads total.

My understanding is each consumer(instance) can read with 3 threads from 3
partitions. And another consumer with 3 threads.

Is this good design?
Can it create duplicate?
This thread and partitions trade-off is related to client_id or Consumer
group Id?
I hope because of diff client_id 3 instances wont read duplicate data even
if group_id is same.
I am getting duplicate data in my consumer side.
Please help in this.

Regards,
Sunil.

Re: Duplicate records on consumer side.

Posted by Ricardo Ferreira <ri...@riferrei.com>.
Hi Sunil,

No worries for the mix up in the email. I totally understand!

"So Now if I already started 3 instances on 3 servers with 3 threads 
each, then To better utilize it, i have to increase partitions. Right?" 
-- Yes, you got that right. To ease up your understanding... always 
think in terms of threads. Forget about instances and servers. 3 
instances on 3 servers might not mean much if each instance is running 
on its own server. They will essentially count as 3 threads. Now if each 
instance has 3 threads and you have 3 instances then your total number 
of threads will be 9 and that is the minimum number of partitions that 
you should have.

"What is impact on Existing topics, if i increase number of partitions 
for all topics and reatart cluster?" -- increasing partitions causes 
rebalancing of partitions among the consumer groups as well as a higher 
number of replicas created if each topic has replication factor set to 
higher than 1. Thus, the cluster will invariably become busier. Whether 
if this becomes a performance problem or not depends on the size of the 
cluster versus the number of partitions created.

"Or I can do that from CLI or Confluent control Center without 
restarting cluster?" -- you are not required to restart the cluster in 
order to increase the number of partitions. This is totally optional. 
Though some companies might treat changes like this as a planned 
downtime change that would require a restart for best practice purposes. 
But it is optional. Trust me, Kafka can handle it =)

Thanks,

-- Ricardo

On 6/19/20 12:24 PM, sunil chaudhari wrote:
> Hi,
> Thanks for the clarification.
> This means, for “A“ consumer group, Running one Consumer instance with 
> 3 threads on one server is equal to running 3 different instances with 
> one thread each on 3 different servers.
>
> So Now if I already started 3 instances on 3 servers with 3 threads 
> each, then To better utilise it, i have to increase partitions. Right?
>
> What is impact on Existing topics, if i increase number of partitions 
> for all topics and reatart cluster?
>
> Or I can do that from CLI or Confluent control Center without 
> restarting cluster?
>
>
> About duplicate records, it seems problem of max.poll.records and 
> polling interval. I am working on that.
> Offset commit is failing before next poll for a consumer group. Thats 
> the problem.
> Now I dont know what is default value in cluster for above 2 
> parameters and what value should I set in logstash kafka input?
>
> Sorry to mixup so many things in one mail😃
>
>
> Regards,
> Sunil.
>
>
> On Fri, 19 Jun 2020 at 7:59 PM, Ricardo Ferreira 
> <riferrei@riferrei.com <ma...@riferrei.com>> wrote:
>
>     Sunil,
>
>     Kafka ensures that each partition is read by one given thread only
>     from a consumer group. Since your topic has three partitions, the
>     rationale is that at least three threads from the consumer group
>     will be properly served.
>
>     However, though your calculation is correct (3 instances, each one
>     of 3 threads will total 9 threads) the design and usage is
>     incorrect. As stated above only three threads will be served and
>     the remaining six other threads will be kept waiting -- likely to
>     starve if all of them belong to the consumer group that the other
>     three threads belong.
>
>     Please note that the `client-id` property has nothing to do with
>     this thread group management. This property is used internally by
>     Kafka to correlate events sent from the same machine in order to
>     better adjust quota management. So the only property taking place
>     where is the `group-id` in the matter of partition assignment.
>
>     Regarding duplicated data, this is another problem that would
>     require a better investigation of your topology, how Logstash
>     connect to Kafka, and how the code is implemented.
>
>     Thanks,
>
>     -- Ricardo
>
>     On 6/19/20 7:13 AM, sunil chaudhari wrote:
>>     Hi,
>>     I am using kafka as a broker in my event data pipeline.
>>     Filebeat as producer
>>     Logstash as consumer.
>>
>>
>>     Filebeat simply pushes to Kafka.
>>     Logstash has 3 instances.
>>     Each instance has a consumer group say consumer_mytopic which reads from
>>     mytopic.
>>
>>     mytopic has 3 partitions and 2 replica.
>>
>>     As per my understanding, each consumer group can have threads equal to
>>     number of partitions so i kept 3 threads for each consumer.
>>
>>     Here I am considering one logstash instance as a one consumer which is part
>>     of consumer_mytopic.
>>     Similar consumer running on some other server which has group_id same as
>>     above. Note that 3 servers has client Id different so that they wont read
>>     duplicate data.
>>     So 3 instances of logstash running with group_id as consumer_mytopic with 3
>>     threads each, and diff client id. Means 9 threads total.
>>
>>     My understanding is each consumer(instance) can read with 3 threads from 3
>>     partitions. And another consumer with 3 threads.
>>
>>     Is this good design?
>>     Can it create duplicate?
>>     This thread and partitions trade-off is related to client_id or Consumer
>>     group Id?
>>     I hope because of diff client_id 3 instances wont read duplicate data even
>>     if group_id is same.
>>     I am getting duplicate data in my consumer side.
>>     Please help in this.
>>
>>     Regards,
>>     Sunil.
>>

Re: Duplicate records on consumer side.

Posted by sunil chaudhari <su...@gmail.com>.
Hi,
Thanks for the clarification.
This means, for “A“ consumer group, Running one Consumer instance with 3
threads on one server is equal to running 3 different instances with one
thread each on 3 different servers.

So Now if I already started 3 instances on 3 servers with 3 threads each,
then To better utilise it, i have to increase partitions. Right?

What is impact on Existing topics, if i increase number of partitions for
all topics and reatart cluster?

Or I can do that from CLI or Confluent control Center without restarting
cluster?


About duplicate records, it seems problem of max.poll.records and polling
interval. I am working on that.
Offset commit is failing before next poll for a consumer group. Thats the
problem.
Now I dont know what is default value in cluster for above 2 parameters and
what value should I set in logstash kafka input?

Sorry to mixup so many things in one mail😃


Regards,
Sunil.


On Fri, 19 Jun 2020 at 7:59 PM, Ricardo Ferreira <ri...@riferrei.com>
wrote:

> Sunil,
>
> Kafka ensures that each partition is read by one given thread only from a
> consumer group. Since your topic has three partitions, the rationale is
> that at least three threads from the consumer group will be properly served.
>
> However, though your calculation is correct (3 instances, each one of 3
> threads will total 9 threads) the design and usage is incorrect. As stated
> above only three threads will be served and the remaining six other threads
> will be kept waiting -- likely to starve if all of them belong to the
> consumer group that the other three threads belong.
>
> Please note that the `client-id` property has nothing to do with this
> thread group management. This property is used internally by Kafka to
> correlate events sent from the same machine in order to better adjust quota
> management. So the only property taking place where is the `group-id` in
> the matter of partition assignment.
>
> Regarding duplicated data, this is another problem that would require a
> better investigation of your topology, how Logstash connect to Kafka, and
> how the code is implemented.
>
> Thanks,
>
> -- Ricardo
> On 6/19/20 7:13 AM, sunil chaudhari wrote:
>
> Hi,
> I am using kafka as a broker in my event data pipeline.
> Filebeat as producer
> Logstash as consumer.
>
>
> Filebeat simply pushes to Kafka.
> Logstash has 3 instances.
> Each instance has a consumer group say consumer_mytopic which reads from
> mytopic.
>
> mytopic has 3 partitions and 2 replica.
>
> As per my understanding, each consumer group can have threads equal to
> number of partitions so i kept 3 threads for each consumer.
>
> Here I am considering one logstash instance as a one consumer which is part
> of consumer_mytopic.
> Similar consumer running on some other server which has group_id same as
> above. Note that 3 servers has client Id different so that they wont read
> duplicate data.
> So 3 instances of logstash running with group_id as consumer_mytopic with 3
> threads each, and diff client id. Means 9 threads total.
>
> My understanding is each consumer(instance) can read with 3 threads from 3
> partitions. And another consumer with 3 threads.
>
> Is this good design?
> Can it create duplicate?
> This thread and partitions trade-off is related to client_id or Consumer
> group Id?
> I hope because of diff client_id 3 instances wont read duplicate data even
> if group_id is same.
> I am getting duplicate data in my consumer side.
> Please help in this.
>
> Regards,
> Sunil.
>
>
>

Re: Duplicate records on consumer side.

Posted by Ricardo Ferreira <ri...@riferrei.com>.
Sunil,

Kafka ensures that each partition is read by one given thread only from 
a consumer group. Since your topic has three partitions, the rationale 
is that at least three threads from the consumer group will be properly 
served.

However, though your calculation is correct (3 instances, each one of 3 
threads will total 9 threads) the design and usage is incorrect. As 
stated above only three threads will be served and the remaining six 
other threads will be kept waiting -- likely to starve if all of them 
belong to the consumer group that the other three threads belong.

Please note that the `client-id` property has nothing to do with this 
thread group management. This property is used internally by Kafka to 
correlate events sent from the same machine in order to better adjust 
quota management. So the only property taking place where is the 
`group-id` in the matter of partition assignment.

Regarding duplicated data, this is another problem that would require a 
better investigation of your topology, how Logstash connect to Kafka, 
and how the code is implemented.

Thanks,

-- Ricardo

On 6/19/20 7:13 AM, sunil chaudhari wrote:
> Hi,
> I am using kafka as a broker in my event data pipeline.
> Filebeat as producer
> Logstash as consumer.
>
>
> Filebeat simply pushes to Kafka.
> Logstash has 3 instances.
> Each instance has a consumer group say consumer_mytopic which reads from
> mytopic.
>
> mytopic has 3 partitions and 2 replica.
>
> As per my understanding, each consumer group can have threads equal to
> number of partitions so i kept 3 threads for each consumer.
>
> Here I am considering one logstash instance as a one consumer which is part
> of consumer_mytopic.
> Similar consumer running on some other server which has group_id same as
> above. Note that 3 servers has client Id different so that they wont read
> duplicate data.
> So 3 instances of logstash running with group_id as consumer_mytopic with 3
> threads each, and diff client id. Means 9 threads total.
>
> My understanding is each consumer(instance) can read with 3 threads from 3
> partitions. And another consumer with 3 threads.
>
> Is this good design?
> Can it create duplicate?
> This thread and partitions trade-off is related to client_id or Consumer
> group Id?
> I hope because of diff client_id 3 instances wont read duplicate data even
> if group_id is same.
> I am getting duplicate data in my consumer side.
> Please help in this.
>
> Regards,
> Sunil.
>