You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "元始(Bob Hu)" <65...@qq.com> on 2020/07/03 07:28:38 UTC

flink interval join后按窗口聚组问题

您好,我想请教一个问题:
flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
比如关联条件是select * from a,b where a.id=b.id and b.rowtime between a.rowtime and a.rowtime + INTERVAL '1' HOUR ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime + leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 + allowedLateness + 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize, rightRelativeSize) + allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group by的时候这种右表数据为空的数据就丢掉了啊。

flink版本 1.10.0。


下面是我的一段测试代码:
import org.apache.commons.net.ntp.TimeStamp;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.IOUtils;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

public class TimeBoundedJoin {

    public static AssignerWithPeriodicWatermarks<Row&gt; getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) {
        AssignerWithPeriodicWatermarks<Row&gt; timestampExtractor = new AssignerWithPeriodicWatermarks<Row&gt;() {
            private long currentMaxTimestamp = 0;
            private long lastMaxTimestamp = 0;
            private long lastUpdateTime = 0;
            boolean firstWatermark = true;
//            Integer maxIdleTime = 30;

            @Override
            public Watermark getCurrentWatermark() {
                if(firstWatermark) {
                    lastUpdateTime = System.currentTimeMillis();
                    firstWatermark = false;
                }
                if(currentMaxTimestamp != lastMaxTimestamp) {
                    lastMaxTimestamp = currentMaxTimestamp;
                    lastUpdateTime = System.currentTimeMillis();
                }
                if(maxIdleTime != null &amp;&amp; System.currentTimeMillis() - lastUpdateTime &gt; maxIdleTime * 1000) {
                    return new Watermark(new Date().getTime() - finalMaxOutOfOrderness * 1000);
                }
                return new Watermark(currentMaxTimestamp - finalMaxOutOfOrderness * 1000);

            }

            @Override
            public long extractTimestamp(Row row, long previousElementTimestamp) {
                Object value = row.getField(1);
                long timestamp;
                try {
                    timestamp = (long)value;
                } catch (Exception e) {
                    timestamp = ((Timestamp)value).getTime();
                }
                if(timestamp &gt; currentMaxTimestamp) {
                    currentMaxTimestamp = timestamp;
                }
                return timestamp;
            }
        };
        return timestampExtractor;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
        bsEnv.setParallelism(1);
        bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


//        DataStream<Row&gt; ds1 = bsEnv.addSource(sourceFunction(9000));
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        List<Row&gt; list = new ArrayList<&gt;();
        list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 00:00:00").getTime()), 100));
        list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:20:00").getTime()), 100));
        list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:40:00").getTime()), 100));
        list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 01:00:01").getTime()), 100));
        list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
        list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:30:00").getTime()), 100));
        list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 02:00:02").getTime()), 100));
        list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
        list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime()), 100));
        list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime()), 100));
        list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime()), 100));
        list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime()), 100));
        list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 04:00:04").getTime()), 100));
        DataStream<Row&gt; ds1 = bsEnv.addSource(new SourceFunction<Row&gt;() {
            @Override
            public void run(SourceContext<Row&gt; ctx) throws Exception {
                for(Row row : list) {
                    ctx.collect(row);
                    Thread.sleep(1000);
                }

            }

            @Override
            public void cancel() {

            }
        });
        ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, 0));
        ds1.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT)));
        bsTableEnv.createTemporaryView("order_info", ds1, "order_id, order_time, fee, rowtime.rowtime");

        List<Row&gt; list2 = new ArrayList<&gt;();
        list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 01:00:00").getTime())));
        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:20:00").getTime())));
        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:30:00").getTime())));
        list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 02:00:00").getTime())));
        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime())));
        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime())));
//        list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime())));
        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime())));
        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime())));
        list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 04:00:00").getTime())));
        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:20:00").getTime())));
        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:40:00").getTime())));
        list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 05:00:00").getTime())));
        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:20:00").getTime())));
        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:40:00").getTime())));
        DataStream<Row&gt; ds2 = bsEnv.addSource(new SourceFunction<Row&gt;() {
            @Override
            public void run(SourceContext<Row&gt; ctx) throws Exception {
                for(Row row : list2) {
                    ctx.collect(row);
                    Thread.sleep(1000);
                }

            }

            @Override
            public void cancel() {

            }
        });
        ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0));
        ds2.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP)));
        bsTableEnv.createTemporaryView("pay", ds2, "order_id, pay_time, rowtime.rowtime");

        Table joinTable =  bsTableEnv.sqlQuery("SELECT a.*,b.order_id from order_info a left join pay b on a.order_id=b.order_id and b.rowtime between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id <&gt;'000' ");

        bsTableEnv.toAppendStream(joinTable, Row.class).process(new ProcessFunction<Row, Object&gt;() {
            @Override
            public void processElement(Row value, Context ctx, Collector<Object&gt; out) throws Exception {
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                System.err.println("row:" + value + ",rowtime:" + value.getField(3) + ",watermark:" + sdf.format(ctx.timerService().currentWatermark()));
            }
        });

        bsTableEnv.execute("job");
    }
}

Re: flink interval join后按窗口聚组问题

Posted by Benchao Li <li...@apache.org>.
Hi Danny,

You are right, we have already considered the watermark lateness in this
case.
However our Interval Join Operator has some bug that will still produce
records later than watermark.
I've created a issue[1], we can discuss it in the jira issue.

[1] https://issues.apache.org/jira/browse/FLINK-18996

Danny Chan <yu...@gmail.com> 于2020年8月26日周三 下午8:09写道:

> For SQL, we always hold back the watermark when we emit the elements, for
> time interval:
>
> return Math.max(leftRelativeSize, rightRelativeSize) + allowedLateness;
>
> For your case, the watermark would hold back for 1 hour, so the left join
> records would not delay when it is used by subsequent operators.
>
> See KeyedCoProcessOperatorWithWatermarkDelay and
> RowTimeIntervalJoin.getMaxOutputDelay for details.
>
> Best,
> Danny Chan
> 在 2020年7月3日 +0800 PM3:29,元始(Bob Hu) <65...@qq.com>,写道:
>
> 您好,我想请教一个问题:
> flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between a.rowtime
> and a.rowtime + INTERVAL '1' HOUR
> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime +
> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 +
> allowedLateness +
> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize,
> rightRelativeSize) +
> allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group
> by的时候这种右表数据为空的数据就丢掉了啊。
> flink版本 1.10.0。
>
> 下面是我的一段测试代码:
>
> import org.apache.commons.net.ntp.TimeStamp;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
> import org.apache.flink.streaming.api.functions.ProcessFunction;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.functions.ScalarFunction;
> import org.apache.flink.types.Row;
> import org.apache.flink.util.Collector;
> import org.apache.flink.util.IOUtils;
>
> import java.io.BufferedReader;
> import java.io.InputStreamReader;
> import java.io.Serializable;
> import java.net.InetSocketAddress;
> import java.net.Socket;
> import java.sql.Timestamp;
> import java.text.SimpleDateFormat;
> import java.util.ArrayList;
> import java.util.Date;
> import java.util.List;
>
> public class TimeBoundedJoin {
>
>     public static AssignerWithPeriodicWatermarks<Row> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) {
>         AssignerWithPeriodicWatermarks<Row> timestampExtractor = new AssignerWithPeriodicWatermarks<Row>() {
>             private long currentMaxTimestamp = 0;
>             private long lastMaxTimestamp = 0;
>             private long lastUpdateTime = 0;
>             boolean firstWatermark = true;
> //            Integer maxIdleTime = 30;
>
>             @Override
>             public Watermark getCurrentWatermark() {
>                 if(firstWatermark) {
>                     lastUpdateTime = System.currentTimeMillis();
>                     firstWatermark = false;
>                 }
>                 if(currentMaxTimestamp != lastMaxTimestamp) {
>                     lastMaxTimestamp = currentMaxTimestamp;
>                     lastUpdateTime = System.currentTimeMillis();
>                 }
>                 if(maxIdleTime != null && System.currentTimeMillis() - lastUpdateTime > maxIdleTime * 1000) {
>                     return new Watermark(new Date().getTime() - finalMaxOutOfOrderness * 1000);
>                 }
>                 return new Watermark(currentMaxTimestamp - finalMaxOutOfOrderness * 1000);
>
>             }
>
>             @Override
>             public long extractTimestamp(Row row, long previousElementTimestamp) {
>                 Object value = row.getField(1);
>                 long timestamp;
>                 try {
>                     timestamp = (long)value;
>                 } catch (Exception e) {
>                     timestamp = ((Timestamp)value).getTime();
>                 }
>                 if(timestamp > currentMaxTimestamp) {
>                     currentMaxTimestamp = timestamp;
>                 }
>                 return timestamp;
>             }
>         };
>         return timestampExtractor;
>     }
>
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>         EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>         StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
>         bsEnv.setParallelism(1);
>         bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
> //        DataStream<Row> ds1 = bsEnv.addSource(sourceFunction(9000));
>         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
>         List<Row> list = new ArrayList<>();
>         list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 00:00:00").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:20:00").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:40:00").getTime()), 100));
>         list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 01:00:01").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:30:00").getTime()), 100));
>         list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 02:00:02").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime()), 100));
>         list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime()), 100));
>         list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 04:00:04").getTime()), 100));
>         DataStream<Row> ds1 = bsEnv.addSource(new SourceFunction<Row>() {
>             @Override
>             public void run(SourceContext<Row> ctx) throws Exception {
>                 for(Row row : list) {
>                     ctx.collect(row);
>                     Thread.sleep(1000);
>                 }
>
>             }
>
>             @Override
>             public void cancel() {
>
>             }
>         });
>         ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, 0));
>         ds1.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT)));
>         bsTableEnv.createTemporaryView("order_info", ds1, "order_id, order_time, fee, rowtime.rowtime");
>
>         List<Row> list2 = new ArrayList<>();
>         list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 01:00:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:20:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:30:00").getTime())));
>         list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 02:00:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime())));
> //        list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime())));
>         list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 04:00:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:20:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:40:00").getTime())));
>         list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 05:00:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:20:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:40:00").getTime())));
>         DataStream<Row> ds2 = bsEnv.addSource(new SourceFunction<Row>() {
>             @Override
>             public void run(SourceContext<Row> ctx) throws Exception {
>                 for(Row row : list2) {
>                     ctx.collect(row);
>                     Thread.sleep(1000);
>                 }
>
>             }
>
>             @Override
>             public void cancel() {
>
>             }
>         });
>         ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0));
>         ds2.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP)));
>         bsTableEnv.createTemporaryView("pay", ds2, "order_id, pay_time, rowtime.rowtime");
>
>         Table joinTable =  bsTableEnv.sqlQuery("SELECT a.*,b.order_id from order_info a left join pay b on a.order_id=b.order_id and b.rowtime between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id <>'000' ");
>
>         bsTableEnv.toAppendStream(joinTable, Row.class).process(new ProcessFunction<Row, Object>() {
>             @Override
>             public void processElement(Row value, Context ctx, Collector<Object> out) throws Exception {
>                 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
>                 System.err.println("row:" + value + ",rowtime:" + value.getField(3) + ",watermark:" + sdf.format(ctx.timerService().currentWatermark()));
>             }
>         });
>
>         bsTableEnv.execute("job");
>     }
> }
>
>

-- 

Best,
Benchao Li

Re: flink intervaljoin后按窗口聚组问题

