You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Jins George <ji...@aeris.net> on 2017/03/21 00:50:45 UTC

Kafka Offset handling for Restart/failure scenarios.

Hello,

I am writing a Beam pipeline(streaming) with Flink runner to consume 
data from Kafka and apply some transformations and persist to Hbase.

If I restart the application ( due to failure/manual restart), consumer 
does not resume from the offset where it was prior to restart. It always 
resume from the latest offset.

If I enable Flink checkpionting with hdfs state back-end, system appears 
to be resuming from the earliest offset

Is there a recommended way to resume from the offset where it was stopped ?

Thanks,
Jins George

Re: Kafka Offset handling for Restart/failure scenarios.

Posted by Raghu Angadi <ra...@google.com>.
On Tue, Mar 21, 2017 at 8:56 AM, Mingmin Xu <mi...@gmail.com> wrote:

> From KafkaIO itself, looks like it either start_from_beginning or
> start_from_latest. It's designed to leverage `UnboundedSource.CheckpointMark`
> during initialization, but so far I don't see it's provided by runners. At
> the moment Flink savepoints is a good option, created a JIRA(BEAM-1775
> <https://issues.apache.org/jira/browse/BEAM-1775>)  to handle it in
> KafkaIO.
>

CheckpointMark is part of UnboundedSource contract. Google Dataflow
certainly supports it. It is needed for job update. It is also an essential
part of fault tolerance and support of autoscaling (where key ranges
assigned for workers changes).

Raghu.


>
> Mingmin
>
> On Tue, Mar 21, 2017 at 3:40 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>> Are you using Flink savepoints [1] when restoring your application? If
>> you use this the Kafka offset should be stored in state and it should
>> restart from the correct position.
>>
>> Best,
>> Aljoscha
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> setup/savepoints.html
>> > On 21 Mar 2017, at 01:50, Jins George <ji...@aeris.net> wrote:
>> >
>> > Hello,
>> >
>> > I am writing a Beam pipeline(streaming) with Flink runner to consume
>> data from Kafka and apply some transformations and persist to Hbase.
>> >
>> > If I restart the application ( due to failure/manual restart), consumer
>> does not resume from the offset where it was prior to restart. It always
>> resume from the latest offset.
>> >
>> > If I enable Flink checkpionting with hdfs state back-end, system
>> appears to be resuming from the earliest offset
>> >
>> > Is there a recommended way to resume from the offset where it was
>> stopped ?
>> >
>> > Thanks,
>> > Jins George
>>
>>
>
>
> --
> ----
> Mingmin
>

Re: Kafka Offset handling for Restart/failure scenarios.

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Would not it be Flink runner specific ?

Maybe the State API could do the same in a runner agnostic way (just thinking 
loud) ?

Regards
JB

On 03/21/2017 04:56 PM, Mingmin Xu wrote:
> From KafkaIO itself, looks like it either start_from_beginning or
> start_from_latest. It's designed to leverage `UnboundedSource.CheckpointMark`
> during initialization, but so far I don't see it's provided by runners. At the
> moment Flink savepoints is a good option, created a JIRA(BEAM-1775
> <https://issues.apache.org/jira/browse/BEAM-1775>)  to handle it in KafkaIO.
>
> Mingmin
>
> On Tue, Mar 21, 2017 at 3:40 AM, Aljoscha Krettek <aljoscha@apache.org
> <ma...@apache.org>> wrote:
>
>     Hi,
>     Are you using Flink savepoints [1] when restoring your application? If you
>     use this the Kafka offset should be stored in state and it should restart
>     from the correct position.
>
>     Best,
>     Aljoscha
>
>     [1]
>     https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html>
>     > On 21 Mar 2017, at 01:50, Jins George <jins.george@aeris.net
>     <ma...@aeris.net>> wrote:
>     >
>     > Hello,
>     >
>     > I am writing a Beam pipeline(streaming) with Flink runner to consume data
>     from Kafka and apply some transformations and persist to Hbase.
>     >
>     > If I restart the application ( due to failure/manual restart), consumer
>     does not resume from the offset where it was prior to restart. It always
>     resume from the latest offset.
>     >
>     > If I enable Flink checkpionting with hdfs state back-end, system appears
>     to be resuming from the earliest offset
>     >
>     > Is there a recommended way to resume from the offset where it was stopped ?
>     >
>     > Thanks,
>     > Jins George
>
>
>
>
> --
> ----
> Mingmin

-- 
Jean-Baptiste Onofr�
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: Kafka Offset handling for Restart/failure scenarios.

Posted by Mingmin Xu <mi...@gmail.com>.
From KafkaIO itself, looks like it either start_from_beginning or
start_from_latest. It's designed to leverage
`UnboundedSource.CheckpointMark` during initialization, but so far I don't
see it's provided by runners. At the moment Flink savepoints is a good
option, created a JIRA(BEAM-1775
<https://issues.apache.org/jira/browse/BEAM-1775>)  to handle it in KafkaIO.

Mingmin

On Tue, Mar 21, 2017 at 3:40 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> Are you using Flink savepoints [1] when restoring your application? If you
> use this the Kafka offset should be stored in state and it should restart
> from the correct position.
>
> Best,
> Aljoscha
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
> setup/savepoints.html
> > On 21 Mar 2017, at 01:50, Jins George <ji...@aeris.net> wrote:
> >
> > Hello,
> >
> > I am writing a Beam pipeline(streaming) with Flink runner to consume
> data from Kafka and apply some transformations and persist to Hbase.
> >
> > If I restart the application ( due to failure/manual restart), consumer
> does not resume from the offset where it was prior to restart. It always
> resume from the latest offset.
> >
> > If I enable Flink checkpionting with hdfs state back-end, system appears
> to be resuming from the earliest offset
> >
> > Is there a recommended way to resume from the offset where it was
> stopped ?
> >
> > Thanks,
> > Jins George
>
>


-- 
----
Mingmin

Re: Kafka Offset handling for Restart/failure scenarios.

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
Are you using Flink savepoints [1] when restoring your application? If you use this the Kafka offset should be stored in state and it should restart from the correct position.

Best,
Aljoscha

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html
> On 21 Mar 2017, at 01:50, Jins George <ji...@aeris.net> wrote:
> 
> Hello,
> 
> I am writing a Beam pipeline(streaming) with Flink runner to consume data from Kafka and apply some transformations and persist to Hbase.
> 
> If I restart the application ( due to failure/manual restart), consumer does not resume from the offset where it was prior to restart. It always resume from the latest offset.
> 
> If I enable Flink checkpionting with hdfs state back-end, system appears to be resuming from the earliest offset
> 
> Is there a recommended way to resume from the offset where it was stopped ?
> 
> Thanks,
> Jins George