You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Philipp Bussche <ph...@gmail.com> on 2016/09/12 00:36:39 UTC

Enriching a tuple mapped from a datastream with data coming from a JDBC source

Hi there,
I have a data stream (coming from Kafka) that contains information which I
want to enrich with information that sits in a database before I handover
the enriched tuple to a sink.
How would I do that ?
I was thinking of somehow combining my streaming job with a JDBC input but
wasn't very succesful in getting this going.
Thanks
Philipp

Re: Enriching a tuple mapped from a datastream with data coming from a JDBC source

Posted by Philipp Bussche <ph...@gmail.com>.
Awesome, thanks Fabian !

I will give this a try.


Fabian Hueske-2 wrote
> Hi Philipp,
> 
> If I got your requirements right you would like to:
> 1) load an initial hashmap via JDBC
> 2) update the hashmap from a stream
> 3) use the hashmap to enrich another stream.
> 
> You can use a CoFlatMap to do this:
> 
> stream1.connect(stream2).flatMap(new YourCoFlatMapFunction).
> 
> YourCoFlatMapFunction should implement RichCoFlatMapFunction. The initial
> hashmap load can be done in the open method.
> A CoFlatMapFunction has two inputs and two flatMap methods, one for each
> input. One method can update the hashmap, the other enrich the second
> stream with data from the hashmap.
> Both methods are not concurrently called and the order in which they are
> called depends on what data is available.
> 
> In general, it is not possible to share local operator state among
> different operators (or even parallel instance of the same operator).
> 
> Hope this helps,
> Fabian


Fabian Hueske-2 wrote
> Hi Philipp,
> 
> If I got your requirements right you would like to:
> 1) load an initial hashmap via JDBC
> 2) update the hashmap from a stream
> 3) use the hashmap to enrich another stream.
> 
> You can use a CoFlatMap to do this:
> 
> stream1.connect(stream2).flatMap(new YourCoFlatMapFunction).
> 
> YourCoFlatMapFunction should implement RichCoFlatMapFunction. The initial
> hashmap load can be done in the open method.
> A CoFlatMapFunction has two inputs and two flatMap methods, one for each
> input. One method can update the hashmap, the other enrich the second
> stream with data from the hashmap.
> Both methods are not concurrently called and the order in which they are
> called depends on what data is available.
> 
> In general, it is not possible to share local operator state among
> different operators (or even parallel instance of the same operator).
> 
> Hope this helps,
> Fabian





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Enriching-a-tuple-mapped-from-a-datastream-with-data-coming-from-a-JDBC-source-tp8993p9313.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Enriching a tuple mapped from a datastream with data coming from a JDBC source

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Philipp,

If I got your requirements right you would like to:
1) load an initial hashmap via JDBC
2) update the hashmap from a stream
3) use the hashmap to enrich another stream.

You can use a CoFlatMap to do this:

stream1.connect(stream2).flatMap(new YourCoFlatMapFunction).

YourCoFlatMapFunction should implement RichCoFlatMapFunction. The initial
hashmap load can be done in the open method.
A CoFlatMapFunction has two inputs and two flatMap methods, one for each
input. One method can update the hashmap, the other enrich the second
stream with data from the hashmap.
Both methods are not concurrently called and the order in which they are
called depends on what data is available.

In general, it is not possible to share local operator state among
different operators (or even parallel instance of the same operator).

Hope this helps,
Fabian

Re: Enriching a tuple mapped from a datastream with data coming from a JDBC source

Posted by Philipp Bussche <ph...@gmail.com>.
Hi again,
I implemented the RichMap Function (open method runs a JDBC query to
populate a HashMap with data) which I am using in the map function.
Now there is another RichMap.map function that would add to the HashMap that
was initialized in the first function.
How would I share the Map between the two functions (I am using the
datastreaming API) ?

Thanks
Philipp



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Enriching-a-tuple-mapped-from-a-datastream-with-data-coming-from-a-JDBC-source-tp8993p9299.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Enriching a tuple mapped from a datastream with data coming from a JDBC source

Posted by Konstantin Knauf <ko...@tngtech.com>.
You can just use plain JDBC. Just keep in mind, that the classes will be
serialized and sent through the cluster. So probably, you want to
initialize all the non-serializable database access object in the open
method itself (as opposed to the constructor (client side)).

Cheers,

Konstantin

On 12.09.2016 13:53, Philipp Bussche wrote:
> Thank you Konstantin, the amount of data I have to load into memory will be
> very small so that should be alright.
> When opening and querying the database would I use any sort of Flink magic
> or just do plain JDBC ?
> I read about the JDBCInput concept which one could use with the DataSet API
> and was wondering if I could use that somehow in my open method then ?
> 
> Thanks
> Philipp
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Enriching-a-tuple-mapped-from-a-datastream-with-data-coming-from-a-JDBC-source-tp8993p9002.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
> 

-- 
Konstantin Knauf * konstantin.knauf@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Re: Enriching a tuple mapped from a datastream with data coming from a JDBC source

Posted by Philipp Bussche <ph...@gmail.com>.
Thank you Konstantin, the amount of data I have to load into memory will be
very small so that should be alright.
When opening and querying the database would I use any sort of Flink magic
or just do plain JDBC ?
I read about the JDBCInput concept which one could use with the DataSet API
and was wondering if I could use that somehow in my open method then ?

Thanks
Philipp



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Enriching-a-tuple-mapped-from-a-datastream-with-data-coming-from-a-JDBC-source-tp8993p9002.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Enriching a tuple mapped from a datastream with data coming from a JDBC source

Posted by Konstantin Knauf <ko...@tngtech.com>.
Hi Philipp,

the easist way is a RichMap. In the open()-Method you can load the
relevant database table into memory (e.g. a HashMap). In the
map()-method you than just look up the entry in the HashMap.

Of course, this only works if the dataset is small enough to fit in
memory. Is it?

Cheers,

Konstantin


On 12.09.2016 02:36, Philipp Bussche wrote:
> Hi there,
> I have a data stream (coming from Kafka) that contains information which
> I want to enrich with information that sits in a database before I
> handover the enriched tuple to a sink.
> How would I do that ?
> I was thinking of somehow combining my streaming job with a JDBC input
> but wasn't very succesful in getting this going.
> Thanks
> Philipp

-- 
Konstantin Knauf * konstantin.knauf@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082