You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Evan Galpin <ev...@gmail.com> on 2021/08/10 15:54:23 UTC

[Dataflow][Java][2.30.0] Best practice for clearing stuck data in streaming pipeline

Hi all,

I recently had an experience where a streaming pipeline became "clogged"
due to invalid data reaching the final step in my pipeline such that the
data was causing a non-transient error when writing to my Sink.  Since the
job is a streaming job, the element (bundle) was continuously retrying.

What options are there for getting out of this state when it occurs? I
attempted to add validation and update the streaming job to remove the bad
entity; though the update was successful, I believe the bad entity was
already checkpointed (?) further downstream in the pipeline. What then?

And for something like a database schema and evolving it over time, what is
the typical solution?

- Should pipelines mirror a DB schema and do validation of all data types
in the pipeline?
- Should all sinks implement a way to remove non-transient failures from
retrying and output them via PCollectionTuple (such as with BigQuery failed
inserts)?

Re: [Dataflow][Java][2.30.0] Best practice for clearing stuck data in streaming pipeline

Posted by Evan Galpin <ev...@gmail.com>.
>
> It is likely that the incorrect transform was edited...
>

It appears you're right; I tried to reproduce but  this time was able to
clear the issue by making "the same" code change and updating the
pipeline.  I believe it was just a change in the wrong place in code.

Good to know this works! Thanks Luke 🙏

On Tue, Aug 10, 2021 at 1:19 PM Luke Cwik <lc...@google.com> wrote:

>
>
> On Tue, Aug 10, 2021 at 10:11 AM Evan Galpin <ev...@gmail.com>
> wrote:
>
>> Thanks for your responses Luke. One point I have confusion over:
>>
>> * Modify the sink implementation to do what you want with the bad data
>>> and update the pipeline.
>>>
>>
>>  I modified the sink implementation to ignore the specific error that was
>> the problem and updated the pipeline. The update succeeded.  However, the
>> problem persisted.  How might that happen?  Is there caching involved?
>> Checkpointing? I changed the very last method called in the pipeline in
>> order to ensure the validation would apply, but the problem persisted.
>>
>
> It is likely that the incorrect transform was edited. You should take a
> look at the worker logs and find the exception that is being thrown and
> find the step name it is associated with (e.g.
> BigQueryIO/Write/StreamingInserts) and browse the source for a
> "StreamingInserts" transform that is applied from the "Write" transform
> which is applied from the BigQueryIO transform.
>
>
>>
>> And in the case where one is using a Sink which is a built-in IO module
>> of Beam, modification of the Sink may not be immediately feasible. Is the
>> only recourse in that case to drain a job an start a new one?
>>
>> The Beam IOs are open source allowing you to edit the code and build a
> new version locally which you would consume in your project. Dataflow does
> have an optimization and replaces the pubsub source/sink but all others to
> my knowledge should be based upon the Apache Beam source.
>
>
>> On Tue, Aug 10, 2021 at 12:54 PM Luke Cwik <lc...@google.com> wrote:
>>
>>>
>>>
>>> On Tue, Aug 10, 2021 at 8:54 AM Evan Galpin <ev...@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I recently had an experience where a streaming pipeline became
>>>> "clogged" due to invalid data reaching the final step in my pipeline such
>>>> that the data was causing a non-transient error when writing to my Sink.
>>>> Since the job is a streaming job, the element (bundle) was continuously
>>>> retrying.
>>>>
>>>> What options are there for getting out of this state when it occurs?
>>>>
>>>
>>> * Modify the sink implementation to do what you want with the bad data
>>> and update the pipeline.
>>> * Cancel the pipeline and update the implementation to handle the bad
>>> records and rerun from last known good position reprocessing whatever was
>>> necessary.
>>>
>>> I attempted to add validation and update the streaming job to remove the
>>>> bad entity; though the update was successful, I believe the bad entity was
>>>> already checkpointed (?) further downstream in the pipeline. What then?
>>>>
>>> And for something like a database schema and evolving it over time, what
>>>> is the typical solution?
>>>>
>>>
>>> * Pipeline update containing newly updated schema before data with new
>>> schema starts rolling in.
>>> * Use a format and encoding that is agnostic to changes with a
>>> source/sink that can consume this agnostic format. See this thread[1] and
>>> others like it in the user and dev mailing lists.
>>>
>>>
>>>> - Should pipelines mirror a DB schema and do validation of all data
>>>> types in the pipeline?
>>>>
>>>
>>> Perform validation at critical points in the pipeline like data ingress
>>> and egress. Insertion of the data into the DB failing via a dead letter
>>> queue works for the cases that are loud and throw exceptions but for the
>>> cases where they are inserted successfully but are still invalid from a
>>> business logic standpoint won't be caught without validation.
>>>
>>>
>>>> - Should all sinks implement a way to remove non-transient failures
>>>> from retrying and output them via PCollectionTuple (such as with BigQuery
>>>> failed inserts)?
>>>>
>>>
>>> Yes, dead letter queues (DLQs) are quite a good solution for this since
>>> it provides a lot of flexibility and allows for a process to fix it up
>>> (typically a manual process).
>>>
>>> 1:
>>> https://lists.apache.org/thread.html/r4b31c8b76fa81dcb130397077b981ab6429f2999b9d864c815214c5a%40%3Cuser.beam.apache.org%3E
>>>
>>

