You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rion Williams <ri...@gmail.com> on 2021/08/02 17:31:34 UTC

Re: Dead Letter Queue for JdbcSink

Hi again Maciek (and all),

I just recently returned to start investigating this approach, however I can't seem to get the underlying invocation to work as I would normally expect. I'll try to share a bit more as what I currently have and perhaps I'm just missing something minor that someone may be able to spot.

To reiterate - what I'm attempting to do is take a stream of events flowing through, specific types of entities are extracted from these events into multiple side-outputs, and these side-outputs are passed to a sync that will write them via JDBC using logic specific to that entity. What I am aiming to achieve is being able to capture a single record that may be problematic and avoid a poison pill to throw onto a dead-letter queue (Kafka). I understand this would mean limiting batching sizes to a single record, however I'm assuming that the connections themselves could be pooled possibly to avoid opening up a new connection per call. If this isn't the case, is there a way to handle that (or would I need to implement my own sync).

```
val users = Tags.users
        parsedChangelogs
            .getSideOutput(users)
            .addSink(PostgresSink.fromEntityType(users.typeInfo, parameters))
            .uid("sync-${users.id}-to-postgres")
            .name("sync-${users.id}-to-postgres")

val addresses = Tags.addresses
        parsedChangelogs
            .getSideOutput(addresses)
            .addSink(PostgresSink.fromEntityType(addresses.typeInfo, parameters))
            .uid("sync-${addresses.id}-to-postgres")
            .name("sync-${addresses.id}-to-postgres")
```

And the dynamic sink (that would associate a given entity to the necessary calls made to the database) looks a bit like this:

```
fun <T: Any> fromEntityType(typeInfo: TypeInformation<T>, parameters: ParameterTool): SinkFunction<T> {
        val metadata = getQueryMetadataFromType(typeInfo)

        return JdbcSink
            .sink(
                metadata.query,
                metadata.statement,
                getJdbcExecutionOptions(parameters),
                JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                    .withDriverName("org.postgresql.Driver")
                    .withUrl(buildConnectionString(parameters))
                    .build(),
            )
    }
```

I've tried several, a naive wrapper approach that I attempted looked something like this:

```
class DlqWrapper<T>(private val sink: SinkFunction<T>, val parameters: ParameterTool): SinkFunction<T> {
    private val logger = LoggerFactory.getLogger(DlqSink::class.java)
    private val dlqSink: SinkFunction<String> = ...

    override fun invoke(value: T, context: SinkFunction.Context) {
        try {
            sink.invoke(value, context)
        }
        catch (ex: Exception) {
            logger.error("Encountered sink exception. Sending message to dead letter queue. Value: $value. Exception: ${ex.message}")
            val payload = Gson().toJsonTree(value).asJsonObject
            payload.addProperty("exception", ex.message)

            dlqSink.invoke("$payload", context)
        }
    }
}
```

After doing this, it doesn't look like when the invoke calls are made that it's actually attempting to perform the JDBC calls to insert the records into those sources. I'm not entirely sure if this is related specifically for how the JdbcSink is wrapped (via the GenericJdbcSink, etc.).

I had seen several posts around involving the use of an InvocationHandler/Proxy, etc. but I'm not sure if that should be necessary for handling this type of functionality. Any ideas/thoughts/examples would be greatly appreciated.

Thanks,

Rion

On 2021/07/14 15:47:18, Maciej Bryński <ma...@brynski.pl> wrote: 
> This is the idea.
> Of course you need to wrap more functions like: open, close,
> notifyCheckpointComplete, snapshotState, initializeState and
> setRuntimeContext.
> 
> The problem is that if you want to catch problematic record you need
> to set batch size to 1, which gives very bad performance.
> 
> Regards,
> Maciek
> 
> śr., 14 lip 2021 o 17:31 Rion Williams <ri...@gmail.com> napisał(a):
> >
> > Hi Maciej,
> >
> > Thanks for the quick response. I wasn't aware of the idea of using a SinkWrapper, but I'm not quite certain that it would suit this specific use case (as a SinkFunction / RichSinkFunction doesn't appear to support side-outputs). Essentially, what I'd hope to accomplish would be to pick up when a bad record could not be written to the sink and then offload that via a side-output somewhere else.
> >
> > Something like this, which is a very, very naive idea:
> >
> > class PostgresSinkWrapper<T>(private val sink: SinkFunction<T>): RichSinkFunction<T>() {
> >     private val logger = LoggerFactory.getLogger(PostgresSinkWrapper::class.java)
> >
> >     override fun invoke(value: T, context: SinkFunction.Context) {
> >         try {
> >             sink.invoke(value, context)
> >         }
> >         catch (exception: Exception){
> >             logger.error("Encountered a bad record, offloading to dead-letter-queue")
> >             // Offload bad record to DLQ
> >         }
> >     }
> > }
> >
> > But I think that's basically the gist of it. I'm just not sure how I could go about doing this aside from perhaps writing a custom process function that wraps another sink function (or just completely rewriting my own JdbcSink?)
> >
> > Thanks,
> >
> > Rion
> >
> >
> >
> >
> >
> > On Wed, Jul 14, 2021 at 9:56 AM Maciej Bryński <ma...@brynski.pl> wrote:
> >>
> >> Hi Rion,
> >> We have implemented such a solution with Sink Wrapper.
> >>
> >>
> >> Regards,
> >> Maciek
> >>
> >> śr., 14 lip 2021 o 16:21 Rion Williams <ri...@gmail.com> napisał(a):
> >> >
> >> > Hi all,
> >> >
> >> > Recently I've been encountering an issue where some external dependencies or process causes writes within my JDBCSink to fail (e.g. something is being inserted with an explicit constraint that never made it's way there). I'm trying to see if there's a pattern or recommendation for handling this similar to a dead-letter queue.
> >> >
> >> > Basically - if I experience a given number of failures (> max retry attempts) when writing to my JDBC destination, I'd like to take the record that was attempted and throw it into a Kafka topic or some other destination so that it can be evaluated at a later time.
> >> >
> >> > Are there any well defined patterns or recommended approaches around this?
> >> >
> >> > Thanks,
> >> >
> >> > Rion
> >>
> >>
> >>
> >> --
> >> Maciek Bryński
> 
> 
> 
> -- 
> Maciek Bryński
> 

