You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Lukasz Cwik <lc...@google.com> on 2017/12/15 17:51:59 UTC

Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

+dev@beam.apache.org

On Thu, Dec 14, 2017 at 11:27 PM, Sushil Ks <su...@gmail.com> wrote:

> Hi Likasz,
>            I am not sure whether I can reproduce in the DirectRunner, as
> am taking retry and checkpoint mechanism of Flink into consideration. In
> other words, the issue am facing is, any exception in the operation post
> GroupByKey and the pipeline restarts, those particular elements are not
> being processed in the next run.
>
> On Wed, Dec 13, 2017 at 4:01 AM, Lukasz Cwik <lc...@google.com> wrote:
>
>> That seems incorrect. Please file a JIRA and provide an example + data
>> that shows the error using the DirectRunner.
>>
>> On Tue, Dec 12, 2017 at 2:51 AM, Sushil Ks <su...@gmail.com> wrote:
>>
>>> Hi,
>>>          I am running a fixed window with GroupByKey on FlinkRunner and
>>> have noticed that any exception and restart before the GroupByKey operation
>>> the Kafka consumer is replaying the data from the particular offset,
>>> however, post that any exception occurs and the pipeline restart the Kafka
>>> is consuming from the latest offset. Is this expected?
>>>
>>> Regards,
>>> Sushil Ks
>>>
>>
>>
>

Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

Posted by Sushil Ks <su...@gmail.com>.
Yea have tried ENABLE_AUTO_COMMIT_CONFIG , its helping me before GroupByKey
but not after that. In other words once the window is materialized and if
any exception occurs while processing the elements in it, its dropping the
remaining elements of that window when restarted.

On Jan 8, 2018 12:45 PM, "Reuven Lax" <re...@google.com> wrote:

> Do you set ENABLE_AUTO_COMMIT_CONFIG?
>
> On Sun, Jan 7, 2018 at 10:57 PM, Sushil Ks <su...@gmail.com> wrote:
>
>> HI Aljoscha,
>>                    The issue is let's say I consumed 100 elements in 5
>> mins Fixed Window with *GroupByKey* and later I applied *ParDO* for all
>> those elements. If there is an issue while processing element 70 in
>> *ParDo *and the pipeline restarts with *UserCodeException *it's skipping
>> the rest 30 elements. Wanted to know if this is expected? In case if you
>> still having doubt let me know will share a code snippet.
>>
>> Regards,
>> Sushil Ks
>>
>
>

Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

Posted by Sushil Ks <su...@gmail.com>.
Yea have tried ENABLE_AUTO_COMMIT_CONFIG , its helping me before GroupByKey
but not after that. In other words once the window is materialized and if
any exception occurs while processing the elements in it, its dropping the
remaining elements of that window when restarted.

On Jan 8, 2018 12:45 PM, "Reuven Lax" <re...@google.com> wrote:

> Do you set ENABLE_AUTO_COMMIT_CONFIG?
>
> On Sun, Jan 7, 2018 at 10:57 PM, Sushil Ks <su...@gmail.com> wrote:
>
>> HI Aljoscha,
>>                    The issue is let's say I consumed 100 elements in 5
>> mins Fixed Window with *GroupByKey* and later I applied *ParDO* for all
>> those elements. If there is an issue while processing element 70 in
>> *ParDo *and the pipeline restarts with *UserCodeException *it's skipping
>> the rest 30 elements. Wanted to know if this is expected? In case if you
>> still having doubt let me know will share a code snippet.
>>
>> Regards,
>> Sushil Ks
>>
>
>

Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

Posted by Reuven Lax <re...@google.com>.
Do you set ENABLE_AUTO_COMMIT_CONFIG?

On Sun, Jan 7, 2018 at 10:57 PM, Sushil Ks <su...@gmail.com> wrote:

> HI Aljoscha,
>                    The issue is let's say I consumed 100 elements in 5
> mins Fixed Window with *GroupByKey* and later I applied *ParDO* for all
> those elements. If there is an issue while processing element 70 in
> *ParDo *and the pipeline restarts with *UserCodeException *it's skipping
> the rest 30 elements. Wanted to know if this is expected? In case if you
> still having doubt let me know will share a code snippet.
>
> Regards,
> Sushil Ks
>

Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

Posted by Sushil Ks <su...@gmail.com>.
Thanks, Raghu.

On Tue, Feb 6, 2018 at 6:41 AM, Raghu Angadi <ra...@google.com> wrote:

> Hi Sushil,
>
> That is expected behavior. If you don't have any saved checkpoint, the
> pipeline would start from scratch. It does not have any connection to
> previous run.
>
> On Thu, Feb 1, 2018 at 1:29 AM, Sushil Ks <su...@gmail.com> wrote:
>
>> Hi,
>>        Apologies for delay in my reply,
>>
>> @Raghu Angadi
>>             This checkpoints 20 mins, as you mentioned before any
>> checkpoint is created and if the pipeline restarts, it's reading from the
>> latest offset.
>>
>> @Mingmin
>>         Thanks a lot for sharing your learnings, However in case of any
>> *UserCodeException* while processing the element as part of ParDo after
>> materializing the window, the pipeline drops the unprocessed elements and
>> restarts. Is this expected from Beam?
>>
>>
>> On Wed, Jan 17, 2018 at 2:13 AM, Kenneth Knowles <kl...@google.com> wrote:
>>
>>> Is there a JIRA filed for this? I think this discussion should live in a
>>> ticket.
>>>
>>> Kenn
>>>
>>> On Wed, Jan 10, 2018 at 11:00 AM, Mingmin Xu <mi...@gmail.com> wrote:
>>>
>>>> @Sushil, I have several jobs running on KafkaIO+FlinkRunner, hope my
>>>> experience can help you a bit.
>>>>
>>>> For short, `ENABLE_AUTO_COMMIT_CONFIG` doesn't meet your requirement,
>>>> you need to leverage exactly-once checkpoint/savepoint in Flink. The reason
>>>> is,  with `ENABLE_AUTO_COMMIT_CONFIG` KafkaIO commits offset after data is
>>>> read, and once job is restarted KafkaIO reads from last_committed_offset.
>>>>
>>>> In my jobs, I enable external(external should be optional I think?)
>>>> checkpoint on exactly-once mode in Flink cluster. When the job auto-restart
>>>> on failures it doesn't lost data. In case of manually redeploy the job, I
>>>> use savepoint to cancel and launch the job.
>>>>
>>>> Mingmin
>>>>
>>>> On Wed, Jan 10, 2018 at 10:34 AM, Raghu Angadi <ra...@google.com>
>>>> wrote:
>>>>
>>>>> How often does your pipeline checkpoint/snapshot? If the failure
>>>>> happens before the first checkpoint, the pipeline could restart without any
>>>>> state, in which case KafkaIO would read from latest offset. There is
>>>>> probably some way to verify if pipeline is restarting from a checkpoint.
>>>>>
>>>>> On Sun, Jan 7, 2018 at 10:57 PM, Sushil Ks <su...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> HI Aljoscha,
>>>>>>                    The issue is let's say I consumed 100 elements in
>>>>>> 5 mins Fixed Window with *GroupByKey* and later I applied *ParDO* for
>>>>>> all those elements. If there is an issue while processing element 70 in
>>>>>> *ParDo *and the pipeline restarts with *UserCodeException *it's
>>>>>> skipping the rest 30 elements. Wanted to know if this is expected? In case
>>>>>> if you still having doubt let me know will share a code snippet.
>>>>>>
>>>>>> Regards,
>>>>>> Sushil Ks
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> ----
>>>> Mingmin
>>>>
>>>
>>>
>>
>

Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

Posted by Sushil Ks <su...@gmail.com>.
Thanks, Raghu.

On Tue, Feb 6, 2018 at 6:41 AM, Raghu Angadi <ra...@google.com> wrote:

> Hi Sushil,
>
> That is expected behavior. If you don't have any saved checkpoint, the
> pipeline would start from scratch. It does not have any connection to
> previous run.
>
> On Thu, Feb 1, 2018 at 1:29 AM, Sushil Ks <su...@gmail.com> wrote:
>
>> Hi,
>>        Apologies for delay in my reply,
>>
>> @Raghu Angadi
>>             This checkpoints 20 mins, as you mentioned before any
>> checkpoint is created and if the pipeline restarts, it's reading from the
>> latest offset.
>>
>> @Mingmin
>>         Thanks a lot for sharing your learnings, However in case of any
>> *UserCodeException* while processing the element as part of ParDo after
>> materializing the window, the pipeline drops the unprocessed elements and
>> restarts. Is this expected from Beam?
>>
>>
>> On Wed, Jan 17, 2018 at 2:13 AM, Kenneth Knowles <kl...@google.com> wrote:
>>
>>> Is there a JIRA filed for this? I think this discussion should live in a
>>> ticket.
>>>
>>> Kenn
>>>
>>> On Wed, Jan 10, 2018 at 11:00 AM, Mingmin Xu <mi...@gmail.com> wrote:
>>>
>>>> @Sushil, I have several jobs running on KafkaIO+FlinkRunner, hope my
>>>> experience can help you a bit.
>>>>
>>>> For short, `ENABLE_AUTO_COMMIT_CONFIG` doesn't meet your requirement,
>>>> you need to leverage exactly-once checkpoint/savepoint in Flink. The reason
>>>> is,  with `ENABLE_AUTO_COMMIT_CONFIG` KafkaIO commits offset after data is
>>>> read, and once job is restarted KafkaIO reads from last_committed_offset.
>>>>
>>>> In my jobs, I enable external(external should be optional I think?)
>>>> checkpoint on exactly-once mode in Flink cluster. When the job auto-restart
>>>> on failures it doesn't lost data. In case of manually redeploy the job, I
>>>> use savepoint to cancel and launch the job.
>>>>
>>>> Mingmin
>>>>
>>>> On Wed, Jan 10, 2018 at 10:34 AM, Raghu Angadi <ra...@google.com>
>>>> wrote:
>>>>
>>>>> How often does your pipeline checkpoint/snapshot? If the failure
>>>>> happens before the first checkpoint, the pipeline could restart without any
>>>>> state, in which case KafkaIO would read from latest offset. There is
>>>>> probably some way to verify if pipeline is restarting from a checkpoint.
>>>>>
>>>>> On Sun, Jan 7, 2018 at 10:57 PM, Sushil Ks <su...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> HI Aljoscha,
>>>>>>                    The issue is let's say I consumed 100 elements in
>>>>>> 5 mins Fixed Window with *GroupByKey* and later I applied *ParDO* for
>>>>>> all those elements. If there is an issue while processing element 70 in
>>>>>> *ParDo *and the pipeline restarts with *UserCodeException *it's
>>>>>> skipping the rest 30 elements. Wanted to know if this is expected? In case
>>>>>> if you still having doubt let me know will share a code snippet.
>>>>>>
>>>>>> Regards,
>>>>>> Sushil Ks
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> ----
>>>> Mingmin
>>>>
>>>
>>>
>>
>

Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