Re: [Dataflow][Java][2.30.0] Best practice for clearing stuck data in streaming pipeline

Posted by Evan Galpin <ev...@gmail.com>.
>
> It is likely that the incorrect transform was edited...
>

It appears you're right; I tried to reproduce but  this time was able to
clear the issue by making "the same" code change and updating the
pipeline.  I believe it was just a change in the wrong place in code.

Good to know this works! Thanks Luke 🙏

On Tue, Aug 10, 2021 at 1:19 PM Luke Cwik <lc...@google.com> wrote:

>
>
> On Tue, Aug 10, 2021 at 10:11 AM Evan Galpin <ev...@gmail.com>
> wrote:
>
>> Thanks for your responses Luke. One point I have confusion over:
>>
>> * Modify the sink implementation to do what you want with the bad data
>>> and update the pipeline.
>>>
>>
>>  I modified the sink implementation to ignore the specific error that was
>> the problem and updated the pipeline. The update succeeded.  However, the
>> problem persisted.  How might that happen?  Is there caching involved?
>> Checkpointing? I changed the very last method called in the pipeline in
>> order to ensure the validation would apply, but the problem persisted.
>>
>
> It is likely that the incorrect transform was edited. You should take a
> look at the worker logs and find the exception that is being thrown and
> find the step name it is associated with (e.g.
> BigQueryIO/Write/StreamingInserts) and browse the source for a
> "StreamingInserts" transform that is applied from the "Write" transform
> which is applied from the BigQueryIO transform.
>
>
>>
>> And in the case where one is using a Sink which is a built-in IO module
>> of Beam, modification of the Sink may not be immediately feasible. Is the
>> only recourse in that case to drain a job an start a new one?
>>
>> The Beam IOs are open source allowing you to edit the code and build a
> new version locally which you would consume in your project. Dataflow does
> have an optimization and replaces the pubsub source/sink but all others to
> my knowledge should be based upon the Apache Beam source.
>
>
>> On Tue, Aug 10, 2021 at 12:54 PM Luke Cwik <lc...@google.com> wrote:
>>
>>>
>>>
>>> On Tue, Aug 10, 2021 at 8:54 AM Evan Galpin <ev...@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I recently had an experience where a streaming pipeline became
>>>> "clogged" due to invalid data reaching the final step in my pipeline such
>>>> that the data was causing a non-transient error when writing to my Sink.
>>>> Since the job is a streaming job, the element (bundle) was continuously
>>>> retrying.
>>>>
>>>> What options are there for getting out of this state when it occurs?
>>>>
>>>
>>> * Modify the sink implementation to do what you want with the bad data
>>> and update the pipeline.
>>> * Cancel the pipeline and update the implementation to handle the bad
>>> records and rerun from last known good position reprocessing whatever was
>>> necessary.
>>>
>>> I attempted to add validation and update the streaming job to remove the
>>>> bad entity; though the update was successful, I believe the bad entity was
>>>> already checkpointed (?) further downstream in the pipeline. What then?
>>>>
>>> And for something like a database schema and evolving it over time, what
>>>> is the typical solution?
>>>>
>>>
>>> * Pipeline update containing newly updated schema before data with new
>>> schema starts rolling in.
>>> * Use a format and encoding that is agnostic to changes with a
>>> source/sink that can consume this agnostic format. See this thread[1] and
>>> others like it in the user and dev mailing lists.
>>>
>>>
>>>> - Should pipelines mirror a DB schema and do validation of all data
>>>> types in the pipeline?
>>>>
>>>
>>> Perform validation at critical points in the pipeline like data ingress
>>> and egress. Insertion of the data into the DB failing via a dead letter
>>> queue works for the cases that are loud and throw exceptions but for the
>>> cases where they are inserted successfully but are still invalid from a
>>> business logic standpoint won't be caught without validation.
>>>
>>>
>>>> - Should all sinks implement a way to remove non-transient failures
>>>> from retrying and output them via PCollectionTuple (such as with BigQuery
>>>> failed inserts)?
>>>>
>>>
>>> Yes, dead letter queues (DLQs) are quite a good solution for this since
>>> it provides a lot of flexibility and allows for a process to fix it up
>>> (typically a manual process).
>>>
>>> 1:
>>> https://lists.apache.org/thread.html/r4b31c8b76fa81dcb130397077b981ab6429f2999b9d864c815214c5a%40%3Cuser.beam.apache.org%3E
>>>
>>

