You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Pushkar Deole <pd...@gmail.com> on 2022/09/14 07:08:21 UTC

Kafka consumer ensure offsets committed before partition rebalance to avoid duplicates

Hi All,

I am hosting kafka consumers inside microservice hosted as kubernetes pods,
3 consumers in a consumer group.
There is a requirement to add auto-scaling where there will be a single pod
which will be auto-scaled out or scaled-in based on the load on
microservice.
So, 1 pod can be scaled out to 2 or 3 pods, and similarly 3 pods can be
scaled down to 2 or 1 pod.

Currently, I am using enabled.auto.commit set to 'true' in the consumers
and during scale out or scale-in, i want to commit offset of polled and
processed records so duplicates won't occur.
I have narrowed the problem to 2 scenarios:

1. During scale-down operation, I am adding a shutdown hook to the Java
Runtime, and calling close on the consumer. As per kafka docs, close
provides 30 sec to commit current offsets if auto.commit is enabled: so, i
assume that it will process the current batch of polled records within 30
sec timeout before committing offsets and then close the consumer. Is my
understanding correct?

public void close()

Close the consumer, waiting for up to the default timeout of 30 seconds for
any needed cleanup. If auto-commit is enabled, this will commit the current
offsets if possible within the default timeout. See close(Duration)
<https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#close-java.time.Duration->
for
details. Note that wakeup()
<https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#wakeup-->
cannot
be used to interrupt close.

2. During scale out operation, new pod (consumer) will be added to the
consumer group, so partitions of existing consumers will be rebalanced to
new consumer. In this case, I want to ensure that the current batch of
records polled and being processed by the consumer is processed and offsets
are committed before partition rebalance happens to new consumer.
How can I ensure this with auto-commit enabled?

Re: Kafka consumer ensure offsets committed before partition rebalance to avoid duplicates

Posted by Pushkar Deole <pd...@gmail.com>.
Hi Luke,

Thanks for the details... so from explanation above, it seems that both of
these scenarios, I won't be able to avoid duplicates processing, which is
main purpose that I was looking to achieve

