You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by de...@thomsonreuters.com on 2016/12/05 11:26:53 UTC

Equivalent of Rx combineLatest() on a join?

Hi all,

[first email here, I'm new to Flink, Java and Scala, sorry if I missed something obvious]

I'm exploring Flink in the context of streaming calculators. Basically, the data flow boils down to multiple data streams with variable update rates (ms, seconds, ..., month) which are joined before being fed to calculators. The kind of operation I need is very similar to the Rx combineLatest<http://reactivex.io/documentation/operators/combinelatest.html> operator, which results in a object being emitted whenever one of the streams is updated.

As there is no such operator predefined, I think I have to use a GlobalWindow and provide a custom WindowAssigner. The end result would look like this (pseudo java 8 code, I hope it's understandable):

DataStream<price1> s1 = env.addSource(..);
DataStream<price2> s2 = env.addSource(..);

S3 = s1.join(s2)
.where(s1 -> id)
.equalTo(s2 -> id)
                .window(new MyCustomCombineLatestAssigner())
                .apply( ... return new object combining data from s1 and from s2);

Is the approach correct, or is there a simpler way to achieve the same join + apply mechanism ?

Thank you,

Denis



________________________________

This e-mail is for the sole use of the intended recipient and contains information that may be privileged and/or confidential. If you are not an intended recipient, please notify the sender by return e-mail and delete this e-mail and any attachments. Certain required legal entity disclosures can be accessed on our website.<http://site.thomsonreuters.com/site/disclosures/>

RE: Equivalent of Rx combineLatest() on a join?

Posted by de...@thomsonreuters.com.
Thanks Gábor, indeed it appears to work as expected.

I found another way based on new evictors included in flink 1.2 (see FLINK-4174) that can remove elements anywhere in a window, for example based on element content.

However the CoFlatMap solution you suggest is definitely simpler, I'm going to dig further in this direction.

Regards,

Denis

-----Original Message-----
From: Gábor Gévay [mailto:ggab90@gmail.com] 
Sent: mardi 13 décembre 2016 09:45
To: user@flink.apache.org
Subject: Re: Equivalent of Rx combineLatest() on a join?

Dear Denis,

I think you can do it with a simple CoFlatMapFunction (without windows):
To use a CoFlatMapFunction, you need to first connect [1] your streams, which results in a ConnectedStreams. Then you can call flatMap on this, and give a CoFlatMapFunction to it (where two different callbacks are executed when an element arrives on one of the two streams). What you could do, is to have two members in your CoFlatMapFunction that store the latest values from the two streams, and you update the appropriate one when an element arrives and also emit a combined value from them.

Best,
Gábor

[1] https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.1_api_java_org_apache_flink_streaming_api_datastream_DataStream.html-23connect-2Dorg.apache.flink.streaming.api.datastream.DataStream-2D&d=CwIFaQ&c=4ZIZThykDLcoWk-GVjSLm9hvvvzvGv0FLoWSRuCSs5Q&r=Jtkfol_Mg_eQUL17wcicdBE1Et94D7zgzS-8_bKvGlc&m=ZYuPMV-ZV7UqMGZmLQMxLRdvEMcFaZ_TKhaRe0WuWBA&s=BUpNqxUD1hBkAYXovGhnJMFhb-27W65ZF-AusAtsmKY&e= 




2016-12-05 18:28 GMT+01:00  <de...@thomsonreuters.com>:
> Actually that doesn’t work as expected because emitted values are not 
> purged. I’ll experiment with purging triggers and/or evictors, though 
> I have the feeling that Flink was not designed for what we need to do 
> here -- but I’ll keep on searching.
>
>
>
> In the meantime any advice is appreciated. If the goal is not clear I 
> can provide more details.
>
>
>
> Thank you,
>
>
>
> Denis
>
>
>
> From: Dollfus, Denis (TR Technology & Ops)
> Sent: lundi 5 décembre 2016 16:31
> To: user@flink.apache.org
> Subject: RE: Equivalent of Rx combineLatest() on a join?
>
>
>
> Asking the response helped me to find the answer (yes, rubber duck
> debugging) as it seems that the code below does what I need:
>
>
>
> s3 = s1.join(s2)
>
> .where(new KeySelector1()).equalTo(new KeySelector2())
>
>                 .window(GlobalWindow.create())
>
>                 .trigger(CountTrigger.of(1))
>
>                 .apply(new JoinFunction<a,b,c>);
>
>
>
> If that’s a common use case (in my view it is), a syntax shortcut 
> could help developers, e.g. something like:
>
>
>
> s3 = s1.join(s2)
>
> .where(new KeySelector1()).equalTo(new KeySelector2())
>
>                                 .combineLatest(new 
> JoinFunction<a,b,c>);
>
>
>
> Denis
>
>
>
>
>
> From: Dollfus, Denis (TR Technology & Ops)
> Sent: lundi 5 décembre 2016 12:27
> To: user@flink.apache.org
> Subject: Equivalent of Rx combineLatest() on a join?
>
>
>
> Hi all,
>
>
>
> [first email here, I’m new to Flink, Java and Scala, sorry if I missed 
> something obvious]
>
>
>
> I'm exploring Flink in the context of streaming calculators. 
> Basically, the data flow boils down to multiple data streams with 
> variable update rates (ms, seconds, …, month) which are joined before being fed to calculators.
> The kind of operation I need is very similar to the Rx combineLatest 
> operator, which results in a object being emitted whenever one of the 
> streams is updated.
>
>
>
> As there is no such operator predefined, I think I have to use a 
> GlobalWindow and provide a custom WindowAssigner. The end result would 
> look like this (pseudo java 8 code, I hope it's understandable):
>
>
>
> DataStream<price1> s1 = env.addSource(..);
>
> DataStream<price2> s2 = env.addSource(..);
>
>
>
> S3 = s1.join(s2)
>
> .where(s1 -> id)
>
> .equalTo(s2 -> id)
>
>                 .window(new MyCustomCombineLatestAssigner())
>
>                 .apply( … return new object combining data from s1 and 
> from s2);
>
>
>
> Is the approach correct, or is there a simpler way to achieve the same 
> join
> + apply mechanism ?
>
>
>
> Thank you,
>
>
>
> Denis
>
>
>
>
>
>
>
> ________________________________
>
>
> This e-mail is for the sole use of the intended recipient and contains 
> information that may be privileged and/or confidential. If you are not 
> an intended recipient, please notify the sender by return e-mail and 
> delete this e-mail and any attachments. Certain required legal entity 
> disclosures can be accessed on our website.

Re: Equivalent of Rx combineLatest() on a join?

Posted by Gábor Gévay <gg...@gmail.com>.
Dear Denis,

I think you can do it with a simple CoFlatMapFunction (without windows):
To use a CoFlatMapFunction, you need to first connect [1] your
streams, which results in a ConnectedStreams. Then you can call
flatMap on this, and give a CoFlatMapFunction to it (where two
different callbacks are executed when an element arrives on one of the
two streams). What you could do, is to have two members in your
CoFlatMapFunction that store the latest values from the two streams,
and you update the appropriate one when an element arrives and also
emit a combined value from them.

Best,
Gábor

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#connect-org.apache.flink.streaming.api.datastream.DataStream-




2016-12-05 18:28 GMT+01:00  <de...@thomsonreuters.com>:
> Actually that doesn’t work as expected because emitted values are not
> purged. I’ll experiment with purging triggers and/or evictors, though I have
> the feeling that Flink was not designed for what we need to do here -- but
> I’ll keep on searching.
>
>
>
> In the meantime any advice is appreciated. If the goal is not clear I can
> provide more details.
>
>
>
> Thank you,
>
>
>
> Denis
>
>
>
> From: Dollfus, Denis (TR Technology & Ops)
> Sent: lundi 5 décembre 2016 16:31
> To: user@flink.apache.org
> Subject: RE: Equivalent of Rx combineLatest() on a join?
>
>
>
> Asking the response helped me to find the answer (yes, rubber duck
> debugging) as it seems that the code below does what I need:
>
>
>
> s3 = s1.join(s2)
>
> .where(new KeySelector1()).equalTo(new KeySelector2())
>
>                 .window(GlobalWindow.create())
>
>                 .trigger(CountTrigger.of(1))
>
>                 .apply(new JoinFunction<a,b,c>);
>
>
>
> If that’s a common use case (in my view it is), a syntax shortcut could help
> developers, e.g. something like:
>
>
>
> s3 = s1.join(s2)
>
> .where(new KeySelector1()).equalTo(new KeySelector2())
>
>                                 .combineLatest(new JoinFunction<a,b,c>);
>
>
>
> Denis
>
>
>
>
>
> From: Dollfus, Denis (TR Technology & Ops)
> Sent: lundi 5 décembre 2016 12:27
> To: user@flink.apache.org
> Subject: Equivalent of Rx combineLatest() on a join?
>
>
>
> Hi all,
>
>
>
> [first email here, I’m new to Flink, Java and Scala, sorry if I missed
> something obvious]
>
>
>
> I'm exploring Flink in the context of streaming calculators. Basically, the
> data flow boils down to multiple data streams with variable update rates
> (ms, seconds, …, month) which are joined before being fed to calculators.
> The kind of operation I need is very similar to the Rx combineLatest
> operator, which results in a object being emitted whenever one of the
> streams is updated.
>
>
>
> As there is no such operator predefined, I think I have to use a
> GlobalWindow and provide a custom WindowAssigner. The end result would look
> like this (pseudo java 8 code, I hope it's understandable):
>
>
>
> DataStream<price1> s1 = env.addSource(..);
>
> DataStream<price2> s2 = env.addSource(..);
>
>
>
> S3 = s1.join(s2)
>
> .where(s1 -> id)
>
> .equalTo(s2 -> id)
>
>                 .window(new MyCustomCombineLatestAssigner())
>
>                 .apply( … return new object combining data from s1 and from
> s2);
>
>
>
> Is the approach correct, or is there a simpler way to achieve the same join
> + apply mechanism ?
>
>
>
> Thank you,
>
>
>
> Denis
>
>
>
>
>
>
>
> ________________________________
>
>
> This e-mail is for the sole use of the intended recipient and contains
> information that may be privileged and/or confidential. If you are not an
> intended recipient, please notify the sender by return e-mail and delete
> this e-mail and any attachments. Certain required legal entity disclosures
> can be accessed on our website.

RE: Equivalent of Rx combineLatest() on a join?

Posted by de...@thomsonreuters.com.
Actually that doesn't work as expected because emitted values are not purged. I'll experiment with purging triggers and/or evictors, though I have the feeling that Flink was not designed for what we need to do here -- but I'll keep on searching.

In the meantime any advice is appreciated. If the goal is not clear I can provide more details.

Thank you,

Denis

From: Dollfus, Denis (TR Technology & Ops)
Sent: lundi 5 décembre 2016 16:31
To: user@flink.apache.org
Subject: RE: Equivalent of Rx combineLatest() on a join?

Asking the response helped me to find the answer (yes, rubber duck debugging<https://urldefense.proofpoint.com/v2/url?u=https-3A__en.wikipedia.org_wiki_Rubber-5Fduck-5Fdebugging&d=CwMFAw&c=4ZIZThykDLcoWk-GVjSLm9hvvvzvGv0FLoWSRuCSs5Q&r=Jtkfol_Mg_eQUL17wcicdBE1Et94D7zgzS-8_bKvGlc&m=NNcjpWPJ7g8qY8aDVqKnFusNJenvEDh4gSVH9oN596Y&s=jzjcaGPpNnYAs8WTsdAtUBisrQxT234_JP59ZzJgyCw&e=>) as it seems that the code below does what I need:

s3 = s1.join(s2)
.where(new KeySelector1()).equalTo(new KeySelector2())
                .window(GlobalWindow.create())
                .trigger(CountTrigger.of(1))
                .apply(new JoinFunction<a,b,c>);

If that's a common use case (in my view it is), a syntax shortcut could help developers, e.g. something like:

s3 = s1.join(s2)
.where(new KeySelector1()).equalTo(new KeySelector2())
                                .combineLatest(new JoinFunction<a,b,c>);

Denis


From: Dollfus, Denis (TR Technology & Ops)
Sent: lundi 5 décembre 2016 12:27
To: user@flink.apache.org<ma...@flink.apache.org>
Subject: Equivalent of Rx combineLatest() on a join?

Hi all,

[first email here, I'm new to Flink, Java and Scala, sorry if I missed something obvious]

I'm exploring Flink in the context of streaming calculators. Basically, the data flow boils down to multiple data streams with variable update rates (ms, seconds, ..., month) which are joined before being fed to calculators. The kind of operation I need is very similar to the Rx combineLatest<https://urldefense.proofpoint.com/v2/url?u=http-3A__reactivex.io_documentation_operators_combinelatest.html&d=CwMFAg&c=4ZIZThykDLcoWk-GVjSLm9hvvvzvGv0FLoWSRuCSs5Q&r=Jtkfol_Mg_eQUL17wcicdBE1Et94D7zgzS-8_bKvGlc&m=XIk_LOnCw3IK_rNsQ5k4Y7nX14-RXxpT2wgnexTL2nM&s=HqeoR2u4r5RCZ6OjcCRP6usy9b8iUuZU3xxU6Sk17yg&e=> operator, which results in a object being emitted whenever one of the streams is updated.

As there is no such operator predefined, I think I have to use a GlobalWindow and provide a custom WindowAssigner. The end result would look like this (pseudo java 8 code, I hope it's understandable):

DataStream<price1> s1 = env.addSource(..);
DataStream<price2> s2 = env.addSource(..);

S3 = s1.join(s2)
.where(s1 -> id)
.equalTo(s2 -> id)
                .window(new MyCustomCombineLatestAssigner())
                .apply( ... return new object combining data from s1 and from s2);

Is the approach correct, or is there a simpler way to achieve the same join + apply mechanism ?

Thank you,

Denis



________________________________

This e-mail is for the sole use of the intended recipient and contains information that may be privileged and/or confidential. If you are not an intended recipient, please notify the sender by return e-mail and delete this e-mail and any attachments. Certain required legal entity disclosures can be accessed on our website.<http://site.thomsonreuters.com/site/disclosures/>

RE: Equivalent of Rx combineLatest() on a join?

Posted by de...@thomsonreuters.com.
Asking the response helped me to find the answer (yes, rubber duck debugging<https://en.wikipedia.org/wiki/Rubber_duck_debugging>) as it seems that the code below does what I need:

s3 = s1.join(s2)
.where(new KeySelector1()).equalTo(new KeySelector2())
                .window(GlobalWindow.create())
                .trigger(CountTrigger.of(1))
                .apply(new JoinFunction<a,b,c>);

If that's a common use case (in my view it is), a syntax shortcut could help developers, e.g. something like:

s3 = s1.join(s2)
.where(new KeySelector1()).equalTo(new KeySelector2())
                                .combineLatest(new JoinFunction<a,b,c>);

Denis


From: Dollfus, Denis (TR Technology & Ops)
Sent: lundi 5 décembre 2016 12:27
To: user@flink.apache.org
Subject: Equivalent of Rx combineLatest() on a join?

Hi all,

[first email here, I'm new to Flink, Java and Scala, sorry if I missed something obvious]

I'm exploring Flink in the context of streaming calculators. Basically, the data flow boils down to multiple data streams with variable update rates (ms, seconds, ..., month) which are joined before being fed to calculators. The kind of operation I need is very similar to the Rx combineLatest<https://urldefense.proofpoint.com/v2/url?u=http-3A__reactivex.io_documentation_operators_combinelatest.html&d=CwMFAg&c=4ZIZThykDLcoWk-GVjSLm9hvvvzvGv0FLoWSRuCSs5Q&r=Jtkfol_Mg_eQUL17wcicdBE1Et94D7zgzS-8_bKvGlc&m=XIk_LOnCw3IK_rNsQ5k4Y7nX14-RXxpT2wgnexTL2nM&s=HqeoR2u4r5RCZ6OjcCRP6usy9b8iUuZU3xxU6Sk17yg&e=> operator, which results in a object being emitted whenever one of the streams is updated.

As there is no such operator predefined, I think I have to use a GlobalWindow and provide a custom WindowAssigner. The end result would look like this (pseudo java 8 code, I hope it's understandable):

DataStream<price1> s1 = env.addSource(..);
DataStream<price2> s2 = env.addSource(..);

S3 = s1.join(s2)
.where(s1 -> id)
.equalTo(s2 -> id)
                .window(new MyCustomCombineLatestAssigner())
                .apply( ... return new object combining data from s1 and from s2);

Is the approach correct, or is there a simpler way to achieve the same join + apply mechanism ?

Thank you,

Denis



________________________________

This e-mail is for the sole use of the intended recipient and contains information that may be privileged and/or confidential. If you are not an intended recipient, please notify the sender by return e-mail and delete this e-mail and any attachments. Certain required legal entity disclosures can be accessed on our website.<http://site.thomsonreuters.com/site/disclosures/>