You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Sushil Ks <su...@gmail.com> on 2018/02/01 09:29:59 UTC

Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

Hi,
       Apologies for delay in my reply,

@Raghu Angadi
            This checkpoints 20 mins, as you mentioned before any
checkpoint is created and if the pipeline restarts, it's reading from the
latest offset.

@Mingmin
        Thanks a lot for sharing your learnings, However in case of any
*UserCodeException* while processing the element as part of ParDo after
materializing the window, the pipeline drops the unprocessed elements and
restarts. Is this expected from Beam?


On Wed, Jan 17, 2018 at 2:13 AM, Kenneth Knowles <kl...@google.com> wrote:

> Is there a JIRA filed for this? I think this discussion should live in a
> ticket.
>
> Kenn
>
> On Wed, Jan 10, 2018 at 11:00 AM, Mingmin Xu <mi...@gmail.com> wrote:
>
>> @Sushil, I have several jobs running on KafkaIO+FlinkRunner, hope my
>> experience can help you a bit.
>>
>> For short, `ENABLE_AUTO_COMMIT_CONFIG` doesn't meet your requirement, you
>> need to leverage exactly-once checkpoint/savepoint in Flink. The reason
>> is,  with `ENABLE_AUTO_COMMIT_CONFIG` KafkaIO commits offset after data is
>> read, and once job is restarted KafkaIO reads from last_committed_offset.
>>
>> In my jobs, I enable external(external should be optional I think?)
>> checkpoint on exactly-once mode in Flink cluster. When the job auto-restart
>> on failures it doesn't lost data. In case of manually redeploy the job, I
>> use savepoint to cancel and launch the job.
>>
>> Mingmin
>>
>> On Wed, Jan 10, 2018 at 10:34 AM, Raghu Angadi <ra...@google.com>
>> wrote:
>>
>>> How often does your pipeline checkpoint/snapshot? If the failure happens
>>> before the first checkpoint, the pipeline could restart without any state,
>>> in which case KafkaIO would read from latest offset. There is probably some
>>> way to verify if pipeline is restarting from a checkpoint.
>>>
>>> On Sun, Jan 7, 2018 at 10:57 PM, Sushil Ks <su...@gmail.com> wrote:
>>>
>>>> HI Aljoscha,
>>>>                    The issue is let's say I consumed 100 elements in 5
>>>> mins Fixed Window with *GroupByKey* and later I applied *ParDO* for
>>>> all those elements. If there is an issue while processing element 70 in
>>>> *ParDo *and the pipeline restarts with *UserCodeException *it's
>>>> skipping the rest 30 elements. Wanted to know if this is expected? In case
>>>> if you still having doubt let me know will share a code snippet.
>>>>
>>>> Regards,
>>>> Sushil Ks
>>>>
>>>
>>>
>>
>>
>> --
>> ----
>> Mingmin
>>
>
>

Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

Posted by Sushil Ks <su...@gmail.com>.
Thanks, Raghu.

On Tue, Feb 6, 2018 at 6:41 AM, Raghu Angadi <ra...@google.com> wrote:

> Hi Sushil,
>
> That is expected behavior. If you don't have any saved checkpoint, the
> pipeline would start from scratch. It does not have any connection to
> previous run.
>
> On Thu, Feb 1, 2018 at 1:29 AM, Sushil Ks <su...@gmail.com> wrote:
>
>> Hi,
>>        Apologies for delay in my reply,
>>
>> @Raghu Angadi
>>             This checkpoints 20 mins, as you mentioned before any
>> checkpoint is created and if the pipeline restarts, it's reading from the
>> latest offset.
>>
>> @Mingmin
>>         Thanks a lot for sharing your learnings, However in case of any
>> *UserCodeException* while processing the element as part of ParDo after
>> materializing the window, the pipeline drops the unprocessed elements and
>> restarts. Is this expected from Beam?
>>
>>
>> On Wed, Jan 17, 2018 at 2:13 AM, Kenneth Knowles <kl...@google.com> wrote:
>>
>>> Is there a JIRA filed for this? I think this discussion should live in a
>>> ticket.
>>>
>>> Kenn
>>>
>>> On Wed, Jan 10, 2018 at 11:00 AM, Mingmin Xu <mi...@gmail.com> wrote:
>>>
>>>> @Sushil, I have several jobs running on KafkaIO+FlinkRunner, hope my
>>>> experience can help you a bit.
>>>>
>>>> For short, `ENABLE_AUTO_COMMIT_CONFIG` doesn't meet your requirement,
>>>> you need to leverage exactly-once checkpoint/savepoint in Flink. The reason
>>>> is,  with `ENABLE_AUTO_COMMIT_CONFIG` KafkaIO commits offset after data is
>>>> read, and once job is restarted KafkaIO reads from last_committed_offset.
>>>>
>>>> In my jobs, I enable external(external should be optional I think?)
>>>> checkpoint on exactly-once mode in Flink cluster. When the job auto-restart
>>>> on failures it doesn't lost data. In case of manually redeploy the job, I
>>>> use savepoint to cancel and launch the job.
>>>>
>>>> Mingmin
>>>>
>>>> On Wed, Jan 10, 2018 at 10:34 AM, Raghu Angadi <ra...@google.com>
>>>> wrote:
>>>>
>>>>> How often does your pipeline checkpoint/snapshot? If the failure
>>>>> happens before the first checkpoint, the pipeline could restart without any
>>>>> state, in which case KafkaIO would read from latest offset. There is
>>>>> probably some way to verify if pipeline is restarting from a checkpoint.
>>>>>
>>>>> On Sun, Jan 7, 2018 at 10:57 PM, Sushil Ks <su...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> HI Aljoscha,
>>>>>>                    The issue is let's say I consumed 100 elements in
>>>>>> 5 mins Fixed Window with *GroupByKey* and later I applied *ParDO* for
>>>>>> all those elements. If there is an issue while processing element 70 in
>>>>>> *ParDo *and the pipeline restarts with *UserCodeException *it's
>>>>>> skipping the rest 30 elements. Wanted to know if this is expected? In case
>>>>>> if you still having doubt let me know will share a code snippet.
>>>>>>
>>>>>> Regards,
>>>>>> Sushil Ks
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> ----
>>>> Mingmin
>>>>
>>>
>>>
>>
>

Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

Posted by Sushil Ks <su...@gmail.com>.
Thanks, Raghu.

On Tue, Feb 6, 2018 at 6:41 AM, Raghu Angadi <ra...@google.com> wrote:

> Hi Sushil,
>
> That is expected behavior. If you don't have any saved checkpoint, the
> pipeline would start from scratch. It does not have any connection to
> previous run.
>
> On Thu, Feb 1, 2018 at 1:29 AM, Sushil Ks <su...@gmail.com> wrote:
>
>> Hi,
>>        Apologies for delay in my reply,
>>
>> @Raghu Angadi
>>             This checkpoints 20 mins, as you mentioned before any
>> checkpoint is created and if the pipeline restarts, it's reading from the
>> latest offset.
>>
>> @Mingmin
>>         Thanks a lot for sharing your learnings, However in case of any
>> *UserCodeException* while processing the element as part of ParDo after
>> materializing the window, the pipeline drops the unprocessed elements and
>> restarts. Is this expected from Beam?
>>
>>
>> On Wed, Jan 17, 2018 at 2:13 AM, Kenneth Knowles <kl...@google.com> wrote:
>>
>>> Is there a JIRA filed for this? I think this discussion should live in a
>>> ticket.
>>>
>>> Kenn
>>>
>>> On Wed, Jan 10, 2018 at 11:00 AM, Mingmin Xu <mi...@gmail.com> wrote:
>>>
>>>> @Sushil, I have several jobs running on KafkaIO+FlinkRunner, hope my
>>>> experience can help you a bit.
>>>>
>>>> For short, `ENABLE_AUTO_COMMIT_CONFIG` doesn't meet your requirement,
>>>> you need to leverage exactly-once checkpoint/savepoint in Flink. The reason
>>>> is,  with `ENABLE_AUTO_COMMIT_CONFIG` KafkaIO commits offset after data is
>>>> read, and once job is restarted KafkaIO reads from last_committed_offset.
>>>>
>>>> In my jobs, I enable external(external should be optional I think?)
>>>> checkpoint on exactly-once mode in Flink cluster. When the job auto-restart
>>>> on failures it doesn't lost data. In case of manually redeploy the job, I
>>>> use savepoint to cancel and launch the job.
>>>>
>>>> Mingmin
>>>>
>>>> On Wed, Jan 10, 2018 at 10:34 AM, Raghu Angadi <ra...@google.com>
>>>> wrote:
>>>>
>>>>> How often does your pipeline checkpoint/snapshot? If the failure
>>>>> happens before the first checkpoint, the pipeline could restart without any
>>>>> state, in which case KafkaIO would read from latest offset. There is
>>>>> probably some way to verify if pipeline is restarting from a checkpoint.
>>>>>
>>>>> On Sun, Jan 7, 2018 at 10:57 PM, Sushil Ks <su...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> HI Aljoscha,
>>>>>>                    The issue is let's say I consumed 100 elements in
>>>>>> 5 mins Fixed Window with *GroupByKey* and later I applied *ParDO* for
>>>>>> all those elements. If there is an issue while processing element 70 in
>>>>>> *ParDo *and the pipeline restarts with *UserCodeException *it's
>>>>>> skipping the rest 30 elements. Wanted to know if this is expected? In case
>>>>>> if you still having doubt let me know will share a code snippet.
>>>>>>
>>>>>> Regards,
>>>>>> Sushil Ks
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> ----
>>>> Mingmin
>>>>
>>>
>>>
>>
>

Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

Posted by Raghu Angadi <ra...@google.com>.
Hi Sushil,

That is expected behavior. If you don't have any saved checkpoint, the
pipeline would start from scratch. It does not have any connection to
previous run.

On Thu, Feb 1, 2018 at 1:29 AM, Sushil Ks <su...@gmail.com> wrote:

> Hi,
>        Apologies for delay in my reply,
>
> @Raghu Angadi
>             This checkpoints 20 mins, as you mentioned before any
> checkpoint is created and if the pipeline restarts, it's reading from the
> latest offset.
>
> @Mingmin
>         Thanks a lot for sharing your learnings, However in case of any
> *UserCodeException* while processing the element as part of ParDo after
> materializing the window, the pipeline drops the unprocessed elements and
> restarts. Is this expected from Beam?
>
>
> On Wed, Jan 17, 2018 at 2:13 AM, Kenneth Knowles <kl...@google.com> wrote:
>
>> Is there a JIRA filed for this? I think this discussion should live in a
>> ticket.
>>
>> Kenn
>>
>> On Wed, Jan 10, 2018 at 11:00 AM, Mingmin Xu <mi...@gmail.com> wrote:
>>
>>> @Sushil, I have several jobs running on KafkaIO+FlinkRunner, hope my
>>> experience can help you a bit.
>>>
>>> For short, `ENABLE_AUTO_COMMIT_CONFIG` doesn't meet your requirement,
>>> you need to leverage exactly-once checkpoint/savepoint in Flink. The reason
>>> is,  with `ENABLE_AUTO_COMMIT_CONFIG` KafkaIO commits offset after data is
>>> read, and once job is restarted KafkaIO reads from last_committed_offset.
>>>
>>> In my jobs, I enable external(external should be optional I think?)
>>> checkpoint on exactly-once mode in Flink cluster. When the job auto-restart
>>> on failures it doesn't lost data. In case of manually redeploy the job, I
>>> use savepoint to cancel and launch the job.
>>>
>>> Mingmin
>>>
>>> On Wed, Jan 10, 2018 at 10:34 AM, Raghu Angadi <ra...@google.com>
>>> wrote:
>>>
>>>> How often does your pipeline checkpoint/snapshot? If the failure
>>>> happens before the first checkpoint, the pipeline could restart without any
>>>> state, in which case KafkaIO would read from latest offset. There is
>>>> probably some way to verify if pipeline is restarting from a checkpoint.
>>>>
>>>> On Sun, Jan 7, 2018 at 10:57 PM, Sushil Ks <su...@gmail.com> wrote:
>>>>
>>>>> HI Aljoscha,
>>>>>                    The issue is let's say I consumed 100 elements in 5
>>>>> mins Fixed Window with *GroupByKey* and later I applied *ParDO* for
>>>>> all those elements. If there is an issue while processing element 70 in
>>>>> *ParDo *and the pipeline restarts with *UserCodeException *it's
>>>>> skipping the rest 30 elements. Wanted to know if this is expected? In case
>>>>> if you still having doubt let me know will share a code snippet.
>>>>>
>>>>> Regards,
>>>>> Sushil Ks
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> ----
>>> Mingmin
>>>
>>
>>
>

Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