Posted by Raghu Angadi <ra...@google.com>.
Hi Sushil,

That is expected behavior. If you don't have any saved checkpoint, the
pipeline would start from scratch. It does not have any connection to
previous run.

On Thu, Feb 1, 2018 at 1:29 AM, Sushil Ks <su...@gmail.com> wrote:

> Hi,
>        Apologies for delay in my reply,
>
> @Raghu Angadi
>             This checkpoints 20 mins, as you mentioned before any
> checkpoint is created and if the pipeline restarts, it's reading from the
> latest offset.
>
> @Mingmin
>         Thanks a lot for sharing your learnings, However in case of any
> *UserCodeException* while processing the element as part of ParDo after
> materializing the window, the pipeline drops the unprocessed elements and
> restarts. Is this expected from Beam?
>
>
> On Wed, Jan 17, 2018 at 2:13 AM, Kenneth Knowles <kl...@google.com> wrote:
>
>> Is there a JIRA filed for this? I think this discussion should live in a
>> ticket.
>>
>> Kenn
>>
>> On Wed, Jan 10, 2018 at 11:00 AM, Mingmin Xu <mi...@gmail.com> wrote:
>>
>>> @Sushil, I have several jobs running on KafkaIO+FlinkRunner, hope my
>>> experience can help you a bit.
>>>
>>> For short, `ENABLE_AUTO_COMMIT_CONFIG` doesn't meet your requirement,
>>> you need to leverage exactly-once checkpoint/savepoint in Flink. The reason
>>> is,  with `ENABLE_AUTO_COMMIT_CONFIG` KafkaIO commits offset after data is
>>> read, and once job is restarted KafkaIO reads from last_committed_offset.
>>>
>>> In my jobs, I enable external(external should be optional I think?)
>>> checkpoint on exactly-once mode in Flink cluster. When the job auto-restart
>>> on failures it doesn't lost data. In case of manually redeploy the job, I
>>> use savepoint to cancel and launch the job.
>>>
>>> Mingmin
>>>
>>> On Wed, Jan 10, 2018 at 10:34 AM, Raghu Angadi <ra...@google.com>
>>> wrote:
>>>
>>>> How often does your pipeline checkpoint/snapshot? If the failure
>>>> happens before the first checkpoint, the pipeline could restart without any
>>>> state, in which case KafkaIO would read from latest offset. There is
>>>> probably some way to verify if pipeline is restarting from a checkpoint.
>>>>
>>>> On Sun, Jan 7, 2018 at 10:57 PM, Sushil Ks <su...@gmail.com> wrote:
>>>>
>>>>> HI Aljoscha,
>>>>>                    The issue is let's say I consumed 100 elements in 5
>>>>> mins Fixed Window with *GroupByKey* and later I applied *ParDO* for
>>>>> all those elements. If there is an issue while processing element 70 in
>>>>> *ParDo *and the pipeline restarts with *UserCodeException *it's
>>>>> skipping the rest 30 elements. Wanted to know if this is expected? In case
>>>>> if you still having doubt let me know will share a code snippet.
>>>>>
>>>>> Regards,
>>>>> Sushil Ks
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> ----
>>> Mingmin
>>>
>>
>>
>

Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

Posted by Raghu Angadi <ra...@google.com>.
Hi Sushil,

That is expected behavior. If you don't have any saved checkpoint, the
pipeline would start from scratch. It does not have any connection to
previous run.

On Thu, Feb 1, 2018 at 1:29 AM, Sushil Ks <su...@gmail.com> wrote:

> Hi,
>        Apologies for delay in my reply,
>
> @Raghu Angadi
>             This checkpoints 20 mins, as you mentioned before any
> checkpoint is created and if the pipeline restarts, it's reading from the
> latest offset.
>
> @Mingmin
>         Thanks a lot for sharing your learnings, However in case of any
> *UserCodeException* while processing the element as part of ParDo after
> materializing the window, the pipeline drops the unprocessed elements and
> restarts. Is this expected from Beam?
>
>
> On Wed, Jan 17, 2018 at 2:13 AM, Kenneth Knowles <kl...@google.com> wrote:
>
>> Is there a JIRA filed for this? I think this discussion should live in a
>> ticket.
>>
>> Kenn
>>
>> On Wed, Jan 10, 2018 at 11:00 AM, Mingmin Xu <mi...@gmail.com> wrote:
>>
>>> @Sushil, I have several jobs running on KafkaIO+FlinkRunner, hope my
>>> experience can help you a bit.
>>>
>>> For short, `ENABLE_AUTO_COMMIT_CONFIG` doesn't meet your requirement,
>>> you need to leverage exactly-once checkpoint/savepoint in Flink. The reason
>>> is,  with `ENABLE_AUTO_COMMIT_CONFIG` KafkaIO commits offset after data is
>>> read, and once job is restarted KafkaIO reads from last_committed_offset.
>>>
>>> In my jobs, I enable external(external should be optional I think?)
>>> checkpoint on exactly-once mode in Flink cluster. When the job auto-restart
>>> on failures it doesn't lost data. In case of manually redeploy the job, I
>>> use savepoint to cancel and launch the job.
>>>
>>> Mingmin
>>>
>>> On Wed, Jan 10, 2018 at 10:34 AM, Raghu Angadi <ra...@google.com>
>>> wrote:
>>>
>>>> How often does your pipeline checkpoint/snapshot? If the failure
>>>> happens before the first checkpoint, the pipeline could restart without any
>>>> state, in which case KafkaIO would read from latest offset. There is
>>>> probably some way to verify if pipeline is restarting from a checkpoint.
>>>>
>>>> On Sun, Jan 7, 2018 at 10:57 PM, Sushil Ks <su...@gmail.com> wrote:
>>>>
>>>>> HI Aljoscha,
>>>>>                    The issue is let's say I consumed 100 elements in 5
>>>>> mins Fixed Window with *GroupByKey* and later I applied *ParDO* for
>>>>> all those elements. If there is an issue while processing element 70 in
>>>>> *ParDo *and the pipeline restarts with *UserCodeException *it's
>>>>> skipping the rest 30 elements. Wanted to know if this is expected? In case
>>>>> if you still having doubt let me know will share a code snippet.
>>>>>
>>>>> Regards,
>>>>> Sushil Ks
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> ----
>>> Mingmin
>>>
>>
>>
>

Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

Posted by Sushil Ks <su...@gmail.com>.
Hi,
       Apologies for delay in my reply,

@Raghu Angadi
            This checkpoints 20 mins, as you mentioned before any
checkpoint is created and if the pipeline restarts, it's reading from the
latest offset.

@Mingmin
        Thanks a lot for sharing your learnings, However in case of any
*UserCodeException* while processing the element as part of ParDo after
materializing the window, the pipeline drops the unprocessed elements and
restarts. Is this expected from Beam?


On Wed, Jan 17, 2018 at 2:13 AM, Kenneth Knowles <kl...@google.com> wrote:

> Is there a JIRA filed for this? I think this discussion should live in a
> ticket.
>
> Kenn
>
> On Wed, Jan 10, 2018 at 11:00 AM, Mingmin Xu <mi...@gmail.com> wrote:
>
>> @Sushil, I have several jobs running on KafkaIO+FlinkRunner, hope my
>> experience can help you a bit.
>>
>> For short, `ENABLE_AUTO_COMMIT_CONFIG` doesn't meet your requirement, you
>> need to leverage exactly-once checkpoint/savepoint in Flink. The reason
>> is,  with `ENABLE_AUTO_COMMIT_CONFIG` KafkaIO commits offset after data is
>> read, and once job is restarted KafkaIO reads from last_committed_offset.
>>
>> In my jobs, I enable external(external should be optional I think?)
>> checkpoint on exactly-once mode in Flink cluster. When the job auto-restart
>> on failures it doesn't lost data. In case of manually redeploy the job, I
>> use savepoint to cancel and launch the job.
>>
>> Mingmin
>>
>> On Wed, Jan 10, 2018 at 10:34 AM, Raghu Angadi <ra...@google.com>
>> wrote:
>>
>>> How often does your pipeline checkpoint/snapshot? If the failure happens
>>> before the first checkpoint, the pipeline could restart without any state,
>>> in which case KafkaIO would read from latest offset. There is probably some
>>> way to verify if pipeline is restarting from a checkpoint.
>>>
>>> On Sun, Jan 7, 2018 at 10:57 PM, Sushil Ks <su...@gmail.com> wrote:
>>>
>>>> HI Aljoscha,
>>>>                    The issue is let's say I consumed 100 elements in 5
>>>> mins Fixed Window with *GroupByKey* and later I applied *ParDO* for
>>>> all those elements. If there is an issue while processing element 70 in
>>>> *ParDo *and the pipeline restarts with *UserCodeException *it's
>>>> skipping the rest 30 elements. Wanted to know if this is expected? In case
>>>> if you still having doubt let me know will share a code snippet.
>>>>
>>>> Regards,
>>>> Sushil Ks
>>>>
>>>
>>>
>>
>>
>> --
>> ----
>> Mingmin
>>
>
>

Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