Posted by Danny Chan <yu...@gmail.com>.
For SQL, we always hold back the watermark when we emit the elements, for time interval:

return Math.max(leftRelativeSize, rightRelativeSize) + allowedLateness;

For your case, the watermark would hold back for 1 hour, so the left join records would not delay when it is used by subsequent operators.

See KeyedCoProcessOperatorWithWatermarkDelay and RowTimeIntervalJoin.getMaxOutputDelay for details.

Best,
Danny Chan
在 2020年7月3日 +0800 PM3:29,元始(Bob Hu) <65...@qq.com>,写道:
> 您好,我想请教一个问题:
> flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between a.rowtime and a.rowtime + INTERVAL '1' HOUR ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime + leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 + allowedLateness + 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize, rightRelativeSize) + allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group by的时候这种右表数据为空的数据就丢掉了啊。
> flink版本 1.10.0。
>
> 下面是我的一段测试代码:
> import org.apache.commons.net.ntp.TimeStamp;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
> import org.apache.flink.streaming.api.functions.ProcessFunction;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.functions.ScalarFunction;
> import org.apache.flink.types.Row;
> import org.apache.flink.util.Collector;
> import org.apache.flink.util.IOUtils;
>
> import java.io.BufferedReader;
> import java.io.InputStreamReader;
> import java.io.Serializable;
> import java.net.InetSocketAddress;
> import java.net.Socket;
> import java.sql.Timestamp;
> import java.text.SimpleDateFormat;
> import java.util.ArrayList;
> import java.util.Date;
> import java.util.List;
>
> public class TimeBoundedJoin {
>
>    public static AssignerWithPeriodicWatermarks<Row> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) {
>        AssignerWithPeriodicWatermarks<Row> timestampExtractor = new AssignerWithPeriodicWatermarks<Row>() {
>            private long currentMaxTimestamp = 0;
>            private long lastMaxTimestamp = 0;
>            private long lastUpdateTime = 0;
>            boolean firstWatermark = true;
> //            Integer maxIdleTime = 30;
>
>            @Override
>            public Watermark getCurrentWatermark() {
>                if(firstWatermark) {
>                    lastUpdateTime = System.currentTimeMillis();
>                    firstWatermark = false;
>                }
>                if(currentMaxTimestamp != lastMaxTimestamp) {
>                    lastMaxTimestamp = currentMaxTimestamp;
>                    lastUpdateTime = System.currentTimeMillis();
>                }
>                if(maxIdleTime != null && System.currentTimeMillis() - lastUpdateTime > maxIdleTime * 1000) {
>                    return new Watermark(new Date().getTime() - finalMaxOutOfOrderness * 1000);
>                }
>                return new Watermark(currentMaxTimestamp - finalMaxOutOfOrderness * 1000);
>
>            }
>
>            @Override
>            public long extractTimestamp(Row row, long previousElementTimestamp) {
>                Object value = row.getField(1);
>                long timestamp;
>                try {
>                    timestamp = (long)value;
>                } catch (Exception e) {
>                    timestamp = ((Timestamp)value).getTime();
>                }
>                if(timestamp > currentMaxTimestamp) {
>                    currentMaxTimestamp = timestamp;
>                }
>                return timestamp;
>            }
>        };
>        return timestampExtractor;
>    }
>
>    public static void main(String[] args) throws Exception {
>        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
>        bsEnv.setParallelism(1);
>        bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
> //        DataStream<Row> ds1 = bsEnv.addSource(sourceFunction(9000));
>        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
>        List<Row> list = new ArrayList<>();
>        list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 00:00:00").getTime()), 100));
>        list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:20:00").getTime()), 100));
>        list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:40:00").getTime()), 100));
>        list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 01:00:01").getTime()), 100));
>        list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
>        list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:30:00").getTime()), 100));
>        list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 02:00:02").getTime()), 100));
>        list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
>        list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime()), 100));
>        list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime()), 100));
>        list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime()), 100));
>        list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime()), 100));
>        list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 04:00:04").getTime()), 100));
>        DataStream<Row> ds1 = bsEnv.addSource(new SourceFunction<Row>() {
>            @Override
>            public void run(SourceContext<Row> ctx) throws Exception {
>                for(Row row : list) {
>                    ctx.collect(row);
>                    Thread.sleep(1000);
>                }
>
>            }
>
>            @Override
>            public void cancel() {
>
>            }
>        });
>        ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, 0));
>        ds1.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT)));
>        bsTableEnv.createTemporaryView("order_info", ds1, "order_id, order_time, fee, rowtime.rowtime");
>
>        List<Row> list2 = new ArrayList<>();
>        list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 01:00:00").getTime())));
>        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:20:00").getTime())));
>        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:30:00").getTime())));
>        list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 02:00:00").getTime())));
>        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime())));
>        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime())));
> //        list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime())));
>        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime())));
>        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime())));
>        list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 04:00:00").getTime())));
>        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:20:00").getTime())));
>        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:40:00").getTime())));
>        list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 05:00:00").getTime())));
>        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:20:00").getTime())));
>        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:40:00").getTime())));
>        DataStream<Row> ds2 = bsEnv.addSource(new SourceFunction<Row>() {
>            @Override
>            public void run(SourceContext<Row> ctx) throws Exception {
>                for(Row row : list2) {
>                    ctx.collect(row);
>                    Thread.sleep(1000);
>                }
>
>            }
>
>            @Override
>            public void cancel() {
>
>            }
>        });
>        ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0));
>        ds2.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP)));
>        bsTableEnv.createTemporaryView("pay", ds2, "order_id, pay_time, rowtime.rowtime");
>
>        Table joinTable =  bsTableEnv.sqlQuery("SELECT a.*,b.order_id from order_info a left join pay b on a.order_id=b.order_id and b.rowtime between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id <>'000' ");
>
>        bsTableEnv.toAppendStream(joinTable, Row.class).process(new ProcessFunction<Row, Object>() {
>            @Override
>            public void processElement(Row value, Context ctx, Collector<Object> out) throws Exception {
>                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
>                System.err.println("row:" + value + ",rowtime:" + value.getField(3) + ",watermark:" + sdf.format(ctx.timerService().currentWatermark()));
>            }
>        });
>
>        bsTableEnv.execute("job");
>    }
> }

Re: flink interval join后按窗口聚组问题

Posted by Benchao Li <li...@apache.org>.
Hi Bob,

This is Flink user mailing list. Please send to this mailing list using
english.
If you want to use Chinese, you can send it to user-zh@flink.apache.org

元始(Bob Hu) <65...@qq.com> 于2020年7月3日周五 下午3:29写道:

> 您好,我想请教一个问题:
> flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between a.rowtime
> and a.rowtime + INTERVAL '1' HOUR
> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime +
> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 +
> allowedLateness +
> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize,
> rightRelativeSize) +
> allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group
> by的时候这种右表数据为空的数据就丢掉了啊。
> flink版本 1.10.0。
>
> 下面是我的一段测试代码:
>
> import org.apache.commons.net.ntp.TimeStamp;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
> import org.apache.flink.streaming.api.functions.ProcessFunction;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.functions.ScalarFunction;
> import org.apache.flink.types.Row;
> import org.apache.flink.util.Collector;
> import org.apache.flink.util.IOUtils;
>
> import java.io.BufferedReader;
> import java.io.InputStreamReader;
> import java.io.Serializable;
> import java.net.InetSocketAddress;
> import java.net.Socket;
> import java.sql.Timestamp;
> import java.text.SimpleDateFormat;
> import java.util.ArrayList;
> import java.util.Date;
> import java.util.List;
>
> public class TimeBoundedJoin {
>
>     public static AssignerWithPeriodicWatermarks<Row> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) {
>         AssignerWithPeriodicWatermarks<Row> timestampExtractor = new AssignerWithPeriodicWatermarks<Row>() {
>             private long currentMaxTimestamp = 0;
>             private long lastMaxTimestamp = 0;
>             private long lastUpdateTime = 0;
>             boolean firstWatermark = true;
> //            Integer maxIdleTime = 30;
>
>             @Override
>             public Watermark getCurrentWatermark() {
>                 if(firstWatermark) {
>                     lastUpdateTime = System.currentTimeMillis();
>                     firstWatermark = false;
>                 }
>                 if(currentMaxTimestamp != lastMaxTimestamp) {
>                     lastMaxTimestamp = currentMaxTimestamp;
>                     lastUpdateTime = System.currentTimeMillis();
>                 }
>                 if(maxIdleTime != null && System.currentTimeMillis() - lastUpdateTime > maxIdleTime * 1000) {
>                     return new Watermark(new Date().getTime() - finalMaxOutOfOrderness * 1000);
>                 }
>                 return new Watermark(currentMaxTimestamp - finalMaxOutOfOrderness * 1000);
>
>             }
>
>             @Override
>             public long extractTimestamp(Row row, long previousElementTimestamp) {
>                 Object value = row.getField(1);
>                 long timestamp;
>                 try {
>                     timestamp = (long)value;
>                 } catch (Exception e) {
>                     timestamp = ((Timestamp)value).getTime();
>                 }
>                 if(timestamp > currentMaxTimestamp) {
>                     currentMaxTimestamp = timestamp;
>                 }
>                 return timestamp;
>             }
>         };
>         return timestampExtractor;
>     }
>
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>         EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>         StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
>         bsEnv.setParallelism(1);
>         bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
> //        DataStream<Row> ds1 = bsEnv.addSource(sourceFunction(9000));
>         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
>         List<Row> list = new ArrayList<>();
>         list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 00:00:00").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:20:00").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:40:00").getTime()), 100));
>         list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 01:00:01").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:30:00").getTime()), 100));
>         list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 02:00:02").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime()), 100));
>         list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime()), 100));
>         list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 04:00:04").getTime()), 100));
>         DataStream<Row> ds1 = bsEnv.addSource(new SourceFunction<Row>() {
>             @Override
>             public void run(SourceContext<Row> ctx) throws Exception {
>                 for(Row row : list) {
>                     ctx.collect(row);
>                     Thread.sleep(1000);
>                 }
>
>             }
>
>             @Override
>             public void cancel() {
>
>             }
>         });
>         ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, 0));
>         ds1.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT)));
>         bsTableEnv.createTemporaryView("order_info", ds1, "order_id, order_time, fee, rowtime.rowtime");
>
>         List<Row> list2 = new ArrayList<>();
>         list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 01:00:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:20:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:30:00").getTime())));
>         list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 02:00:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime())));
> //        list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime())));
>         list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 04:00:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:20:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:40:00").getTime())));
>         list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 05:00:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:20:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:40:00").getTime())));
>         DataStream<Row> ds2 = bsEnv.addSource(new SourceFunction<Row>() {
>             @Override
>             public void run(SourceContext<Row> ctx) throws Exception {
>                 for(Row row : list2) {
>                     ctx.collect(row);
>                     Thread.sleep(1000);
>                 }
>
>             }
>
>             @Override
>             public void cancel() {
>
>             }
>         });
>         ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0));
>         ds2.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP)));
>         bsTableEnv.createTemporaryView("pay", ds2, "order_id, pay_time, rowtime.rowtime");
>
>         Table joinTable =  bsTableEnv.sqlQuery("SELECT a.*,b.order_id from order_info a left join pay b on a.order_id=b.order_id and b.rowtime between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id <>'000' ");
>
>         bsTableEnv.toAppendStream(joinTable, Row.class).process(new ProcessFunction<Row, Object>() {
>             @Override
>             public void processElement(Row value, Context ctx, Collector<Object> out) throws Exception {
>                 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
>                 System.err.println("row:" + value + ",rowtime:" + value.getField(3) + ",watermark:" + sdf.format(ctx.timerService().currentWatermark()));
>             }
>         });
>
>         bsTableEnv.execute("job");
>     }
> }
>
>

