You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Tom Fennelly <tf...@cloudbees.com> on 2020/07/22 11:07:05 UTC

Recommended pattern for implementing a DLQ with Flink+Kafka

Hi.

I've been searching blogs etc trying to see if there are
established patterns/mechanisms for reprocessing of failed messages via
something like a DLQ. I've read about using checkpointing and restarting
tasks (not what we want because we want to keep processing forward) and
then also how some use side outputs to filter "bad" data to a DLQ style
topic. Kafka has dead letter topic configs too but it seems that can't
really be used from inside Flink (from what I can see).

We're running Flink+Kafka. I'm new to Flink+Kafka so not sure if there just
isn't a defined pattern for it, or if I'm just not asking the right
questions in my searches. I searched the archives here and don't see
anything either, which obviously makes me think that I'm not thinking about
this in the "Flink way" :-|

Regards,

Tom.

Re: Recommended pattern for implementing a DLQ with Flink+Kafka

Posted by Arvid Heise <ar...@ververica.com>.
Hi Tom,

using side outputs is actually the established Flink pattern in that
regard. The advantage of side output is that you do not depend on the DLQ
concept of the source system, which is incredibly useful if you read from
multiple systems.

Most commonly, the side-output is then outputted to another Kafka topic
with the respective exception to be inspected manually (for broken data /
programming errors).

For external systems, you'd usually use a retry if it's part of the
enrichment, because often the follow-up steps depend on it. If the external
system is just used as a sink and only rarely experiences outages, I'd
recommend using a sink or at least revert to async IO.
If the external system is just used optionally or behaves very flaky, then
having a DLQ with a separate retry topology/job is very valid.

Since you append a complete Flink program on the side-output, you can also
add recovery logic to it. For example, you could go to a fallback external
system, such as a different geolocation service, possibly maintained by an
external provider (so your internal service is more or less used as a cache
to save money).

On Fri, Jul 24, 2020 at 3:40 PM Tom Fennelly <tf...@cloudbees.com>
wrote:

> Thanks Robert.
>
> On Fri, Jul 24, 2020 at 2:32 PM Robert Metzger <rm...@apache.org>
> wrote:
>
>> Hey Tom,
>>
>> I'm not aware of any patterns for this problem, sorry. Intuitively, I
>> would send dead letters to a separate Kafka topic.
>>
>> Best,
>> Robert
>>
>>
>> On Wed, Jul 22, 2020 at 7:22 PM Tom Fennelly <tf...@cloudbees.com>
>> wrote:
>>
>>> Thanks Chen.
>>>
>>> I'm thinking about errors that occur while processing a record/message
>>> that shouldn't be retried until after some "action" has been taken Vs
>>> flooding the system with pointless retries e.g.
>>>
>>>    - A side output step might involve an API call to an external system
>>>    and that system is down atm so there's no point retrying until further
>>>    notice. For this we want to be able to send something to a DLQ.
>>>    - We have some bad code that is resulting in an uncaught exception
>>>    in very specific cases. We want these to go to a DLQ and only be retried
>>>    after the appropriate fix has been made.
>>>
>>> The possible scenarios for this are numerous so I think my main question
>>> would be ... are there established general Flink patterns or best practices
>>> that can be applied for this, or is it something we'd need to hand-role on
>>> a case by case basis with a side output type solution such as in your
>>> example? We can do that but I just wanted to make sure I wasn't missing
>>> anything before heading down that road.
>>>
>>> Regards,
>>>
>>> Tom.
>>>
>>>
>>> On Wed, Jul 22, 2020 at 5:46 PM Chen Qin <qi...@gmail.com> wrote:
>>>
>>>> Could you more specific on what “failed message” means here?
>>>>
>>>> In general side output can do something like were
>>>>
>>>>
>>>>
>>>> def process(ele) {
>>>>
>>>>    try{
>>>>
>>>>         biz
>>>>
>>>> } catch {
>>>>
>>>>    Sideout( ele + exception context)
>>>>
>>>> }
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>> process(func).sideoutput(tag).addSink(kafkasink)
>>>>
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Chen
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *From: *Eleanore Jin <el...@gmail.com>
>>>> *Sent: *Wednesday, July 22, 2020 9:25 AM
>>>> *To: *Tom Fennelly <tf...@cloudbees.com>
>>>> *Cc: *user <us...@flink.apache.org>
>>>> *Subject: *Re: Recommended pattern for implementing a DLQ with
>>>> Flink+Kafka
>>>>
>>>>
>>>>
>>>> +1 we have a similar use case for message schema validation.
>>>>
>>>>
>>>>
>>>> Eleanore
>>>>
>>>>
>>>>
>>>> On Wed, Jul 22, 2020 at 4:12 AM Tom Fennelly <tf...@cloudbees.com>
>>>> wrote:
>>>>
>>>> Hi.
>>>>
>>>>
>>>>
>>>> I've been searching blogs etc trying to see if there are
>>>> established patterns/mechanisms for reprocessing of failed messages via
>>>> something like a DLQ. I've read about using checkpointing and restarting
>>>> tasks (not what we want because we want to keep processing forward) and
>>>> then also how some use side outputs to filter "bad" data to a DLQ style
>>>> topic. Kafka has dead letter topic configs too but it seems that can't
>>>> really be used from inside Flink (from what I can see).
>>>>
>>>>
>>>>
>>>> We're running Flink+Kafka. I'm new to Flink+Kafka so not sure if there
>>>> just isn't a defined pattern for it, or if I'm just not asking the right
>>>> questions in my searches. I searched the archives here and don't see
>>>> anything either, which obviously makes me think that I'm not thinking about
>>>> this in the "Flink way" :-|
>>>>
>>>>
>>>>
>>>> Regards,
>>>>
>>>>
>>>>
>>>> Tom.
>>>>
>>>>
>>>>
>>>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Recommended pattern for implementing a DLQ with Flink+Kafka

Posted by Tom Fennelly <tf...@cloudbees.com>.
Thanks Robert.

On Fri, Jul 24, 2020 at 2:32 PM Robert Metzger <rm...@apache.org> wrote:

> Hey Tom,
>
> I'm not aware of any patterns for this problem, sorry. Intuitively, I
> would send dead letters to a separate Kafka topic.
>
> Best,
> Robert
>
>
> On Wed, Jul 22, 2020 at 7:22 PM Tom Fennelly <tf...@cloudbees.com>
> wrote:
>
>> Thanks Chen.
>>
>> I'm thinking about errors that occur while processing a record/message
>> that shouldn't be retried until after some "action" has been taken Vs
>> flooding the system with pointless retries e.g.
>>
>>    - A side output step might involve an API call to an external system
>>    and that system is down atm so there's no point retrying until further
>>    notice. For this we want to be able to send something to a DLQ.
>>    - We have some bad code that is resulting in an uncaught exception in
>>    very specific cases. We want these to go to a DLQ and only be retried after
>>    the appropriate fix has been made.
>>
>> The possible scenarios for this are numerous so I think my main question
>> would be ... are there established general Flink patterns or best practices
>> that can be applied for this, or is it something we'd need to hand-role on
>> a case by case basis with a side output type solution such as in your
>> example? We can do that but I just wanted to make sure I wasn't missing
>> anything before heading down that road.
>>
>> Regards,
>>
>> Tom.
>>
>>
>> On Wed, Jul 22, 2020 at 5:46 PM Chen Qin <qi...@gmail.com> wrote:
>>
>>> Could you more specific on what “failed message” means here?
>>>
>>> In general side output can do something like were
>>>
>>>
>>>
>>> def process(ele) {
>>>
>>>    try{
>>>
>>>         biz
>>>
>>> } catch {
>>>
>>>    Sideout( ele + exception context)
>>>
>>> }
>>>
>>> }
>>>
>>>
>>>
>>> process(func).sideoutput(tag).addSink(kafkasink)
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Chen
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *From: *Eleanore Jin <el...@gmail.com>
>>> *Sent: *Wednesday, July 22, 2020 9:25 AM
>>> *To: *Tom Fennelly <tf...@cloudbees.com>
>>> *Cc: *user <us...@flink.apache.org>
>>> *Subject: *Re: Recommended pattern for implementing a DLQ with
>>> Flink+Kafka
>>>
>>>
>>>
>>> +1 we have a similar use case for message schema validation.
>>>
>>>
>>>
>>> Eleanore
>>>
>>>
>>>
>>> On Wed, Jul 22, 2020 at 4:12 AM Tom Fennelly <tf...@cloudbees.com>
>>> wrote:
>>>
>>> Hi.
>>>
>>>
>>>
>>> I've been searching blogs etc trying to see if there are
>>> established patterns/mechanisms for reprocessing of failed messages via
>>> something like a DLQ. I've read about using checkpointing and restarting
>>> tasks (not what we want because we want to keep processing forward) and
>>> then also how some use side outputs to filter "bad" data to a DLQ style
>>> topic. Kafka has dead letter topic configs too but it seems that can't
>>> really be used from inside Flink (from what I can see).
>>>
>>>
>>>
>>> We're running Flink+Kafka. I'm new to Flink+Kafka so not sure if there
>>> just isn't a defined pattern for it, or if I'm just not asking the right
>>> questions in my searches. I searched the archives here and don't see
>>> anything either, which obviously makes me think that I'm not thinking about
>>> this in the "Flink way" :-|
>>>
>>>
>>>
>>> Regards,
>>>
>>>
>>>
>>> Tom.
>>>
>>>
>>>
>>

Re: Recommended pattern for implementing a DLQ with Flink+Kafka

Posted by Robert Metzger <rm...@apache.org>.
Hey Tom,

I'm not aware of any patterns for this problem, sorry. Intuitively, I would
send dead letters to a separate Kafka topic.

Best,
Robert


On Wed, Jul 22, 2020 at 7:22 PM Tom Fennelly <tf...@cloudbees.com>
wrote:

> Thanks Chen.
>
> I'm thinking about errors that occur while processing a record/message
> that shouldn't be retried until after some "action" has been taken Vs
> flooding the system with pointless retries e.g.
>
>    - A side output step might involve an API call to an external system
>    and that system is down atm so there's no point retrying until further
>    notice. For this we want to be able to send something to a DLQ.
>    - We have some bad code that is resulting in an uncaught exception in
>    very specific cases. We want these to go to a DLQ and only be retried after
>    the appropriate fix has been made.
>
> The possible scenarios for this are numerous so I think my main question
> would be ... are there established general Flink patterns or best practices
> that can be applied for this, or is it something we'd need to hand-role on
> a case by case basis with a side output type solution such as in your
> example? We can do that but I just wanted to make sure I wasn't missing
> anything before heading down that road.
>
> Regards,
>
> Tom.
>
>
> On Wed, Jul 22, 2020 at 5:46 PM Chen Qin <qi...@gmail.com> wrote:
>
>> Could you more specific on what “failed message” means here?
>>
>> In general side output can do something like were
>>
>>
>>
>> def process(ele) {
>>
>>    try{
>>
>>         biz
>>
>> } catch {
>>
>>    Sideout( ele + exception context)
>>
>> }
>>
>> }
>>
>>
>>
>> process(func).sideoutput(tag).addSink(kafkasink)
>>
>>
>>
>> Thanks,
>>
>> Chen
>>
>>
>>
>>
>>
>>
>>
>> *From: *Eleanore Jin <el...@gmail.com>
>> *Sent: *Wednesday, July 22, 2020 9:25 AM
>> *To: *Tom Fennelly <tf...@cloudbees.com>
>> *Cc: *user <us...@flink.apache.org>
>> *Subject: *Re: Recommended pattern for implementing a DLQ with
>> Flink+Kafka
>>
>>
>>
>> +1 we have a similar use case for message schema validation.
>>
>>
>>
>> Eleanore
>>
>>
>>
>> On Wed, Jul 22, 2020 at 4:12 AM Tom Fennelly <tf...@cloudbees.com>
>> wrote:
>>
>> Hi.
>>
>>
>>
>> I've been searching blogs etc trying to see if there are
>> established patterns/mechanisms for reprocessing of failed messages via
>> something like a DLQ. I've read about using checkpointing and restarting
>> tasks (not what we want because we want to keep processing forward) and
>> then also how some use side outputs to filter "bad" data to a DLQ style
>> topic. Kafka has dead letter topic configs too but it seems that can't
>> really be used from inside Flink (from what I can see).
>>
>>
>>
>> We're running Flink+Kafka. I'm new to Flink+Kafka so not sure if there
>> just isn't a defined pattern for it, or if I'm just not asking the right
>> questions in my searches. I searched the archives here and don't see
>> anything either, which obviously makes me think that I'm not thinking about
>> this in the "Flink way" :-|
>>
>>
>>
>> Regards,
>>
>>
>>
>> Tom.
>>
>>
>>
>

Re: Recommended pattern for implementing a DLQ with Flink+Kafka

Posted by Tom Fennelly <tf...@cloudbees.com>.
Thanks Chen.

I'm thinking about errors that occur while processing a record/message that
shouldn't be retried until after some "action" has been taken Vs flooding
the system with pointless retries e.g.

   - A side output step might involve an API call to an external system and
   that system is down atm so there's no point retrying until further notice.
   For this we want to be able to send something to a DLQ.
   - We have some bad code that is resulting in an uncaught exception in
   very specific cases. We want these to go to a DLQ and only be retried after
   the appropriate fix has been made.

The possible scenarios for this are numerous so I think my main question
would be ... are there established general Flink patterns or best practices
that can be applied for this, or is it something we'd need to hand-role on
a case by case basis with a side output type solution such as in your
example? We can do that but I just wanted to make sure I wasn't missing
anything before heading down that road.

Regards,

Tom.


On Wed, Jul 22, 2020 at 5:46 PM Chen Qin <qi...@gmail.com> wrote:

> Could you more specific on what “failed message” means here?
>
> In general side output can do something like were
>
>
>
> def process(ele) {
>
>    try{
>
>         biz
>
> } catch {
>
>    Sideout( ele + exception context)
>
> }
>
> }
>
>
>
> process(func).sideoutput(tag).addSink(kafkasink)
>
>
>
> Thanks,
>
> Chen
>
>
>
>
>
>
>
> *From: *Eleanore Jin <el...@gmail.com>
> *Sent: *Wednesday, July 22, 2020 9:25 AM
> *To: *Tom Fennelly <tf...@cloudbees.com>
> *Cc: *user <us...@flink.apache.org>
> *Subject: *Re: Recommended pattern for implementing a DLQ with Flink+Kafka
>
>
>
> +1 we have a similar use case for message schema validation.
>
>
>
> Eleanore
>
>
>
> On Wed, Jul 22, 2020 at 4:12 AM Tom Fennelly <tf...@cloudbees.com>
> wrote:
>
> Hi.
>
>
>
> I've been searching blogs etc trying to see if there are
> established patterns/mechanisms for reprocessing of failed messages via
> something like a DLQ. I've read about using checkpointing and restarting
> tasks (not what we want because we want to keep processing forward) and
> then also how some use side outputs to filter "bad" data to a DLQ style
> topic. Kafka has dead letter topic configs too but it seems that can't
> really be used from inside Flink (from what I can see).
>
>
>
> We're running Flink+Kafka. I'm new to Flink+Kafka so not sure if there
> just isn't a defined pattern for it, or if I'm just not asking the right
> questions in my searches. I searched the archives here and don't see
> anything either, which obviously makes me think that I'm not thinking about
> this in the "Flink way" :-|
>
>
>
> Regards,
>
>
>
> Tom.
>
>
>

RE: Recommended pattern for implementing a DLQ with Flink+Kafka

Posted by Chen Qin <qi...@gmail.com>.
Could you more specific on what “failed message” means here?

In general side output can do something like were



def process(ele) {

try{

biz

} catch {

Sideout( ele + exception context)

}

}



process(func).sideoutput(tag).addSink(kafkasink)



Thanks,

Chen







 **From:**[Eleanore Jin](mailto:eleanore.jin@gmail.com)  
 **Sent:** Wednesday, July 22, 2020 9:25 AM  
 **To:**[Tom Fennelly](mailto:tfennelly@cloudbees.com)  
 **Cc:**[user](mailto:user@flink.apache.org)  
 **Subject:** Re: Recommended pattern for implementing a DLQ with Flink+Kafka



+1 we have a similar use case for message schema validation.



Eleanore



On Wed, Jul 22, 2020 at 4:12 AM Tom Fennelly
<[tfennelly@cloudbees.com](mailto:tfennelly@cloudbees.com)> wrote:

> Hi.

>

>  
>

> I've been searching blogs etc trying to see if there are established
patterns/mechanisms for reprocessing of failed messages via something like a
DLQ. I've read about using checkpointing and restarting tasks (not what we
want because we want to keep processing forward) and then also how some use
side outputs to filter "bad" data to a DLQ style topic. Kafka has dead letter
topic configs too but it seems that can't really be used from inside Flink
(from what I can see).