Re: Dead Letter Queue for JdbcSink

Posted by Rion Williams <ri...@gmail.com>.
Thanks, I figured that was the preferred approach. I’ve noticed that at times it doesn’t seem like it’s catching the exceptions (just have the wrapped sink wrapped with a try-catch block at the moment).

Did you have to do anything special on that front? I’d assumed that catching the IOException after retries had been exhausted was probably the best bet, just wasn’t sure if anything might be missing.

Thanks,

Rion

> On Aug 3, 2021, at 11:16 AM, Maciej Bryński <ma...@brynski.pl> wrote:
> 
> Hi Rion,
> We're using plain Kafka producer to send records to DLQ.
> 
> Regards,
> Maciek
> 
> wt., 3 sie 2021 o 18:07 Rion Williams <ri...@gmail.com> napisał(a):
>> 
>> Thanks Maciek,
>> 
>> It looks like my initial issue had another problem with a bad interface that was being used (or an improper one), but after changing that and ensuring all of the fields were implemented it worked as expected.
>> 
>> I know in my particular case, I'm planning on writing to Kafka, however my wrapped function isn't a process function and thus it isn't as simple as supplying a side-output and sending those to Kafka. I'm guessing in this scenario, it'd be sufficient to have a plain Kafka producer (created within the open() function) and just use that as opposed to constructing a sink and explicitly invoking it.
>> 
>> ```
>> catch (exception: Exception) {
>>     // I'd imagine that the context here would require a second level of mapping to ensure that
>>     // we have the proper context for the sink itself
>>     dlqSink.invoke(value, context)
>> 
>>     // Or this would be the alternative
>>     dlqProducer.send(..., value)
>> }
>> ```
>> 
>> I don't know if you have the same scenario (or are leveraging Kafka), but if so, I'd be curious of the approach that you took.
>> 
>> Thanks much,
>> 
>> Rion
>> 
>>> On 2021/08/03 08:45:07, Maciej Obuchowski <ob...@gmail.com> wrote:
>>> Hey.
>>> 
>>> As far as I see, you're not overriding functions like open,
>>> setRuntimeContext, snapshotState, initializeState - the calls needs to
>>> be passed to the inner sink function.
>>> 
>>> pon., 2 sie 2021 o 19:31 Rion Williams <ri...@gmail.com> napisał(a):
>>>> 
>>>> Hi again Maciek (and all),
>>>> 
>>>> I just recently returned to start investigating this approach, however I can't seem to get the underlying invocation to work as I would normally expect. I'll try to share a bit more as what I currently have and perhaps I'm just missing something minor that someone may be able to spot.
>>>> 
>>>> To reiterate - what I'm attempting to do is take a stream of events flowing through, specific types of entities are extracted from these events into multiple side-outputs, and these side-outputs are passed to a sync that will write them via JDBC using logic specific to that entity. What I am aiming to achieve is being able to capture a single record that may be problematic and avoid a poison pill to throw onto a dead-letter queue (Kafka). I understand this would mean limiting batching sizes to a single record, however I'm assuming that the connections themselves could be pooled possibly to avoid opening up a new connection per call. If this isn't the case, is there a way to handle that (or would I need to implement my own sync).
>>>> 
>>>> ```
>>>> val users = Tags.users
>>>>        parsedChangelogs
>>>>            .getSideOutput(users)
>>>>            .addSink(PostgresSink.fromEntityType(users.typeInfo, parameters))
>>>>            .uid("sync-${users.id}-to-postgres")
>>>>            .name("sync-${users.id}-to-postgres")
>>>> 
>>>> val addresses = Tags.addresses
>>>>        parsedChangelogs
>>>>            .getSideOutput(addresses)
>>>>            .addSink(PostgresSink.fromEntityType(addresses.typeInfo, parameters))
>>>>            .uid("sync-${addresses.id}-to-postgres")
>>>>            .name("sync-${addresses.id}-to-postgres")
>>>> ```
>>>> 
>>>> And the dynamic sink (that would associate a given entity to the necessary calls made to the database) looks a bit like this:
>>>> 
>>>> ```
>>>> fun <T: Any> fromEntityType(typeInfo: TypeInformation<T>, parameters: ParameterTool): SinkFunction<T> {
>>>>        val metadata = getQueryMetadataFromType(typeInfo)
>>>> 
>>>>        return JdbcSink
>>>>            .sink(
>>>>                metadata.query,
>>>>                metadata.statement,
>>>>                getJdbcExecutionOptions(parameters),
>>>>                JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
>>>>                    .withDriverName("org.postgresql.Driver")
>>>>                    .withUrl(buildConnectionString(parameters))
>>>>                    .build(),
>>>>            )
>>>>    }
>>>> ```
>>>> 
>>>> I've tried several, a naive wrapper approach that I attempted looked something like this:
>>>> 
>>>> ```
>>>> class DlqWrapper<T>(private val sink: SinkFunction<T>, val parameters: ParameterTool): SinkFunction<T> {
>>>>    private val logger = LoggerFactory.getLogger(DlqSink::class.java)
>>>>    private val dlqSink: SinkFunction<String> = ...
>>>> 
>>>>    override fun invoke(value: T, context: SinkFunction.Context) {
>>>>        try {
>>>>            sink.invoke(value, context)
>>>>        }
>>>>        catch (ex: Exception) {
>>>>            logger.error("Encountered sink exception. Sending message to dead letter queue. Value: $value. Exception: ${ex.message}")
>>>>            val payload = Gson().toJsonTree(value).asJsonObject
>>>>            payload.addProperty("exception", ex.message)
>>>> 
>>>>            dlqSink.invoke("$payload", context)
>>>>        }
>>>>    }
>>>> }
>>>> ```
>>>> 
>>>> After doing this, it doesn't look like when the invoke calls are made that it's actually attempting to perform the JDBC calls to insert the records into those sources. I'm not entirely sure if this is related specifically for how the JdbcSink is wrapped (via the GenericJdbcSink, etc.).
>>>> 
>>>> I had seen several posts around involving the use of an InvocationHandler/Proxy, etc. but I'm not sure if that should be necessary for handling this type of functionality. Any ideas/thoughts/examples would be greatly appreciated.
>>>> 
>>>> Thanks,
>>>> 
>>>> Rion
>>>> 
>>>> On 2021/07/14 15:47:18, Maciej Bryński <ma...@brynski.pl> wrote:
>>>>> This is the idea.
>>>>> Of course you need to wrap more functions like: open, close,
>>>>> notifyCheckpointComplete, snapshotState, initializeState and
>>>>> setRuntimeContext.
>>>>> 
>>>>> The problem is that if you want to catch problematic record you need
>>>>> to set batch size to 1, which gives very bad performance.
>>>>> 
>>>>> Regards,
>>>>> Maciek
>>>>> 
>>>>> śr., 14 lip 2021 o 17:31 Rion Williams <ri...@gmail.com> napisał(a):
>>>>>> 
>>>>>> Hi Maciej,
>>>>>> 
>>>>>> Thanks for the quick response. I wasn't aware of the idea of using a SinkWrapper, but I'm not quite certain that it would suit this specific use case (as a SinkFunction / RichSinkFunction doesn't appear to support side-outputs). Essentially, what I'd hope to accomplish would be to pick up when a bad record could not be written to the sink and then offload that via a side-output somewhere else.
>>>>>> 
>>>>>> Something like this, which is a very, very naive idea:
>>>>>> 
>>>>>> class PostgresSinkWrapper<T>(private val sink: SinkFunction<T>): RichSinkFunction<T>() {
>>>>>>    private val logger = LoggerFactory.getLogger(PostgresSinkWrapper::class.java)
>>>>>> 
>>>>>>    override fun invoke(value: T, context: SinkFunction.Context) {
>>>>>>        try {
>>>>>>            sink.invoke(value, context)
>>>>>>        }
>>>>>>        catch (exception: Exception){
>>>>>>            logger.error("Encountered a bad record, offloading to dead-letter-queue")
>>>>>>            // Offload bad record to DLQ
>>>>>>        }
>>>>>>    }
>>>>>> }
>>>>>> 
>>>>>> But I think that's basically the gist of it. I'm just not sure how I could go about doing this aside from perhaps writing a custom process function that wraps another sink function (or just completely rewriting my own JdbcSink?)
>>>>>> 
>>>>>> Thanks,
>>>>>> 
>>>>>> Rion
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Wed, Jul 14, 2021 at 9:56 AM Maciej Bryński <ma...@brynski.pl> wrote:
>>>>>>> 
>>>>>>> Hi Rion,
>>>>>>> We have implemented such a solution with Sink Wrapper.
>>>>>>> 
>>>>>>> 
>>>>>>> Regards,
>>>>>>> Maciek
>>>>>>> 
>>>>>>> śr., 14 lip 2021 o 16:21 Rion Williams <ri...@gmail.com> napisał(a):
>>>>>>>> 
>>>>>>>> Hi all,
>>>>>>>> 
>>>>>>>> Recently I've been encountering an issue where some external dependencies or process causes writes within my JDBCSink to fail (e.g. something is being inserted with an explicit constraint that never made it's way there). I'm trying to see if there's a pattern or recommendation for handling this similar to a dead-letter queue.
>>>>>>>> 
>>>>>>>> Basically - if I experience a given number of failures (> max retry attempts) when writing to my JDBC destination, I'd like to take the record that was attempted and throw it into a Kafka topic or some other destination so that it can be evaluated at a later time.
>>>>>>>> 
>>>>>>>> Are there any well defined patterns or recommended approaches around this?
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> 
>>>>>>>> Rion
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> Maciek Bryński
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> Maciek Bryński
>>>>> 
>>> 
> 
> 
> 
> -- 
> Maciek Bryński