Posted by Raghu Angadi <ra...@google.com>.
Hi Sushil,

That is expected behavior. If you don't have any saved checkpoint, the
pipeline would start from scratch. It does not have any connection to
previous run.

On Thu, Feb 1, 2018 at 1:29 AM, Sushil Ks <su...@gmail.com> wrote:

> Hi,
>        Apologies for delay in my reply,
>
> @Raghu Angadi
>             This checkpoints 20 mins, as you mentioned before any
> checkpoint is created and if the pipeline restarts, it's reading from the
> latest offset.
>
> @Mingmin
>         Thanks a lot for sharing your learnings, However in case of any
> *UserCodeException* while processing the element as part of ParDo after
> materializing the window, the pipeline drops the unprocessed elements and
> restarts. Is this expected from Beam?
>
>
> On Wed, Jan 17, 2018 at 2:13 AM, Kenneth Knowles <kl...@google.com> wrote:
>
>> Is there a JIRA filed for this? I think this discussion should live in a
>> ticket.
>>
>> Kenn
>>
>> On Wed, Jan 10, 2018 at 11:00 AM, Mingmin Xu <mi...@gmail.com> wrote:
>>
>>> @Sushil, I have several jobs running on KafkaIO+FlinkRunner, hope my
>>> experience can help you a bit.
>>>
>>> For short, `ENABLE_AUTO_COMMIT_CONFIG` doesn't meet your requirement,
>>> you need to leverage exactly-once checkpoint/savepoint in Flink. The reason
>>> is,  with `ENABLE_AUTO_COMMIT_CONFIG` KafkaIO commits offset after data is
>>> read, and once job is restarted KafkaIO reads from last_committed_offset.
>>>
>>> In my jobs, I enable external(external should be optional I think?)
>>> checkpoint on exactly-once mode in Flink cluster. When the job auto-restart
>>> on failures it doesn't lost data. In case of manually redeploy the job, I
>>> use savepoint to cancel and launch the job.
>>>
>>> Mingmin
>>>
>>> On Wed, Jan 10, 2018 at 10:34 AM, Raghu Angadi <ra...@google.com>
>>> wrote:
>>>
>>>> How often does your pipeline checkpoint/snapshot? If the failure
>>>> happens before the first checkpoint, the pipeline could restart without any
>>>> state, in which case KafkaIO would read from latest offset. There is
>>>> probably some way to verify if pipeline is restarting from a checkpoint.
>>>>
>>>> On Sun, Jan 7, 2018 at 10:57 PM, Sushil Ks <su...@gmail.com> wrote:
>>>>
>>>>> HI Aljoscha,
>>>>>                    The issue is let's say I consumed 100 elements in 5
>>>>> mins Fixed Window with *GroupByKey* and later I applied *ParDO* for
>>>>> all those elements. If there is an issue while processing element 70 in
>>>>> *ParDo *and the pipeline restarts with *UserCodeException *it's
>>>>> skipping the rest 30 elements. Wanted to know if this is expected? In case
>>>>> if you still having doubt let me know will share a code snippet.
>>>>>
>>>>> Regards,
>>>>> Sushil Ks
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> ----
>>> Mingmin
>>>
>>
>>
>