-- 

Best,
Benchao Li

Re: flink interval join后按窗口聚组问题

Posted by Benchao Li <li...@apache.org>.
Hi Tianwang,一旦,

我感觉这个场景其实可以在Flink SQL中做一个优化,我建了一个issue[1],欢迎讨论~

[1] https://issues.apache.org/jira/browse/FLINK-18996

赵一旦 <hi...@gmail.com> 于2020年8月17日周一 上午11:52写道:

> 大概看了下。这个问题我业务中涉及到过。我是DataStream API做的。
> 不过我是在任务设计阶段就考虑了所有case,然后提前考虑了这些问题的。
> watermark是可以重设的。其次我还更改了interval join的算子实现,默认1.10只支持inner join。不支持left/right
> join。
> 并且inner join后采用最大的timestamp。这个比较复杂,实际如果做left join,业务上可能更希望使用left的时间,right
> join则使用right的时间。out join则只能使用留下的那个的时间,inner join情况需要看业务。
>
>
> 你这个问题主要就是watermark重设就可以了。
>
>
>
> Tianwang Li <li...@gmail.com> 于2020年8月16日周日 上午10:45写道:
>
> > 展开讨论一些特点从场景。
> > 1、inner join场景。有什么办法取两条流的的rowtime 的max吗?
> > 使用SQL语句的场合,怎么实现?
> > 例如:
> > SELECT if(left.rowtime > right.rowtime, left.rowtime, right.rowtime) as
> > rowtime, ...
> >
> > 如果支持了,那么这种场景我们还是可以在下游进行窗口计算和CEP之类的计算。
> >
> > Tianwang Li <li...@gmail.com> 于2020年8月16日周日 上午10:40写道:
> >
> > > 展开讨论一些特点场景。
> > >
> > > Benchao Li <li...@apache.org> 于2020年7月6日周一 下午11:08写道:
> > >
> > >> 我们最开始发现这个现象的时候也有些惊讶,不过后来想了一下感觉也是合理的。
> > >>
> > >> 因为双流Join的时间范围有可能会比较大,比如 A流 在 B流的[-10min, +10min],那这样的话,
> > >> A流来一条数据,可能会join到几分钟之前的数据,而此时的watermark其实已经大于了那条数据的事件时间。
> > >>
> > >> 我个人感觉,这应该就是在更实时的产生Join结果和导致数据时间晚于watermark之间,需要有一个balance。
> > >> 现在默认实现是选择了更加实时的产生结果。当然还有另外一种实现思路,就是保证watermark不会超过数据时间,
> > >> 那样的话,Join结果的产生就会delay,或者需要修改watermark逻辑,让watermark一定要小于当前能join到的数据
> > >> 的时间最早的那个。
> > >>
> > >> 元始(Bob Hu) <65...@qq.com> 于2020年7月5日周日 下午8:48写道:
> > >>
> > >> > 谢谢您的解答。感觉flink这个机制有点奇怪呢
> > >> >
> > >> >
> > >> > ------------------ 原始邮件 ------------------
> > >> > *发件人:* "Benchao Li"<li...@apache.org>;
> > >> > *发送时间:* 2020年7月5日(星期天) 中午11:58
> > >> > *收件人:* "元始(Bob Hu)"<65...@qq.com>;
> > >> > *抄送:* "user-zh"<us...@flink.apache.org>;
> > >> > *主题:* Re: flink interval join后按窗口聚组问题
> > >> >
> > >> > 回到你的问题,我觉得你的观察是正确的。Time interval join产生的结果的确是会有这种情况。
> > >> > 所以如果用事件时间的time interval
> join,后面再接一个事件时间的window(或者其他的使用事件时间的算子,比如CEP等)
> > >> > 就会有些问题,很多数据被作为late数据直接丢掉了。
> > >> >
> > >> > 元始(Bob Hu) <65...@qq.com> 于2020年7月3日周五 下午3:29写道:
> > >> >
> > >> >> 您好,我想请教一个问题:
> > >> >> flink双流表 interval join后再做window group是不是有问题呢,有些left
> join关联不上的数据会被丢掉。
> > >> >> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between
> > >> a.rowtime
> > >> >> and a.rowtime + INTERVAL '1' HOUR
> > >> >> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime =
> rowTime
> > +
> > >> >> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 +
> > >> >> allowedLateness +
> > >> >>
> > >>
> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize,
> > >> >> rightRelativeSize) +
> > >> >>
> > >>
> >
> allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group
> > >> >> by的时候这种右表数据为空的数据就丢掉了啊。
> > >> >> flink版本 1.10.0。
> > >> >>
> > >> >> 下面是我的一段测试代码:
> > >> >>
> > >> >> import org.apache.commons.net.ntp.TimeStamp;
> > >> >> import org.apache.flink.api.common.typeinfo.TypeInformation;
> > >> >> import org.apache.flink.api.common.typeinfo.Types;
> > >> >> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> > >> >> import org.apache.flink.streaming.api.TimeCharacteristic;
> > >> >> import org.apache.flink.streaming.api.datastream.DataStream;
> > >> >> import
> > >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > >> >> import
> > >>
> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
> > >> >> import org.apache.flink.streaming.api.functions.ProcessFunction;
> > >> >> import
> > org.apache.flink.streaming.api.functions.source.SourceFunction;
> > >> >> import org.apache.flink.streaming.api.watermark.Watermark;
> > >> >> import org.apache.flink.table.api.EnvironmentSettings;
> > >> >> import org.apache.flink.table.api.Table;
> > >> >> import org.apache.flink.table.api.java.StreamTableEnvironment;
> > >> >> import org.apache.flink.table.functions.ScalarFunction;
> > >> >> import org.apache.flink.types.Row;
> > >> >> import org.apache.flink.util.Collector;
> > >> >> import org.apache.flink.util.IOUtils;
> > >> >>
> > >> >> import java.io.BufferedReader;
> > >> >> import java.io.InputStreamReader;
> > >> >> import java.io.Serializable;
> > >> >> import java.net.InetSocketAddress;
> > >> >> import java.net.Socket;
> > >> >> import java.sql.Timestamp;
> > >> >> import java.text.SimpleDateFormat;
> > >> >> import java.util.ArrayList;
> > >> >> import java.util.Date;
> > >> >> import java.util.List;
> > >> >>
> > >> >> public class TimeBoundedJoin {
> > >> >>
> > >> >>     public static AssignerWithPeriodicWatermarks<Row>
> > >> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) {
> > >> >>         AssignerWithPeriodicWatermarks<Row> timestampExtractor =
> new
> > >> AssignerWithPeriodicWatermarks<Row>() {
> > >> >>             private long currentMaxTimestamp = 0;
> > >> >>             private long lastMaxTimestamp = 0;
> > >> >>             private long lastUpdateTime = 0;
> > >> >>             boolean firstWatermark = true;
> > >> >> //            Integer maxIdleTime = 30;
> > >> >>
> > >> >>             @Override
> > >> >>             public Watermark getCurrentWatermark() {
> > >> >>                 if(firstWatermark) {
> > >> >>                     lastUpdateTime = System.currentTimeMillis();
> > >> >>                     firstWatermark = false;
> > >> >>                 }
> > >> >>                 if(currentMaxTimestamp != lastMaxTimestamp) {
> > >> >>                     lastMaxTimestamp = currentMaxTimestamp;
> > >> >>                     lastUpdateTime = System.currentTimeMillis();
> > >> >>                 }
> > >> >>                 if(maxIdleTime != null &&
> System.currentTimeMillis()
> > -
> > >> lastUpdateTime > maxIdleTime * 1000) {
> > >> >>                     return new Watermark(new Date().getTime() -
> > >> finalMaxOutOfOrderness * 1000);
> > >> >>                 }
> > >> >>                 return new Watermark(currentMaxTimestamp -
> > >> finalMaxOutOfOrderness * 1000);
> > >> >>
> > >> >>             }
> > >> >>
> > >> >>             @Override
> > >> >>             public long extractTimestamp(Row row, long
> > >> previousElementTimestamp) {
> > >> >>                 Object value = row.getField(1);
> > >> >>                 long timestamp;
> > >> >>                 try {
> > >> >>                     timestamp = (long)value;
> > >> >>                 } catch (Exception e) {
> > >> >>                     timestamp = ((Timestamp)value).getTime();
> > >> >>                 }
> > >> >>                 if(timestamp > currentMaxTimestamp) {
> > >> >>                     currentMaxTimestamp = timestamp;
> > >> >>                 }
> > >> >>                 return timestamp;
> > >> >>             }
> > >> >>         };
> > >> >>         return timestampExtractor;
> > >> >>     }
> > >> >>
> > >> >>     public static void main(String[] args) throws Exception {
> > >> >>         StreamExecutionEnvironment bsEnv =
> > >> StreamExecutionEnvironment.getExecutionEnvironment();
> > >> >>         EnvironmentSettings bsSettings =
> > >>
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> > >> >>         StreamTableEnvironment bsTableEnv =
> > >> StreamTableEnvironment.create(bsEnv, bsSettings);
> > >> >>         bsEnv.setParallelism(1);
> > >> >>
> > >>  bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> > >> >>
> > >> >>
> > >> >> //        DataStream<Row> ds1 =
> > bsEnv.addSource(sourceFunction(9000));
> > >> >>         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd
> > >> HH:mm:ss");
> > >> >>         List<Row> list = new ArrayList<>();
> > >> >>         list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13
> > >> 00:00:00").getTime()), 100));
> > >> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> > >> 00:20:00").getTime()), 100));
> > >> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> > >> 00:40:00").getTime()), 100));
> > >> >>         list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13
> > >> 01:00:01").getTime()), 100));
> > >> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> > >> 02:20:00").getTime()), 100));
> > >> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> > >> 02:30:00").getTime()), 100));
> > >> >>         list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13
> > >> 02:00:02").getTime()), 100));
> > >> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> > >> 02:20:00").getTime()), 100));
> > >> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> > >> 02:40:00").getTime()), 100));
> > >> >>         list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13
> > >> 03:00:03").getTime()), 100));
> > >> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> > >> 03:20:00").getTime()), 100));
> > >> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> > >> 03:40:00").getTime()), 100));
> > >> >>         list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13
> > >> 04:00:04").getTime()), 100));
> > >> >>         DataStream<Row> ds1 = bsEnv.addSource(new
> > >> SourceFunction<Row>() {
> > >> >>             @Override
> > >> >>             public void run(SourceContext<Row> ctx) throws
> Exception
> > {
> > >> >>                 for(Row row : list) {
> > >> >>                     ctx.collect(row);
> > >> >>                     Thread.sleep(1000);
> > >> >>                 }
> > >> >>
> > >> >>             }
> > >> >>
> > >> >>             @Override
> > >> >>             public void cancel() {
> > >> >>
> > >> >>             }
> > >> >>         });
> > >> >>         ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null,
> > 0));
> > >> >>         ds1.getTransformation().setOutputType((new
> > >> RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT)));
> > >> >>         bsTableEnv.createTemporaryView("order_info", ds1,
> "order_id,
> > >> order_time, fee, rowtime.rowtime");
> > >> >>
> > >> >>         List<Row> list2 = new ArrayList<>();
> > >> >>         list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13
> > >> 01:00:00").getTime())));
> > >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 01:20:00").getTime())));
> > >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 01:30:00").getTime())));
> > >> >>         list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13
> > >> 02:00:00").getTime())));
> > >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 02:20:00").getTime())));
> > >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 02:40:00").getTime())));
> > >> >> //        list2.add(Row.of("003",new
> Timestamp(sdf.parse("2020-05-13
> > >> 03:00:03").getTime())));
> > >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 03:20:00").getTime())));
> > >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 03:40:00").getTime())));
> > >> >>         list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13
> > >> 04:00:00").getTime())));
> > >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 04:20:00").getTime())));
> > >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 04:40:00").getTime())));
> > >> >>         list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13
> > >> 05:00:00").getTime())));
> > >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 05:20:00").getTime())));
> > >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 05:40:00").getTime())));
> > >> >>         DataStream<Row> ds2 = bsEnv.addSource(new
> > >> SourceFunction<Row>() {
> > >> >>             @Override
> > >> >>             public void run(SourceContext<Row> ctx) throws
> Exception
> > {
> > >> >>                 for(Row row : list2) {
> > >> >>                     ctx.collect(row);
> > >> >>                     Thread.sleep(1000);
> > >> >>                 }
> > >> >>
> > >> >>             }
> > >> >>
> > >> >>             @Override
> > >> >>             public void cancel() {
> > >> >>
> > >> >>             }
> > >> >>         });
> > >> >>         ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null,
> > 0));
> > >> >>         ds2.getTransformation().setOutputType((new
> > >> RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP)));
> > >> >>         bsTableEnv.createTemporaryView("pay", ds2, "order_id,
> > >> pay_time, rowtime.rowtime");
> > >> >>
> > >> >>         Table joinTable =  bsTableEnv.sqlQuery("SELECT
> a.*,b.order_id
> > >> from order_info a left join pay b on a.order_id=b.order_id and
> b.rowtime
> > >> between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id
> > >> <>'000' ");
> > >> >>
> > >> >>         bsTableEnv.toAppendStream(joinTable, Row.class).process(new
> > >> ProcessFunction<Row, Object>() {
> > >> >>             @Override
> > >> >>             public void processElement(Row value, Context ctx,
> > >> Collector<Object> out) throws Exception {
> > >> >>                 SimpleDateFormat sdf = new
> > >> SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
> > >> >>                 System.err.println("row:" + value + ",rowtime:" +
> > >> value.getField(3) + ",watermark:" +
> > >> sdf.format(ctx.timerService().currentWatermark()));
> > >> >>             }
> > >> >>         });
> > >> >>
> > >> >>         bsTableEnv.execute("job");
> > >> >>     }
> > >> >> }
> > >> >>
> > >> >>
> > >> >
> > >> > --
> > >> >
> > >> > Best,
> > >> > Benchao Li
> > >> >
> > >>
> > >>
> > >> --
> > >>
> > >> Best,
> > >> Benchao Li
> > >>
> > >
> > >
> > > --
> > > **************************************
> > >  tivanli
> > > **************************************
> > >
> >
> >
> > --
> > **************************************
> >  tivanli
> > **************************************
> >
>


