You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Edward <eg...@hotmail.com> on 2018/03/05 16:11:55 UTC

Kafka offset auto-commit stops after timeout

We have noticed that the Kafka offset auto-commit functionality seems to stop
working after it encounters a timeout. It appears in the logs like this:

2018-03-04 07:02:54,779 INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
the coordinator kafka06:9092 (id: 2147483641 rack: null) dead for group
consumergroup01
2018-03-04 07:02:54,780 WARN 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  -
Auto-commit of offsets {topic01-24=OffsetAndMetadata{offset=153237895,
metadata=''}} failed for group consumergroup01: Offset commit failed with a
retriable exception. You should retry committing offsets. The underlying
error was: The request timed out.

After this message is logged, no more offsets are committed by the job until
it is restarted (and if the flink process ends abnormally, the offsets never
get committed).

This is using Flink 1.4.0 which uses kafka-clients 0.11.0.2. We are using
the default kafka client settings for enable.auto.commit (true) and
auto.commit.interval.ms (5000). We are not using Flink checkpointing, so the
kafka client offset commit mode is OffsetCommitMode.KAFKA_PERIODIC (not
OffsetCommitMode.ON_CHECKPOINTS).

I'm wondering if others have encountered this?

And if so, does enabling checkpointing resolve the issue, because
Kafka09Fetcher.doCommitInternalOffsetsToKafka is called from the Flink code?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Kafka offset auto-commit stops after timeout

Posted by Anil <an...@gmail.com>.
I had the same issue and enabling checkpoint seems to solve the problem. Can
you please explain how does enabling checkpoint fixes the issue. Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Kafka offset auto-commit stops after timeout

Posted by Edward <eg...@hotmail.com>.
Thanks for the reply, Nico.

I've been testing with OffsetCommitMode.ON_CHECKPOINTS, and I can confirm
that this fixes the issue -- even if a single commit time out when
communicating with Kafka, subsequent offset commits are still successful.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Kafka offset auto-commit stops after timeout

Posted by Nico Kruber <ni...@data-artisans.com>.
Hi Edward,
looking through the Kafka code, I do see a path where they deliberately
do not want recursive retries, i.e. if the coordinator is unknown. It
seems like you are getting into this scenario.

I'm no expert on Kafka and therefore I'm not sure on the implications or
ways to circumvent/fix this, maybe the Kafka folks can help you with
this on their mailing list or Gordon (cc'd) knows - although this seems
Flink-unrelated.

Regarding the use of OffsetCommitMode.ON_CHECKPOINTS: I looked at our
code and with this (@Gordon, please correct me if I'm wrong), we will
commit the offsets ourselves and will try to commit every time a
checkpoint completes. In case of a failure in the last commit, we will
simply commit the new one instead with the next checkpoint.


Nico

On 05/03/18 17:11, Edward wrote:
> We have noticed that the Kafka offset auto-commit functionality seems to stop
> working after it encounters a timeout. It appears in the logs like this:
> 
> 2018-03-04 07:02:54,779 INFO 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
> the coordinator kafka06:9092 (id: 2147483641 rack: null) dead for group
> consumergroup01
> 2018-03-04 07:02:54,780 WARN 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  -
> Auto-commit of offsets {topic01-24=OffsetAndMetadata{offset=153237895,
> metadata=''}} failed for group consumergroup01: Offset commit failed with a
> retriable exception. You should retry committing offsets. The underlying
> error was: The request timed out.
> 
> After this message is logged, no more offsets are committed by the job until
> it is restarted (and if the flink process ends abnormally, the offsets never
> get committed).
> 
> This is using Flink 1.4.0 which uses kafka-clients 0.11.0.2. We are using
> the default kafka client settings for enable.auto.commit (true) and
> auto.commit.interval.ms (5000). We are not using Flink checkpointing, so the
> kafka client offset commit mode is OffsetCommitMode.KAFKA_PERIODIC (not
> OffsetCommitMode.ON_CHECKPOINTS).
> 
> I'm wondering if others have encountered this?
> 
> And if so, does enabling checkpointing resolve the issue, because
> Kafka09Fetcher.doCommitInternalOffsetsToKafka is called from the Flink code?
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>