You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by suman shil <cn...@gmail.com> on 2021/08/04 22:35:05 UTC

Is FlinkKafkaConsumer setStartFromLatest() method needed when we use auto.offset.reset=latest kafka properties

In my flink streaming application I have kafka datasource. I am using the
kafka property auto.offset.reset=latest. I am wondering if I need to use
FlinkKafkaConsumer.setStartFromLatest(). Are they similar? Can I use either
of them? Following is the documentation from flink code.
/**
 * Specifies the consumer to start reading from the latest offset for all
partitions. This lets
 * the consumer ignore any committed group offsets in Zookeeper / Kafka
brokers.
 *
 * <p>This method does not affect where partitions are read from when the
consumer is restored
 * from a checkpoint or savepoint. When the consumer is restored from a
checkpoint or savepoint,
 * only the offsets in the restored state will be used.
 *
 * @return The consumer object, to allow function chaining.
 */
public FlinkKafkaConsumerBase<T> setStartFromLatest() {
    this.startupMode = StartupMode.LATEST;
    this.startupOffsetsTimestamp = null;
    this.specificStartupOffsets = null;
    return this;
}

Thanks

Re:Is FlinkKafkaConsumer setStartFromLatest() method needed when we use auto.offset.reset=latest kafka properties

Posted by 纳兰清风 <ho...@163.com>.


Hi suman,


    FlinkKafkaConsumer.setStartFromLatest() means you always consume messages from the latest whenever you restart the flink job,the consumer ignore any committed group offsets.
    auto.offset.reset=latest  means the consumer fetch messages from the latest if you never committed any offsets before, it is a default strategy if the consumer does not found any offsets from brokers.










At 2021-08-05 06:35:05, "suman shil" <cn...@gmail.com> wrote:

In my flink streaming application I have kafka datasource. I am using the kafka property auto.offset.reset=latest. I am wondering if I need to use FlinkKafkaConsumer.setStartFromLatest(). Are they similar? Can I use either of them? Following is the documentation from flink code. 
/**
 * Specifies the consumer to start reading from the latest offset for all partitions. This lets
 * the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
 *
 * <p>This method does not affect where partitions are read from when the consumer is restored
 * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint,
 * only the offsets in the restored state will be used.
 *
 * @return The consumer object, to allow function chaining.
 */
public FlinkKafkaConsumerBase<T> setStartFromLatest() {
    this.startupMode = StartupMode.LATEST;
    this.startupOffsetsTimestamp = null;
    this.specificStartupOffsets = null;
    return this;
}



Thanks

Re: Is FlinkKafkaConsumer setStartFromLatest() method needed when we use auto.offset.reset=latest kafka properties

Posted by yidan zhao <hi...@gmail.com>.
it is not the same.
Kafka's 'auto.offset.reset' is used when the configured consumer group
id does not have offset info stored in kafka. not exist. If you want
to consume from latest no matter whether there is group offset info in
kafka, you should use flink's setStartFromLatest.

suman shil <cn...@gmail.com> 于2021年8月5日周四 上午6:35写道:
>
> In my flink streaming application I have kafka datasource. I am using the kafka property auto.offset.reset=latest. I am wondering if I need to use FlinkKafkaConsumer.setStartFromLatest(). Are they similar? Can I use either of them? Following is the documentation from flink code.
> /**
>  * Specifies the consumer to start reading from the latest offset for all partitions. This lets
>  * the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
>  *
>  * <p>This method does not affect where partitions are read from when the consumer is restored
>  * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint,
>  * only the offsets in the restored state will be used.
>  *
>  * @return The consumer object, to allow function chaining.
>  */
> public FlinkKafkaConsumerBase<T> setStartFromLatest() {
>     this.startupMode = StartupMode.LATEST;
>     this.startupOffsetsTimestamp = null;
>     this.specificStartupOffsets = null;
>     return this;
> }
>
> Thanks