Re: Dead Letter Queue for JdbcSink

Posted by Maciej Bryński <ma...@brynski.pl>.
Hi Rion,
We're using plain Kafka producer to send records to DLQ.

Regards,
Maciek

wt., 3 sie 2021 o 18:07 Rion Williams <ri...@gmail.com> napisał(a):
>
> Thanks Maciek,
>
> It looks like my initial issue had another problem with a bad interface that was being used (or an improper one), but after changing that and ensuring all of the fields were implemented it worked as expected.
>
> I know in my particular case, I'm planning on writing to Kafka, however my wrapped function isn't a process function and thus it isn't as simple as supplying a side-output and sending those to Kafka. I'm guessing in this scenario, it'd be sufficient to have a plain Kafka producer (created within the open() function) and just use that as opposed to constructing a sink and explicitly invoking it.
>
> ```
> catch (exception: Exception) {
>      // I'd imagine that the context here would require a second level of mapping to ensure that
>      // we have the proper context for the sink itself
>      dlqSink.invoke(value, context)
>
>      // Or this would be the alternative
>      dlqProducer.send(..., value)
> }
> ```
>
> I don't know if you have the same scenario (or are leveraging Kafka), but if so, I'd be curious of the approach that you took.
>
> Thanks much,
>
> Rion
>
> On 2021/08/03 08:45:07, Maciej Obuchowski <ob...@gmail.com> wrote:
> > Hey.
> >
> > As far as I see, you're not overriding functions like open,
> > setRuntimeContext, snapshotState, initializeState - the calls needs to
> > be passed to the inner sink function.
> >
> > pon., 2 sie 2021 o 19:31 Rion Williams <ri...@gmail.com> napisał(a):
> > >
> > > Hi again Maciek (and all),
> > >
> > > I just recently returned to start investigating this approach, however I can't seem to get the underlying invocation to work as I would normally expect. I'll try to share a bit more as what I currently have and perhaps I'm just missing something minor that someone may be able to spot.
> > >
> > > To reiterate - what I'm attempting to do is take a stream of events flowing through, specific types of entities are extracted from these events into multiple side-outputs, and these side-outputs are passed to a sync that will write them via JDBC using logic specific to that entity. What I am aiming to achieve is being able to capture a single record that may be problematic and avoid a poison pill to throw onto a dead-letter queue (Kafka). I understand this would mean limiting batching sizes to a single record, however I'm assuming that the connections themselves could be pooled possibly to avoid opening up a new connection per call. If this isn't the case, is there a way to handle that (or would I need to implement my own sync).
> > >
> > > ```
> > > val users = Tags.users
> > >         parsedChangelogs
> > >             .getSideOutput(users)
> > >             .addSink(PostgresSink.fromEntityType(users.typeInfo, parameters))
> > >             .uid("sync-${users.id}-to-postgres")
> > >             .name("sync-${users.id}-to-postgres")
> > >
> > > val addresses = Tags.addresses
> > >         parsedChangelogs
> > >             .getSideOutput(addresses)
> > >             .addSink(PostgresSink.fromEntityType(addresses.typeInfo, parameters))
> > >             .uid("sync-${addresses.id}-to-postgres")
> > >             .name("sync-${addresses.id}-to-postgres")
> > > ```
> > >
> > > And the dynamic sink (that would associate a given entity to the necessary calls made to the database) looks a bit like this:
> > >
> > > ```
> > > fun <T: Any> fromEntityType(typeInfo: TypeInformation<T>, parameters: ParameterTool): SinkFunction<T> {
> > >         val metadata = getQueryMetadataFromType(typeInfo)
> > >
> > >         return JdbcSink
> > >             .sink(
> > >                 metadata.query,
> > >                 metadata.statement,
> > >                 getJdbcExecutionOptions(parameters),
> > >                 JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
> > >                     .withDriverName("org.postgresql.Driver")
> > >                     .withUrl(buildConnectionString(parameters))
> > >                     .build(),
> > >             )
> > >     }
> > > ```
> > >
> > > I've tried several, a naive wrapper approach that I attempted looked something like this:
> > >
> > > ```
> > > class DlqWrapper<T>(private val sink: SinkFunction<T>, val parameters: ParameterTool): SinkFunction<T> {
> > >     private val logger = LoggerFactory.getLogger(DlqSink::class.java)
> > >     private val dlqSink: SinkFunction<String> = ...
> > >
> > >     override fun invoke(value: T, context: SinkFunction.Context) {
> > >         try {
> > >             sink.invoke(value, context)
> > >         }
> > >         catch (ex: Exception) {
> > >             logger.error("Encountered sink exception. Sending message to dead letter queue. Value: $value. Exception: ${ex.message}")
> > >             val payload = Gson().toJsonTree(value).asJsonObject
> > >             payload.addProperty("exception", ex.message)
> > >
> > >             dlqSink.invoke("$payload", context)
> > >         }
> > >     }
> > > }
> > > ```
> > >
> > > After doing this, it doesn't look like when the invoke calls are made that it's actually attempting to perform the JDBC calls to insert the records into those sources. I'm not entirely sure if this is related specifically for how the JdbcSink is wrapped (via the GenericJdbcSink, etc.).
> > >
> > > I had seen several posts around involving the use of an InvocationHandler/Proxy, etc. but I'm not sure if that should be necessary for handling this type of functionality. Any ideas/thoughts/examples would be greatly appreciated.
> > >
> > > Thanks,
> > >
> > > Rion
> > >
> > > On 2021/07/14 15:47:18, Maciej Bryński <ma...@brynski.pl> wrote:
> > > > This is the idea.
> > > > Of course you need to wrap more functions like: open, close,
> > > > notifyCheckpointComplete, snapshotState, initializeState and
> > > > setRuntimeContext.
> > > >
> > > > The problem is that if you want to catch problematic record you need
> > > > to set batch size to 1, which gives very bad performance.
> > > >
> > > > Regards,
> > > > Maciek
> > > >
> > > > śr., 14 lip 2021 o 17:31 Rion Williams <ri...@gmail.com> napisał(a):
> > > > >
> > > > > Hi Maciej,
> > > > >
> > > > > Thanks for the quick response. I wasn't aware of the idea of using a SinkWrapper, but I'm not quite certain that it would suit this specific use case (as a SinkFunction / RichSinkFunction doesn't appear to support side-outputs). Essentially, what I'd hope to accomplish would be to pick up when a bad record could not be written to the sink and then offload that via a side-output somewhere else.
> > > > >
> > > > > Something like this, which is a very, very naive idea:
> > > > >
> > > > > class PostgresSinkWrapper<T>(private val sink: SinkFunction<T>): RichSinkFunction<T>() {
> > > > >     private val logger = LoggerFactory.getLogger(PostgresSinkWrapper::class.java)
> > > > >
> > > > >     override fun invoke(value: T, context: SinkFunction.Context) {
> > > > >         try {
> > > > >             sink.invoke(value, context)
> > > > >         }
> > > > >         catch (exception: Exception){
> > > > >             logger.error("Encountered a bad record, offloading to dead-letter-queue")
> > > > >             // Offload bad record to DLQ
> > > > >         }
> > > > >     }
> > > > > }
> > > > >
> > > > > But I think that's basically the gist of it. I'm just not sure how I could go about doing this aside from perhaps writing a custom process function that wraps another sink function (or just completely rewriting my own JdbcSink?)
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Rion
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Jul 14, 2021 at 9:56 AM Maciej Bryński <ma...@brynski.pl> wrote:
> > > > >>
> > > > >> Hi Rion,
> > > > >> We have implemented such a solution with Sink Wrapper.
> > > > >>
> > > > >>
> > > > >> Regards,
> > > > >> Maciek
> > > > >>
> > > > >> śr., 14 lip 2021 o 16:21 Rion Williams <ri...@gmail.com> napisał(a):
> > > > >> >
> > > > >> > Hi all,
> > > > >> >
> > > > >> > Recently I've been encountering an issue where some external dependencies or process causes writes within my JDBCSink to fail (e.g. something is being inserted with an explicit constraint that never made it's way there). I'm trying to see if there's a pattern or recommendation for handling this similar to a dead-letter queue.
> > > > >> >
> > > > >> > Basically - if I experience a given number of failures (> max retry attempts) when writing to my JDBC destination, I'd like to take the record that was attempted and throw it into a Kafka topic or some other destination so that it can be evaluated at a later time.
> > > > >> >
> > > > >> > Are there any well defined patterns or recommended approaches around this?
> > > > >> >
> > > > >> > Thanks,
> > > > >> >
> > > > >> > Rion
> > > > >>
> > > > >>
> > > > >>
> > > > >> --
> > > > >> Maciek Bryński
> > > >
> > > >
> > > >
> > > > --
> > > > Maciek Bryński
> > > >
> >



