You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Benchao Li <li...@apache.org> on 2020/08/27 02:18:36 UTC

Re: flink interval join后按窗口聚组问题

Hi Danny,

You are right, we have already considered the watermark lateness in this
case.
However our Interval Join Operator has some bug that will still produce
records later than watermark.
I've created a issue[1], we can discuss it in the jira issue.

[1] https://issues.apache.org/jira/browse/FLINK-18996

Danny Chan <yu...@gmail.com> 于2020年8月26日周三 下午8:09写道:

> For SQL, we always hold back the watermark when we emit the elements, for
> time interval:
>
> return Math.max(leftRelativeSize, rightRelativeSize) + allowedLateness;
>
> For your case, the watermark would hold back for 1 hour, so the left join
> records would not delay when it is used by subsequent operators.
>
> See KeyedCoProcessOperatorWithWatermarkDelay and
> RowTimeIntervalJoin.getMaxOutputDelay for details.
>
> Best,
> Danny Chan
> 在 2020年7月3日 +0800 PM3:29,元始(Bob Hu) <65...@qq.com>,写道:
>
> 您好,我想请教一个问题:
> flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between a.rowtime
> and a.rowtime + INTERVAL '1' HOUR
> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime +
> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 +
> allowedLateness +
> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize,
> rightRelativeSize) +
> allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group
> by的时候这种右表数据为空的数据就丢掉了啊。
> flink版本 1.10.0。
>
> 下面是我的一段测试代码:
>
> import org.apache.commons.net.ntp.TimeStamp;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
> import org.apache.flink.streaming.api.functions.ProcessFunction;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.functions.ScalarFunction;
> import org.apache.flink.types.Row;
> import org.apache.flink.util.Collector;
> import org.apache.flink.util.IOUtils;
>
> import java.io.BufferedReader;
> import java.io.InputStreamReader;
> import java.io.Serializable;
> import java.net.InetSocketAddress;
> import java.net.Socket;
> import java.sql.Timestamp;
> import java.text.SimpleDateFormat;
> import java.util.ArrayList;
> import java.util.Date;
> import java.util.List;
>
> public class TimeBoundedJoin {
>
>     public static AssignerWithPeriodicWatermarks<Row> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) {
>         AssignerWithPeriodicWatermarks<Row> timestampExtractor = new AssignerWithPeriodicWatermarks<Row>() {
>             private long currentMaxTimestamp = 0;
>             private long lastMaxTimestamp = 0;
>             private long lastUpdateTime = 0;
>             boolean firstWatermark = true;
> //            Integer maxIdleTime = 30;
>
>             @Override
>             public Watermark getCurrentWatermark() {
>                 if(firstWatermark) {
>                     lastUpdateTime = System.currentTimeMillis();
>                     firstWatermark = false;
>                 }
>                 if(currentMaxTimestamp != lastMaxTimestamp) {
>                     lastMaxTimestamp = currentMaxTimestamp;
>                     lastUpdateTime = System.currentTimeMillis();
>                 }
>                 if(maxIdleTime != null && System.currentTimeMillis() - lastUpdateTime > maxIdleTime * 1000) {
>                     return new Watermark(new Date().getTime() - finalMaxOutOfOrderness * 1000);
>                 }
>                 return new Watermark(currentMaxTimestamp - finalMaxOutOfOrderness * 1000);
>
>             }
>
>             @Override
>             public long extractTimestamp(Row row, long previousElementTimestamp) {
>                 Object value = row.getField(1);
>                 long timestamp;
>                 try {
>                     timestamp = (long)value;
>                 } catch (Exception e) {
>                     timestamp = ((Timestamp)value).getTime();
>                 }
>                 if(timestamp > currentMaxTimestamp) {
>                     currentMaxTimestamp = timestamp;
>                 }
>                 return timestamp;
>             }
>         };
>         return timestampExtractor;
>     }
>
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>         EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>         StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
>         bsEnv.setParallelism(1);
>         bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
> //        DataStream<Row> ds1 = bsEnv.addSource(sourceFunction(9000));
>         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
>         List<Row> list = new ArrayList<>();
>         list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 00:00:00").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:20:00").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:40:00").getTime()), 100));
>         list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 01:00:01").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:30:00").getTime()), 100));
>         list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 02:00:02").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime()), 100));
>         list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime()), 100));
>         list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 04:00:04").getTime()), 100));
>         DataStream<Row> ds1 = bsEnv.addSource(new SourceFunction<Row>() {
>             @Override
>             public void run(SourceContext<Row> ctx) throws Exception {
>                 for(Row row : list) {
>                     ctx.collect(row);
>                     Thread.sleep(1000);
>                 }
>
>             }
>
>             @Override
>             public void cancel() {
>
>             }
>         });
>         ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, 0));
>         ds1.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT)));
>         bsTableEnv.createTemporaryView("order_info", ds1, "order_id, order_time, fee, rowtime.rowtime");
>
>         List<Row> list2 = new ArrayList<>();
>         list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 01:00:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:20:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:30:00").getTime())));
>         list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 02:00:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime())));
> //        list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime())));
>         list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 04:00:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:20:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:40:00").getTime())));
>         list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 05:00:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:20:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:40:00").getTime())));
>         DataStream<Row> ds2 = bsEnv.addSource(new SourceFunction<Row>() {
>             @Override
>             public void run(SourceContext<Row> ctx) throws Exception {
>                 for(Row row : list2) {
>                     ctx.collect(row);
>                     Thread.sleep(1000);
>                 }
>
>             }
>
>             @Override
>             public void cancel() {
>
>             }
>         });
>         ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0));
>         ds2.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP)));
>         bsTableEnv.createTemporaryView("pay", ds2, "order_id, pay_time, rowtime.rowtime");
>
>         Table joinTable =  bsTableEnv.sqlQuery("SELECT a.*,b.order_id from order_info a left join pay b on a.order_id=b.order_id and b.rowtime between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id <>'000' ");
>
>         bsTableEnv.toAppendStream(joinTable, Row.class).process(new ProcessFunction<Row, Object>() {
>             @Override
>             public void processElement(Row value, Context ctx, Collector<Object> out) throws Exception {
>                 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
>                 System.err.println("row:" + value + ",rowtime:" + value.getField(3) + ",watermark:" + sdf.format(ctx.timerService().currentWatermark()));
>             }
>         });
>
>         bsTableEnv.execute("job");
>     }
> }
>
>

-- 

Best,
Benchao Li