Posted by Sushil Ks <su...@gmail.com>.
Hi,
       Apologies for delay in my reply,

@Raghu Angadi
            This checkpoints 20 mins, as you mentioned before any
checkpoint is created and if the pipeline restarts, it's reading from the
latest offset.

@Mingmin
        Thanks a lot for sharing your learnings, However in case of any
*UserCodeException* while processing the element as part of ParDo after
materializing the window, the pipeline drops the unprocessed elements and
restarts. Is this expected from Beam?


On Wed, Jan 17, 2018 at 2:13 AM, Kenneth Knowles <kl...@google.com> wrote:

> Is there a JIRA filed for this? I think this discussion should live in a
> ticket.
>
> Kenn
>
> On Wed, Jan 10, 2018 at 11:00 AM, Mingmin Xu <mi...@gmail.com> wrote:
>
>> @Sushil, I have several jobs running on KafkaIO+FlinkRunner, hope my
>> experience can help you a bit.
>>
>> For short, `ENABLE_AUTO_COMMIT_CONFIG` doesn't meet your requirement, you
>> need to leverage exactly-once checkpoint/savepoint in Flink. The reason
>> is,  with `ENABLE_AUTO_COMMIT_CONFIG` KafkaIO commits offset after data is
>> read, and once job is restarted KafkaIO reads from last_committed_offset.
>>
>> In my jobs, I enable external(external should be optional I think?)
>> checkpoint on exactly-once mode in Flink cluster. When the job auto-restart
>> on failures it doesn't lost data. In case of manually redeploy the job, I
>> use savepoint to cancel and launch the job.
>>
>> Mingmin
>>
>> On Wed, Jan 10, 2018 at 10:34 AM, Raghu Angadi <ra...@google.com>
>> wrote:
>>
>>> How often does your pipeline checkpoint/snapshot? If the failure happens
>>> before the first checkpoint, the pipeline could restart without any state,
>>> in which case KafkaIO would read from latest offset. There is probably some
>>> way to verify if pipeline is restarting from a checkpoint.
>>>
>>> On Sun, Jan 7, 2018 at 10:57 PM, Sushil Ks <su...@gmail.com> wrote:
>>>
>>>> HI Aljoscha,
>>>>                    The issue is let's say I consumed 100 elements in 5
>>>> mins Fixed Window with *GroupByKey* and later I applied *ParDO* for
>>>> all those elements. If there is an issue while processing element 70 in
>>>> *ParDo *and the pipeline restarts with *UserCodeException *it's
>>>> skipping the rest 30 elements. Wanted to know if this is expected? In case
>>>> if you still having doubt let me know will share a code snippet.
>>>>
>>>> Regards,
>>>> Sushil Ks
>>>>
>>>
>>>
>>
>>
>> --
>> ----
>> Mingmin
>>
>
>

Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

Posted by Kenneth Knowles <kl...@google.com>.
Is there a JIRA filed for this? I think this discussion should live in a
ticket.

Kenn

On Wed, Jan 10, 2018 at 11:00 AM, Mingmin Xu <mi...@gmail.com> wrote:

> @Sushil, I have several jobs running on KafkaIO+FlinkRunner, hope my
> experience can help you a bit.
>
> For short, `ENABLE_AUTO_COMMIT_CONFIG` doesn't meet your requirement, you
> need to leverage exactly-once checkpoint/savepoint in Flink. The reason
> is,  with `ENABLE_AUTO_COMMIT_CONFIG` KafkaIO commits offset after data is
> read, and once job is restarted KafkaIO reads from last_committed_offset.
>
> In my jobs, I enable external(external should be optional I think?)
> checkpoint on exactly-once mode in Flink cluster. When the job auto-restart
> on failures it doesn't lost data. In case of manually redeploy the job, I
> use savepoint to cancel and launch the job.
>
> Mingmin
>
> On Wed, Jan 10, 2018 at 10:34 AM, Raghu Angadi <ra...@google.com> wrote:
>
>> How often does your pipeline checkpoint/snapshot? If the failure happens
>> before the first checkpoint, the pipeline could restart without any state,
>> in which case KafkaIO would read from latest offset. There is probably some
>> way to verify if pipeline is restarting from a checkpoint.
>>
>> On Sun, Jan 7, 2018 at 10:57 PM, Sushil Ks <su...@gmail.com> wrote:
>>
>>> HI Aljoscha,
>>>                    The issue is let's say I consumed 100 elements in 5
>>> mins Fixed Window with *GroupByKey* and later I applied *ParDO* for all
>>> those elements. If there is an issue while processing element 70 in
>>> *ParDo *and the pipeline restarts with *UserCodeException *it's
>>> skipping the rest 30 elements. Wanted to know if this is expected? In case
>>> if you still having doubt let me know will share a code snippet.
>>>
>>> Regards,
>>> Sushil Ks
>>>
>>
>>
>
>
> --
> ----
> Mingmin
>

Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

