You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "XiaShengSheng (Jira)" <ji...@apache.org> on 2021/10/31 17:17:00 UTC
[jira] [Created] (FLINK-24709) Fix the issue of interval join java
case content in the official document case
XiaShengSheng created FLINK-24709:
-------------------------------------
Summary: Fix the issue of interval join java case content in the official document case
Key: FLINK-24709
URL: https://issues.apache.org/jira/browse/FLINK-24709
Project: Flink
Issue Type: Improvement
Components: API / DataStream
Affects Versions: 1.14.0, 1.13.0, 1.12.0, 1.11.0, 1.10.0, 1.9.0, 1.8.0
Reporter: XiaShengSheng
Attachments: case.png
Fix the interval join java case in the official document case:
Take the flink1.12.0 version document link as an example:
[https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining.html#interval-join|http://example.com]
1、Your case is:
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...
orangeStream
.keyBy(<KeySelector>)
.intervalJoin(greenStream.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){
@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
out.collect(first + "," + second);
}
});
2、After repair:
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...
orangeStream
.keyBy(<KeySelector>)
.intervalJoin(greenStream.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){
@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
out.collect(left + "," + right);
}
});
--
This message was sent by Atlassian Jira
(v8.3.4#803005)