You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Philipp Bussche <ph...@gmail.com> on 2016/10/04 01:41:21 UTC

Re: Enriching a tuple mapped from a datastream with data coming from a JDBC source

Hi again,
I implemented the RichMap Function (open method runs a JDBC query to
populate a HashMap with data) which I am using in the map function.
Now there is another RichMap.map function that would add to the HashMap that
was initialized in the first function.
How would I share the Map between the two functions (I am using the
datastreaming API) ?

Thanks
Philipp



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Enriching-a-tuple-mapped-from-a-datastream-with-data-coming-from-a-JDBC-source-tp8993p9299.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Enriching a tuple mapped from a datastream with data coming from a JDBC source

Posted by Philipp Bussche <ph...@gmail.com>.
Awesome, thanks Fabian !

I will give this a try.


Fabian Hueske-2 wrote
> Hi Philipp,
> 
> If I got your requirements right you would like to:
> 1) load an initial hashmap via JDBC
> 2) update the hashmap from a stream
> 3) use the hashmap to enrich another stream.
> 
> You can use a CoFlatMap to do this:
> 
> stream1.connect(stream2).flatMap(new YourCoFlatMapFunction).
> 
> YourCoFlatMapFunction should implement RichCoFlatMapFunction. The initial
> hashmap load can be done in the open method.
> A CoFlatMapFunction has two inputs and two flatMap methods, one for each
> input. One method can update the hashmap, the other enrich the second
> stream with data from the hashmap.
> Both methods are not concurrently called and the order in which they are
> called depends on what data is available.
> 
> In general, it is not possible to share local operator state among
> different operators (or even parallel instance of the same operator).
> 
> Hope this helps,
> Fabian


Fabian Hueske-2 wrote
> Hi Philipp,
> 
> If I got your requirements right you would like to:
> 1) load an initial hashmap via JDBC
> 2) update the hashmap from a stream
> 3) use the hashmap to enrich another stream.
> 
> You can use a CoFlatMap to do this:
> 
> stream1.connect(stream2).flatMap(new YourCoFlatMapFunction).
> 
> YourCoFlatMapFunction should implement RichCoFlatMapFunction. The initial
> hashmap load can be done in the open method.
> A CoFlatMapFunction has two inputs and two flatMap methods, one for each
> input. One method can update the hashmap, the other enrich the second
> stream with data from the hashmap.
> Both methods are not concurrently called and the order in which they are
> called depends on what data is available.
> 
> In general, it is not possible to share local operator state among
> different operators (or even parallel instance of the same operator).
> 
> Hope this helps,
> Fabian





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Enriching-a-tuple-mapped-from-a-datastream-with-data-coming-from-a-JDBC-source-tp8993p9313.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Enriching a tuple mapped from a datastream with data coming from a JDBC source

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Philipp,

If I got your requirements right you would like to:
1) load an initial hashmap via JDBC
2) update the hashmap from a stream
3) use the hashmap to enrich another stream.

You can use a CoFlatMap to do this:

stream1.connect(stream2).flatMap(new YourCoFlatMapFunction).

YourCoFlatMapFunction should implement RichCoFlatMapFunction. The initial
hashmap load can be done in the open method.
A CoFlatMapFunction has two inputs and two flatMap methods, one for each
input. One method can update the hashmap, the other enrich the second
stream with data from the hashmap.
Both methods are not concurrently called and the order in which they are
called depends on what data is available.

In general, it is not possible to share local operator state among
different operators (or even parallel instance of the same operator).

Hope this helps,
Fabian