Posted by Kenneth Knowles <kl...@google.com>.
Is there a JIRA filed for this? I think this discussion should live in a
ticket.

Kenn

On Wed, Jan 10, 2018 at 11:00 AM, Mingmin Xu <mi...@gmail.com> wrote:

> @Sushil, I have several jobs running on KafkaIO+FlinkRunner, hope my
> experience can help you a bit.
>
> For short, `ENABLE_AUTO_COMMIT_CONFIG` doesn't meet your requirement, you
> need to leverage exactly-once checkpoint/savepoint in Flink. The reason
> is,  with `ENABLE_AUTO_COMMIT_CONFIG` KafkaIO commits offset after data is
> read, and once job is restarted KafkaIO reads from last_committed_offset.
>
> In my jobs, I enable external(external should be optional I think?)
> checkpoint on exactly-once mode in Flink cluster. When the job auto-restart
> on failures it doesn't lost data. In case of manually redeploy the job, I
> use savepoint to cancel and launch the job.
>
> Mingmin
>
> On Wed, Jan 10, 2018 at 10:34 AM, Raghu Angadi <ra...@google.com> wrote:
>
>> How often does your pipeline checkpoint/snapshot? If the failure happens
>> before the first checkpoint, the pipeline could restart without any state,
>> in which case KafkaIO would read from latest offset. There is probably some
>> way to verify if pipeline is restarting from a checkpoint.
>>
>> On Sun, Jan 7, 2018 at 10:57 PM, Sushil Ks <su...@gmail.com> wrote:
>>
>>> HI Aljoscha,
>>>                    The issue is let's say I consumed 100 elements in 5
>>> mins Fixed Window with *GroupByKey* and later I applied *ParDO* for all
>>> those elements. If there is an issue while processing element 70 in
>>> *ParDo *and the pipeline restarts with *UserCodeException *it's
>>> skipping the rest 30 elements. Wanted to know if this is expected? In case
>>> if you still having doubt let me know will share a code snippet.
>>>
>>> Regards,
>>> Sushil Ks
>>>
>>
>>
>
>
> --
> ----
> Mingmin
>

Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

Posted by Mingmin Xu <mi...@gmail.com>.
@Sushil, I have several jobs running on KafkaIO+FlinkRunner, hope my
experience can help you a bit.

For short, `ENABLE_AUTO_COMMIT_CONFIG` doesn't meet your requirement, you
need to leverage exactly-once checkpoint/savepoint in Flink. The reason
is,  with `ENABLE_AUTO_COMMIT_CONFIG` KafkaIO commits offset after data is
read, and once job is restarted KafkaIO reads from last_committed_offset.

In my jobs, I enable external(external should be optional I think?)
checkpoint on exactly-once mode in Flink cluster. When the job auto-restart
on failures it doesn't lost data. In case of manually redeploy the job, I
use savepoint to cancel and launch the job.

Mingmin

On Wed, Jan 10, 2018 at 10:34 AM, Raghu Angadi <ra...@google.com> wrote:

> How often does your pipeline checkpoint/snapshot? If the failure happens
> before the first checkpoint, the pipeline could restart without any state,
> in which case KafkaIO would read from latest offset. There is probably some
> way to verify if pipeline is restarting from a checkpoint.
>
> On Sun, Jan 7, 2018 at 10:57 PM, Sushil Ks <su...@gmail.com> wrote:
>
>> HI Aljoscha,
>>                    The issue is let's say I consumed 100 elements in 5
>> mins Fixed Window with *GroupByKey* and later I applied *ParDO* for all
>> those elements. If there is an issue while processing element 70 in
>> *ParDo *and the pipeline restarts with *UserCodeException *it's skipping
>> the rest 30 elements. Wanted to know if this is expected? In case if you
>> still having doubt let me know will share a code snippet.
>>
>> Regards,
>> Sushil Ks
>>
>
>


-- 
----
Mingmin

Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

Posted by Mingmin Xu <mi...@gmail.com>.
@Sushil, I have several jobs running on KafkaIO+FlinkRunner, hope my
experience can help you a bit.

For short, `ENABLE_AUTO_COMMIT_CONFIG` doesn't meet your requirement, you
need to leverage exactly-once checkpoint/savepoint in Flink. The reason
is,  with `ENABLE_AUTO_COMMIT_CONFIG` KafkaIO commits offset after data is
read, and once job is restarted KafkaIO reads from last_committed_offset.

In my jobs, I enable external(external should be optional I think?)
checkpoint on exactly-once mode in Flink cluster. When the job auto-restart
on failures it doesn't lost data. In case of manually redeploy the job, I
use savepoint to cancel and launch the job.

Mingmin

