You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Hang Ruan (Jira)" <ji...@apache.org> on 2021/10/29 08:15:00 UTC

[jira] [Updated] (FLINK-24697) Kafka table source cannot change the auto.offset.reset setting

     [ https://issues.apache.org/jira/browse/FLINK-24697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hang Ruan updated FLINK-24697:
------------------------------
    Description: 
Because Flink 1.13 SQL does not use the new Source API in FLIP-27, the behavior to start from group offsets in flink 1.13 will use the kafka 'auto.offset.reset' default value(latest), when the 'auto.offset.reset' configuration is not set in table options. But in flink 1.13 we could change the behavior by setting 'auto.offset.reset' to other values. See the method {{setStartFromGroupOffsets }}under the class {{FlinkKafkaConsumerBase.}}

Flink 1.14 uses the new Source API, but we have no ways to change the default 'auto.offset.reset' value when use 'group-offsets' startup mode. In DataStream API, we could change it by `kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy))`.

So we need the way to change auto offset reset configuration.

The design is that when 'auto.offset.reset' is set, the 'group-offsets' startup mode will use the provided auto offset reset strategy, or else 'none' reset strategy in order to be consistent with the DataStream API.

  was:
Because Flink 1.13 SQL does not use the new Source API in FLIP-27, the behavior to start from group offsets in flink 1.13 will use the kafka 'auto.offset.reset' default value(latest), when the 'auto.offset.reset' configuration is not set in table options. But in flink 1.13 we could change the behavior by setting 'auto.offset.reset' to other values. See the method {{setStartFromGroupOffsets }}under the class {{FlinkKafkaConsumerBase.}}

Flink 1.14 uses the new Source API, but we have no ways to change the default 'auto.offset.reset' value when use 'group-offsets' startup mode. In DataStream API, we could change it by `kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy))`.

So we need the way to change auto offset reset configuration.

The design is that when 'auto.offset.reset' is set, the 'group-offsets' startup mode will use the provided auto offset reset strategy, or else 'none' reset strategy n order to be consistent with the DataStream API.


> Kafka table source cannot change the auto.offset.reset setting
> --------------------------------------------------------------
>
>                 Key: FLINK-24697
>                 URL: https://issues.apache.org/jira/browse/FLINK-24697
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Hang Ruan
>            Priority: Minor
>
> Because Flink 1.13 SQL does not use the new Source API in FLIP-27, the behavior to start from group offsets in flink 1.13 will use the kafka 'auto.offset.reset' default value(latest), when the 'auto.offset.reset' configuration is not set in table options. But in flink 1.13 we could change the behavior by setting 'auto.offset.reset' to other values. See the method {{setStartFromGroupOffsets }}under the class {{FlinkKafkaConsumerBase.}}
> Flink 1.14 uses the new Source API, but we have no ways to change the default 'auto.offset.reset' value when use 'group-offsets' startup mode. In DataStream API, we could change it by `kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy))`.
> So we need the way to change auto offset reset configuration.
> The design is that when 'auto.offset.reset' is set, the 'group-offsets' startup mode will use the provided auto offset reset strategy, or else 'none' reset strategy in order to be consistent with the DataStream API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)