Re: [Dataflow][Java][2.30.0] Best practice for clearing stuck data in streaming pipeline

Posted by Luke Cwik <lc...@google.com>.
On Tue, Aug 10, 2021 at 10:11 AM Evan Galpin <ev...@gmail.com> wrote:

> Thanks for your responses Luke. One point I have confusion over:
>
> * Modify the sink implementation to do what you want with the bad data and
>> update the pipeline.
>>
>
>  I modified the sink implementation to ignore the specific error that was
> the problem and updated the pipeline. The update succeeded.  However, the
> problem persisted.  How might that happen?  Is there caching involved?
> Checkpointing? I changed the very last method called in the pipeline in
> order to ensure the validation would apply, but the problem persisted.
>

It is likely that the incorrect transform was edited. You should take a
look at the worker logs and find the exception that is being thrown and
find the step name it is associated with (e.g.
BigQueryIO/Write/StreamingInserts) and browse the source for a
"StreamingInserts" transform that is applied from the "Write" transform
which is applied from the BigQueryIO transform.


>
> And in the case where one is using a Sink which is a built-in IO module of
> Beam, modification of the Sink may not be immediately feasible. Is the only
> recourse in that case to drain a job an start a new one?
>
> The Beam IOs are open source allowing you to edit the code and build a new
version locally which you would consume in your project. Dataflow does have
an optimization and replaces the pubsub source/sink but all others to my
knowledge should be based upon the Apache Beam source.


