You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Gwenhael Pasquiers <gw...@ericsson.com> on 2017/04/13 14:55:04 UTC

Kafka offset commits

Hello,

We're going to migrate some applications that consume data from a Kafka 0.8 from Flink 1.0 to Flink 1.2.

We are wondering if the offset commitment system changed between those two versions: is there a risk that the Flink 1.2-based application will start with no offset (thus will use either the smallest or highest one) ?
Or can we assume that the Flink 1.2 app will resume its work at the same offset than the Flink 1.0 app stopped (if they use the same consumer group id) ?

B.R.

RE: Kafka offset commits

Posted by Gwenhael Pasquiers <gw...@ericsson.com>.
We need more tests but we think we found the cause for the loss of our kafka consumer offset in kafka 0.10.  It might be because of the server-side parameter “offsets.topic.retention.minutes” that defaults to 1440 minutes (1 day). And our flink consumer was “off” for more than a day before restarting.

From: Tzu-Li (Gordon) Tai [mailto:tzulitai@apache.org]
Sent: mercredi 19 avril 2017 11:39
To: user@flink.apache.org
Subject: Re: Kafka offset commits

Thanks for the clarification Aljoscha!
Yes, you cannot restore from a 1.0 savepoint in Flink 1.2 (sorry, I missed the “1.0” part on my first reply).

@Gwenhael, I’ll try to reclarify some of the questions you asked:

Does that means that flink does not rely on the offset in written to zookeeper anymore, but relies on the snapshots data, implying that it’s crucial to keep the same snapshot folder before and after the migration to Flink 1.2 ?

For the case of 1.0 —> 1.2, you’ll have to rely on committed offsets in Kafka / ZK for the migration. State migration from 1.0 to 1.2 is not possible.

As Aljoscha pointed out, if you are using the same “group.id”, then there shouldn’t be a problem w.r.t. retaining the offset position. You just have to keep in mind of [1], as you would need to manually increase all committed offsets in Kafka / ZK by 1 for that consumer group.

Note that there is no state migration happening here, but just simply relying on offsets committed in Kafka / ZK to define the starting position when you’re starting the job in 1.2.

We were also wondering if the flink consumer was able to restore it’s offset from Zookeeper.

For FlinkKafkaConsumer08, the starting offset is actually always read from ZK.
Again, this isn’t a “restore”, but just defining start position using committed offsets.

Another question : is there an expiration to the snapshots ? We’ve been having issues with an app that we forgot to restart. We did it after a couple of days, but it looks like it did not restore correctly the offset and it started consuming from the oldest offset, creating duplicated data (the kafka queue has over a week of buffer).

There is no expiration to the offsets stored in the snapshots. The only issue would be if Kafka has expired that offset due to data retention settings.
If you’re sure that at the time of the restore the data hasn’t expired yet, there might be something weird going on.
AFAIK, the only issue that was previously known to possibly cause this was [2].
Could you check if that issue may be the case?

[1] https://issues.apache.org/jira/browse/FLINK-4723
[2] https://issues.apache.org/jira/browse/FLINK-6006

On 19 April 2017 at 5:14:35 PM, Aljoscha Krettek (aljoscha@apache.org<ma...@apache.org>) wrote:
Hi,
AFAIK, restoring a Flink 1.0 savepoint should not be possible on Flink 1.2. Only restoring from Flink 1.1 savepoints is supported.

@Gordon If the consumer group stays the same the new Flink job should pick up where the old one stopped, right?

Best,
Aljoscha

On 18. Apr 2017, at 16:19, Gwenhael Pasquiers <gw...@ericsson.com>> wrote:

Thanks for your answer.
Does that means that flink does not rely on the offset in written to zookeeper anymore, but relies on the snapshots data, implying that it’s crucial to keep the same snapshot folder before and after the migration to Flink 1.2 ?
We were also wondering if the flink consumer was able to restore it’s offset from Zookeeper.
Another question : is there an expiration to the snapshots ? We’ve been having issues with an app that we forgot to restart. We did it after a couple of days, but it looks like it did not restore correctly the offset and it started consuming from the oldest offset, creating duplicated data (the kafka queue has over a week of buffer).
B.R.