-- 

Best,
Benchao Li

Re: flink interval join后按窗口聚组问题

Posted by 赵一旦 <hi...@gmail.com>.
大概看了下。这个问题我业务中涉及到过。我是DataStream API做的。
不过我是在任务设计阶段就考虑了所有case,然后提前考虑了这些问题的。
watermark是可以重设的。其次我还更改了interval join的算子实现,默认1.10只支持inner join。不支持left/right
join。
并且inner join后采用最大的timestamp。这个比较复杂,实际如果做left join,业务上可能更希望使用left的时间,right
join则使用right的时间。out join则只能使用留下的那个的时间,inner join情况需要看业务。


你这个问题主要就是watermark重设就可以了。



Tianwang Li <li...@gmail.com> 于2020年8月16日周日 上午10:45写道:

> 展开讨论一些特点从场景。
> 1、inner join场景。有什么办法取两条流的的rowtime 的max吗?
> 使用SQL语句的场合,怎么实现?
> 例如:
> SELECT if(left.rowtime > right.rowtime, left.rowtime, right.rowtime) as
> rowtime, ...
>
> 如果支持了,那么这种场景我们还是可以在下游进行窗口计算和CEP之类的计算。
>
> Tianwang Li <li...@gmail.com> 于2020年8月16日周日 上午10:40写道:
>
> > 展开讨论一些特点场景。
> >
> > Benchao Li <li...@apache.org> 于2020年7月6日周一 下午11:08写道:
> >
> >> 我们最开始发现这个现象的时候也有些惊讶,不过后来想了一下感觉也是合理的。
> >>
> >> 因为双流Join的时间范围有可能会比较大,比如 A流 在 B流的[-10min, +10min],那这样的话,
> >> A流来一条数据,可能会join到几分钟之前的数据,而此时的watermark其实已经大于了那条数据的事件时间。
> >>
> >> 我个人感觉,这应该就是在更实时的产生Join结果和导致数据时间晚于watermark之间,需要有一个balance。
> >> 现在默认实现是选择了更加实时的产生结果。当然还有另外一种实现思路,就是保证watermark不会超过数据时间,
> >> 那样的话,Join结果的产生就会delay,或者需要修改watermark逻辑,让watermark一定要小于当前能join到的数据
> >> 的时间最早的那个。
> >>
> >> 元始(Bob Hu) <65...@qq.com> 于2020年7月5日周日 下午8:48写道:
> >>
> >> > 谢谢您的解答。感觉flink这个机制有点奇怪呢
> >> >
> >> >
> >> > ------------------ 原始邮件 ------------------
> >> > *发件人:* "Benchao Li"<li...@apache.org>;
> >> > *发送时间:* 2020年7月5日(星期天) 中午11:58
> >> > *收件人:* "元始(Bob Hu)"<65...@qq.com>;
> >> > *抄送:* "user-zh"<us...@flink.apache.org>;
> >> > *主题:* Re: flink interval join后按窗口聚组问题
> >> >
> >> > 回到你的问题,我觉得你的观察是正确的。Time interval join产生的结果的确是会有这种情况。
> >> > 所以如果用事件时间的time interval join,后面再接一个事件时间的window(或者其他的使用事件时间的算子,比如CEP等)
> >> > 就会有些问题,很多数据被作为late数据直接丢掉了。
> >> >
> >> > 元始(Bob Hu) <65...@qq.com> 于2020年7月3日周五 下午3:29写道:
> >> >
> >> >> 您好,我想请教一个问题:
> >> >> flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
> >> >> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between
> >> a.rowtime
> >> >> and a.rowtime + INTERVAL '1' HOUR
> >> >> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime
> +
> >> >> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 +
> >> >> allowedLateness +
> >> >>
> >> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize,
> >> >> rightRelativeSize) +
> >> >>
> >>
> allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group
> >> >> by的时候这种右表数据为空的数据就丢掉了啊。
> >> >> flink版本 1.10.0。
> >> >>
> >> >> 下面是我的一段测试代码:
> >> >>
> >> >> import org.apache.commons.net.ntp.TimeStamp;
> >> >> import org.apache.flink.api.common.typeinfo.TypeInformation;
> >> >> import org.apache.flink.api.common.typeinfo.Types;
> >> >> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> >> >> import org.apache.flink.streaming.api.TimeCharacteristic;
> >> >> import org.apache.flink.streaming.api.datastream.DataStream;
> >> >> import
> >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> >> >> import
> >> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
> >> >> import org.apache.flink.streaming.api.functions.ProcessFunction;
> >> >> import
> org.apache.flink.streaming.api.functions.source.SourceFunction;
> >> >> import org.apache.flink.streaming.api.watermark.Watermark;
> >> >> import org.apache.flink.table.api.EnvironmentSettings;
> >> >> import org.apache.flink.table.api.Table;
> >> >> import org.apache.flink.table.api.java.StreamTableEnvironment;
> >> >> import org.apache.flink.table.functions.ScalarFunction;
> >> >> import org.apache.flink.types.Row;
> >> >> import org.apache.flink.util.Collector;
> >> >> import org.apache.flink.util.IOUtils;
> >> >>
> >> >> import java.io.BufferedReader;
> >> >> import java.io.InputStreamReader;
> >> >> import java.io.Serializable;
> >> >> import java.net.InetSocketAddress;
> >> >> import java.net.Socket;
> >> >> import java.sql.Timestamp;
> >> >> import java.text.SimpleDateFormat;
> >> >> import java.util.ArrayList;
> >> >> import java.util.Date;
> >> >> import java.util.List;
> >> >>
> >> >> public class TimeBoundedJoin {
> >> >>
> >> >>     public static AssignerWithPeriodicWatermarks<Row>
> >> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) {
> >> >>         AssignerWithPeriodicWatermarks<Row> timestampExtractor = new
> >> AssignerWithPeriodicWatermarks<Row>() {
> >> >>             private long currentMaxTimestamp = 0;
> >> >>             private long lastMaxTimestamp = 0;
> >> >>             private long lastUpdateTime = 0;
> >> >>             boolean firstWatermark = true;
> >> >> //            Integer maxIdleTime = 30;
> >> >>
> >> >>             @Override
> >> >>             public Watermark getCurrentWatermark() {
> >> >>                 if(firstWatermark) {
> >> >>                     lastUpdateTime = System.currentTimeMillis();
> >> >>                     firstWatermark = false;
> >> >>                 }
> >> >>                 if(currentMaxTimestamp != lastMaxTimestamp) {
> >> >>                     lastMaxTimestamp = currentMaxTimestamp;
> >> >>                     lastUpdateTime = System.currentTimeMillis();
> >> >>                 }
> >> >>                 if(maxIdleTime != null && System.currentTimeMillis()
> -
> >> lastUpdateTime > maxIdleTime * 1000) {
> >> >>                     return new Watermark(new Date().getTime() -
> >> finalMaxOutOfOrderness * 1000);
> >> >>                 }
> >> >>                 return new Watermark(currentMaxTimestamp -
> >> finalMaxOutOfOrderness * 1000);
> >> >>
> >> >>             }
> >> >>
> >> >>             @Override
> >> >>             public long extractTimestamp(Row row, long
> >> previousElementTimestamp) {
> >> >>                 Object value = row.getField(1);
> >> >>                 long timestamp;
> >> >>                 try {
> >> >>                     timestamp = (long)value;
> >> >>                 } catch (Exception e) {
> >> >>                     timestamp = ((Timestamp)value).getTime();
> >> >>                 }
> >> >>                 if(timestamp > currentMaxTimestamp) {
> >> >>                     currentMaxTimestamp = timestamp;
> >> >>                 }
> >> >>                 return timestamp;
> >> >>             }
> >> >>         };
> >> >>         return timestampExtractor;
> >> >>     }
> >> >>
> >> >>     public static void main(String[] args) throws Exception {
> >> >>         StreamExecutionEnvironment bsEnv =
> >> StreamExecutionEnvironment.getExecutionEnvironment();
> >> >>         EnvironmentSettings bsSettings =
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> >> >>         StreamTableEnvironment bsTableEnv =
> >> StreamTableEnvironment.create(bsEnv, bsSettings);
> >> >>         bsEnv.setParallelism(1);
> >> >>
> >>  bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >> >>
> >> >>
> >> >> //        DataStream<Row> ds1 =
> bsEnv.addSource(sourceFunction(9000));
> >> >>         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd
> >> HH:mm:ss");
> >> >>         List<Row> list = new ArrayList<>();
> >> >>         list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13
> >> 00:00:00").getTime()), 100));
> >> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> >> 00:20:00").getTime()), 100));
> >> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> >> 00:40:00").getTime()), 100));
> >> >>         list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13
> >> 01:00:01").getTime()), 100));
> >> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> >> 02:20:00").getTime()), 100));
> >> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> >> 02:30:00").getTime()), 100));
> >> >>         list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13
> >> 02:00:02").getTime()), 100));
> >> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> >> 02:20:00").getTime()), 100));
> >> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> >> 02:40:00").getTime()), 100));
> >> >>         list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13
> >> 03:00:03").getTime()), 100));
> >> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> >> 03:20:00").getTime()), 100));
> >> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> >> 03:40:00").getTime()), 100));
> >> >>         list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13
> >> 04:00:04").getTime()), 100));
> >> >>         DataStream<Row> ds1 = bsEnv.addSource(new
> >> SourceFunction<Row>() {
> >> >>             @Override
> >> >>             public void run(SourceContext<Row> ctx) throws Exception
> {
> >> >>                 for(Row row : list) {
> >> >>                     ctx.collect(row);
> >> >>                     Thread.sleep(1000);
> >> >>                 }
> >> >>
> >> >>             }
> >> >>
> >> >>             @Override
> >> >>             public void cancel() {
> >> >>
> >> >>             }
> >> >>         });
> >> >>         ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null,
> 0));
> >> >>         ds1.getTransformation().setOutputType((new
> >> RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT)));
> >> >>         bsTableEnv.createTemporaryView("order_info", ds1, "order_id,
> >> order_time, fee, rowtime.rowtime");
> >> >>
> >> >>         List<Row> list2 = new ArrayList<>();
> >> >>         list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13
> >> 01:00:00").getTime())));
> >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> >> 01:20:00").getTime())));
> >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> >> 01:30:00").getTime())));
> >> >>         list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13
> >> 02:00:00").getTime())));
> >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> >> 02:20:00").getTime())));
> >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> >> 02:40:00").getTime())));
> >> >> //        list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13
> >> 03:00:03").getTime())));
> >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> >> 03:20:00").getTime())));
> >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> >> 03:40:00").getTime())));
> >> >>         list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13
> >> 04:00:00").getTime())));
> >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> >> 04:20:00").getTime())));
> >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> >> 04:40:00").getTime())));
> >> >>         list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13
> >> 05:00:00").getTime())));
> >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> >> 05:20:00").getTime())));
> >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> >> 05:40:00").getTime())));
> >> >>         DataStream<Row> ds2 = bsEnv.addSource(new
> >> SourceFunction<Row>() {
> >> >>             @Override
> >> >>             public void run(SourceContext<Row> ctx) throws Exception
> {
> >> >>                 for(Row row : list2) {
> >> >>                     ctx.collect(row);
> >> >>                     Thread.sleep(1000);
> >> >>                 }
> >> >>
> >> >>             }
> >> >>
> >> >>             @Override
> >> >>             public void cancel() {
> >> >>
> >> >>             }
> >> >>         });
> >> >>         ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null,
> 0));
> >> >>         ds2.getTransformation().setOutputType((new
> >> RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP)));
> >> >>         bsTableEnv.createTemporaryView("pay", ds2, "order_id,
> >> pay_time, rowtime.rowtime");
> >> >>
> >> >>         Table joinTable =  bsTableEnv.sqlQuery("SELECT a.*,b.order_id
> >> from order_info a left join pay b on a.order_id=b.order_id and b.rowtime
> >> between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id
> >> <>'000' ");
> >> >>
> >> >>         bsTableEnv.toAppendStream(joinTable, Row.class).process(new
> >> ProcessFunction<Row, Object>() {
> >> >>             @Override
> >> >>             public void processElement(Row value, Context ctx,
> >> Collector<Object> out) throws Exception {
> >> >>                 SimpleDateFormat sdf = new
> >> SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
> >> >>                 System.err.println("row:" + value + ",rowtime:" +
> >> value.getField(3) + ",watermark:" +
> >> sdf.format(ctx.timerService().currentWatermark()));
> >> >>             }
> >> >>         });
> >> >>
> >> >>         bsTableEnv.execute("job");
> >> >>     }
> >> >> }
> >> >>
> >> >>
> >> >
> >> > --
> >> >
> >> > Best,
> >> > Benchao Li
> >> >
> >>
> >>
> >> --
> >>
> >> Best,
> >> Benchao Li
> >>
> >
> >
> > --
> > **************************************
> >  tivanli
> > **************************************
> >
>
>
> --
> **************************************
>  tivanli
> **************************************
>