> On Tue, Aug 10, 2021 at 12:54 PM Luke Cwik <lc...@google.com> wrote:
>
>>
>>
>> On Tue, Aug 10, 2021 at 8:54 AM Evan Galpin <ev...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I recently had an experience where a streaming pipeline became "clogged"
>>> due to invalid data reaching the final step in my pipeline such that the
>>> data was causing a non-transient error when writing to my Sink.  Since the
>>> job is a streaming job, the element (bundle) was continuously retrying.
>>>
>>> What options are there for getting out of this state when it occurs?
>>>
>>
>> * Modify the sink implementation to do what you want with the bad data
>> and update the pipeline.
>> * Cancel the pipeline and update the implementation to handle the bad
>> records and rerun from last known good position reprocessing whatever was
>> necessary.
>>
>> I attempted to add validation and update the streaming job to remove the
>>> bad entity; though the update was successful, I believe the bad entity was
>>> already checkpointed (?) further downstream in the pipeline. What then?
>>>
>> And for something like a database schema and evolving it over time, what
>>> is the typical solution?
>>>
>>
>> * Pipeline update containing newly updated schema before data with new
>> schema starts rolling in.
>> * Use a format and encoding that is agnostic to changes with a
>> source/sink that can consume this agnostic format. See this thread[1] and
>> others like it in the user and dev mailing lists.
>>
>>
>>> - Should pipelines mirror a DB schema and do validation of all data
>>> types in the pipeline?
>>>
>>
>> Perform validation at critical points in the pipeline like data ingress
>> and egress. Insertion of the data into the DB failing via a dead letter
>> queue works for the cases that are loud and throw exceptions but for the
>> cases where they are inserted successfully but are still invalid from a
>> business logic standpoint won't be caught without validation.
>>
>>
>>> - Should all sinks implement a way to remove non-transient failures from
>>> retrying and output them via PCollectionTuple (such as with BigQuery failed
>>> inserts)?
>>>
>>
>> Yes, dead letter queues (DLQs) are quite a good solution for this since
>> it provides a lot of flexibility and allows for a process to fix it up
>> (typically a manual process).
>>
>> 1:
>> https://lists.apache.org/thread.html/r4b31c8b76fa81dcb130397077b981ab6429f2999b9d864c815214c5a%40%3Cuser.beam.apache.org%3E
>>
>

Re: [Dataflow][Java][2.30.0] Best practice for clearing stuck data in streaming pipeline

Posted by Luke Cwik <lc...@google.com>.
On Tue, Aug 10, 2021 at 10:11 AM Evan Galpin <ev...@gmail.com> wrote:

> Thanks for your responses Luke. One point I have confusion over:
>
> * Modify the sink implementation to do what you want with the bad data and
>> update the pipeline.
>>
>
>  I modified the sink implementation to ignore the specific error that was
> the problem and updated the pipeline. The update succeeded.  However, the
> problem persisted.  How might that happen?  Is there caching involved?
> Checkpointing? I changed the very last method called in the pipeline in
> order to ensure the validation would apply, but the problem persisted.
>

It is likely that the incorrect transform was edited. You should take a
look at the worker logs and find the exception that is being thrown and
find the step name it is associated with (e.g.
BigQueryIO/Write/StreamingInserts) and browse the source for a
"StreamingInserts" transform that is applied from the "Write" transform
which is applied from the BigQueryIO transform.


>
> And in the case where one is using a Sink which is a built-in IO module of
> Beam, modification of the Sink may not be immediately feasible. Is the only
> recourse in that case to drain a job an start a new one?
>
> The Beam IOs are open source allowing you to edit the code and build a new
version locally which you would consume in your project. Dataflow does have
an optimization and replaces the pubsub source/sink but all others to my
knowledge should be based upon the Apache Beam source.


