You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by miki haiat <mi...@gmail.com> on 2020/03/01 15:28:17 UTC

Re: Single stream, two sinks

So you have rabitmq source and http sink?
If so you can use side output in order to dump your data to db.

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html

On Sat, Feb 29, 2020, 23:01 Gadi Katsovich <ga...@gmail.com> wrote:

> Hi,
> I'm new to flink and am evaluating it to replace our existing streaming
> application.
> The use case I'm working on is reading messages from RabbitMQ queue,
> applying some transformation and filtering logic and sending it via HTTP to
> a 3rd party.
> A must have requirement of this flow is to to write the data that was sent
> to an SQL db, for audit and troubleshooting purposes.
> I'm currently basing my HTTP solution on a PR with needed adjustments:
> https://github.com/apache/flink/pull/5866/files
> How can I add an insertion to a DB after a successful HTTP request?
> Thank you.
>

Re: Single stream, two sinks

Posted by Austin Cawley-Edwards <au...@gmail.com>.
We have the same setup and it works quite well. One thing to take into
account is that your HTTP call may happen multiple times if you’re using
checkpointing/ fault tolerance mechanism, so it’s important that those
calls are idempotent and won’t duplicate data.

Also we’ve found that it’s important to make the max number of parallel
requests in your async operator runtime-configurable so you can control
that bottleneck.

Hope that is helpful!

Austin

On Thu, Mar 5, 2020 at 6:18 PM Gadi Katsovich <ga...@gmail.com>
wrote:

> Guys, thanks for the great advice. It works!
> I used HttpAsyncClient from Apache Commons.
> At first I tried to implement the async http client by implementing
> AsyncFunction. I implemented the asyncInvoke method and used
> try-with-resouce to instantiate the client (because it's
> CloseableHttpAsyncClient). That didn't work and I got "Async function call
> has timed out" exception.
> Then I followed the example in the link and had my async http client
> extend RichAsyncFunction that opens and closes the http client instance in
> the corresponding methods, all started to working.
>
> On Tue, Mar 3, 2020 at 7:13 PM John Smith <ja...@gmail.com> wrote:
>
>> If I understand correctly he wants the HTTP result in the DB. So I do not
>> think side output works here. The DB would have to be the sink. Also sinks
>> in Flink are the final destination.
>>
>> So it would have to be RabbitMQ -----> Some Cool Business Logic Operators
>> Here ----> Async I/O HTTP Operator -----> JDBC Sink.
>>
>> Take look here also:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html <--
>> The Example shows database client, but you can easily replace that with
>> HTTP client.
>>
>> But basically...
>> 1- Get input from RabbitMQ Source.
>> 2- Do what ever type of stream computations/business logic you need.
>> 3- Use the Async I/O operator to send HTTP
>>     - If HTTP 200 OK create Flink record tagged as SUCESS and what ever
>> other info you want. Maybe response body.
>>     - If NOT HTTO 200 OK create Flink record tagged as FAILED plus other
>> info.
>> 4- Sink the output record from #3 to JDBC.
>>
>> On Sun, 1 Mar 2020 at 10:28, miki haiat <mi...@gmail.com> wrote:
>>
>>> So you have rabitmq source and http sink?
>>> If so you can use side output in order to dump your data to db.
>>>
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>>>
>>> On Sat, Feb 29, 2020, 23:01 Gadi Katsovich <ga...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> I'm new to flink and am evaluating it to replace our existing streaming
>>>> application.
>>>> The use case I'm working on is reading messages from RabbitMQ queue,
>>>> applying some transformation and filtering logic and sending it via HTTP to
>>>> a 3rd party.
>>>> A must have requirement of this flow is to to write the data that was
>>>> sent to an SQL db, for audit and troubleshooting purposes.
>>>> I'm currently basing my HTTP solution on a PR with needed adjustments:
>>>> https://github.com/apache/flink/pull/5866/files
>>>> How can I add an insertion to a DB after a successful HTTP request?
>>>> Thank you.
>>>>
>>>

Re: Single stream, two sinks