Re: flink interval join后按窗口聚组问题

Posted by Tianwang Li <li...@gmail.com>.
展开讨论一些特点从场景。
1、inner join场景。有什么办法取两条流的的rowtime 的max吗?
使用SQL语句的场合,怎么实现?
例如:
SELECT if(left.rowtime > right.rowtime, left.rowtime, right.rowtime) as
rowtime, ...

如果支持了,那么这种场景我们还是可以在下游进行窗口计算和CEP之类的计算。

Tianwang Li <li...@gmail.com> 于2020年8月16日周日 上午10:40写道:

> 展开讨论一些特点场景。
>
> Benchao Li <li...@apache.org> 于2020年7月6日周一 下午11:08写道:
>
>> 我们最开始发现这个现象的时候也有些惊讶,不过后来想了一下感觉也是合理的。
>>
>> 因为双流Join的时间范围有可能会比较大,比如 A流 在 B流的[-10min, +10min],那这样的话,
>> A流来一条数据,可能会join到几分钟之前的数据,而此时的watermark其实已经大于了那条数据的事件时间。
>>
>> 我个人感觉,这应该就是在更实时的产生Join结果和导致数据时间晚于watermark之间,需要有一个balance。
>> 现在默认实现是选择了更加实时的产生结果。当然还有另外一种实现思路,就是保证watermark不会超过数据时间,
>> 那样的话,Join结果的产生就会delay,或者需要修改watermark逻辑,让watermark一定要小于当前能join到的数据
>> 的时间最早的那个。
>>
>> 元始(Bob Hu) <65...@qq.com> 于2020年7月5日周日 下午8:48写道:
>>
>> > 谢谢您的解答。感觉flink这个机制有点奇怪呢
>> >
>> >
>> > ------------------ 原始邮件 ------------------
>> > *发件人:* "Benchao Li"<li...@apache.org>;
>> > *发送时间:* 2020年7月5日(星期天) 中午11:58
>> > *收件人:* "元始(Bob Hu)"<65...@qq.com>;
>> > *抄送:* "user-zh"<us...@flink.apache.org>;
>> > *主题:* Re: flink interval join后按窗口聚组问题
>> >
>> > 回到你的问题,我觉得你的观察是正确的。Time interval join产生的结果的确是会有这种情况。
>> > 所以如果用事件时间的time interval join,后面再接一个事件时间的window(或者其他的使用事件时间的算子,比如CEP等)
>> > 就会有些问题,很多数据被作为late数据直接丢掉了。
>> >
>> > 元始(Bob Hu) <65...@qq.com> 于2020年7月3日周五 下午3:29写道:
>> >
>> >> 您好,我想请教一个问题:
>> >> flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
>> >> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between
>> a.rowtime
>> >> and a.rowtime + INTERVAL '1' HOUR
>> >> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime +
>> >> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 +
>> >> allowedLateness +
>> >>
>> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize,
>> >> rightRelativeSize) +
>> >>
>> allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group
>> >> by的时候这种右表数据为空的数据就丢掉了啊。
>> >> flink版本 1.10.0。
>> >>
>> >> 下面是我的一段测试代码:
>> >>
>> >> import org.apache.commons.net.ntp.TimeStamp;
>> >> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> >> import org.apache.flink.api.common.typeinfo.Types;
>> >> import org.apache.flink.api.java.typeutils.RowTypeInfo;
>> >> import org.apache.flink.streaming.api.TimeCharacteristic;
>> >> import org.apache.flink.streaming.api.datastream.DataStream;
>> >> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> >> import
>> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
>> >> import org.apache.flink.streaming.api.functions.ProcessFunction;
>> >> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>> >> import org.apache.flink.streaming.api.watermark.Watermark;
>> >> import org.apache.flink.table.api.EnvironmentSettings;
>> >> import org.apache.flink.table.api.Table;
>> >> import org.apache.flink.table.api.java.StreamTableEnvironment;
>> >> import org.apache.flink.table.functions.ScalarFunction;
>> >> import org.apache.flink.types.Row;
>> >> import org.apache.flink.util.Collector;
>> >> import org.apache.flink.util.IOUtils;
>> >>
>> >> import java.io.BufferedReader;
>> >> import java.io.InputStreamReader;
>> >> import java.io.Serializable;
>> >> import java.net.InetSocketAddress;
>> >> import java.net.Socket;
>> >> import java.sql.Timestamp;
>> >> import java.text.SimpleDateFormat;
>> >> import java.util.ArrayList;
>> >> import java.util.Date;
>> >> import java.util.List;
>> >>
>> >> public class TimeBoundedJoin {
>> >>
>> >>     public static AssignerWithPeriodicWatermarks<Row>
>> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) {
>> >>         AssignerWithPeriodicWatermarks<Row> timestampExtractor = new
>> AssignerWithPeriodicWatermarks<Row>() {
>> >>             private long currentMaxTimestamp = 0;
>> >>             private long lastMaxTimestamp = 0;
>> >>             private long lastUpdateTime = 0;
>> >>             boolean firstWatermark = true;
>> >> //            Integer maxIdleTime = 30;
>> >>
>> >>             @Override
>> >>             public Watermark getCurrentWatermark() {
>> >>                 if(firstWatermark) {
>> >>                     lastUpdateTime = System.currentTimeMillis();
>> >>                     firstWatermark = false;
>> >>                 }
>> >>                 if(currentMaxTimestamp != lastMaxTimestamp) {
>> >>                     lastMaxTimestamp = currentMaxTimestamp;
>> >>                     lastUpdateTime = System.currentTimeMillis();
>> >>                 }
>> >>                 if(maxIdleTime != null && System.currentTimeMillis() -
>> lastUpdateTime > maxIdleTime * 1000) {
>> >>                     return new Watermark(new Date().getTime() -
>> finalMaxOutOfOrderness * 1000);
>> >>                 }
>> >>                 return new Watermark(currentMaxTimestamp -
>> finalMaxOutOfOrderness * 1000);
>> >>
>> >>             }
>> >>
>> >>             @Override
>> >>             public long extractTimestamp(Row row, long
>> previousElementTimestamp) {
>> >>                 Object value = row.getField(1);
>> >>                 long timestamp;
>> >>                 try {
>> >>                     timestamp = (long)value;
>> >>                 } catch (Exception e) {
>> >>                     timestamp = ((Timestamp)value).getTime();
>> >>                 }
>> >>                 if(timestamp > currentMaxTimestamp) {
>> >>                     currentMaxTimestamp = timestamp;
>> >>                 }
>> >>                 return timestamp;
>> >>             }
>> >>         };
>> >>         return timestampExtractor;
>> >>     }
>> >>
>> >>     public static void main(String[] args) throws Exception {
>> >>         StreamExecutionEnvironment bsEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> >>         EnvironmentSettings bsSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> >>         StreamTableEnvironment bsTableEnv =
>> StreamTableEnvironment.create(bsEnv, bsSettings);
>> >>         bsEnv.setParallelism(1);
>> >>
>>  bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> >>
>> >>
>> >> //        DataStream<Row> ds1 = bsEnv.addSource(sourceFunction(9000));
>> >>         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd
>> HH:mm:ss");
>> >>         List<Row> list = new ArrayList<>();
>> >>         list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13
>> 00:00:00").getTime()), 100));
>> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
>> 00:20:00").getTime()), 100));
>> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
>> 00:40:00").getTime()), 100));
>> >>         list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13
>> 01:00:01").getTime()), 100));
>> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
>> 02:20:00").getTime()), 100));
>> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
>> 02:30:00").getTime()), 100));
>> >>         list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13
>> 02:00:02").getTime()), 100));
>> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
>> 02:20:00").getTime()), 100));
>> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
>> 02:40:00").getTime()), 100));
>> >>         list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13
>> 03:00:03").getTime()), 100));
>> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
>> 03:20:00").getTime()), 100));
>> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
>> 03:40:00").getTime()), 100));
>> >>         list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13
>> 04:00:04").getTime()), 100));
>> >>         DataStream<Row> ds1 = bsEnv.addSource(new
>> SourceFunction<Row>() {
>> >>             @Override
>> >>             public void run(SourceContext<Row> ctx) throws Exception {
>> >>                 for(Row row : list) {
>> >>                     ctx.collect(row);
>> >>                     Thread.sleep(1000);
>> >>                 }
>> >>
>> >>             }
>> >>
>> >>             @Override
>> >>             public void cancel() {
>> >>
>> >>             }
>> >>         });
>> >>         ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, 0));
>> >>         ds1.getTransformation().setOutputType((new
>> RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT)));
>> >>         bsTableEnv.createTemporaryView("order_info", ds1, "order_id,
>> order_time, fee, rowtime.rowtime");
>> >>
>> >>         List<Row> list2 = new ArrayList<>();
>> >>         list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13
>> 01:00:00").getTime())));
>> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
>> 01:20:00").getTime())));
>> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
>> 01:30:00").getTime())));
>> >>         list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13
>> 02:00:00").getTime())));
>> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
>> 02:20:00").getTime())));
>> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
>> 02:40:00").getTime())));
>> >> //        list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13
>> 03:00:03").getTime())));
>> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
>> 03:20:00").getTime())));
>> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
>> 03:40:00").getTime())));
>> >>         list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13
>> 04:00:00").getTime())));
>> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
>> 04:20:00").getTime())));
>> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
>> 04:40:00").getTime())));
>> >>         list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13
>> 05:00:00").getTime())));
>> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
>> 05:20:00").getTime())));
>> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
>> 05:40:00").getTime())));
>> >>         DataStream<Row> ds2 = bsEnv.addSource(new
>> SourceFunction<Row>() {
>> >>             @Override
>> >>             public void run(SourceContext<Row> ctx) throws Exception {
>> >>                 for(Row row : list2) {
>> >>                     ctx.collect(row);
>> >>                     Thread.sleep(1000);
>> >>                 }
>> >>
>> >>             }
>> >>
>> >>             @Override
>> >>             public void cancel() {
>> >>
>> >>             }
>> >>         });
>> >>         ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0));
>> >>         ds2.getTransformation().setOutputType((new
>> RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP)));
>> >>         bsTableEnv.createTemporaryView("pay", ds2, "order_id,
>> pay_time, rowtime.rowtime");
>> >>
>> >>         Table joinTable =  bsTableEnv.sqlQuery("SELECT a.*,b.order_id
>> from order_info a left join pay b on a.order_id=b.order_id and b.rowtime
>> between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id
>> <>'000' ");
>> >>
>> >>         bsTableEnv.toAppendStream(joinTable, Row.class).process(new
>> ProcessFunction<Row, Object>() {
>> >>             @Override
>> >>             public void processElement(Row value, Context ctx,
>> Collector<Object> out) throws Exception {
>> >>                 SimpleDateFormat sdf = new
>> SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
>> >>                 System.err.println("row:" + value + ",rowtime:" +
>> value.getField(3) + ",watermark:" +
>> sdf.format(ctx.timerService().currentWatermark()));
>> >>             }
>> >>         });
>> >>
>> >>         bsTableEnv.execute("job");
>> >>     }
>> >> }
>> >>
>> >>
>> >
>> > --
>> >
>> > Best,
>> > Benchao Li
>> >
>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>
>
> --
> **************************************
>  tivanli
> **************************************
>