> On Tue, Aug 10, 2021 at 12:54 PM Luke Cwik <lc...@google.com> wrote:
>
>>
>>
>> On Tue, Aug 10, 2021 at 8:54 AM Evan Galpin <ev...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I recently had an experience where a streaming pipeline became "clogged"
>>> due to invalid data reaching the final step in my pipeline such that the
>>> data was causing a non-transient error when writing to my Sink.  Since the
>>> job is a streaming job, the element (bundle) was continuously retrying.
>>>
>>> What options are there for getting out of this state when it occurs?
>>>
>>
>> * Modify the sink implementation to do what you want with the bad data
>> and update the pipeline.
>> * Cancel the pipeline and update the implementation to handle the bad
>> records and rerun from last known good position reprocessing whatever was
>> necessary.
>>
>> I attempted to add validation and update the streaming job to remove the
>>> bad entity; though the update was successful, I believe the bad entity was
>>> already checkpointed (?) further downstream in the pipeline. What then?
>>>
>> And for something like a database schema and evolving it over time, what
>>> is the typical solution?
>>>
>>
>> * Pipeline update containing newly updated schema before data with new
>> schema starts rolling in.
>> * Use a format and encoding that is agnostic to changes with a
>> source/sink that can consume this agnostic format. See this thread[1] and
>> others like it in the user and dev mailing lists.
>>
>>
>>> - Should pipelines mirror a DB schema and do validation of all data
>>> types in the pipeline?
>>>
>>
>> Perform validation at critical points in the pipeline like data ingress
>> and egress. Insertion of the data into the DB failing via a dead letter
>> queue works for the cases that are loud and throw exceptions but for the
>> cases where they are inserted successfully but are still invalid from a
>> business logic standpoint won't be caught without validation.
>>
>>
>>> - Should all sinks implement a way to remove non-transient failures from
>>> retrying and output them via PCollectionTuple (such as with BigQuery failed
>>> inserts)?
>>>
>>
>> Yes, dead letter queues (DLQs) are quite a good solution for this since
>> it provides a lot of flexibility and allows for a process to fix it up
>> (typically a manual process).
>>
>> 1:
>> https://lists.apache.org/thread.html/r4b31c8b76fa81dcb130397077b981ab6429f2999b9d864c815214c5a%40%3Cuser.beam.apache.org%3E
>>
>

Re: [Dataflow][Java][2.30.0] Best practice for clearing stuck data in streaming pipeline

Posted by Evan Galpin <ev...@gmail.com>.
Thanks for your responses Luke. One point I have confusion over:

* Modify the sink implementation to do what you want with the bad data and
> update the pipeline.
>

 I modified the sink implementation to ignore the specific error that was
the problem and updated the pipeline. The update succeeded.  However, the
problem persisted.  How might that happen?  Is there caching involved?
Checkpointing? I changed the very last method called in the pipeline in
order to ensure the validation would apply, but the problem persisted.

And in the case where one is using a Sink which is a built-in IO module of
Beam, modification of the Sink may not be immediately feasible. Is the only
recourse in that case to drain a job an start a new one?

On Tue, Aug 10, 2021 at 12:54 PM Luke Cwik <lc...@google.com> wrote:

>
>
> On Tue, Aug 10, 2021 at 8:54 AM Evan Galpin <ev...@gmail.com> wrote:
>
>> Hi all,
>>
>> I recently had an experience where a streaming pipeline became "clogged"
>> due to invalid data reaching the final step in my pipeline such that the
>> data was causing a non-transient error when writing to my Sink.  Since the
>> job is a streaming job, the element (bundle) was continuously retrying.
>>
>> What options are there for getting out of this state when it occurs?
>>
>
> * Modify the sink implementation to do what you want with the bad data and
> update the pipeline.
> * Cancel the pipeline and update the implementation to handle the bad
> records and rerun from last known good position reprocessing whatever was
> necessary.
>
> I attempted to add validation and update the streaming job to remove the
>> bad entity; though the update was successful, I believe the bad entity was
>> already checkpointed (?) further downstream in the pipeline. What then?
>>
> And for something like a database schema and evolving it over time, what
>> is the typical solution?
>>
>
> * Pipeline update containing newly updated schema before data with new
> schema starts rolling in.
> * Use a format and encoding that is agnostic to changes with a source/sink
> that can consume this agnostic format. See this thread[1] and others like
> it in the user and dev mailing lists.
>
>
>> - Should pipelines mirror a DB schema and do validation of all data types
>> in the pipeline?
>>
>
> Perform validation at critical points in the pipeline like data ingress
> and egress. Insertion of the data into the DB failing via a dead letter
> queue works for the cases that are loud and throw exceptions but for the
> cases where they are inserted successfully but are still invalid from a
> business logic standpoint won't be caught without validation.
>
>
>> - Should all sinks implement a way to remove non-transient failures from
>> retrying and output them via PCollectionTuple (such as with BigQuery failed
>> inserts)?
>>
>
> Yes, dead letter queues (DLQs) are quite a good solution for this since it
> provides a lot of flexibility and allows for a process to fix it up
> (typically a manual process).
>
> 1:
> https://lists.apache.org/thread.html/r4b31c8b76fa81dcb130397077b981ab6429f2999b9d864c815214c5a%40%3Cuser.beam.apache.org%3E
>

