You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Tony Wei <to...@gmail.com> on 2017/11/15 09:08:29 UTC

kafka consumer client seems not auto commit offset

Hi Gordon,

When I used FlinkKafkaConsumer010 to consume log from Kafka, I found that
if I used `setStartFromLatest()` the kafka consumer api didn't auto commit
offsets back to consumer group, but if I used `setStartFromGroupOffsets()`
it worked fine.

I am sure that the configuration for Kafka has `auto.commit.interval.ms =
5000` and `enable.auto.commit = true` and I didn't enable checkpointing.

All the difference is only the change from `setStartFromGroupOffsets()` to
`setStartFromLatest()`, but the auto commit mechanism just stopped working.

My Flink cluster version is 1.3.2.
My Kafka cluster version is 0.10.2.1.
My Zookeeper version for Kafka is 3.4.6-1569965, built on 02/20/2014 09:09
GMT.
My Kafka connector library is "org.apache.flink" %
"flink-connector-kafka-0.10_2.10" % "1.3.2"

Thanks for your help in advance.

Best Regards,
Tony Wei

Re: kafka consumer client seems not auto commit offset

Posted by Ashish Pokharel <as...@yahoo.com>.
Gordon, Tony,

Thought I would chime in real quick as I have tested this a few different ways in the last month (not sure if this will be helpful but thought I’d throw it out there). I actually haven’t noticed issue auto committing with any of those configs using Kafka property auto.offset.reset instead of using those methods. However, I have come across one interesting scenario - even when Checkpointing is disabled BUT if App is started from a Savepoint, auto commit doesn’t seem to work. I am not sure if Tony has the same scenario. I assumed that starting from Savepoint sort of expects Checkpointing to be enabled to commit offsets similar to how it behaves when Checkpointing is enabled. At this point, I am generating a  random UID for my Kafka consumer (as I really don’t want to enable checkpointing — not really needed in my use case and want to save on some resources) but I do have some really slow moving states which I’d like save on app shutdown etc.

Thanks, Ashish

> On Nov 15, 2017, at 4:22 AM, Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:
> 
> Hi Tony,
> 
> Thanks for the report. At first glance of the description, what you described doesn’t seem to match the expected behavior.
> I’ll spend some time later today to check this out.
> 
> Cheers,
> Gordon
> 
> 
> On 15 November 2017 at 5:08:34 PM, Tony Wei (tony19920430@gmail.com <ma...@gmail.com>) wrote:
> 
>> Hi Gordon,
>> 
>> When I used FlinkKafkaConsumer010 to consume log from Kafka, I found that if I used `setStartFromLatest()` the kafka consumer api didn't auto commit offsets back to consumer group, but if I used `setStartFromGroupOffsets()` it worked fine.
>> 
>> I am sure that the configuration for Kafka has `auto.commit.interval.ms <http://auto.commit.interval.ms/> = 5000` and `enable.auto.commit = true` and I didn't enable checkpointing.
>> 
>> All the difference is only the change from `setStartFromGroupOffsets()` to `setStartFromLatest()`, but the auto commit mechanism just stopped working.
>> 
>> My Flink cluster version is 1.3.2.
>> My Kafka cluster version is 0.10.2.1.
>> My Zookeeper version for Kafka is 3.4.6-1569965, built on 02/20/2014 09:09 GMT.
>> My Kafka connector library is "org.apache.flink" % "flink-connector-kafka-0.10_2.10" % "1.3.2"
>> 
>> Thanks for your help in advance.
>> 
>> Best Regards,
>> Tony Wei


Re: kafka consumer client seems not auto commit offset

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Tony,

Thanks for the report. At first glance of the description, what you described doesn’t seem to match the expected behavior.
I’ll spend some time later today to check this out.

Cheers,
Gordon


On 15 November 2017 at 5:08:34 PM, Tony Wei (tony19920430@gmail.com) wrote:

Hi Gordon,

When I used FlinkKafkaConsumer010 to consume log from Kafka, I found that if I used `setStartFromLatest()` the kafka consumer api didn't auto commit offsets back to consumer group, but if I used `setStartFromGroupOffsets()` it worked fine.

I am sure that the configuration for Kafka has `auto.commit.interval.ms = 5000` and `enable.auto.commit = true` and I didn't enable checkpointing.

All the difference is only the change from `setStartFromGroupOffsets()` to `setStartFromLatest()`, but the auto commit mechanism just stopped working.

My Flink cluster version is 1.3.2.
My Kafka cluster version is 0.10.2.1.
My Zookeeper version for Kafka is 3.4.6-1569965, built on 02/20/2014 09:09 GMT.
My Kafka connector library is "org.apache.flink" % "flink-connector-kafka-0.10_2.10" % "1.3.2"

Thanks for your help in advance.

Best Regards,
Tony Wei