-- 
Maciek Bryński

Re: Dead Letter Queue for JdbcSink

Posted by Rion Williams <ri...@gmail.com>.
Thanks Maciek, 

It looks like my initial issue had another problem with a bad interface that was being used (or an improper one), but after changing that and ensuring all of the fields were implemented it worked as expected.

I know in my particular case, I'm planning on writing to Kafka, however my wrapped function isn't a process function and thus it isn't as simple as supplying a side-output and sending those to Kafka. I'm guessing in this scenario, it'd be sufficient to have a plain Kafka producer (created within the open() function) and just use that as opposed to constructing a sink and explicitly invoking it.

```
catch (exception: Exception) {
     // I'd imagine that the context here would require a second level of mapping to ensure that
     // we have the proper context for the sink itself
     dlqSink.invoke(value, context)

     // Or this would be the alternative
     dlqProducer.send(..., value)
}
```

I don't know if you have the same scenario (or are leveraging Kafka), but if so, I'd be curious of the approach that you took.

Thanks much,

Rion

On 2021/08/03 08:45:07, Maciej Obuchowski <ob...@gmail.com> wrote: 
> Hey.
> 
> As far as I see, you're not overriding functions like open,
> setRuntimeContext, snapshotState, initializeState - the calls needs to
> be passed to the inner sink function.
> 
> pon., 2 sie 2021 o 19:31 Rion Williams <ri...@gmail.com> napisał(a):
> >
> > Hi again Maciek (and all),
> >
> > I just recently returned to start investigating this approach, however I can't seem to get the underlying invocation to work as I would normally expect. I'll try to share a bit more as what I currently have and perhaps I'm just missing something minor that someone may be able to spot.
> >
> > To reiterate - what I'm attempting to do is take a stream of events flowing through, specific types of entities are extracted from these events into multiple side-outputs, and these side-outputs are passed to a sync that will write them via JDBC using logic specific to that entity. What I am aiming to achieve is being able to capture a single record that may be problematic and avoid a poison pill to throw onto a dead-letter queue (Kafka). I understand this would mean limiting batching sizes to a single record, however I'm assuming that the connections themselves could be pooled possibly to avoid opening up a new connection per call. If this isn't the case, is there a way to handle that (or would I need to implement my own sync).
> >
> > ```
> > val users = Tags.users
> >         parsedChangelogs
> >             .getSideOutput(users)
> >             .addSink(PostgresSink.fromEntityType(users.typeInfo, parameters))
> >             .uid("sync-${users.id}-to-postgres")
> >             .name("sync-${users.id}-to-postgres")
> >
> > val addresses = Tags.addresses
> >         parsedChangelogs
> >             .getSideOutput(addresses)
> >             .addSink(PostgresSink.fromEntityType(addresses.typeInfo, parameters))
> >             .uid("sync-${addresses.id}-to-postgres")
> >             .name("sync-${addresses.id}-to-postgres")
> > ```
> >
> > And the dynamic sink (that would associate a given entity to the necessary calls made to the database) looks a bit like this:
> >
> > ```
> > fun <T: Any> fromEntityType(typeInfo: TypeInformation<T>, parameters: ParameterTool): SinkFunction<T> {
> >         val metadata = getQueryMetadataFromType(typeInfo)
> >
> >         return JdbcSink
> >             .sink(
> >                 metadata.query,
> >                 metadata.statement,
> >                 getJdbcExecutionOptions(parameters),
> >                 JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
> >                     .withDriverName("org.postgresql.Driver")
> >                     .withUrl(buildConnectionString(parameters))
> >                     .build(),
> >             )
> >     }
> > ```
> >
> > I've tried several, a naive wrapper approach that I attempted looked something like this:
> >
> > ```
> > class DlqWrapper<T>(private val sink: SinkFunction<T>, val parameters: ParameterTool): SinkFunction<T> {
> >     private val logger = LoggerFactory.getLogger(DlqSink::class.java)
> >     private val dlqSink: SinkFunction<String> = ...
> >
> >     override fun invoke(value: T, context: SinkFunction.Context) {
> >         try {
> >             sink.invoke(value, context)
> >         }
> >         catch (ex: Exception) {
> >             logger.error("Encountered sink exception. Sending message to dead letter queue. Value: $value. Exception: ${ex.message}")
> >             val payload = Gson().toJsonTree(value).asJsonObject
> >             payload.addProperty("exception", ex.message)
> >
> >             dlqSink.invoke("$payload", context)
> >         }
> >     }
> > }
> > ```
> >
> > After doing this, it doesn't look like when the invoke calls are made that it's actually attempting to perform the JDBC calls to insert the records into those sources. I'm not entirely sure if this is related specifically for how the JdbcSink is wrapped (via the GenericJdbcSink, etc.).
> >
> > I had seen several posts around involving the use of an InvocationHandler/Proxy, etc. but I'm not sure if that should be necessary for handling this type of functionality. Any ideas/thoughts/examples would be greatly appreciated.
> >
> > Thanks,
> >
> > Rion
> >
> > On 2021/07/14 15:47:18, Maciej Bryński <ma...@brynski.pl> wrote:
> > > This is the idea.
> > > Of course you need to wrap more functions like: open, close,
> > > notifyCheckpointComplete, snapshotState, initializeState and
> > > setRuntimeContext.
> > >
> > > The problem is that if you want to catch problematic record you need
> > > to set batch size to 1, which gives very bad performance.
> > >
> > > Regards,
> > > Maciek
> > >
> > > śr., 14 lip 2021 o 17:31 Rion Williams <ri...@gmail.com> napisał(a):
> > > >
> > > > Hi Maciej,
> > > >
> > > > Thanks for the quick response. I wasn't aware of the idea of using a SinkWrapper, but I'm not quite certain that it would suit this specific use case (as a SinkFunction / RichSinkFunction doesn't appear to support side-outputs). Essentially, what I'd hope to accomplish would be to pick up when a bad record could not be written to the sink and then offload that via a side-output somewhere else.
> > > >
> > > > Something like this, which is a very, very naive idea:
> > > >
> > > > class PostgresSinkWrapper<T>(private val sink: SinkFunction<T>): RichSinkFunction<T>() {
> > > >     private val logger = LoggerFactory.getLogger(PostgresSinkWrapper::class.java)
> > > >
> > > >     override fun invoke(value: T, context: SinkFunction.Context) {
> > > >         try {
> > > >             sink.invoke(value, context)
> > > >         }
> > > >         catch (exception: Exception){
> > > >             logger.error("Encountered a bad record, offloading to dead-letter-queue")
> > > >             // Offload bad record to DLQ
> > > >         }
> > > >     }
> > > > }
> > > >
> > > > But I think that's basically the gist of it. I'm just not sure how I could go about doing this aside from perhaps writing a custom process function that wraps another sink function (or just completely rewriting my own JdbcSink?)
> > > >
> > > > Thanks,
> > > >
> > > > Rion
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Wed, Jul 14, 2021 at 9:56 AM Maciej Bryński <ma...@brynski.pl> wrote:
> > > >>
> > > >> Hi Rion,
> > > >> We have implemented such a solution with Sink Wrapper.
> > > >>
> > > >>
> > > >> Regards,
> > > >> Maciek
> > > >>
> > > >> śr., 14 lip 2021 o 16:21 Rion Williams <ri...@gmail.com> napisał(a):
> > > >> >
> > > >> > Hi all,
> > > >> >
> > > >> > Recently I've been encountering an issue where some external dependencies or process causes writes within my JDBCSink to fail (e.g. something is being inserted with an explicit constraint that never made it's way there). I'm trying to see if there's a pattern or recommendation for handling this similar to a dead-letter queue.
> > > >> >
> > > >> > Basically - if I experience a given number of failures (> max retry attempts) when writing to my JDBC destination, I'd like to take the record that was attempted and throw it into a Kafka topic or some other destination so that it can be evaluated at a later time.
> > > >> >
> > > >> > Are there any well defined patterns or recommended approaches around this?
> > > >> >
> > > >> > Thanks,
> > > >> >
> > > >> > Rion
> > > >>
> > > >>
> > > >>
> > > >> --
> > > >> Maciek Bryński
> > >
> > >
> > >
> > > --
> > > Maciek Bryński
> > >
> 