From: Tzu-Li (Gordon) Tai [mailto:tzulitai@apache.org]
Sent: lundi 17 avril 2017 07:40
To: user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: Kafka offset commits

Hi,

The FlinkKafkaConsumer in 1.2 is able to restore from older version state snapshots and bridge the migration, so there should be no problem in reading the offsets from older state. The smallest or highest offsets will only be used if the offset no longer exists due to Kafka data retention settings.

Besides this, there was once fix related to the Kafka 0.8 offsets for Flink 1.2.0 [1]. Shortly put, before the fix, the committed offsets to ZK was off by 1 (wrt to how Kafka itself defines the committed offsets).
However, this should not affect the behavior of restoring from offsets in savepoints, so it should be fine.

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-4723


On 13 April 2017 at 10:55:40 PM, Gwenhael Pasquiers (gwenhael.pasquiers@ericsson.com<ma...@ericsson.com>) wrote:
Hello,

We’re going to migrate some applications that consume data from a Kafka 0.8 from Flink 1.0 to Flink 1.2.

We are wondering if the offset commitment system changed between those two versions: is there a risk that the Flink 1.2-based application will start with no offset (thus will use either the smallest or highest one) ?
Or can we assume that the Flink 1.2 app will resume its work at the same offset than the Flink 1.0 app stopped (if they use the same consumer group id) ?

B.R.


Re: Kafka offset commits

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Thanks for the clarification Aljoscha!
Yes, you cannot restore from a 1.0 savepoint in Flink 1.2 (sorry, I missed the “1.0” part on my first reply).

@Gwenhael, I’ll try to reclarify some of the questions you asked:

Does that means that flink does not rely on the offset in written to zookeeper anymore, but relies on the snapshots data, implying that it’s crucial to keep the same snapshot folder before and after the migration to Flink 1.2 ?

For the case of 1.0 —> 1.2, you’ll have to rely on committed offsets in Kafka / ZK for the migration. State migration from 1.0 to 1.2 is not possible.

As Aljoscha pointed out, if you are using the same “group.id”, then there shouldn’t be a problem w.r.t. retaining the offset position. You just have to keep in mind of [1], as you would need to manually increase all committed offsets in Kafka / ZK by 1 for that consumer group.

Note that there is no state migration happening here, but just simply relying on offsets committed in Kafka / ZK to define the starting position when you’re starting the job in 1.2.

We were also wondering if the flink consumer was able to restore it’s offset from Zookeeper.

For FlinkKafkaConsumer08, the starting offset is actually always read from ZK.
Again, this isn’t a “restore”, but just defining start position using committed offsets.

Another question : is there an expiration to the snapshots ? We’ve been having issues with an app that we forgot to restart. We did it after a couple of days, but it looks like it did not restore correctly the offset and it started consuming from the oldest offset, creating duplicated data (the kafka queue has over a week of buffer).

There is no expiration to the offsets stored in the snapshots. The only issue would be if Kafka has expired that offset due to data retention settings.
If you’re sure that at the time of the restore the data hasn’t expired yet, there might be something weird going on.
AFAIK, the only issue that was previously known to possibly cause this was [2].
Could you check if that issue may be the case?

[1] https://issues.apache.org/jira/browse/FLINK-4723
[2] https://issues.apache.org/jira/browse/FLINK-6006
On 19 April 2017 at 5:14:35 PM, Aljoscha Krettek (aljoscha@apache.org) wrote:

Hi,
AFAIK, restoring a Flink 1.0 savepoint should not be possible on Flink 1.2. Only restoring from Flink 1.1 savepoints is supported.

@Gordon If the consumer group stays the same the new Flink job should pick up where the old one stopped, right?

Best,
Aljoscha

On 18. Apr 2017, at 16:19, Gwenhael Pasquiers <gw...@ericsson.com> wrote:

