You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "r. r." <ro...@abv.bg> on 2017/11/21 17:36:45 UTC

FlinkKafkaConsumer010 does not start from the next record on startup from offsets in Kafka

Hello
according to https://issues.apache.org/jira/browse/FLINK-4618 "FlinkKafkaConsumer09 should start from the next record on startup from offsets in Kafka".
Is the same behavior expected of FlinkKafkaConsumer010?
A document in Kafka is failing my job and I want on restart of the job (via the restart strategy or after stop and run again the job) processing to continue from the next document in the partition.
Checkpoints are enabled:

            env.enableCheckpointing(1000);
            env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
            env.getCheckpointConfig().setCheckpointTimeout(60000);
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoints to filesystem "file:/tmp/checkpoints")
taskmanager_4  | 2017-11-21 17:31:42,873 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Setting restore state in the FlinkKafkaConsumer.
taskmanager_4  | 2017-11-21 17:31:42,875 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 4 will commit offsets back to Kafka on completed checkpoints

Also, if a TM (other than the one that fails) has managed to successfully complete reading and processing a record from Kafka, after the job is cancelled and restarted, the already complete record is retrieved and processed again together with the failing one

flink-1.3.2
kafka_2.12-0.11.0.1

Thanks!
- Robert


Re: FlinkKafkaConsumer010 does not start from the next record on startup from offsets in Kafka

Posted by "r. r." <ro...@abv.bg>.
Gordon, thanks for clarifying this!
For my experimental project I decided to disable checkpointing and use
kafkaConsumer.setStartFromGroupOffsets() (explicitly, although docs state it is the default).

I verified with kafka-consumer-offset-checker.sh that, after the job fails and is restarted, 
it will skip the offsets it was at the time of failure and continue with the newer kafka records.
This suits me as another process will go after the left-behind records and try to recover what it can

Best regards
-Robert







 >-------- Оригинално писмо --------

 >От: "Tzu-Li (Gordon) Tai" tzulitai@apache.org

 >Относно: Re: FlinkKafkaConsumer010 does not start from the next record on
 startup from offsets in Kafka

 >До: user@flink.apache.org

 >Изпратено на: 23.11.2017 09:01



 
> Hi Robert,
 
> 
 
> Uncaught exceptions that cause the job to fall into a fail-and-restart loop
 
> is likewise to the corrupt record case I mentioned.
 
> 
 
> With exactly-once guarantees, the job will roll back to the last complete
 
> checkpoint, which "resets" the Flink consumer to some earlier Kafka
 
> partition offset. Eventually, that failing record will be processed again.
 
> Currently there is no way to manipulate the "reset" offset on restore from
 
> failure. That is strictly reset to the offset stored in the last complete
 
> checkpoint, otherwise exactly-once is violated.
 
> 
 
> 
 
> Rob wrote
 
> > Or maybe the recipe is to manually retrieve the record at
 
> > partitionX/offsetY for the group and then restart?
 
> 
 
> This would not work, as exactly-once is achieved with the offsets that Flink
 
> stores in its checkpoints, not the offsets that are committed back to Kafka.
 
> 
 
> Cheers,
 
> Gordon
 
> 
 
> 
 
> 
 
> --
 
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: FlinkKafkaConsumer010 does not start from the next record on startup from offsets in Kafka

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Robert,

Uncaught exceptions that cause the job to fall into a fail-and-restart loop
is likewise to the corrupt record case I mentioned.

With exactly-once guarantees, the job will roll back to the last complete
checkpoint, which "resets" the Flink consumer to some earlier Kafka
partition offset. Eventually, that failing record will be processed again.
Currently there is no way to manipulate the "reset" offset on restore from
failure. That is strictly reset to the offset stored in the last complete
checkpoint, otherwise exactly-once is violated.


Rob wrote
> Or maybe the recipe is to manually retrieve the record at
> partitionX/offsetY for the group and then restart?

This would not work, as exactly-once is achieved with the offsets that Flink
stores in its checkpoints, not the offsets that are committed back to Kafka.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: FlinkKafkaConsumer010 does not start from the next record on startup from offsets in Kafka

Posted by "r. r." <ro...@abv.bg>.
Thanks Gordon
But what if there is an uncaught exception in processing of the record (during normal job execution, after deserialization)?
After the restart strategy exceeds the failure rate, the job will fail and on re-run it would start at the same offset, right?
Is there a way to avoid this and 'automatically' start at offset+1?
Or maybe the recipe is to manually retrieve the record at partitionX/offsetY for the group and then restart?

Best regards
-Robert







 >-------- Оригинално писмо --------

 >От: "Tzu-Li (Gordon) Tai" tzulitai@apache.org

 >Относно: Re: FlinkKafkaConsumer010 does not start from the next record on
 startup from offsets in Kafka

 >До: user@flink.apache.org

 >Изпратено на: 22.11.2017 14:57



 
> Hi Robert,
 
> 
 
> As expected with exactly-once guarantees, a record that caused a Flink job
 
> to fail will be attempted to be reprocessed on the restart of the job.
 
> 
 
> For some specific "corrupt" record that causes the job to fall into a
 
> fail-and-restart loop, there is a way to let the Kafka consumer skip that
 
> specific "corrupt" record. To do that, return null when attempting to
 
> deserialize the corrupted record (specifically, that would be the
 
> `deserialize` method on the provided `DeserializationSchema`).
 
> 
 
> Cheers,
 
> Gordon
 
> 
 
> 
 
> 
 
> --
 
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: FlinkKafkaConsumer010 does not start from the next record on startup from offsets in Kafka

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Robert,

As expected with exactly-once guarantees, a record that caused a Flink job
to fail will be attempted to be reprocessed on the restart of the job.

For some specific "corrupt" record that causes the job to fall into a
fail-and-restart loop, there is a way to let the Kafka consumer skip that
specific "corrupt" record. To do that, return null when attempting to
deserialize the corrupted record (specifically, that would be the
`deserialize` method on the provided `DeserializationSchema`).

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/