Re: [Dataflow][Java][2.30.0] Best practice for clearing stuck data in streaming pipeline

Posted by Evan Galpin <ev...@gmail.com>.
Thanks for your responses Luke. One point I have confusion over:

* Modify the sink implementation to do what you want with the bad data and
> update the pipeline.
>

 I modified the sink implementation to ignore the specific error that was
the problem and updated the pipeline. The update succeeded.  However, the
problem persisted.  How might that happen?  Is there caching involved?
Checkpointing? I changed the very last method called in the pipeline in
order to ensure the validation would apply, but the problem persisted.

And in the case where one is using a Sink which is a built-in IO module of
Beam, modification of the Sink may not be immediately feasible. Is the only
recourse in that case to drain a job an start a new one?

On Tue, Aug 10, 2021 at 12:54 PM Luke Cwik <lc...@google.com> wrote:

>
>
> On Tue, Aug 10, 2021 at 8:54 AM Evan Galpin <ev...@gmail.com> wrote:
>
>> Hi all,
>>
>> I recently had an experience where a streaming pipeline became "clogged"
>> due to invalid data reaching the final step in my pipeline such that the
>> data was causing a non-transient error when writing to my Sink.  Since the
>> job is a streaming job, the element (bundle) was continuously retrying.
>>
>> What options are there for getting out of this state when it occurs?
>>
>
> * Modify the sink implementation to do what you want with the bad data and
> update the pipeline.
> * Cancel the pipeline and update the implementation to handle the bad
> records and rerun from last known good position reprocessing whatever was
> necessary.
>
> I attempted to add validation and update the streaming job to remove the
>> bad entity; though the update was successful, I believe the bad entity was
>> already checkpointed (?) further downstream in the pipeline. What then?
>>
> And for something like a database schema and evolving it over time, what
>> is the typical solution?
>>
>
> * Pipeline update containing newly updated schema before data with new
> schema starts rolling in.
> * Use a format and encoding that is agnostic to changes with a source/sink
> that can consume this agnostic format. See this thread[1] and others like
> it in the user and dev mailing lists.
>
>
>> - Should pipelines mirror a DB schema and do validation of all data types
>> in the pipeline?
>>
>
> Perform validation at critical points in the pipeline like data ingress
> and egress. Insertion of the data into the DB failing via a dead letter
> queue works for the cases that are loud and throw exceptions but for the
> cases where they are inserted successfully but are still invalid from a
> business logic standpoint won't be caught without validation.
>
>
>> - Should all sinks implement a way to remove non-transient failures from
>> retrying and output them via PCollectionTuple (such as with BigQuery failed
>> inserts)?
>>
>
> Yes, dead letter queues (DLQs) are quite a good solution for this since it
> provides a lot of flexibility and allows for a process to fix it up
> (typically a manual process).
>
> 1:
> https://lists.apache.org/thread.html/r4b31c8b76fa81dcb130397077b981ab6429f2999b9d864c815214c5a%40%3Cuser.beam.apache.org%3E
>

Re: [Dataflow][Java][2.30.0] Best practice for clearing stuck data in streaming pipeline

Posted by Luke Cwik <lc...@google.com>.
On Tue, Aug 10, 2021 at 8:54 AM Evan Galpin <ev...@gmail.com> wrote:

> Hi all,
>
> I recently had an experience where a streaming pipeline became "clogged"
> due to invalid data reaching the final step in my pipeline such that the
> data was causing a non-transient error when writing to my Sink.  Since the
> job is a streaming job, the element (bundle) was continuously retrying.
>
> What options are there for getting out of this state when it occurs?
>

* Modify the sink implementation to do what you want with the bad data and
update the pipeline.
* Cancel the pipeline and update the implementation to handle the bad
records and rerun from last known good position reprocessing whatever was
necessary.

I attempted to add validation and update the streaming job to remove the
> bad entity; though the update was successful, I believe the bad entity was
> already checkpointed (?) further downstream in the pipeline. What then?
>
And for something like a database schema and evolving it over time, what is
> the typical solution?
>

* Pipeline update containing newly updated schema before data with new
schema starts rolling in.
* Use a format and encoding that is agnostic to changes with a source/sink
that can consume this agnostic format. See this thread[1] and others like
it in the user and dev mailing lists.


> - Should pipelines mirror a DB schema and do validation of all data types
> in the pipeline?
>

Perform validation at critical points in the pipeline like data ingress and
egress. Insertion of the data into the DB failing via a dead letter queue
works for the cases that are loud and throw exceptions but for the cases
where they are inserted successfully but are still invalid from a business
logic standpoint won't be caught without validation.


> - Should all sinks implement a way to remove non-transient failures from
> retrying and output them via PCollectionTuple (such as with BigQuery failed
> inserts)?
>

Yes, dead letter queues (DLQs) are quite a good solution for this since it
provides a lot of flexibility and allows for a process to fix it up
(typically a manual process).

1:
https://lists.apache.org/thread.html/r4b31c8b76fa81dcb130397077b981ab6429f2999b9d864c815214c5a%40%3Cuser.beam.apache.org%3E

Re: [Dataflow][Java][2.30.0] Best practice for clearing stuck data in streaming pipeline

Posted by Luke Cwik <lc...@google.com>.
On Tue, Aug 10, 2021 at 8:54 AM Evan Galpin <ev...@gmail.com> wrote:

> Hi all,
>
> I recently had an experience where a streaming pipeline became "clogged"
> due to invalid data reaching the final step in my pipeline such that the
> data was causing a non-transient error when writing to my Sink.  Since the
> job is a streaming job, the element (bundle) was continuously retrying.
>
> What options are there for getting out of this state when it occurs?
>

* Modify the sink implementation to do what you want with the bad data and
update the pipeline.
* Cancel the pipeline and update the implementation to handle the bad
records and rerun from last known good position reprocessing whatever was
necessary.

I attempted to add validation and update the streaming job to remove the
> bad entity; though the update was successful, I believe the bad entity was
> already checkpointed (?) further downstream in the pipeline. What then?
>
And for something like a database schema and evolving it over time, what is
> the typical solution?
>

* Pipeline update containing newly updated schema before data with new
schema starts rolling in.
* Use a format and encoding that is agnostic to changes with a source/sink
that can consume this agnostic format. See this thread[1] and others like
it in the user and dev mailing lists.


> - Should pipelines mirror a DB schema and do validation of all data types
> in the pipeline?
>

Perform validation at critical points in the pipeline like data ingress and
egress. Insertion of the data into the DB failing via a dead letter queue
works for the cases that are loud and throw exceptions but for the cases
where they are inserted successfully but are still invalid from a business
logic standpoint won't be caught without validation.


> - Should all sinks implement a way to remove non-transient failures from
> retrying and output them via PCollectionTuple (such as with BigQuery failed
> inserts)?
>

Yes, dead letter queues (DLQs) are quite a good solution for this since it
provides a lot of flexibility and allows for a process to fix it up
(typically a manual process).

1:
https://lists.apache.org/thread.html/r4b31c8b76fa81dcb130397077b981ab6429f2999b9d864c815214c5a%40%3Cuser.beam.apache.org%3E