Thanks for your answer.
Does that means that flink does not rely on the offset in written to zookeeper anymore, but relies on the snapshots data, implying that it’s crucial to keep the same snapshot folder before and after the migration to Flink 1.2 ?
We were also wondering if the flink consumer was able to restore it’s offset from Zookeeper.
Another question : is there an expiration to the snapshots ? We’ve been having issues with an app that we forgot to restart. We did it after a couple of days, but it looks like it did not restore correctly the offset and it started consuming from the oldest offset, creating duplicated data (the kafka queue has over a week of buffer).
B.R.
 
From: Tzu-Li (Gordon) Tai [mailto:tzulitai@apache.org] 
Sent: lundi 17 avril 2017 07:40
To: user@flink.apache.org
Subject: Re: Kafka offset commits
 
Hi,
 
The FlinkKafkaConsumer in 1.2 is able to restore from older version state snapshots and bridge the migration, so there should be no problem in reading the offsets from older state. The smallest or highest offsets will only be used if the offset no longer exists due to Kafka data retention settings.
 
Besides this, there was once fix related to the Kafka 0.8 offsets for Flink 1.2.0 [1]. Shortly put, before the fix, the committed offsets to ZK was off by 1 (wrt to how Kafka itself defines the committed offsets).
However, this should not affect the behavior of restoring from offsets in savepoints, so it should be fine.
 
Cheers,
Gordon
 
[1] https://issues.apache.org/jira/browse/FLINK-4723
 
On 13 April 2017 at 10:55:40 PM, Gwenhael Pasquiers (gwenhael.pasquiers@ericsson.com) wrote:

Hello,
 
We’re going to migrate some applications that consume data from a Kafka 0.8 from Flink 1.0 to Flink 1.2.
 
We are wondering if the offset commitment system changed between those two versions: is there a risk that the Flink 1.2-based application will start with no offset (thus will use either the smallest or highest one) ?
Or can we assume that the Flink 1.2 app will resume its work at the same offset than the Flink 1.0 app stopped (if they use the same consumer group id) ?
 
B.R.


Re: Kafka offset commits

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
AFAIK, restoring a Flink 1.0 savepoint should not be possible on Flink 1.2. Only restoring from Flink 1.1 savepoints is supported.

@Gordon If the consumer group stays the same the new Flink job should pick up where the old one stopped, right?

Best,
Aljoscha

> On 18. Apr 2017, at 16:19, Gwenhael Pasquiers <gw...@ericsson.com> wrote:
> 
> Thanks for your answer.
> Does that means that flink does not rely on the offset in written to zookeeper anymore, but relies on the snapshots data, implying that it’s crucial to keep the same snapshot folder before and after the migration to Flink 1.2 ?
> We were also wondering if the flink consumer was able to restore it’s offset from Zookeeper.
> Another question : is there an expiration to the snapshots ? We’ve been having issues with an app that we forgot to restart. We did it after a couple of days, but it looks like it did not restore correctly the offset and it started consuming from the oldest offset, creating duplicated data (the kafka queue has over a week of buffer).
> B.R.
>  
> From: Tzu-Li (Gordon) Tai [mailto:tzulitai@apache.org] 
> Sent: lundi 17 avril 2017 07:40
> To: user@flink.apache.org
> Subject: Re: Kafka offset commits
>  
> Hi,
>  
> The FlinkKafkaConsumer in 1.2 is able to restore from older version state snapshots and bridge the migration, so there should be no problem in reading the offsets from older state. The smallest or highest offsets will only be used if the offset no longer exists due to Kafka data retention settings.
>  
> Besides this, there was once fix related to the Kafka 0.8 offsets for Flink 1.2.0 [1]. Shortly put, before the fix, the committed offsets to ZK was off by 1 (wrt to how Kafka itself defines the committed offsets).
> However, this should not affect the behavior of restoring from offsets in savepoints, so it should be fine.
>  
> Cheers,
> Gordon
>  
> [1] https://issues.apache.org/jira/browse/FLINK-4723 <https://issues.apache.org/jira/browse/FLINK-4723>
>  
> On 13 April 2017 at 10:55:40 PM, Gwenhael Pasquiers (gwenhael.pasquiers@ericsson.com <ma...@ericsson.com>) wrote:
> 
> Hello,
>  
> We’re going to migrate some applications that consume data from a Kafka 0.8 from Flink 1.0 to Flink 1.2.
>  
> We are wondering if the offset commitment system changed between those two versions: is there a risk that the Flink 1.2-based application will start with no offset (thus will use either the smallest or highest one) ?
> Or can we assume that the Flink 1.2 app will resume its work at the same offset than the Flink 1.0 app stopped (if they use the same consumer group id) ?
>  
> B.R.