>

>  
>

> We're running Flink+Kafka. I'm new to Flink+Kafka so not sure if there just
isn't a defined pattern for it, or if I'm just not asking the right questions
in my searches. I searched the archives here and don't see anything either,
which obviously makes me think that I'm not thinking about this in the "Flink
way" :-|

>

>  
>

> Regards,

>

>  

Tom.




Re: Recommended pattern for implementing a DLQ with Flink+Kafka

Posted by Eleanore Jin <el...@gmail.com>.
+1 we have a similar use case for message schema validation.

Eleanore

On Wed, Jul 22, 2020 at 4:12 AM Tom Fennelly <tf...@cloudbees.com>
wrote:

> Hi.
>
> I've been searching blogs etc trying to see if there are
> established patterns/mechanisms for reprocessing of failed messages via
> something like a DLQ. I've read about using checkpointing and restarting
> tasks (not what we want because we want to keep processing forward) and
> then also how some use side outputs to filter "bad" data to a DLQ style
> topic. Kafka has dead letter topic configs too but it seems that can't
> really be used from inside Flink (from what I can see).
>
> We're running Flink+Kafka. I'm new to Flink+Kafka so not sure if there
> just isn't a defined pattern for it, or if I'm just not asking the right
> questions in my searches. I searched the archives here and don't see
> anything either, which obviously makes me think that I'm not thinking about
> this in the "Flink way" :-|
>
> Regards,
>
> Tom.
>