You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by sunil chaudhari <su...@gmail.com> on 2020/06/19 11:13:58 UTC
Duplicate records on consumer side.
Hi,
I am using kafka as a broker in my event data pipeline.
Filebeat as producer
Logstash as consumer.
Filebeat simply pushes to Kafka.
Logstash has 3 instances.
Each instance has a consumer group say consumer_mytopic which reads from
mytopic.
mytopic has 3 partitions and 2 replica.
As per my understanding, each consumer group can have threads equal to
number of partitions so i kept 3 threads for each consumer.
Here I am considering one logstash instance as a one consumer which is part
of consumer_mytopic.
Similar consumer running on some other server which has group_id same as
above. Note that 3 servers has client Id different so that they wont read
duplicate data.
So 3 instances of logstash running with group_id as consumer_mytopic with 3
threads each, and diff client id. Means 9 threads total.
My understanding is each consumer(instance) can read with 3 threads from 3
partitions. And another consumer with 3 threads.
Is this good design?
Can it create duplicate?
This thread and partitions trade-off is related to client_id or Consumer
group Id?
I hope because of diff client_id 3 instances wont read duplicate data even
if group_id is same.
I am getting duplicate data in my consumer side.
Please help in this.
Regards,
Sunil.
Re: Duplicate records on consumer side.
Posted by Ricardo Ferreira <ri...@riferrei.com>.
Hi Sunil,
No worries for the mix up in the email. I totally understand!
"So Now if I already started 3 instances on 3 servers with 3 threads
each, then To better utilize it, i have to increase partitions. Right?"
-- Yes, you got that right. To ease up your understanding... always
think in terms of threads. Forget about instances and servers. 3
instances on 3 servers might not mean much if each instance is running
on its own server. They will essentially count as 3 threads. Now if each
instance has 3 threads and you have 3 instances then your total number
of threads will be 9 and that is the minimum number of partitions that
you should have.
"What is impact on Existing topics, if i increase number of partitions
for all topics and reatart cluster?" -- increasing partitions causes
rebalancing of partitions among the consumer groups as well as a higher
number of replicas created if each topic has replication factor set to
higher than 1. Thus, the cluster will invariably become busier. Whether
if this becomes a performance problem or not depends on the size of the
cluster versus the number of partitions created.
"Or I can do that from CLI or Confluent control Center without
restarting cluster?" -- you are not required to restart the cluster in
order to increase the number of partitions. This is totally optional.
Though some companies might treat changes like this as a planned
downtime change that would require a restart for best practice purposes.
But it is optional. Trust me, Kafka can handle it =)
Thanks,
-- Ricardo
On 6/19/20 12:24 PM, sunil chaudhari wrote:
> Hi,
> Thanks for the clarification.
> This means, for “A“ consumer group, Running one Consumer instance with
> 3 threads on one server is equal to running 3 different instances with
> one thread each on 3 different servers.
>
> So Now if I already started 3 instances on 3 servers with 3 threads
> each, then To better utilise it, i have to increase partitions. Right?
>
> What is impact on Existing topics, if i increase number of partitions
> for all topics and reatart cluster?
>
> Or I can do that from CLI or Confluent control Center without
> restarting cluster?
>
>
> About duplicate records, it seems problem of max.poll.records and
> polling interval. I am working on that.
> Offset commit is failing before next poll for a consumer group. Thats
> the problem.
> Now I dont know what is default value in cluster for above 2
> parameters and what value should I set in logstash kafka input?
>
> Sorry to mixup so many things in one mail😃
>
>
> Regards,
> Sunil.
>
>
> On Fri, 19 Jun 2020 at 7:59 PM, Ricardo Ferreira
> <riferrei@riferrei.com <ma...@riferrei.com>> wrote:
>
> Sunil,
>
> Kafka ensures that each partition is read by one given thread only
> from a consumer group. Since your topic has three partitions, the
> rationale is that at least three threads from the consumer group
> will be properly served.
>
> However, though your calculation is correct (3 instances, each one
> of 3 threads will total 9 threads) the design and usage is
> incorrect. As stated above only three threads will be served and
> the remaining six other threads will be kept waiting -- likely to
> starve if all of them belong to the consumer group that the other
> three threads belong.
>
> Please note that the `client-id` property has nothing to do with
> this thread group management. This property is used internally by
> Kafka to correlate events sent from the same machine in order to
> better adjust quota management. So the only property taking place
> where is the `group-id` in the matter of partition assignment.
>
> Regarding duplicated data, this is another problem that would
> require a better investigation of your topology, how Logstash
> connect to Kafka, and how the code is implemented.
>
> Thanks,
>
> -- Ricardo
>
> On 6/19/20 7:13 AM, sunil chaudhari wrote:
>> Hi,
>> I am using kafka as a broker in my event data pipeline.
>> Filebeat as producer
>> Logstash as consumer.
>>
>>
>> Filebeat simply pushes to Kafka.
>> Logstash has 3 instances.
>> Each instance has a consumer group say consumer_mytopic which reads from
>> mytopic.
>>
>> mytopic has 3 partitions and 2 replica.
>>
>> As per my understanding, each consumer group can have threads equal to
>> number of partitions so i kept 3 threads for each consumer.
>>
>> Here I am considering one logstash instance as a one consumer which is part
>> of consumer_mytopic.
>> Similar consumer running on some other server which has group_id same as
>> above. Note that 3 servers has client Id different so that they wont read
>> duplicate data.
>> So 3 instances of logstash running with group_id as consumer_mytopic with 3
>> threads each, and diff client id. Means 9 threads total.
>>
>> My understanding is each consumer(instance) can read with 3 threads from 3
>> partitions. And another consumer with 3 threads.
>>
>> Is this good design?
>> Can it create duplicate?
>> This thread and partitions trade-off is related to client_id or Consumer
>> group Id?
>> I hope because of diff client_id 3 instances wont read duplicate data even
>> if group_id is same.
>> I am getting duplicate data in my consumer side.
>> Please help in this.
>>
>> Regards,
>> Sunil.
>>
Re: Duplicate records on consumer side.
Posted by sunil chaudhari <su...@gmail.com>.
Hi,
Thanks for the clarification.
This means, for “A“ consumer group, Running one Consumer instance with 3
threads on one server is equal to running 3 different instances with one
thread each on 3 different servers.
So Now if I already started 3 instances on 3 servers with 3 threads each,
then To better utilise it, i have to increase partitions. Right?
What is impact on Existing topics, if i increase number of partitions for
all topics and reatart cluster?
Or I can do that from CLI or Confluent control Center without restarting
cluster?
About duplicate records, it seems problem of max.poll.records and polling
interval. I am working on that.
Offset commit is failing before next poll for a consumer group. Thats the
problem.
Now I dont know what is default value in cluster for above 2 parameters and
what value should I set in logstash kafka input?
Sorry to mixup so many things in one mail😃
Regards,
Sunil.
On Fri, 19 Jun 2020 at 7:59 PM, Ricardo Ferreira <ri...@riferrei.com>
wrote:
> Sunil,
>
> Kafka ensures that each partition is read by one given thread only from a
> consumer group. Since your topic has three partitions, the rationale is
> that at least three threads from the consumer group will be properly served.
>
> However, though your calculation is correct (3 instances, each one of 3
> threads will total 9 threads) the design and usage is incorrect. As stated
> above only three threads will be served and the remaining six other threads
> will be kept waiting -- likely to starve if all of them belong to the
> consumer group that the other three threads belong.
>
> Please note that the `client-id` property has nothing to do with this
> thread group management. This property is used internally by Kafka to
> correlate events sent from the same machine in order to better adjust quota
> management. So the only property taking place where is the `group-id` in
> the matter of partition assignment.
>
> Regarding duplicated data, this is another problem that would require a
> better investigation of your topology, how Logstash connect to Kafka, and
> how the code is implemented.
>
> Thanks,
>
> -- Ricardo
> On 6/19/20 7:13 AM, sunil chaudhari wrote:
>
> Hi,
> I am using kafka as a broker in my event data pipeline.
> Filebeat as producer
> Logstash as consumer.
>
>
> Filebeat simply pushes to Kafka.
> Logstash has 3 instances.
> Each instance has a consumer group say consumer_mytopic which reads from
> mytopic.
>
> mytopic has 3 partitions and 2 replica.
>
> As per my understanding, each consumer group can have threads equal to
> number of partitions so i kept 3 threads for each consumer.
>
> Here I am considering one logstash instance as a one consumer which is part
> of consumer_mytopic.
> Similar consumer running on some other server which has group_id same as
> above. Note that 3 servers has client Id different so that they wont read
> duplicate data.
> So 3 instances of logstash running with group_id as consumer_mytopic with 3
> threads each, and diff client id. Means 9 threads total.
>
> My understanding is each consumer(instance) can read with 3 threads from 3
> partitions. And another consumer with 3 threads.
>
> Is this good design?
> Can it create duplicate?
> This thread and partitions trade-off is related to client_id or Consumer
> group Id?
> I hope because of diff client_id 3 instances wont read duplicate data even
> if group_id is same.
> I am getting duplicate data in my consumer side.
> Please help in this.
>
> Regards,
> Sunil.
>
>
>
Re: Duplicate records on consumer side.
Posted by Ricardo Ferreira <ri...@riferrei.com>.
Sunil,
Kafka ensures that each partition is read by one given thread only from
a consumer group. Since your topic has three partitions, the rationale
is that at least three threads from the consumer group will be properly
served.
However, though your calculation is correct (3 instances, each one of 3
threads will total 9 threads) the design and usage is incorrect. As
stated above only three threads will be served and the remaining six
other threads will be kept waiting -- likely to starve if all of them
belong to the consumer group that the other three threads belong.
Please note that the `client-id` property has nothing to do with this
thread group management. This property is used internally by Kafka to
correlate events sent from the same machine in order to better adjust
quota management. So the only property taking place where is the
`group-id` in the matter of partition assignment.
Regarding duplicated data, this is another problem that would require a
better investigation of your topology, how Logstash connect to Kafka,
and how the code is implemented.
Thanks,
-- Ricardo
On 6/19/20 7:13 AM, sunil chaudhari wrote:
> Hi,
> I am using kafka as a broker in my event data pipeline.
> Filebeat as producer
> Logstash as consumer.
>
>
> Filebeat simply pushes to Kafka.
> Logstash has 3 instances.
> Each instance has a consumer group say consumer_mytopic which reads from
> mytopic.
>
> mytopic has 3 partitions and 2 replica.
>
> As per my understanding, each consumer group can have threads equal to
> number of partitions so i kept 3 threads for each consumer.
>
> Here I am considering one logstash instance as a one consumer which is part
> of consumer_mytopic.
> Similar consumer running on some other server which has group_id same as
> above. Note that 3 servers has client Id different so that they wont read
> duplicate data.
> So 3 instances of logstash running with group_id as consumer_mytopic with 3
> threads each, and diff client id. Means 9 threads total.
>
> My understanding is each consumer(instance) can read with 3 threads from 3
> partitions. And another consumer with 3 threads.
>
> Is this good design?
> Can it create duplicate?
> This thread and partitions trade-off is related to client_id or Consumer
> group Id?
> I hope because of diff client_id 3 instances wont read duplicate data even
> if group_id is same.
> I am getting duplicate data in my consumer side.
> Please help in this.
>
> Regards,
> Sunil.
>