-- 
**************************************
 tivanli
**************************************

Re: flink interval join后按窗口聚组问题

Posted by Tianwang Li <li...@gmail.com>.
展开讨论一些特点场景。

Benchao Li <li...@apache.org> 于2020年7月6日周一 下午11:08写道:

> 我们最开始发现这个现象的时候也有些惊讶,不过后来想了一下感觉也是合理的。
>
> 因为双流Join的时间范围有可能会比较大,比如 A流 在 B流的[-10min, +10min],那这样的话,
> A流来一条数据,可能会join到几分钟之前的数据,而此时的watermark其实已经大于了那条数据的事件时间。
>
> 我个人感觉,这应该就是在更实时的产生Join结果和导致数据时间晚于watermark之间,需要有一个balance。
> 现在默认实现是选择了更加实时的产生结果。当然还有另外一种实现思路,就是保证watermark不会超过数据时间,
> 那样的话,Join结果的产生就会delay,或者需要修改watermark逻辑,让watermark一定要小于当前能join到的数据
> 的时间最早的那个。
>
> 元始(Bob Hu) <65...@qq.com> 于2020年7月5日周日 下午8:48写道:
>
> > 谢谢您的解答。感觉flink这个机制有点奇怪呢
> >
> >
> > ------------------ 原始邮件 ------------------
> > *发件人:* "Benchao Li"<li...@apache.org>;
> > *发送时间:* 2020年7月5日(星期天) 中午11:58
> > *收件人:* "元始(Bob Hu)"<65...@qq.com>;
> > *抄送:* "user-zh"<us...@flink.apache.org>;
> > *主题:* Re: flink interval join后按窗口聚组问题
> >
> > 回到你的问题,我觉得你的观察是正确的。Time interval join产生的结果的确是会有这种情况。
> > 所以如果用事件时间的time interval join,后面再接一个事件时间的window(或者其他的使用事件时间的算子,比如CEP等)
> > 就会有些问题,很多数据被作为late数据直接丢掉了。
> >
> > 元始(Bob Hu) <65...@qq.com> 于2020年7月3日周五 下午3:29写道:
> >
> >> 您好,我想请教一个问题:
> >> flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
> >> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between
> a.rowtime
> >> and a.rowtime + INTERVAL '1' HOUR
> >> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime +
> >> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 +
> >> allowedLateness +
> >> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize,
> >> rightRelativeSize) +
> >>
> allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group
> >> by的时候这种右表数据为空的数据就丢掉了啊。
> >> flink版本 1.10.0。
> >>
> >> 下面是我的一段测试代码:
> >>
> >> import org.apache.commons.net.ntp.TimeStamp;
> >> import org.apache.flink.api.common.typeinfo.TypeInformation;
> >> import org.apache.flink.api.common.typeinfo.Types;
> >> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> >> import org.apache.flink.streaming.api.TimeCharacteristic;
> >> import org.apache.flink.streaming.api.datastream.DataStream;
> >> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> >> import
> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
> >> import org.apache.flink.streaming.api.functions.ProcessFunction;
> >> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> >> import org.apache.flink.streaming.api.watermark.Watermark;
> >> import org.apache.flink.table.api.EnvironmentSettings;
> >> import org.apache.flink.table.api.Table;
> >> import org.apache.flink.table.api.java.StreamTableEnvironment;
> >> import org.apache.flink.table.functions.ScalarFunction;
> >> import org.apache.flink.types.Row;
> >> import org.apache.flink.util.Collector;
> >> import org.apache.flink.util.IOUtils;
> >>
> >> import java.io.BufferedReader;
> >> import java.io.InputStreamReader;
> >> import java.io.Serializable;
> >> import java.net.InetSocketAddress;
> >> import java.net.Socket;
> >> import java.sql.Timestamp;
> >> import java.text.SimpleDateFormat;
> >> import java.util.ArrayList;
> >> import java.util.Date;
> >> import java.util.List;
> >>
> >> public class TimeBoundedJoin {
> >>
> >>     public static AssignerWithPeriodicWatermarks<Row>
> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) {
> >>         AssignerWithPeriodicWatermarks<Row> timestampExtractor = new
> AssignerWithPeriodicWatermarks<Row>() {
> >>             private long currentMaxTimestamp = 0;
> >>             private long lastMaxTimestamp = 0;
> >>             private long lastUpdateTime = 0;
> >>             boolean firstWatermark = true;
> >> //            Integer maxIdleTime = 30;
> >>
> >>             @Override
> >>             public Watermark getCurrentWatermark() {
> >>                 if(firstWatermark) {
> >>                     lastUpdateTime = System.currentTimeMillis();
> >>                     firstWatermark = false;
> >>                 }
> >>                 if(currentMaxTimestamp != lastMaxTimestamp) {
> >>                     lastMaxTimestamp = currentMaxTimestamp;
> >>                     lastUpdateTime = System.currentTimeMillis();
> >>                 }
> >>                 if(maxIdleTime != null && System.currentTimeMillis() -
> lastUpdateTime > maxIdleTime * 1000) {
> >>                     return new Watermark(new Date().getTime() -
> finalMaxOutOfOrderness * 1000);
> >>                 }
> >>                 return new Watermark(currentMaxTimestamp -
> finalMaxOutOfOrderness * 1000);
> >>
> >>             }
> >>
> >>             @Override
> >>             public long extractTimestamp(Row row, long
> previousElementTimestamp) {
> >>                 Object value = row.getField(1);
> >>                 long timestamp;
> >>                 try {
> >>                     timestamp = (long)value;
> >>                 } catch (Exception e) {
> >>                     timestamp = ((Timestamp)value).getTime();
> >>                 }
> >>                 if(timestamp > currentMaxTimestamp) {
> >>                     currentMaxTimestamp = timestamp;
> >>                 }
> >>                 return timestamp;
> >>             }
> >>         };
> >>         return timestampExtractor;
> >>     }
> >>
> >>     public static void main(String[] args) throws Exception {
> >>         StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> >>         EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> >>         StreamTableEnvironment bsTableEnv =
> StreamTableEnvironment.create(bsEnv, bsSettings);
> >>         bsEnv.setParallelism(1);
> >>         bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >>
> >>
> >> //        DataStream<Row> ds1 = bsEnv.addSource(sourceFunction(9000));
> >>         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd
> HH:mm:ss");
> >>         List<Row> list = new ArrayList<>();
> >>         list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13
> 00:00:00").getTime()), 100));
> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> 00:20:00").getTime()), 100));
> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> 00:40:00").getTime()), 100));
> >>         list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13
> 01:00:01").getTime()), 100));
> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> 02:20:00").getTime()), 100));
> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> 02:30:00").getTime()), 100));
> >>         list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13
> 02:00:02").getTime()), 100));
> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> 02:20:00").getTime()), 100));
> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> 02:40:00").getTime()), 100));
> >>         list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13
> 03:00:03").getTime()), 100));
> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> 03:20:00").getTime()), 100));
> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> 03:40:00").getTime()), 100));
> >>         list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13
> 04:00:04").getTime()), 100));
> >>         DataStream<Row> ds1 = bsEnv.addSource(new SourceFunction<Row>()
> {
> >>             @Override
> >>             public void run(SourceContext<Row> ctx) throws Exception {
> >>                 for(Row row : list) {
> >>                     ctx.collect(row);
> >>                     Thread.sleep(1000);
> >>                 }
> >>
> >>             }
> >>
> >>             @Override
> >>             public void cancel() {
> >>
> >>             }
> >>         });
> >>         ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, 0));
> >>         ds1.getTransformation().setOutputType((new
> RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT)));
> >>         bsTableEnv.createTemporaryView("order_info", ds1, "order_id,
> order_time, fee, rowtime.rowtime");
> >>
> >>         List<Row> list2 = new ArrayList<>();
> >>         list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13
> 01:00:00").getTime())));
> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> 01:20:00").getTime())));
> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> 01:30:00").getTime())));
> >>         list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13
> 02:00:00").getTime())));
> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> 02:20:00").getTime())));
> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> 02:40:00").getTime())));
> >> //        list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13
> 03:00:03").getTime())));
> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> 03:20:00").getTime())));
> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> 03:40:00").getTime())));
> >>         list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13
> 04:00:00").getTime())));
> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> 04:20:00").getTime())));
> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> 04:40:00").getTime())));
> >>         list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13
> 05:00:00").getTime())));
> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> 05:20:00").getTime())));
> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> 05:40:00").getTime())));
> >>         DataStream<Row> ds2 = bsEnv.addSource(new SourceFunction<Row>()
> {
> >>             @Override
> >>             public void run(SourceContext<Row> ctx) throws Exception {
> >>                 for(Row row : list2) {
> >>                     ctx.collect(row);
> >>                     Thread.sleep(1000);
> >>                 }
> >>
> >>             }
> >>
> >>             @Override
> >>             public void cancel() {
> >>
> >>             }
> >>         });
> >>         ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0));
> >>         ds2.getTransformation().setOutputType((new
> RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP)));
> >>         bsTableEnv.createTemporaryView("pay", ds2, "order_id, pay_time,
> rowtime.rowtime");
> >>
> >>         Table joinTable =  bsTableEnv.sqlQuery("SELECT a.*,b.order_id
> from order_info a left join pay b on a.order_id=b.order_id and b.rowtime
> between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id
> <>'000' ");
> >>
> >>         bsTableEnv.toAppendStream(joinTable, Row.class).process(new
> ProcessFunction<Row, Object>() {
> >>             @Override
> >>             public void processElement(Row value, Context ctx,
> Collector<Object> out) throws Exception {
> >>                 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd
> HH:mm:ss.SSS");
> >>                 System.err.println("row:" + value + ",rowtime:" +
> value.getField(3) + ",watermark:" +
> sdf.format(ctx.timerService().currentWatermark()));
> >>             }
> >>         });
> >>
> >>         bsTableEnv.execute("job");
> >>     }
> >> }
> >>
> >>
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>
>
> --
>
> Best,
> Benchao Li
>


