You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rex Fenley <Re...@remind101.com> on 2020/10/13 05:43:42 UTC

Flink Kafka offsets

Hello,

I've been trying to configure the offset start position for a flink kafka
consumer. when there is no committed offset, to always start at the
beginning. It seems like the typical way to do this would be setting
auto.offset.reset=earliest however, I don't see that configuration property
in the documentation.
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html

However, I do see scan.startup.mode = earliest-offset, but from the docs it
sounds like this would mean it would never commit an offset and flink would
always start consuming from the beginning of the kafka stream, which is not
what I want.

Is this the case or am I misunderstanding? How can I get the behavior that
I wish to see, where committed offsets are respected, but no offset means
start at the beginning of the kafka log stream?

Thanks!
-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Flink Kafka offsets

Posted by Rex Fenley <Re...@remind101.com>.
Thanks for the explanation, this was all super helpful.

On Tue, Oct 13, 2020 at 2:16 AM Dawid Wysakowicz <dw...@apache.org>
wrote:

> Hey Rex,
>
> I agree the documentation might be slightly misleading. To get the full
> picture of that configuration I'd suggest having a look at the DataStream
> Kafka connector page[1]. The Table connector is just a wrapper around the
> DataStream one.
>
> Let me also try to clarify it a bit more. In case of Flink there are two
> places where the offsets are committed:
>
> 1) Flink's checkpoint/savepoint. Those always take the highest priority.
> Therefore e.g. when the job is restarted because of a failure, it will use
> offsets that were stored in the last successful checkpoint.
>
> 2) Upon a checkpoint Flink can also write the offsets back to Kafka. This
> is enabled by default in DataStream API and is enabled in Table API if you
> provide properties.group.id[2]. This works only if you have checkpointing
> enabled. If you disable checkpoints, you can still auto commit offsets from
> the underlying Kafka consumer via properties.enable.auto.commit /
> properties.auto.commit.interval.ms (btw, you can pass any Kafka options
> with a properties.* prefix).
>
> Having explained that, if you set scan.startup-mode and you do not restore
> from a checkpoint/savepoint:
>
> * group-offsets -> it will start consuming from the committed offset in
> Kafka for the configured group.id, if there are none it should use
> properties.auto.offset.reset option
>
> * earliest-offset -> it will ignore committed offsets in Kafka and start
> from earliest-offsets.
>
> Hope it helps.
>
> Best,
>
> Dawid
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#properties-group-id
> On 13/10/2020 07:43, Rex Fenley wrote:
>
> Hello,
>
> I've been trying to configure the offset start position for a flink kafka
> consumer. when there is no committed offset, to always start at the
> beginning. It seems like the typical way to do this would be setting
> auto.offset.reset=earliest however, I don't see that configuration
> property in the documentation.
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
>
> However, I do see scan.startup.mode = earliest-offset, but from the docs
> it sounds like this would mean it would never commit an offset and flink
> would always start consuming from the beginning of the kafka stream, which
> is not what I want.
>
> Is this the case or am I misunderstanding? How can I get the behavior that
> I wish to see, where committed offsets are respected, but no offset means
> start at the beginning of the kafka log stream?
>
> Thanks!
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Flink Kafka offsets

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hey Rex,

I agree the documentation might be slightly misleading. To get the full
picture of that configuration I'd suggest having a look at the
DataStream Kafka connector page[1]. The Table connector is just a
wrapper around the DataStream one.

Let me also try to clarify it a bit more. In case of Flink there are two
places where the offsets are committed:

1) Flink's checkpoint/savepoint. Those always take the highest priority.
Therefore e.g. when the job is restarted because of a failure, it will
use offsets that were stored in the last successful checkpoint.

2) Upon a checkpoint Flink can also write the offsets back to Kafka.
This is enabled by default in DataStream API and is enabled in Table API
if you provide properties.group.id[2]. This works only if you have
checkpointing enabled. If you disable checkpoints, you can still auto
commit offsets from the underlying Kafka consumer via
properties.enable.auto.commit / properties.auto.commit.interval.ms (btw,
you can pass any Kafka options with a properties.* prefix).

Having explained that, if you set scan.startup-mode and you do not
restore from a checkpoint/savepoint:

* group-offsets -> it will start consuming from the committed offset in
Kafka for the configured group.id, if there are none it should use
properties.auto.offset.reset option

* earliest-offset -> it will ignore committed offsets in Kafka and start
from earliest-offsets.

Hope it helps.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#kafka-consumers-start-position-configuration

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#properties-group-id

On 13/10/2020 07:43, Rex Fenley wrote:
> Hello,
>
> I've been trying to configure the offset start position for a flink
> kafka consumer. when there is no committed offset, to always start at
> the beginning. It seems like the typical way to do this would be
> setting |auto.offset.reset=earliest| however, I don't see that
> configuration property in the documentation.
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
>
> However, I do see |scan.startup.mode = earliest-offset|, but from the
> docs it sounds like this would mean it would never commit an offset
> and flink would always start consuming from the beginning of the kafka
> stream, which is not what I want.
>
> Is this the case or am I misunderstanding? How can I get the behavior
> that I wish to see, where committed offsets are respected, but no
> offset means start at the beginning of the kafka log stream?
>
> Thanks!
> -- 
>
> Rex Fenley | Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/>|  BLOG
> <http://blog.remind.com/> |  FOLLOW US
> <https://twitter.com/remindhq> |  LIKE US
> <https://www.facebook.com/remindhq>
>