On Wed, Jan 10, 2018 at 10:34 AM, Raghu Angadi <ra...@google.com> wrote:

> How often does your pipeline checkpoint/snapshot? If the failure happens
> before the first checkpoint, the pipeline could restart without any state,
> in which case KafkaIO would read from latest offset. There is probably some
> way to verify if pipeline is restarting from a checkpoint.
>
> On Sun, Jan 7, 2018 at 10:57 PM, Sushil Ks <su...@gmail.com> wrote:
>
>> HI Aljoscha,
>>                    The issue is let's say I consumed 100 elements in 5
>> mins Fixed Window with *GroupByKey* and later I applied *ParDO* for all
>> those elements. If there is an issue while processing element 70 in
>> *ParDo *and the pipeline restarts with *UserCodeException *it's skipping
>> the rest 30 elements. Wanted to know if this is expected? In case if you
>> still having doubt let me know will share a code snippet.
>>
>> Regards,
>> Sushil Ks
>>
>
>


-- 
----
Mingmin

Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

Posted by Raghu Angadi <ra...@google.com>.
How often does your pipeline checkpoint/snapshot? If the failure happens
before the first checkpoint, the pipeline could restart without any state,
in which case KafkaIO would read from latest offset. There is probably some
way to verify if pipeline is restarting from a checkpoint.

On Sun, Jan 7, 2018 at 10:57 PM, Sushil Ks <su...@gmail.com> wrote:

> HI Aljoscha,
>                    The issue is let's say I consumed 100 elements in 5
> mins Fixed Window with *GroupByKey* and later I applied *ParDO* for all
> those elements. If there is an issue while processing element 70 in
> *ParDo *and the pipeline restarts with *UserCodeException *it's skipping
> the rest 30 elements. Wanted to know if this is expected? In case if you
> still having doubt let me know will share a code snippet.
>
> Regards,
> Sushil Ks
>

Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

Posted by Raghu Angadi <ra...@google.com>.
How often does your pipeline checkpoint/snapshot? If the failure happens
before the first checkpoint, the pipeline could restart without any state,
in which case KafkaIO would read from latest offset. There is probably some
way to verify if pipeline is restarting from a checkpoint.

On Sun, Jan 7, 2018 at 10:57 PM, Sushil Ks <su...@gmail.com> wrote:

> HI Aljoscha,
>                    The issue is let's say I consumed 100 elements in 5
> mins Fixed Window with *GroupByKey* and later I applied *ParDO* for all
> those elements. If there is an issue while processing element 70 in
> *ParDo *and the pipeline restarts with *UserCodeException *it's skipping
> the rest 30 elements. Wanted to know if this is expected? In case if you
> still having doubt let me know will share a code snippet.
>
> Regards,
> Sushil Ks
>

Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

Posted by Sushil Ks <su...@gmail.com>.
HI Aljoscha,
                   The issue is let's say I consumed 100 elements in 5 mins
Fixed Window with *GroupByKey* and later I applied *ParDO* for all those
elements. If there is an issue while processing element 70 in *ParDo *and
the pipeline restarts with *UserCodeException *it's skipping the rest 30
elements. Wanted to know if this is expected? In case if you still having
doubt let me know will share a code snippet.

Regards,
Sushil Ks

Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

Posted by Sushil Ks <su...@gmail.com>.
HI Aljoscha,
                   The issue is let's say I consumed 100 elements in 5 mins
Fixed Window with *GroupByKey* and later I applied *ParDO* for all those
elements. If there is an issue while processing element 70 in *ParDo *and
the pipeline restarts with *UserCodeException *it's skipping the rest 30
elements. Wanted to know if this is expected? In case if you still having
doubt let me know will share a code snippet.

Regards,
Sushil Ks

Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

Posted by Aljoscha Krettek <al...@apache.org>.
What is the exact behaviour you're seeing. It's not 100 % clear to me from the initial message.

Best,
Aljoscha

> On 4. Jan 2018, at 12:56, Sushil Ks <su...@gmail.com> wrote:
> 
> *bump*
> 
> 
> On Dec 15, 2017 11:22 PM, "Lukasz Cwik" <lcwik@google.com <ma...@google.com>> wrote:
> +dev@beam.apache.org <ma...@beam.apache.org>
> 
> On Thu, Dec 14, 2017 at 11:27 PM, Sushil Ks <sushil416@gmail.com <ma...@gmail.com>> wrote:
> Hi Likasz,
>            I am not sure whether I can reproduce in the DirectRunner, as am taking retry and checkpoint mechanism of Flink into consideration. In other words, the issue am facing is, any exception in the operation post GroupByKey and the pipeline restarts, those particular elements are not being processed in the next run.
> 
> On Wed, Dec 13, 2017 at 4:01 AM, Lukasz Cwik <lcwik@google.com <ma...@google.com>> wrote:
> That seems incorrect. Please file a JIRA and provide an example + data that shows the error using the DirectRunner.
> 
> On Tue, Dec 12, 2017 at 2:51 AM, Sushil Ks <sushil416@gmail.com <ma...@gmail.com>> wrote:
> Hi,
>          I am running a fixed window with GroupByKey on FlinkRunner and have noticed that any exception and restart before the GroupByKey operation the Kafka consumer is replaying the data from the particular offset, however, post that any exception occurs and the pipeline restart the Kafka is consuming from the latest offset. Is this expected?
> 
> Regards,
> Sushil Ks
> 
> 
> 


Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