RE: Kafka offset commits

Posted by Gwenhael Pasquiers <gw...@ericsson.com>.
Thanks for your answer.
Does that means that flink does not rely on the offset in written to zookeeper anymore, but relies on the snapshots data, implying that it’s crucial to keep the same snapshot folder before and after the migration to Flink 1.2 ?
We were also wondering if the flink consumer was able to restore it’s offset from Zookeeper.
Another question : is there an expiration to the snapshots ? We’ve been having issues with an app that we forgot to restart. We did it after a couple of days, but it looks like it did not restore correctly the offset and it started consuming from the oldest offset, creating duplicated data (the kafka queue has over a week of buffer).
B.R.

From: Tzu-Li (Gordon) Tai [mailto:tzulitai@apache.org]
Sent: lundi 17 avril 2017 07:40
To: user@flink.apache.org
Subject: Re: Kafka offset commits

Hi,

The FlinkKafkaConsumer in 1.2 is able to restore from older version state snapshots and bridge the migration, so there should be no problem in reading the offsets from older state. The smallest or highest offsets will only be used if the offset no longer exists due to Kafka data retention settings.

Besides this, there was once fix related to the Kafka 0.8 offsets for Flink 1.2.0 [1]. Shortly put, before the fix, the committed offsets to ZK was off by 1 (wrt to how Kafka itself defines the committed offsets).
However, this should not affect the behavior of restoring from offsets in savepoints, so it should be fine.

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-4723


On 13 April 2017 at 10:55:40 PM, Gwenhael Pasquiers (gwenhael.pasquiers@ericsson.com<ma...@ericsson.com>) wrote:
Hello,

We’re going to migrate some applications that consume data from a Kafka 0.8 from Flink 1.0 to Flink 1.2.

We are wondering if the offset commitment system changed between those two versions: is there a risk that the Flink 1.2-based application will start with no offset (thus will use either the smallest or highest one) ?
Or can we assume that the Flink 1.2 app will resume its work at the same offset than the Flink 1.0 app stopped (if they use the same consumer group id) ?

B.R.

Re: Kafka offset commits

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

The FlinkKafkaConsumer in 1.2 is able to restore from older version state snapshots and bridge the migration, so there should be no problem in reading the offsets from older state. The smallest or highest offsets will only be used if the offset no longer exists due to Kafka data retention settings.

Besides this, there was once fix related to the Kafka 0.8 offsets for Flink 1.2.0 [1]. Shortly put, before the fix, the committed offsets to ZK was off by 1 (wrt to how Kafka itself defines the committed offsets).
However, this should not affect the behavior of restoring from offsets in savepoints, so it should be fine.

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-4723

On 13 April 2017 at 10:55:40 PM, Gwenhael Pasquiers (gwenhael.pasquiers@ericsson.com) wrote:

Hello,

 

We’re going to migrate some applications that consume data from a Kafka 0.8 from Flink 1.0 to Flink 1.2.

 

We are wondering if the offset commitment system changed between those two versions: is there a risk that the Flink 1.2-based application will start with no offset (thus will use either the smallest or highest one) ?

Or can we assume that the Flink 1.2 app will resume its work at the same offset than the Flink 1.0 app stopped (if they use the same consumer group id) ?

 

B.R.