You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Anastasios Skarlatidis <a....@gmail.com> on 2016/11/27 21:11:35 UTC

DB connection and query inside map function

Hi!

I am new to Apache Flink and I would like to ask what is the best way to
query a relational DB inside a map function, in order to enrich the
streaming data. Consider, for example, that I have a KeyedStream[Int,
String] and I would like to query the database based on the Int value
inside a map function `stream.map(v: Int => <<some SQL query>> )`.

Is it possible to have a connection pooler per worker nod,e in order to be
used inside each map function call?

Best,

Anastasios

Re: DB connection and query inside map function

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Anastasios,

that's certainly possible. The most straight-forward approach would be a
synchronous call to the database.
Because only one request is active at the same time, you do not need a
thread pool.
You can establish the connection in the open() method of a RichMapFunction.
The problem with this approach is that the synchronous requests can
significantly increase the latency.

Doing the calls asynchronously and using a thread pool is not very easy
because this would need to be integrated with Flink's checkpointing
mechanism.
In fact, there is an effort to add a special Map operator that supports
asynchronous calls (see FLIP-12 [1]).
We expect this to be included in the next minor release, Flink 1.2.

Hope this helps,
Fabian

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673

2016-11-27 22:11 GMT+01:00 Anastasios Skarlatidis <a....@gmail.com>:

> Hi!
>
> I am new to Apache Flink and I would like to ask what is the best way to
> query a relational DB inside a map function, in order to enrich the
> streaming data. Consider, for example, that I have a KeyedStream[Int,
> String] and I would like to query the database based on the Int value
> inside a map function `stream.map(v: Int => <<some SQL query>> )`.
>
> Is it possible to have a connection pooler per worker nod,e in order to be
> used inside each map function call?
>
> Best,
>
> Anastasios
>