scenario 1: consumer shuts down, and doesn't commit offsets of
already polled and processed batch of records (since auto.commit enabled
which would commit on next poll which won't occur when closing consumer).
This would give rise duplicate processing of that batch when partition is
rebalanced to other consumer pod

scenario 2: CooperativeStickyAssignor keeps working on partition before
rebalancing which means again same thing i.e. consumer1 has polled and
processed some records which are not yet committed before rebalancing and
when partition moves over to next consumer, it can process those records
again

On Wed, Sep 21, 2022 at 7:32 AM Luke Chen <sh...@gmail.com> wrote:

> Hi
>
> 1. I was under impression, from documentation, that close method waits for
> 30 seconds to complete processing of any in-flight events and then commits
> offsets of last poll. Isn't that true? what does timeout of 30 seconds
> mean?
>
> -> 30 seconds timeout is to have a buffer for graceful closing, ex: commit
> offsets, leave groups,...
> It won't complete processing any in-flight "fetch" events during closing.
>
> 2. How does CoperativeStickyAssignor solve my problem when partitions move
> out to newly added consumer pod. i.e. consumer1 has polled 100 records from
> partition1 and is midway processing those i.e. 50 completed, 50 remaining
> and new consumer is added so partition1 has to move to new consumer2. Since
> auto.commit is enabled, offsets of all 100 polled records will be committed
> only during next poll. 2. How does CoperativeStickyAssignor solve my
> problem when partitions move
> out to newly added consumer pod. i.e. consumer1 has polled 100 records from
> partition1 and is midway processing those i.e. 50 completed, 50 remaining
> and new consumer is added so partition1 has to move to new consumer2. Since
> auto.commit is enabled, offsets of all 100 polled records will be committed
> only during next poll.
>
> -> You're right about the process.
>
> So how does CooperativeStickyAssignore help here to
> wait for process of 100 records and commit their offsets before moving the
> partition to new consumer? Looks like i am missing something
> Looks like i am missing something
>
> -> CooperativeStickyAssignore does the same thing to it, except it will
> keep all the partitions "during rebalancing".
> So, the issue is:
> In eagar protocol (ex: RangeAssignor)
> consumer prepare rebalancing -> commit offsets -> revoke all owned
> partitions -> rebalancing -> received new assignment -> start fetch data
> In cooperative protocol (ex: CooperativeStickyAssignore)
> consumer prepare rebalancing -> commit offsets (but no revoke) ->
> rebalancing -> received new assignment -> revoke partitions not owned
> anymore
>
> So you can see, in cooperative protocol, since it didn't revoke any
> partition before rebalancing, it might fetch more data after offset
> commits.
>
> Hope that's clear
> Luke
>
> On Tue, Sep 20, 2022 at 9:36 PM Pushkar Deole <pd...@gmail.com>
> wrote:
>
> > Thanks Luke..
> >
> > 1. I was under impression, from documentation, that close method waits
> for
> > 30 seconds to complete processing of any in-flight events and then
> commits
> > offsets of last poll. Isn't that true? what does timeout of 30 seconds
> > mean?
> >
> > 2. How does CoperativeStickyAssignor solve my problem when partitions
> move
> > out to newly added consumer pod. i.e. consumer1 has polled 100 records
> from
> > partition1 and is midway processing those i.e. 50 completed, 50 remaining
> > and new consumer is added so partition1 has to move to new consumer2.
> Since
> > auto.commit is enabled, offsets of all 100 polled records will be
> committed
> > only during next poll. So how does CooperativeStickyAssignore help here
> to
> > wait for process of 100 records and commit their offsets before moving
> the
> > partition to new consumer? Looks like i am missing something
> >
> > On Fri, Sep 16, 2022 at 7:59 AM Luke Chen <sh...@gmail.com> wrote:
> >
> > > Hi Pushkar,
> > >
> > > Here's the answer to your questions:
> > >
> > > > 1. During scale-down operation, I am adding a shutdown hook to the
> Java
> > > Runtime, and calling close on the consumer. As per kafka docs, close
> > > provides 30 sec to commit current offsets if auto.commit is enabled:
> so,
> > i
> > > assume that it will process the current batch of polled records within
> 30
> > > sec timeout before committing offsets and then close the consumer. Is
> my
> > > understanding correct?
> > >
> > > No, close() method is only doing some cleanup and offset commit if
> > needed.
> > > It won't care if the polled records are processed or not.
> > > So, to be clear, the 30 seconds is for consumer to do:
> > > (1) commit offset if auto.commit is enabled (2) leave consumer group
> (3)
> > > other cleanup
> > >
> > > > 2. During scale out operation, new pod (consumer) will be added to
> the
> > > consumer group, so partitions of existing consumers will be rebalanced
> to
> > > new consumer. In this case, I want to ensure that the current batch of
> > > records polled and being processed by the consumer is processed and
> > offsets
> > > are committed before partition rebalance happens to new consumer.
> > > How can I ensure this with auto-commit enabled?
> > >
> > > It depends on which version of Kafka you're running, and which
> > > `partition.assignment.strategy` you are setting.
> > > In Kafka v3.2.1, we found a bug that it'll have chance to process
> > duplicate
> > > records during rebalance: KAFKA-14196
> > > <https://issues.apache.org/jira/browse/KAFKA-14196>
> > > So, assuming you're using default `partition.assignment.strategy`
> > setting,
> > > and not in v3.2.1, we can ensure it will not have duplicated
> consumption.
> > > If you set the `partition.assignment.strategy` to
> > > cooperativeStickyAssignor, there's a bug that we're still working on:
> > > KAFKA-14224 <https://issues.apache.org/jira/browse/KAFKA-14224>
> > >
> > > Thank you.
> > > Luke
> > >
> > > On Wed, Sep 14, 2022 at 3:09 PM Pushkar Deole <pd...@gmail.com>
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I am hosting kafka consumers inside microservice hosted as kubernetes
> > > pods,
> > > > 3 consumers in a consumer group.
> > > > There is a requirement to add auto-scaling where there will be a
> single
> > > pod
> > > > which will be auto-scaled out or scaled-in based on the load on
> > > > microservice.
> > > > So, 1 pod can be scaled out to 2 or 3 pods, and similarly 3 pods can
> be
> > > > scaled down to 2 or 1 pod.
> > > >
> > > > Currently, I am using enabled.auto.commit set to 'true' in the
> > consumers
> > > > and during scale out or scale-in, i want to commit offset of polled
> and
> > > > processed records so duplicates won't occur.
> > > > I have narrowed the problem to 2 scenarios:
> > > >
> > > > 1. During scale-down operation, I am adding a shutdown hook to the
> Java
> > > > Runtime, and calling close on the consumer. As per kafka docs, close
> > > > provides 30 sec to commit current offsets if auto.commit is enabled:
> > so,
> > > i
> > > > assume that it will process the current batch of polled records
> within
> > 30
> > > > sec timeout before committing offsets and then close the consumer. Is
> > my
> > > > understanding correct?
> > > >
> > > > public void close()
> > > >
> > > > Close the consumer, waiting for up to the default timeout of 30
> seconds
> > > for
> > > > any needed cleanup. If auto-commit is enabled, this will commit the
> > > current
> > > > offsets if possible within the default timeout. See close(Duration)
> > > > <
> > > >
> > >
> >
> https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#close-java.time.Duration-
> > > > >
> > > > for
> > > > details. Note that wakeup()
> > > > <
> > > >
> > >
> >
> https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#wakeup--
> > > > >
> > > > cannot
> > > > be used to interrupt close.
> > > >
> > > > 2. During scale out operation, new pod (consumer) will be added to
> the
> > > > consumer group, so partitions of existing consumers will be
> rebalanced
> > to
> > > > new consumer. In this case, I want to ensure that the current batch
> of
> > > > records polled and being processed by the consumer is processed and
> > > offsets
> > > > are committed before partition rebalance happens to new consumer.
> > > > How can I ensure this with auto-commit enabled?
> > > >
> > >
> >
>

Re: Kafka consumer ensure offsets committed before partition rebalance to avoid duplicates

Posted by Luke Chen <sh...@gmail.com>.
Hi

1. I was under impression, from documentation, that close method waits for
30 seconds to complete processing of any in-flight events and then commits
offsets of last poll. Isn't that true? what does timeout of 30 seconds mean?

-> 30 seconds timeout is to have a buffer for graceful closing, ex: commit
offsets, leave groups,...
It won't complete processing any in-flight "fetch" events during closing.

2. How does CoperativeStickyAssignor solve my problem when partitions move
out to newly added consumer pod. i.e. consumer1 has polled 100 records from
partition1 and is midway processing those i.e. 50 completed, 50 remaining
and new consumer is added so partition1 has to move to new consumer2. Since
auto.commit is enabled, offsets of all 100 polled records will be committed
only during next poll. 2. How does CoperativeStickyAssignor solve my
problem when partitions move
out to newly added consumer pod. i.e. consumer1 has polled 100 records from
partition1 and is midway processing those i.e. 50 completed, 50 remaining
and new consumer is added so partition1 has to move to new consumer2. Since
auto.commit is enabled, offsets of all 100 polled records will be committed
only during next poll.

-> You're right about the process.

So how does CooperativeStickyAssignore help here to
wait for process of 100 records and commit their offsets before moving the
partition to new consumer? Looks like i am missing something
Looks like i am missing something

-> CooperativeStickyAssignore does the same thing to it, except it will
keep all the partitions "during rebalancing".
So, the issue is:
In eagar protocol (ex: RangeAssignor)
consumer prepare rebalancing -> commit offsets -> revoke all owned
partitions -> rebalancing -> received new assignment -> start fetch data
In cooperative protocol (ex: CooperativeStickyAssignore)
consumer prepare rebalancing -> commit offsets (but no revoke) ->
rebalancing -> received new assignment -> revoke partitions not owned
anymore

So you can see, in cooperative protocol, since it didn't revoke any
partition before rebalancing, it might fetch more data after offset commits.

Hope that's clear
Luke

On Tue, Sep 20, 2022 at 9:36 PM Pushkar Deole <pd...@gmail.com> wrote:

> Thanks Luke..
>
> 1. I was under impression, from documentation, that close method waits for
> 30 seconds to complete processing of any in-flight events and then commits
> offsets of last poll. Isn't that true? what does timeout of 30 seconds
> mean?
>
> 2. How does CoperativeStickyAssignor solve my problem when partitions move
> out to newly added consumer pod. i.e. consumer1 has polled 100 records from
> partition1 and is midway processing those i.e. 50 completed, 50 remaining
> and new consumer is added so partition1 has to move to new consumer2. Since
> auto.commit is enabled, offsets of all 100 polled records will be committed
> only during next poll. So how does CooperativeStickyAssignore help here to
> wait for process of 100 records and commit their offsets before moving the
> partition to new consumer? Looks like i am missing something
>
> On Fri, Sep 16, 2022 at 7:59 AM Luke Chen <sh...@gmail.com> wrote:
>
> > Hi Pushkar,
> >
> > Here's the answer to your questions:
> >
> > > 1. During scale-down operation, I am adding a shutdown hook to the Java
> > Runtime, and calling close on the consumer. As per kafka docs, close
> > provides 30 sec to commit current offsets if auto.commit is enabled: so,
> i
> > assume that it will process the current batch of polled records within 30
> > sec timeout before committing offsets and then close the consumer. Is my
> > understanding correct?
> >
> > No, close() method is only doing some cleanup and offset commit if
> needed.
> > It won't care if the polled records are processed or not.
> > So, to be clear, the 30 seconds is for consumer to do:
> > (1) commit offset if auto.commit is enabled (2) leave consumer group (3)
> > other cleanup
> >
> > > 2. During scale out operation, new pod (consumer) will be added to the
> > consumer group, so partitions of existing consumers will be rebalanced to
> > new consumer. In this case, I want to ensure that the current batch of
> > records polled and being processed by the consumer is processed and
> offsets
> > are committed before partition rebalance happens to new consumer.
> > How can I ensure this with auto-commit enabled?
> >
> > It depends on which version of Kafka you're running, and which
> > `partition.assignment.strategy` you are setting.
> > In Kafka v3.2.1, we found a bug that it'll have chance to process
> duplicate
> > records during rebalance: KAFKA-14196
> > <https://issues.apache.org/jira/browse/KAFKA-14196>
> > So, assuming you're using default `partition.assignment.strategy`
> setting,
> > and not in v3.2.1, we can ensure it will not have duplicated consumption.
> > If you set the `partition.assignment.strategy` to
> > cooperativeStickyAssignor, there's a bug that we're still working on:
> > KAFKA-14224 <https://issues.apache.org/jira/browse/KAFKA-14224>
> >
> > Thank you.
> > Luke
> >
> > On Wed, Sep 14, 2022 at 3:09 PM Pushkar Deole <pd...@gmail.com>
> > wrote:
> >
> > > Hi All,
> > >
> > > I am hosting kafka consumers inside microservice hosted as kubernetes
> > pods,
> > > 3 consumers in a consumer group.
> > > There is a requirement to add auto-scaling where there will be a single
> > pod
> > > which will be auto-scaled out or scaled-in based on the load on
> > > microservice.
> > > So, 1 pod can be scaled out to 2 or 3 pods, and similarly 3 pods can be
> > > scaled down to 2 or 1 pod.
> > >
> > > Currently, I am using enabled.auto.commit set to 'true' in the
> consumers
> > > and during scale out or scale-in, i want to commit offset of polled and
> > > processed records so duplicates won't occur.
> > > I have narrowed the problem to 2 scenarios:
> > >
> > > 1. During scale-down operation, I am adding a shutdown hook to the Java
> > > Runtime, and calling close on the consumer. As per kafka docs, close
> > > provides 30 sec to commit current offsets if auto.commit is enabled:
> so,
> > i
> > > assume that it will process the current batch of polled records within
> 30
> > > sec timeout before committing offsets and then close the consumer. Is
> my
> > > understanding correct?
> > >
> > > public void close()
> > >
> > > Close the consumer, waiting for up to the default timeout of 30 seconds
> > for
> > > any needed cleanup. If auto-commit is enabled, this will commit the
> > current
> > > offsets if possible within the default timeout. See close(Duration)
> > > <
> > >
> >
> https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#close-java.time.Duration-
> > > >
> > > for
> > > details. Note that wakeup()
> > > <
> > >
> >
> https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#wakeup--
> > > >
> > > cannot
> > > be used to interrupt close.
> > >
> > > 2. During scale out operation, new pod (consumer) will be added to the
> > > consumer group, so partitions of existing consumers will be rebalanced
> to
> > > new consumer. In this case, I want to ensure that the current batch of
> > > records polled and being processed by the consumer is processed and
> > offsets
> > > are committed before partition rebalance happens to new consumer.
> > > How can I ensure this with auto-commit enabled?
> > >
> >
>

Re: Kafka consumer ensure offsets committed before partition rebalance to avoid duplicates

Posted by Pushkar Deole <pd...@gmail.com>.
Thanks Luke..

1. I was under impression, from documentation, that close method waits for
30 seconds to complete processing of any in-flight events and then commits
offsets of last poll. Isn't that true? what does timeout of 30 seconds mean?

2. How does CoperativeStickyAssignor solve my problem when partitions move
out to newly added consumer pod. i.e. consumer1 has polled 100 records from
partition1 and is midway processing those i.e. 50 completed, 50 remaining
and new consumer is added so partition1 has to move to new consumer2. Since
auto.commit is enabled, offsets of all 100 polled records will be committed
only during next poll. So how does CooperativeStickyAssignore help here to
wait for process of 100 records and commit their offsets before moving the
partition to new consumer? Looks like i am missing something

On Fri, Sep 16, 2022 at 7:59 AM Luke Chen <sh...@gmail.com> wrote:

> Hi Pushkar,
>
> Here's the answer to your questions:
>
> > 1. During scale-down operation, I am adding a shutdown hook to the Java
> Runtime, and calling close on the consumer. As per kafka docs, close
> provides 30 sec to commit current offsets if auto.commit is enabled: so, i
> assume that it will process the current batch of polled records within 30
> sec timeout before committing offsets and then close the consumer. Is my
> understanding correct?
>
> No, close() method is only doing some cleanup and offset commit if needed.
> It won't care if the polled records are processed or not.
> So, to be clear, the 30 seconds is for consumer to do:
> (1) commit offset if auto.commit is enabled (2) leave consumer group (3)
> other cleanup
>
> > 2. During scale out operation, new pod (consumer) will be added to the
> consumer group, so partitions of existing consumers will be rebalanced to
> new consumer. In this case, I want to ensure that the current batch of
> records polled and being processed by the consumer is processed and offsets
> are committed before partition rebalance happens to new consumer.
> How can I ensure this with auto-commit enabled?
>
> It depends on which version of Kafka you're running, and which
> `partition.assignment.strategy` you are setting.
> In Kafka v3.2.1, we found a bug that it'll have chance to process duplicate
> records during rebalance: KAFKA-14196
> <https://issues.apache.org/jira/browse/KAFKA-14196>
> So, assuming you're using default `partition.assignment.strategy` setting,
> and not in v3.2.1, we can ensure it will not have duplicated consumption.
> If you set the `partition.assignment.strategy` to
> cooperativeStickyAssignor, there's a bug that we're still working on:
> KAFKA-14224 <https://issues.apache.org/jira/browse/KAFKA-14224>
>
> Thank you.
> Luke
>
> On Wed, Sep 14, 2022 at 3:09 PM Pushkar Deole <pd...@gmail.com>
> wrote:
>
> > Hi All,
> >
> > I am hosting kafka consumers inside microservice hosted as kubernetes
> pods,
> > 3 consumers in a consumer group.
> > There is a requirement to add auto-scaling where there will be a single
> pod
> > which will be auto-scaled out or scaled-in based on the load on
> > microservice.
> > So, 1 pod can be scaled out to 2 or 3 pods, and similarly 3 pods can be
> > scaled down to 2 or 1 pod.
> >
> > Currently, I am using enabled.auto.commit set to 'true' in the consumers
> > and during scale out or scale-in, i want to commit offset of polled and
> > processed records so duplicates won't occur.
> > I have narrowed the problem to 2 scenarios:
> >
> > 1. During scale-down operation, I am adding a shutdown hook to the Java
> > Runtime, and calling close on the consumer. As per kafka docs, close
> > provides 30 sec to commit current offsets if auto.commit is enabled: so,
> i
> > assume that it will process the current batch of polled records within 30
> > sec timeout before committing offsets and then close the consumer. Is my
> > understanding correct?
> >
> > public void close()
> >
> > Close the consumer, waiting for up to the default timeout of 30 seconds
> for
> > any needed cleanup. If auto-commit is enabled, this will commit the
> current
> > offsets if possible within the default timeout. See close(Duration)
> > <
> >
> https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#close-java.time.Duration-
> > >
> > for
> > details. Note that wakeup()
> > <
> >
> https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#wakeup--
> > >
> > cannot
> > be used to interrupt close.
> >
> > 2. During scale out operation, new pod (consumer) will be added to the
> > consumer group, so partitions of existing consumers will be rebalanced to
> > new consumer. In this case, I want to ensure that the current batch of
> > records polled and being processed by the consumer is processed and
> offsets
> > are committed before partition rebalance happens to new consumer.
> > How can I ensure this with auto-commit enabled?
> >
>

Re: Kafka consumer ensure offsets committed before partition rebalance to avoid duplicates

Posted by Luke Chen <sh...@gmail.com>.
Hi Pushkar,

Here's the answer to your questions:

> 1. During scale-down operation, I am adding a shutdown hook to the Java
Runtime, and calling close on the consumer. As per kafka docs, close
provides 30 sec to commit current offsets if auto.commit is enabled: so, i
assume that it will process the current batch of polled records within 30
sec timeout before committing offsets and then close the consumer. Is my
understanding correct?

No, close() method is only doing some cleanup and offset commit if needed.
It won't care if the polled records are processed or not.
So, to be clear, the 30 seconds is for consumer to do:
(1) commit offset if auto.commit is enabled (2) leave consumer group (3)
other cleanup

> 2. During scale out operation, new pod (consumer) will be added to the
consumer group, so partitions of existing consumers will be rebalanced to
new consumer. In this case, I want to ensure that the current batch of
records polled and being processed by the consumer is processed and offsets
are committed before partition rebalance happens to new consumer.
How can I ensure this with auto-commit enabled?

It depends on which version of Kafka you're running, and which
`partition.assignment.strategy` you are setting.
In Kafka v3.2.1, we found a bug that it'll have chance to process duplicate
records during rebalance: KAFKA-14196
<https://issues.apache.org/jira/browse/KAFKA-14196>
So, assuming you're using default `partition.assignment.strategy` setting,
and not in v3.2.1, we can ensure it will not have duplicated consumption.
If you set the `partition.assignment.strategy` to
cooperativeStickyAssignor, there's a bug that we're still working on:
KAFKA-14224 <https://issues.apache.org/jira/browse/KAFKA-14224>

Thank you.
Luke

On Wed, Sep 14, 2022 at 3:09 PM Pushkar Deole <pd...@gmail.com> wrote:

> Hi All,
>
> I am hosting kafka consumers inside microservice hosted as kubernetes pods,
> 3 consumers in a consumer group.
> There is a requirement to add auto-scaling where there will be a single pod
> which will be auto-scaled out or scaled-in based on the load on
> microservice.
> So, 1 pod can be scaled out to 2 or 3 pods, and similarly 3 pods can be
> scaled down to 2 or 1 pod.
>
> Currently, I am using enabled.auto.commit set to 'true' in the consumers
> and during scale out or scale-in, i want to commit offset of polled and
> processed records so duplicates won't occur.
> I have narrowed the problem to 2 scenarios:
>
> 1. During scale-down operation, I am adding a shutdown hook to the Java
> Runtime, and calling close on the consumer. As per kafka docs, close
> provides 30 sec to commit current offsets if auto.commit is enabled: so, i
> assume that it will process the current batch of polled records within 30
> sec timeout before committing offsets and then close the consumer. Is my
> understanding correct?
>
> public void close()
>
> Close the consumer, waiting for up to the default timeout of 30 seconds for
> any needed cleanup. If auto-commit is enabled, this will commit the current
> offsets if possible within the default timeout. See close(Duration)
> <
> https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#close-java.time.Duration-
> >
> for
> details. Note that wakeup()
> <
> https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#wakeup--
> >
> cannot
> be used to interrupt close.
>
> 2. During scale out operation, new pod (consumer) will be added to the
> consumer group, so partitions of existing consumers will be rebalanced to
> new consumer. In this case, I want to ensure that the current batch of
> records polled and being processed by the consumer is processed and offsets
> are committed before partition rebalance happens to new consumer.
> How can I ensure this with auto-commit enabled?
>