You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "XiaShengSheng (Jira)" <ji...@apache.org> on 2021/10/31 17:17:00 UTC

[jira] [Created] (FLINK-24709) Fix the issue of interval join java case content in the official document case

XiaShengSheng created FLINK-24709:
-------------------------------------

             Summary: Fix the issue of interval join java case content in the official document case
                 Key: FLINK-24709
                 URL: https://issues.apache.org/jira/browse/FLINK-24709
             Project: Flink
          Issue Type: Improvement
          Components: API / DataStream
    Affects Versions: 1.14.0, 1.13.0, 1.12.0, 1.11.0, 1.10.0, 1.9.0, 1.8.0
            Reporter: XiaShengSheng
         Attachments: case.png

Fix the interval join java case in the official document case:

Take the flink1.12.0 version document link as an example:
[https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining.html#interval-join|http://example.com]

1、Your case is:
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;

...

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

orangeStream
    .keyBy(<KeySelector>)
    .intervalJoin(greenStream.keyBy(<KeySelector>))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process (new ProcessJoinFunction<Integer, Integer, String(){

        @Override
        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
            out.collect(first + "," + second);
        }
    });


2、After repair:
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;

...

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

orangeStream
    .keyBy(<KeySelector>)
    .intervalJoin(greenStream.keyBy(<KeySelector>))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process (new ProcessJoinFunction<Integer, Integer, String(){

        @Override
        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
            out.collect(left + "," + right);
        }
    });



--
This message was sent by Atlassian Jira
(v8.3.4#803005)