-- 
**************************************
 tivanli
**************************************

Re: flink interval join后按窗口聚组问题

Posted by Benchao Li <li...@apache.org>.
我们最开始发现这个现象的时候也有些惊讶,不过后来想了一下感觉也是合理的。

因为双流Join的时间范围有可能会比较大,比如 A流 在 B流的[-10min, +10min],那这样的话,
A流来一条数据,可能会join到几分钟之前的数据,而此时的watermark其实已经大于了那条数据的事件时间。

我个人感觉,这应该就是在更实时的产生Join结果和导致数据时间晚于watermark之间,需要有一个balance。
现在默认实现是选择了更加实时的产生结果。当然还有另外一种实现思路,就是保证watermark不会超过数据时间,
那样的话,Join结果的产生就会delay,或者需要修改watermark逻辑,让watermark一定要小于当前能join到的数据
的时间最早的那个。

元始(Bob Hu) <65...@qq.com> 于2020年7月5日周日 下午8:48写道:

> 谢谢您的解答。感觉flink这个机制有点奇怪呢
>
>
> ------------------ 原始邮件 ------------------
> *发件人:* "Benchao Li"<li...@apache.org>;
> *发送时间:* 2020年7月5日(星期天) 中午11:58
> *收件人:* "元始(Bob Hu)"<65...@qq.com>;
> *抄送:* "user-zh"<us...@flink.apache.org>;
> *主题:* Re: flink interval join后按窗口聚组问题
>
> 回到你的问题,我觉得你的观察是正确的。Time interval join产生的结果的确是会有这种情况。
> 所以如果用事件时间的time interval join,后面再接一个事件时间的window(或者其他的使用事件时间的算子,比如CEP等)
> 就会有些问题,很多数据被作为late数据直接丢掉了。
>
> 元始(Bob Hu) <65...@qq.com> 于2020年7月3日周五 下午3:29写道:
>
>> 您好,我想请教一个问题:
>> flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
>> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between a.rowtime
>> and a.rowtime + INTERVAL '1' HOUR
>> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime +
>> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 +
>> allowedLateness +
>> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize,
>> rightRelativeSize) +
>> allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group
>> by的时候这种右表数据为空的数据就丢掉了啊。
>> flink版本 1.10.0。
>>
>> 下面是我的一段测试代码:
>>
>> import org.apache.commons.net.ntp.TimeStamp;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.common.typeinfo.Types;
>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
>> import org.apache.flink.streaming.api.functions.ProcessFunction;
>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>> import org.apache.flink.streaming.api.watermark.Watermark;
>> import org.apache.flink.table.api.EnvironmentSettings;
>> import org.apache.flink.table.api.Table;
>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>> import org.apache.flink.table.functions.ScalarFunction;
>> import org.apache.flink.types.Row;
>> import org.apache.flink.util.Collector;
>> import org.apache.flink.util.IOUtils;
>>
>> import java.io.BufferedReader;
>> import java.io.InputStreamReader;
>> import java.io.Serializable;
>> import java.net.InetSocketAddress;
>> import java.net.Socket;
>> import java.sql.Timestamp;
>> import java.text.SimpleDateFormat;
>> import java.util.ArrayList;
>> import java.util.Date;
>> import java.util.List;
>>
>> public class TimeBoundedJoin {
>>
>>     public static AssignerWithPeriodicWatermarks<Row> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) {
>>         AssignerWithPeriodicWatermarks<Row> timestampExtractor = new AssignerWithPeriodicWatermarks<Row>() {
>>             private long currentMaxTimestamp = 0;
>>             private long lastMaxTimestamp = 0;
>>             private long lastUpdateTime = 0;
>>             boolean firstWatermark = true;
>> //            Integer maxIdleTime = 30;
>>
>>             @Override
>>             public Watermark getCurrentWatermark() {
>>                 if(firstWatermark) {
>>                     lastUpdateTime = System.currentTimeMillis();
>>                     firstWatermark = false;
>>                 }
>>                 if(currentMaxTimestamp != lastMaxTimestamp) {
>>                     lastMaxTimestamp = currentMaxTimestamp;
>>                     lastUpdateTime = System.currentTimeMillis();
>>                 }
>>                 if(maxIdleTime != null && System.currentTimeMillis() - lastUpdateTime > maxIdleTime * 1000) {
>>                     return new Watermark(new Date().getTime() - finalMaxOutOfOrderness * 1000);
>>                 }
>>                 return new Watermark(currentMaxTimestamp - finalMaxOutOfOrderness * 1000);
>>
>>             }
>>
>>             @Override
>>             public long extractTimestamp(Row row, long previousElementTimestamp) {
>>                 Object value = row.getField(1);
>>                 long timestamp;
>>                 try {
>>                     timestamp = (long)value;
>>                 } catch (Exception e) {
>>                     timestamp = ((Timestamp)value).getTime();
>>                 }
>>                 if(timestamp > currentMaxTimestamp) {
>>                     currentMaxTimestamp = timestamp;
>>                 }
>>                 return timestamp;
>>             }
>>         };
>>         return timestampExtractor;
>>     }
>>
>>     public static void main(String[] args) throws Exception {
>>         StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>>         EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>         StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
>>         bsEnv.setParallelism(1);
>>         bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>>
>> //        DataStream<Row> ds1 = bsEnv.addSource(sourceFunction(9000));
>>         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
>>         List<Row> list = new ArrayList<>();
>>         list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 00:00:00").getTime()), 100));
>>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:20:00").getTime()), 100));
>>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:40:00").getTime()), 100));
>>         list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 01:00:01").getTime()), 100));
>>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
>>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:30:00").getTime()), 100));
>>         list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 02:00:02").getTime()), 100));
>>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
>>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime()), 100));
>>         list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime()), 100));
>>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime()), 100));
>>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime()), 100));
>>         list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 04:00:04").getTime()), 100));
>>         DataStream<Row> ds1 = bsEnv.addSource(new SourceFunction<Row>() {
>>             @Override
>>             public void run(SourceContext<Row> ctx) throws Exception {
>>                 for(Row row : list) {
>>                     ctx.collect(row);
>>                     Thread.sleep(1000);
>>                 }
>>
>>             }
>>
>>             @Override
>>             public void cancel() {
>>
>>             }
>>         });
>>         ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, 0));
>>         ds1.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT)));
>>         bsTableEnv.createTemporaryView("order_info", ds1, "order_id, order_time, fee, rowtime.rowtime");
>>
>>         List<Row> list2 = new ArrayList<>();
>>         list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 01:00:00").getTime())));
>>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:20:00").getTime())));
>>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:30:00").getTime())));
>>         list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 02:00:00").getTime())));
>>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime())));
>>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime())));
>> //        list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime())));
>>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime())));
>>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime())));
>>         list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 04:00:00").getTime())));
>>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:20:00").getTime())));
>>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:40:00").getTime())));
>>         list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 05:00:00").getTime())));
>>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:20:00").getTime())));
>>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:40:00").getTime())));
>>         DataStream<Row> ds2 = bsEnv.addSource(new SourceFunction<Row>() {
>>             @Override
>>             public void run(SourceContext<Row> ctx) throws Exception {
>>                 for(Row row : list2) {
>>                     ctx.collect(row);
>>                     Thread.sleep(1000);
>>                 }
>>
>>             }
>>
>>             @Override
>>             public void cancel() {
>>
>>             }
>>         });
>>         ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0));
>>         ds2.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP)));
>>         bsTableEnv.createTemporaryView("pay", ds2, "order_id, pay_time, rowtime.rowtime");
>>
>>         Table joinTable =  bsTableEnv.sqlQuery("SELECT a.*,b.order_id from order_info a left join pay b on a.order_id=b.order_id and b.rowtime between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id <>'000' ");
>>
>>         bsTableEnv.toAppendStream(joinTable, Row.class).process(new ProcessFunction<Row, Object>() {
>>             @Override
>>             public void processElement(Row value, Context ctx, Collector<Object> out) throws Exception {
>>                 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
>>                 System.err.println("row:" + value + ",rowtime:" + value.getField(3) + ",watermark:" + sdf.format(ctx.timerService().currentWatermark()));
>>             }
>>         });
>>
>>         bsTableEnv.execute("job");
>>     }
>> }
>>
>>
>
> --
>
> Best,
> Benchao Li
>


-- 

Best,
Benchao Li

回复: flink interval join后按窗口聚组问题