Re: Dead Letter Queue for JdbcSink

Posted by Maciej Obuchowski <ob...@gmail.com>.
Hey.

As far as I see, you're not overriding functions like open,
setRuntimeContext, snapshotState, initializeState - the calls needs to
be passed to the inner sink function.

pon., 2 sie 2021 o 19:31 Rion Williams <ri...@gmail.com> napisał(a):
>
> Hi again Maciek (and all),
>
> I just recently returned to start investigating this approach, however I can't seem to get the underlying invocation to work as I would normally expect. I'll try to share a bit more as what I currently have and perhaps I'm just missing something minor that someone may be able to spot.
>
> To reiterate - what I'm attempting to do is take a stream of events flowing through, specific types of entities are extracted from these events into multiple side-outputs, and these side-outputs are passed to a sync that will write them via JDBC using logic specific to that entity. What I am aiming to achieve is being able to capture a single record that may be problematic and avoid a poison pill to throw onto a dead-letter queue (Kafka). I understand this would mean limiting batching sizes to a single record, however I'm assuming that the connections themselves could be pooled possibly to avoid opening up a new connection per call. If this isn't the case, is there a way to handle that (or would I need to implement my own sync).
>
> ```
> val users = Tags.users
>         parsedChangelogs
>             .getSideOutput(users)
>             .addSink(PostgresSink.fromEntityType(users.typeInfo, parameters))
>             .uid("sync-${users.id}-to-postgres")
>             .name("sync-${users.id}-to-postgres")
>
> val addresses = Tags.addresses
>         parsedChangelogs
>             .getSideOutput(addresses)
>             .addSink(PostgresSink.fromEntityType(addresses.typeInfo, parameters))
>             .uid("sync-${addresses.id}-to-postgres")
>             .name("sync-${addresses.id}-to-postgres")
> ```
>
> And the dynamic sink (that would associate a given entity to the necessary calls made to the database) looks a bit like this:
>
> ```
> fun <T: Any> fromEntityType(typeInfo: TypeInformation<T>, parameters: ParameterTool): SinkFunction<T> {
>         val metadata = getQueryMetadataFromType(typeInfo)
>
>         return JdbcSink
>             .sink(
>                 metadata.query,
>                 metadata.statement,
>                 getJdbcExecutionOptions(parameters),
>                 JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
>                     .withDriverName("org.postgresql.Driver")
>                     .withUrl(buildConnectionString(parameters))
>                     .build(),
>             )
>     }
> ```
>
> I've tried several, a naive wrapper approach that I attempted looked something like this:
>
> ```
> class DlqWrapper<T>(private val sink: SinkFunction<T>, val parameters: ParameterTool): SinkFunction<T> {
>     private val logger = LoggerFactory.getLogger(DlqSink::class.java)
>     private val dlqSink: SinkFunction<String> = ...
>
>     override fun invoke(value: T, context: SinkFunction.Context) {
>         try {
>             sink.invoke(value, context)
>         }
>         catch (ex: Exception) {
>             logger.error("Encountered sink exception. Sending message to dead letter queue. Value: $value. Exception: ${ex.message}")
>             val payload = Gson().toJsonTree(value).asJsonObject
>             payload.addProperty("exception", ex.message)
>
>             dlqSink.invoke("$payload", context)
>         }
>     }
> }
> ```
>
> After doing this, it doesn't look like when the invoke calls are made that it's actually attempting to perform the JDBC calls to insert the records into those sources. I'm not entirely sure if this is related specifically for how the JdbcSink is wrapped (via the GenericJdbcSink, etc.).
>
> I had seen several posts around involving the use of an InvocationHandler/Proxy, etc. but I'm not sure if that should be necessary for handling this type of functionality. Any ideas/thoughts/examples would be greatly appreciated.
>
> Thanks,
>
> Rion
>
> On 2021/07/14 15:47:18, Maciej Bryński <ma...@brynski.pl> wrote:
> > This is the idea.
> > Of course you need to wrap more functions like: open, close,
> > notifyCheckpointComplete, snapshotState, initializeState and
> > setRuntimeContext.
> >
> > The problem is that if you want to catch problematic record you need
> > to set batch size to 1, which gives very bad performance.
> >
> > Regards,
> > Maciek
> >
> > śr., 14 lip 2021 o 17:31 Rion Williams <ri...@gmail.com> napisał(a):
> > >
> > > Hi Maciej,
> > >
> > > Thanks for the quick response. I wasn't aware of the idea of using a SinkWrapper, but I'm not quite certain that it would suit this specific use case (as a SinkFunction / RichSinkFunction doesn't appear to support side-outputs). Essentially, what I'd hope to accomplish would be to pick up when a bad record could not be written to the sink and then offload that via a side-output somewhere else.
> > >
> > > Something like this, which is a very, very naive idea:
> > >
> > > class PostgresSinkWrapper<T>(private val sink: SinkFunction<T>): RichSinkFunction<T>() {
> > >     private val logger = LoggerFactory.getLogger(PostgresSinkWrapper::class.java)
> > >
> > >     override fun invoke(value: T, context: SinkFunction.Context) {
> > >         try {
> > >             sink.invoke(value, context)
> > >         }
> > >         catch (exception: Exception){
> > >             logger.error("Encountered a bad record, offloading to dead-letter-queue")
> > >             // Offload bad record to DLQ
> > >         }
> > >     }
> > > }
> > >
> > > But I think that's basically the gist of it. I'm just not sure how I could go about doing this aside from perhaps writing a custom process function that wraps another sink function (or just completely rewriting my own JdbcSink?)
> > >
> > > Thanks,
> > >
> > > Rion
> > >
> > >
> > >
> > >
> > >
> > > On Wed, Jul 14, 2021 at 9:56 AM Maciej Bryński <ma...@brynski.pl> wrote:
> > >>
> > >> Hi Rion,
> > >> We have implemented such a solution with Sink Wrapper.
> > >>
> > >>
> > >> Regards,
> > >> Maciek
> > >>
> > >> śr., 14 lip 2021 o 16:21 Rion Williams <ri...@gmail.com> napisał(a):
> > >> >
> > >> > Hi all,
> > >> >
> > >> > Recently I've been encountering an issue where some external dependencies or process causes writes within my JDBCSink to fail (e.g. something is being inserted with an explicit constraint that never made it's way there). I'm trying to see if there's a pattern or recommendation for handling this similar to a dead-letter queue.
> > >> >
> > >> > Basically - if I experience a given number of failures (> max retry attempts) when writing to my JDBC destination, I'd like to take the record that was attempted and throw it into a Kafka topic or some other destination so that it can be evaluated at a later time.
> > >> >
> > >> > Are there any well defined patterns or recommended approaches around this?
> > >> >
> > >> > Thanks,
> > >> >
> > >> > Rion
> > >>
> > >>
> > >>
> > >> --
> > >> Maciek Bryński
> >
> >
> >
> > --
> > Maciek Bryński
> >