Posted by Gadi Katsovich <ga...@gmail.com>.
Guys, thanks for the great advice. It works!
I used HttpAsyncClient from Apache Commons.
At first I tried to implement the async http client by implementing
AsyncFunction. I implemented the asyncInvoke method and used
try-with-resouce to instantiate the client (because it's
CloseableHttpAsyncClient). That didn't work and I got "Async function call
has timed out" exception.
Then I followed the example in the link and had my async http client extend
RichAsyncFunction that opens and closes the http client instance in the
corresponding methods, all started to working.

On Tue, Mar 3, 2020 at 7:13 PM John Smith <ja...@gmail.com> wrote:

> If I understand correctly he wants the HTTP result in the DB. So I do not
> think side output works here. The DB would have to be the sink. Also sinks
> in Flink are the final destination.
>
> So it would have to be RabbitMQ -----> Some Cool Business Logic Operators
> Here ----> Async I/O HTTP Operator -----> JDBC Sink.
>
> Take look here also:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html <--
> The Example shows database client, but you can easily replace that with
> HTTP client.
>
> But basically...
> 1- Get input from RabbitMQ Source.
> 2- Do what ever type of stream computations/business logic you need.
> 3- Use the Async I/O operator to send HTTP
>     - If HTTP 200 OK create Flink record tagged as SUCESS and what ever
> other info you want. Maybe response body.
>     - If NOT HTTO 200 OK create Flink record tagged as FAILED plus other
> info.
> 4- Sink the output record from #3 to JDBC.
>
> On Sun, 1 Mar 2020 at 10:28, miki haiat <mi...@gmail.com> wrote:
>
>> So you have rabitmq source and http sink?
>> If so you can use side output in order to dump your data to db.
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>>
>> On Sat, Feb 29, 2020, 23:01 Gadi Katsovich <ga...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I'm new to flink and am evaluating it to replace our existing streaming
>>> application.
>>> The use case I'm working on is reading messages from RabbitMQ queue,
>>> applying some transformation and filtering logic and sending it via HTTP to
>>> a 3rd party.
>>> A must have requirement of this flow is to to write the data that was
>>> sent to an SQL db, for audit and troubleshooting purposes.
>>> I'm currently basing my HTTP solution on a PR with needed adjustments:
>>> https://github.com/apache/flink/pull/5866/files
>>> How can I add an insertion to a DB after a successful HTTP request?
>>> Thank you.
>>>
>>

Re: Single stream, two sinks

Posted by John Smith <ja...@gmail.com>.
If I understand correctly he wants the HTTP result in the DB. So I do not
think side output works here. The DB would have to be the sink. Also sinks
in Flink are the final destination.

So it would have to be RabbitMQ -----> Some Cool Business Logic Operators
Here ----> Async I/O HTTP Operator -----> JDBC Sink.

Take look here also:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
<--
The Example shows database client, but you can easily replace that with
HTTP client.

But basically...
1- Get input from RabbitMQ Source.
2- Do what ever type of stream computations/business logic you need.
3- Use the Async I/O operator to send HTTP
    - If HTTP 200 OK create Flink record tagged as SUCESS and what ever
other info you want. Maybe response body.
    - If NOT HTTO 200 OK create Flink record tagged as FAILED plus other
info.
4- Sink the output record from #3 to JDBC.

On Sun, 1 Mar 2020 at 10:28, miki haiat <mi...@gmail.com> wrote:

> So you have rabitmq source and http sink?
> If so you can use side output in order to dump your data to db.
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>
> On Sat, Feb 29, 2020, 23:01 Gadi Katsovich <ga...@gmail.com>
> wrote:
>
>> Hi,
>> I'm new to flink and am evaluating it to replace our existing streaming
>> application.
>> The use case I'm working on is reading messages from RabbitMQ queue,
>> applying some transformation and filtering logic and sending it via HTTP to
>> a 3rd party.
>> A must have requirement of this flow is to to write the data that was
>> sent to an SQL db, for audit and troubleshooting purposes.
>> I'm currently basing my HTTP solution on a PR with needed adjustments:
>> https://github.com/apache/flink/pull/5866/files
>> How can I add an insertion to a DB after a successful HTTP request?
>> Thank you.
>>
>