Posted by Aljoscha Krettek <al...@apache.org>.
What is the exact behaviour you're seeing. It's not 100 % clear to me from the initial message.

Best,
Aljoscha

> On 4. Jan 2018, at 12:56, Sushil Ks <su...@gmail.com> wrote:
> 
> *bump*
> 
> 
> On Dec 15, 2017 11:22 PM, "Lukasz Cwik" <lcwik@google.com <ma...@google.com>> wrote:
> +dev@beam.apache.org <ma...@beam.apache.org>
> 
> On Thu, Dec 14, 2017 at 11:27 PM, Sushil Ks <sushil416@gmail.com <ma...@gmail.com>> wrote:
> Hi Likasz,
>            I am not sure whether I can reproduce in the DirectRunner, as am taking retry and checkpoint mechanism of Flink into consideration. In other words, the issue am facing is, any exception in the operation post GroupByKey and the pipeline restarts, those particular elements are not being processed in the next run.
> 
> On Wed, Dec 13, 2017 at 4:01 AM, Lukasz Cwik <lcwik@google.com <ma...@google.com>> wrote:
> That seems incorrect. Please file a JIRA and provide an example + data that shows the error using the DirectRunner.
> 
> On Tue, Dec 12, 2017 at 2:51 AM, Sushil Ks <sushil416@gmail.com <ma...@gmail.com>> wrote:
> Hi,
>          I am running a fixed window with GroupByKey on FlinkRunner and have noticed that any exception and restart before the GroupByKey operation the Kafka consumer is replaying the data from the particular offset, however, post that any exception occurs and the pipeline restart the Kafka is consuming from the latest offset. Is this expected?
> 
> Regards,
> Sushil Ks
> 
> 
> 


Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

Posted by Sushil Ks <su...@gmail.com>.
*bump*

On Dec 15, 2017 11:22 PM, "Lukasz Cwik" <lc...@google.com> wrote:

> +dev@beam.apache.org
>
> On Thu, Dec 14, 2017 at 11:27 PM, Sushil Ks <su...@gmail.com> wrote:
>
>> Hi Likasz,
>>            I am not sure whether I can reproduce in the DirectRunner, as
>> am taking retry and checkpoint mechanism of Flink into consideration. In
>> other words, the issue am facing is, any exception in the operation post
>> GroupByKey and the pipeline restarts, those particular elements are not
>> being processed in the next run.
>>
>> On Wed, Dec 13, 2017 at 4:01 AM, Lukasz Cwik <lc...@google.com> wrote:
>>
>>> That seems incorrect. Please file a JIRA and provide an example + data
>>> that shows the error using the DirectRunner.
>>>
>>> On Tue, Dec 12, 2017 at 2:51 AM, Sushil Ks <su...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>          I am running a fixed window with GroupByKey on FlinkRunner and
>>>> have noticed that any exception and restart before the GroupByKey operation
>>>> the Kafka consumer is replaying the data from the particular offset,
>>>> however, post that any exception occurs and the pipeline restart the Kafka
>>>> is consuming from the latest offset. Is this expected?
>>>>
>>>> Regards,
>>>> Sushil Ks
>>>>
>>>
>>>
>>
>

Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

Posted by Sushil Ks <su...@gmail.com>.
*bump*

On Dec 15, 2017 11:22 PM, "Lukasz Cwik" <lc...@google.com> wrote:

> +dev@beam.apache.org
>
> On Thu, Dec 14, 2017 at 11:27 PM, Sushil Ks <su...@gmail.com> wrote:
>
>> Hi Likasz,
>>            I am not sure whether I can reproduce in the DirectRunner, as
>> am taking retry and checkpoint mechanism of Flink into consideration. In
>> other words, the issue am facing is, any exception in the operation post
>> GroupByKey and the pipeline restarts, those particular elements are not
>> being processed in the next run.
>>
>> On Wed, Dec 13, 2017 at 4:01 AM, Lukasz Cwik <lc...@google.com> wrote:
>>
>>> That seems incorrect. Please file a JIRA and provide an example + data
>>> that shows the error using the DirectRunner.
>>>
>>> On Tue, Dec 12, 2017 at 2:51 AM, Sushil Ks <su...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>          I am running a fixed window with GroupByKey on FlinkRunner and
>>>> have noticed that any exception and restart before the GroupByKey operation
>>>> the Kafka consumer is replaying the data from the particular offset,
>>>> however, post that any exception occurs and the pipeline restart the Kafka
>>>> is consuming from the latest offset. Is this expected?
>>>>
>>>> Regards,
>>>> Sushil Ks
>>>>
>>>
>>>
>>
>