You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by David Corley <da...@gmail.com> on 2019/02/16 23:54:02 UTC

Is group.id required in Kafka connector for offsets to be stored in checkpoint?

We've got a relatively simply job that reads in from Kafka, and writes to
S3. We've had a couple of job failures where the consumer lag had built up,
but after the restart, the lag was wiped out because our offset positions
were lost and we consumed from latest offset.

The job has checkpointing enabled:
```
val checkpointInterval = getProperty(checkpointIntervalPropertyKey).toInt
    env.enableCheckpointing(checkpointInterval)
```
but we also have:
```
kafkaSource.setStartFromLatest()
```
set.

According to the documentation, the offsets stored in the checkpoint should
over-ride the "StartFromLatest". When the job restarts, we can see it
retrieving the checkpoint state but we also see a message about master
state as follows:
```
[16-Feb-2019 18:58:54.524 UTC] INFO <CheckpointCoordinator> Restoring from
latest valid checkpoint: Checkpoint 2155 @ 1550342786207 for
6bc7420e001f76ffec7d2501d5f504c0.
[16-Feb-2019 18:58:54.525 UTC] INFO <CheckpointCoordinator> No master state
to restore
```
Then after the job gets into a RUNNING state, it continues to checkpoint:
```
[16-Feb-2019 18:59:00.528 UTC] INFO <CheckpointCoordinator> Triggering
checkpoint 2157 @ 1550343540525
```
Between the "No master state to restore" message, and the fact we seem to
be starting from latest, I'm wondering if we also need to explicitly set a
group id or some other requirement to properly have offsets stored in
checkpoints?


Regards,
Dave

Re: Is group.id required in Kafka connector for offsets to be stored in checkpoint?

Posted by sohimankotia <so...@gmail.com>.
Yes Konstantin Knauf-2 . You are right . 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Is group.id required in Kafka connector for offsets to be stored in checkpoint?

Posted by Konstantin Knauf <ko...@ververica.com>.
Hi David, Hi Sohi,

this should not be the case. If a savepoint/checkpoint is provided, Flink
should always take the offsets from the state regardless of the `group.id`
provided. Which Flink version and which FlinkKafkaConsumer version do you
use?

Best,

Konstantin

On Mon, Feb 18, 2019 at 5:50 AM sohimankotia <so...@gmail.com> wrote:

> Hi David,
>
> We are also running streaming jobs over Kafka source .
>
> Yes : Consumer Group Id needs to be set for Kafka source explicitly t .
>
> We are also using checkpointing and save points for persisting state . Any
> time we change group id it starts from latest offset(default Kafka
> connector
> behavior) .
>
>
>
>
> Thanks
> Sohi
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Re: Is group.id required in Kafka connector for offsets to be stored in checkpoint?

Posted by sohimankotia <so...@gmail.com>.
Hi David,

We are also running streaming jobs over Kafka source . 

Yes : Consumer Group Id needs to be set for Kafka source explicitly t . 

We are also using checkpointing and save points for persisting state . Any
time we change group id it starts from latest offset(default Kafka connector
behavior) . 




Thanks 
Sohi



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/