Posted by "元始(Bob Hu)" <65...@qq.com>.
谢谢您的解答。感觉flink这个机制有点奇怪呢




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Benchao Li"<libenchao@apache.org&gt;;
发送时间:&nbsp;2020年7月5日(星期天) 中午11:58
收件人:&nbsp;"元始(Bob Hu)"<657390448@qq.com&gt;;
抄送:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
主题:&nbsp;Re: flink interval join后按窗口聚组问题



回到你的问题,我觉得你的观察是正确的。Time interval join产生的结果的确是会有这种情况。所以如果用事件时间的time interval join,后面再接一个事件时间的window(或者其他的使用事件时间的算子,比如CEP等)
就会有些问题,很多数据被作为late数据直接丢掉了。


元始(Bob Hu) <657390448@qq.com&gt; 于2020年7月3日周五 下午3:29写道:

您好,我想请教一个问题:
flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
比如关联条件是select * from a,b where a.id=b.id and b.rowtime between a.rowtime and a.rowtime + INTERVAL '1' HOUR ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime + leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 + allowedLateness + 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize, rightRelativeSize) + allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group by的时候这种右表数据为空的数据就丢掉了啊。

flink版本 1.10.0。


下面是我的一段测试代码:
import org.apache.commons.net.ntp.TimeStamp;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.IOUtils;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

public class TimeBoundedJoin {

    public static AssignerWithPeriodicWatermarks<Row&gt; getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) {
        AssignerWithPeriodicWatermarks<Row&gt; timestampExtractor = new AssignerWithPeriodicWatermarks<Row&gt;() {
            private long currentMaxTimestamp = 0;
            private long lastMaxTimestamp = 0;
            private long lastUpdateTime = 0;
            boolean firstWatermark = true;
//            Integer maxIdleTime = 30;

            @Override
            public Watermark getCurrentWatermark() {
                if(firstWatermark) {
                    lastUpdateTime = System.currentTimeMillis();
                    firstWatermark = false;
                }
                if(currentMaxTimestamp != lastMaxTimestamp) {
                    lastMaxTimestamp = currentMaxTimestamp;
                    lastUpdateTime = System.currentTimeMillis();
                }
                if(maxIdleTime != null &amp;&amp; System.currentTimeMillis() - lastUpdateTime &gt; maxIdleTime * 1000) {
                    return new Watermark(new Date().getTime() - finalMaxOutOfOrderness * 1000);
                }
                return new Watermark(currentMaxTimestamp - finalMaxOutOfOrderness * 1000);

            }

            @Override
            public long extractTimestamp(Row row, long previousElementTimestamp) {
                Object value = row.getField(1);
                long timestamp;
                try {
                    timestamp = (long)value;
                } catch (Exception e) {
                    timestamp = ((Timestamp)value).getTime();
                }
                if(timestamp &gt; currentMaxTimestamp) {
                    currentMaxTimestamp = timestamp;
                }
                return timestamp;
            }
        };
        return timestampExtractor;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
        bsEnv.setParallelism(1);
        bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


//        DataStream<Row&gt; ds1 = bsEnv.addSource(sourceFunction(9000));
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        List<Row&gt; list = new ArrayList<&gt;();
        list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 00:00:00").getTime()), 100));
        list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:20:00").getTime()), 100));
        list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:40:00").getTime()), 100));
        list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 01:00:01").getTime()), 100));
        list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
        list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:30:00").getTime()), 100));
        list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 02:00:02").getTime()), 100));
        list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
        list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime()), 100));
        list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime()), 100));
        list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime()), 100));
        list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime()), 100));
        list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 04:00:04").getTime()), 100));
        DataStream<Row&gt; ds1 = bsEnv.addSource(new SourceFunction<Row&gt;() {
            @Override
            public void run(SourceContext<Row&gt; ctx) throws Exception {
                for(Row row : list) {
                    ctx.collect(row);
                    Thread.sleep(1000);
                }

            }

            @Override
            public void cancel() {

            }
        });
        ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, 0));
        ds1.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT)));
        bsTableEnv.createTemporaryView("order_info", ds1, "order_id, order_time, fee, rowtime.rowtime");

        List<Row&gt; list2 = new ArrayList<&gt;();
        list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 01:00:00").getTime())));
        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:20:00").getTime())));
        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:30:00").getTime())));
        list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 02:00:00").getTime())));
        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime())));
        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime())));
//        list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime())));
        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime())));
        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime())));
        list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 04:00:00").getTime())));
        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:20:00").getTime())));
        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:40:00").getTime())));
        list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 05:00:00").getTime())));
        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:20:00").getTime())));
        list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:40:00").getTime())));
        DataStream<Row&gt; ds2 = bsEnv.addSource(new SourceFunction<Row&gt;() {
            @Override
            public void run(SourceContext<Row&gt; ctx) throws Exception {
                for(Row row : list2) {
                    ctx.collect(row);
                    Thread.sleep(1000);
                }

            }

            @Override
            public void cancel() {

            }
        });
        ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0));
        ds2.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP)));
        bsTableEnv.createTemporaryView("pay", ds2, "order_id, pay_time, rowtime.rowtime");

        Table joinTable =  bsTableEnv.sqlQuery("SELECT a.*,b.order_id from order_info a left join pay b on a.order_id=b.order_id and b.rowtime between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id <&gt;'000' ");

        bsTableEnv.toAppendStream(joinTable, Row.class).process(new ProcessFunction<Row, Object&gt;() {
            @Override
            public void processElement(Row value, Context ctx, Collector<Object&gt; out) throws Exception {
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                System.err.println("row:" + value + ",rowtime:" + value.getField(3) + ",watermark:" + sdf.format(ctx.timerService().currentWatermark()));
            }
        });

        bsTableEnv.execute("job");
    }
}





-- 

Best,
Benchao Li

Re: flink interval join后按窗口聚组问题

Posted by Benchao Li <li...@apache.org>.
回到你的问题,我觉得你的观察是正确的。Time interval join产生的结果的确是会有这种情况。
所以如果用事件时间的time interval join,后面再接一个事件时间的window(或者其他的使用事件时间的算子,比如CEP等)
就会有些问题,很多数据被作为late数据直接丢掉了。

元始(Bob Hu) <65...@qq.com> 于2020年7月3日周五 下午3:29写道:

> 您好,我想请教一个问题:
> flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between a.rowtime
> and a.rowtime + INTERVAL '1' HOUR
> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime +
> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 +
> allowedLateness +
> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize,
> rightRelativeSize) +
> allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group
> by的时候这种右表数据为空的数据就丢掉了啊。
> flink版本 1.10.0。
>
> 下面是我的一段测试代码:
>
> import org.apache.commons.net.ntp.TimeStamp;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
> import org.apache.flink.streaming.api.functions.ProcessFunction;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.functions.ScalarFunction;
> import org.apache.flink.types.Row;
> import org.apache.flink.util.Collector;
> import org.apache.flink.util.IOUtils;
>
> import java.io.BufferedReader;
> import java.io.InputStreamReader;
> import java.io.Serializable;
> import java.net.InetSocketAddress;
> import java.net.Socket;
> import java.sql.Timestamp;
> import java.text.SimpleDateFormat;
> import java.util.ArrayList;
> import java.util.Date;
> import java.util.List;
>
> public class TimeBoundedJoin {
>
>     public static AssignerWithPeriodicWatermarks<Row> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) {
>         AssignerWithPeriodicWatermarks<Row> timestampExtractor = new AssignerWithPeriodicWatermarks<Row>() {
>             private long currentMaxTimestamp = 0;
>             private long lastMaxTimestamp = 0;
>             private long lastUpdateTime = 0;
>             boolean firstWatermark = true;
> //            Integer maxIdleTime = 30;
>
>             @Override
>             public Watermark getCurrentWatermark() {
>                 if(firstWatermark) {
>                     lastUpdateTime = System.currentTimeMillis();
>                     firstWatermark = false;
>                 }
>                 if(currentMaxTimestamp != lastMaxTimestamp) {
>                     lastMaxTimestamp = currentMaxTimestamp;
>                     lastUpdateTime = System.currentTimeMillis();
>                 }
>                 if(maxIdleTime != null && System.currentTimeMillis() - lastUpdateTime > maxIdleTime * 1000) {
>                     return new Watermark(new Date().getTime() - finalMaxOutOfOrderness * 1000);
>                 }
>                 return new Watermark(currentMaxTimestamp - finalMaxOutOfOrderness * 1000);
>
>             }
>
>             @Override
>             public long extractTimestamp(Row row, long previousElementTimestamp) {
>                 Object value = row.getField(1);
>                 long timestamp;
>                 try {
>                     timestamp = (long)value;
>                 } catch (Exception e) {
>                     timestamp = ((Timestamp)value).getTime();
>                 }
>                 if(timestamp > currentMaxTimestamp) {
>                     currentMaxTimestamp = timestamp;
>                 }
>                 return timestamp;
>             }
>         };
>         return timestampExtractor;
>     }
>
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>         EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>         StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
>         bsEnv.setParallelism(1);
>         bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
> //        DataStream<Row> ds1 = bsEnv.addSource(sourceFunction(9000));
>         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
>         List<Row> list = new ArrayList<>();
>         list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 00:00:00").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:20:00").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:40:00").getTime()), 100));
>         list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 01:00:01").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:30:00").getTime()), 100));
>         list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 02:00:02").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime()), 100));
>         list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime()), 100));
>         list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 04:00:04").getTime()), 100));
>         DataStream<Row> ds1 = bsEnv.addSource(new SourceFunction<Row>() {
>             @Override
>             public void run(SourceContext<Row> ctx) throws Exception {
>                 for(Row row : list) {
>                     ctx.collect(row);
>                     Thread.sleep(1000);
>                 }
>
>             }
>
>             @Override
>             public void cancel() {
>
>             }
>         });
>         ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, 0));
>         ds1.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT)));
>         bsTableEnv.createTemporaryView("order_info", ds1, "order_id, order_time, fee, rowtime.rowtime");
>
>         List<Row> list2 = new ArrayList<>();
>         list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 01:00:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:20:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:30:00").getTime())));
>         list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 02:00:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime())));
> //        list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime())));
>         list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 04:00:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:20:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:40:00").getTime())));
>         list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 05:00:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:20:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:40:00").getTime())));
>         DataStream<Row> ds2 = bsEnv.addSource(new SourceFunction<Row>() {
>             @Override
>             public void run(SourceContext<Row> ctx) throws Exception {
>                 for(Row row : list2) {
>                     ctx.collect(row);
>                     Thread.sleep(1000);
>                 }
>
>             }
>
>             @Override
>             public void cancel() {
>
>             }
>         });
>         ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0));
>         ds2.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP)));
>         bsTableEnv.createTemporaryView("pay", ds2, "order_id, pay_time, rowtime.rowtime");
>
>         Table joinTable =  bsTableEnv.sqlQuery("SELECT a.*,b.order_id from order_info a left join pay b on a.order_id=b.order_id and b.rowtime between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id <>'000' ");
>
>         bsTableEnv.toAppendStream(joinTable, Row.class).process(new ProcessFunction<Row, Object>() {
>             @Override
>             public void processElement(Row value, Context ctx, Collector<Object> out) throws Exception {
>                 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
>                 System.err.println("row:" + value + ",rowtime:" + value.getField(3) + ",watermark:" + sdf.format(ctx.timerService().currentWatermark()));
>             }
>         });
>
>         bsTableEnv.execute("job");
>     }
> }
>
>

-- 

Best,
Benchao Li