You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "XiaShengSheng (Jira)" <ji...@apache.org> on 2021/10/31 17:24:00 UTC
[jira] [Updated] (FLINK-24709) Fix the issue of interval join java
case content in the official document case
[ https://issues.apache.org/jira/browse/FLINK-24709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
XiaShengSheng updated FLINK-24709:
----------------------------------
Fix Version/s: 1.14.0
> Fix the issue of interval join java case content in the official document case
> ------------------------------------------------------------------------------
>
> Key: FLINK-24709
> URL: https://issues.apache.org/jira/browse/FLINK-24709
> Project: Flink
> Issue Type: Improvement
> Components: API / DataStream
> Affects Versions: 1.8.0, 1.9.0, 1.10.0, 1.11.0, 1.12.0, 1.13.0, 1.14.0
> Reporter: XiaShengSheng
> Priority: Minor
> Fix For: 1.14.0
>
> Attachments: case.png
>
> Original Estimate: 1h
> Remaining Estimate: 1h
>
> Fix the interval join java case in the official document case:
> Take the flink1.12.0 version document link as an example:
> [https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining.html#interval-join|http://example.com]
> 1、Your case is:
> import org.apache.flink.api.java.functions.KeySelector;
> import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
> import org.apache.flink.streaming.api.windowing.time.Time;
> ...
> DataStream<Integer> orangeStream = ...
> DataStream<Integer> greenStream = ...
> orangeStream
> .keyBy(<KeySelector>)
> .intervalJoin(greenStream.keyBy(<KeySelector>))
> .between(Time.milliseconds(-2), Time.milliseconds(1))
> .process (new ProcessJoinFunction<Integer, Integer, String(){
> @Override
> public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
> out.collect(first + "," + second);
> }
> });
> 2、After repair:
> import org.apache.flink.api.java.functions.KeySelector;
> import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
> import org.apache.flink.streaming.api.windowing.time.Time;
> ...
> DataStream<Integer> orangeStream = ...
> DataStream<Integer> greenStream = ...
> orangeStream
> .keyBy(<KeySelector>)
> .intervalJoin(greenStream.keyBy(<KeySelector>))
> .between(Time.milliseconds(-2), Time.milliseconds(1))
> .process (new ProcessJoinFunction<Integer, Integer, String(){
> @Override
> public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
> out.collect(left + "," + right);
> }
> });
--
This message was sent by Atlassian Jira
(v8.3.4#803005)