You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Benchao Li <li...@apache.org> on 2020/07/05 03:58:54 UTC
Re: flink interval join后按窗口聚组问题
回到你的问题,我觉得你的观察是正确的。Time interval join产生的结果的确是会有这种情况。
所以如果用事件时间的time interval join,后面再接一个事件时间的window(或者其他的使用事件时间的算子,比如CEP等)
就会有些问题,很多数据被作为late数据直接丢掉了。
元始(Bob Hu) <65...@qq.com> 于2020年7月3日周五 下午3:29写道:
> 您好,我想请教一个问题:
> flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between a.rowtime
> and a.rowtime + INTERVAL '1' HOUR
> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime +
> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 +
> allowedLateness +
> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize,
> rightRelativeSize) +
> allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group
> by的时候这种右表数据为空的数据就丢掉了啊。
> flink版本 1.10.0。
>
> 下面是我的一段测试代码:
>
> import org.apache.commons.net.ntp.TimeStamp;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
> import org.apache.flink.streaming.api.functions.ProcessFunction;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.functions.ScalarFunction;
> import org.apache.flink.types.Row;
> import org.apache.flink.util.Collector;
> import org.apache.flink.util.IOUtils;
>
> import java.io.BufferedReader;
> import java.io.InputStreamReader;
> import java.io.Serializable;
> import java.net.InetSocketAddress;
> import java.net.Socket;
> import java.sql.Timestamp;
> import java.text.SimpleDateFormat;
> import java.util.ArrayList;
> import java.util.Date;
> import java.util.List;
>
> public class TimeBoundedJoin {
>
> public static AssignerWithPeriodicWatermarks<Row> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) {
> AssignerWithPeriodicWatermarks<Row> timestampExtractor = new AssignerWithPeriodicWatermarks<Row>() {
> private long currentMaxTimestamp = 0;
> private long lastMaxTimestamp = 0;
> private long lastUpdateTime = 0;
> boolean firstWatermark = true;
> // Integer maxIdleTime = 30;
>
> @Override
> public Watermark getCurrentWatermark() {
> if(firstWatermark) {
> lastUpdateTime = System.currentTimeMillis();
> firstWatermark = false;
> }
> if(currentMaxTimestamp != lastMaxTimestamp) {
> lastMaxTimestamp = currentMaxTimestamp;
> lastUpdateTime = System.currentTimeMillis();
> }
> if(maxIdleTime != null && System.currentTimeMillis() - lastUpdateTime > maxIdleTime * 1000) {
> return new Watermark(new Date().getTime() - finalMaxOutOfOrderness * 1000);
> }
> return new Watermark(currentMaxTimestamp - finalMaxOutOfOrderness * 1000);
>
> }
>
> @Override
> public long extractTimestamp(Row row, long previousElementTimestamp) {
> Object value = row.getField(1);
> long timestamp;
> try {
> timestamp = (long)value;
> } catch (Exception e) {
> timestamp = ((Timestamp)value).getTime();
> }
> if(timestamp > currentMaxTimestamp) {
> currentMaxTimestamp = timestamp;
> }
> return timestamp;
> }
> };
> return timestampExtractor;
> }
>
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
> bsEnv.setParallelism(1);
> bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
> // DataStream<Row> ds1 = bsEnv.addSource(sourceFunction(9000));
> SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
> List<Row> list = new ArrayList<>();
> list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 00:00:00").getTime()), 100));
> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:20:00").getTime()), 100));
> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:40:00").getTime()), 100));
> list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 01:00:01").getTime()), 100));
> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:30:00").getTime()), 100));
> list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 02:00:02").getTime()), 100));
> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime()), 100));
> list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime()), 100));
> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime()), 100));
> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime()), 100));
> list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 04:00:04").getTime()), 100));
> DataStream<Row> ds1 = bsEnv.addSource(new SourceFunction<Row>() {
> @Override
> public void run(SourceContext<Row> ctx) throws Exception {
> for(Row row : list) {
> ctx.collect(row);
> Thread.sleep(1000);
> }
>
> }
>
> @Override
> public void cancel() {
>
> }
> });
> ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, 0));
> ds1.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT)));
> bsTableEnv.createTemporaryView("order_info", ds1, "order_id, order_time, fee, rowtime.rowtime");
>
> List<Row> list2 = new ArrayList<>();
> list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 01:00:00").getTime())));
> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:20:00").getTime())));
> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:30:00").getTime())));
> list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 02:00:00").getTime())));
> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime())));
> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime())));
> // list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime())));
> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime())));
> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime())));
> list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 04:00:00").getTime())));
> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:20:00").getTime())));
> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:40:00").getTime())));
> list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 05:00:00").getTime())));
> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:20:00").getTime())));
> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:40:00").getTime())));
> DataStream<Row> ds2 = bsEnv.addSource(new SourceFunction<Row>() {
> @Override
> public void run(SourceContext<Row> ctx) throws Exception {
> for(Row row : list2) {
> ctx.collect(row);
> Thread.sleep(1000);
> }
>
> }
>
> @Override
> public void cancel() {
>
> }
> });
> ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0));
> ds2.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP)));
> bsTableEnv.createTemporaryView("pay", ds2, "order_id, pay_time, rowtime.rowtime");
>
> Table joinTable = bsTableEnv.sqlQuery("SELECT a.*,b.order_id from order_info a left join pay b on a.order_id=b.order_id and b.rowtime between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id <>'000' ");
>
> bsTableEnv.toAppendStream(joinTable, Row.class).process(new ProcessFunction<Row, Object>() {
> @Override
> public void processElement(Row value, Context ctx, Collector<Object> out) throws Exception {
> SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
> System.err.println("row:" + value + ",rowtime:" + value.getField(3) + ",watermark:" + sdf.format(ctx.timerService().currentWatermark()));
> }
> });
>
> bsTableEnv.execute("job");
> }
> }
>
>
--
Best,
Benchao Li
Re: flink interval join后按窗口聚组问题
Posted by Benchao Li <li...@apache.org>.
Hi Tianwang,一旦,
我感觉这个场景其实可以在Flink SQL中做一个优化,我建了一个issue[1],欢迎讨论~
[1] https://issues.apache.org/jira/browse/FLINK-18996
赵一旦 <hi...@gmail.com> 于2020年8月17日周一 上午11:52写道:
> 大概看了下。这个问题我业务中涉及到过。我是DataStream API做的。
> 不过我是在任务设计阶段就考虑了所有case,然后提前考虑了这些问题的。
> watermark是可以重设的。其次我还更改了interval join的算子实现,默认1.10只支持inner join。不支持left/right
> join。
> 并且inner join后采用最大的timestamp。这个比较复杂,实际如果做left join,业务上可能更希望使用left的时间,right
> join则使用right的时间。out join则只能使用留下的那个的时间,inner join情况需要看业务。
>
>
> 你这个问题主要就是watermark重设就可以了。
>
>
>
> Tianwang Li <li...@gmail.com> 于2020年8月16日周日 上午10:45写道:
>
> > 展开讨论一些特点从场景。
> > 1、inner join场景。有什么办法取两条流的的rowtime 的max吗?
> > 使用SQL语句的场合,怎么实现?
> > 例如:
> > SELECT if(left.rowtime > right.rowtime, left.rowtime, right.rowtime) as
> > rowtime, ...
> >
> > 如果支持了,那么这种场景我们还是可以在下游进行窗口计算和CEP之类的计算。
> >
> > Tianwang Li <li...@gmail.com> 于2020年8月16日周日 上午10:40写道:
> >
> > > 展开讨论一些特点场景。
> > >
> > > Benchao Li <li...@apache.org> 于2020年7月6日周一 下午11:08写道:
> > >
> > >> 我们最开始发现这个现象的时候也有些惊讶,不过后来想了一下感觉也是合理的。
> > >>
> > >> 因为双流Join的时间范围有可能会比较大,比如 A流 在 B流的[-10min, +10min],那这样的话,
> > >> A流来一条数据,可能会join到几分钟之前的数据,而此时的watermark其实已经大于了那条数据的事件时间。
> > >>
> > >> 我个人感觉,这应该就是在更实时的产生Join结果和导致数据时间晚于watermark之间,需要有一个balance。
> > >> 现在默认实现是选择了更加实时的产生结果。当然还有另外一种实现思路,就是保证watermark不会超过数据时间,
> > >> 那样的话,Join结果的产生就会delay,或者需要修改watermark逻辑,让watermark一定要小于当前能join到的数据
> > >> 的时间最早的那个。
> > >>
> > >> 元始(Bob Hu) <65...@qq.com> 于2020年7月5日周日 下午8:48写道:
> > >>
> > >> > 谢谢您的解答。感觉flink这个机制有点奇怪呢
> > >> >
> > >> >
> > >> > ------------------ 原始邮件 ------------------
> > >> > *发件人:* "Benchao Li"<li...@apache.org>;
> > >> > *发送时间:* 2020年7月5日(星期天) 中午11:58
> > >> > *收件人:* "元始(Bob Hu)"<65...@qq.com>;
> > >> > *抄送:* "user-zh"<us...@flink.apache.org>;
> > >> > *主题:* Re: flink interval join后按窗口聚组问题
> > >> >
> > >> > 回到你的问题,我觉得你的观察是正确的。Time interval join产生的结果的确是会有这种情况。
> > >> > 所以如果用事件时间的time interval
> join,后面再接一个事件时间的window(或者其他的使用事件时间的算子,比如CEP等)
> > >> > 就会有些问题,很多数据被作为late数据直接丢掉了。
> > >> >
> > >> > 元始(Bob Hu) <65...@qq.com> 于2020年7月3日周五 下午3:29写道:
> > >> >
> > >> >> 您好,我想请教一个问题:
> > >> >> flink双流表 interval join后再做window group是不是有问题呢,有些left
> join关联不上的数据会被丢掉。
> > >> >> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between
> > >> a.rowtime
> > >> >> and a.rowtime + INTERVAL '1' HOUR
> > >> >> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime =
> rowTime
> > +
> > >> >> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 +
> > >> >> allowedLateness +
> > >> >>
> > >>
> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize,
> > >> >> rightRelativeSize) +
> > >> >>
> > >>
> >
> allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group
> > >> >> by的时候这种右表数据为空的数据就丢掉了啊。
> > >> >> flink版本 1.10.0。
> > >> >>
> > >> >> 下面是我的一段测试代码:
> > >> >>
> > >> >> import org.apache.commons.net.ntp.TimeStamp;
> > >> >> import org.apache.flink.api.common.typeinfo.TypeInformation;
> > >> >> import org.apache.flink.api.common.typeinfo.Types;
> > >> >> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> > >> >> import org.apache.flink.streaming.api.TimeCharacteristic;
> > >> >> import org.apache.flink.streaming.api.datastream.DataStream;
> > >> >> import
> > >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > >> >> import
> > >>
> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
> > >> >> import org.apache.flink.streaming.api.functions.ProcessFunction;
> > >> >> import
> > org.apache.flink.streaming.api.functions.source.SourceFunction;
> > >> >> import org.apache.flink.streaming.api.watermark.Watermark;
> > >> >> import org.apache.flink.table.api.EnvironmentSettings;
> > >> >> import org.apache.flink.table.api.Table;
> > >> >> import org.apache.flink.table.api.java.StreamTableEnvironment;
> > >> >> import org.apache.flink.table.functions.ScalarFunction;
> > >> >> import org.apache.flink.types.Row;
> > >> >> import org.apache.flink.util.Collector;
> > >> >> import org.apache.flink.util.IOUtils;
> > >> >>
> > >> >> import java.io.BufferedReader;
> > >> >> import java.io.InputStreamReader;
> > >> >> import java.io.Serializable;
> > >> >> import java.net.InetSocketAddress;
> > >> >> import java.net.Socket;
> > >> >> import java.sql.Timestamp;
> > >> >> import java.text.SimpleDateFormat;
> > >> >> import java.util.ArrayList;
> > >> >> import java.util.Date;
> > >> >> import java.util.List;
> > >> >>
> > >> >> public class TimeBoundedJoin {
> > >> >>
> > >> >> public static AssignerWithPeriodicWatermarks<Row>
> > >> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) {
> > >> >> AssignerWithPeriodicWatermarks<Row> timestampExtractor =
> new
> > >> AssignerWithPeriodicWatermarks<Row>() {
> > >> >> private long currentMaxTimestamp = 0;
> > >> >> private long lastMaxTimestamp = 0;
> > >> >> private long lastUpdateTime = 0;
> > >> >> boolean firstWatermark = true;
> > >> >> // Integer maxIdleTime = 30;
> > >> >>
> > >> >> @Override
> > >> >> public Watermark getCurrentWatermark() {
> > >> >> if(firstWatermark) {
> > >> >> lastUpdateTime = System.currentTimeMillis();
> > >> >> firstWatermark = false;
> > >> >> }
> > >> >> if(currentMaxTimestamp != lastMaxTimestamp) {
> > >> >> lastMaxTimestamp = currentMaxTimestamp;
> > >> >> lastUpdateTime = System.currentTimeMillis();
> > >> >> }
> > >> >> if(maxIdleTime != null &&
> System.currentTimeMillis()
> > -
> > >> lastUpdateTime > maxIdleTime * 1000) {
> > >> >> return new Watermark(new Date().getTime() -
> > >> finalMaxOutOfOrderness * 1000);
> > >> >> }
> > >> >> return new Watermark(currentMaxTimestamp -
> > >> finalMaxOutOfOrderness * 1000);
> > >> >>
> > >> >> }
> > >> >>
> > >> >> @Override
> > >> >> public long extractTimestamp(Row row, long
> > >> previousElementTimestamp) {
> > >> >> Object value = row.getField(1);
> > >> >> long timestamp;
> > >> >> try {
> > >> >> timestamp = (long)value;
> > >> >> } catch (Exception e) {
> > >> >> timestamp = ((Timestamp)value).getTime();
> > >> >> }
> > >> >> if(timestamp > currentMaxTimestamp) {
> > >> >> currentMaxTimestamp = timestamp;
> > >> >> }
> > >> >> return timestamp;
> > >> >> }
> > >> >> };
> > >> >> return timestampExtractor;
> > >> >> }
> > >> >>
> > >> >> public static void main(String[] args) throws Exception {
> > >> >> StreamExecutionEnvironment bsEnv =
> > >> StreamExecutionEnvironment.getExecutionEnvironment();
> > >> >> EnvironmentSettings bsSettings =
> > >>
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> > >> >> StreamTableEnvironment bsTableEnv =
> > >> StreamTableEnvironment.create(bsEnv, bsSettings);
> > >> >> bsEnv.setParallelism(1);
> > >> >>
> > >> bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> > >> >>
> > >> >>
> > >> >> // DataStream<Row> ds1 =
> > bsEnv.addSource(sourceFunction(9000));
> > >> >> SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd
> > >> HH:mm:ss");
> > >> >> List<Row> list = new ArrayList<>();
> > >> >> list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13
> > >> 00:00:00").getTime()), 100));
> > >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> > >> 00:20:00").getTime()), 100));
> > >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> > >> 00:40:00").getTime()), 100));
> > >> >> list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13
> > >> 01:00:01").getTime()), 100));
> > >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> > >> 02:20:00").getTime()), 100));
> > >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> > >> 02:30:00").getTime()), 100));
> > >> >> list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13
> > >> 02:00:02").getTime()), 100));
> > >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> > >> 02:20:00").getTime()), 100));
> > >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> > >> 02:40:00").getTime()), 100));
> > >> >> list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13
> > >> 03:00:03").getTime()), 100));
> > >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> > >> 03:20:00").getTime()), 100));
> > >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> > >> 03:40:00").getTime()), 100));
> > >> >> list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13
> > >> 04:00:04").getTime()), 100));
> > >> >> DataStream<Row> ds1 = bsEnv.addSource(new
> > >> SourceFunction<Row>() {
> > >> >> @Override
> > >> >> public void run(SourceContext<Row> ctx) throws
> Exception
> > {
> > >> >> for(Row row : list) {
> > >> >> ctx.collect(row);
> > >> >> Thread.sleep(1000);
> > >> >> }
> > >> >>
> > >> >> }
> > >> >>
> > >> >> @Override
> > >> >> public void cancel() {
> > >> >>
> > >> >> }
> > >> >> });
> > >> >> ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null,
> > 0));
> > >> >> ds1.getTransformation().setOutputType((new
> > >> RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT)));
> > >> >> bsTableEnv.createTemporaryView("order_info", ds1,
> "order_id,
> > >> order_time, fee, rowtime.rowtime");
> > >> >>
> > >> >> List<Row> list2 = new ArrayList<>();
> > >> >> list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13
> > >> 01:00:00").getTime())));
> > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 01:20:00").getTime())));
> > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 01:30:00").getTime())));
> > >> >> list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13
> > >> 02:00:00").getTime())));
> > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 02:20:00").getTime())));
> > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 02:40:00").getTime())));
> > >> >> // list2.add(Row.of("003",new
> Timestamp(sdf.parse("2020-05-13
> > >> 03:00:03").getTime())));
> > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 03:20:00").getTime())));
> > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 03:40:00").getTime())));
> > >> >> list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13
> > >> 04:00:00").getTime())));
> > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 04:20:00").getTime())));
> > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 04:40:00").getTime())));
> > >> >> list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13
> > >> 05:00:00").getTime())));
> > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 05:20:00").getTime())));
> > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 05:40:00").getTime())));
> > >> >> DataStream<Row> ds2 = bsEnv.addSource(new
> > >> SourceFunction<Row>() {
> > >> >> @Override
> > >> >> public void run(SourceContext<Row> ctx) throws
> Exception
> > {
> > >> >> for(Row row : list2) {
> > >> >> ctx.collect(row);
> > >> >> Thread.sleep(1000);
> > >> >> }
> > >> >>
> > >> >> }
> > >> >>
> > >> >> @Override
> > >> >> public void cancel() {
> > >> >>
> > >> >> }
> > >> >> });
> > >> >> ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null,
> > 0));
> > >> >> ds2.getTransformation().setOutputType((new
> > >> RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP)));
> > >> >> bsTableEnv.createTemporaryView("pay", ds2, "order_id,
> > >> pay_time, rowtime.rowtime");
> > >> >>
> > >> >> Table joinTable = bsTableEnv.sqlQuery("SELECT
> a.*,b.order_id
> > >> from order_info a left join pay b on a.order_id=b.order_id and
> b.rowtime
> > >> between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id
> > >> <>'000' ");
> > >> >>
> > >> >> bsTableEnv.toAppendStream(joinTable, Row.class).process(new
> > >> ProcessFunction<Row, Object>() {
> > >> >> @Override
> > >> >> public void processElement(Row value, Context ctx,
> > >> Collector<Object> out) throws Exception {
> > >> >> SimpleDateFormat sdf = new
> > >> SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
> > >> >> System.err.println("row:" + value + ",rowtime:" +
> > >> value.getField(3) + ",watermark:" +
> > >> sdf.format(ctx.timerService().currentWatermark()));
> > >> >> }
> > >> >> });
> > >> >>
> > >> >> bsTableEnv.execute("job");
> > >> >> }
> > >> >> }
> > >> >>
> > >> >>
> > >> >
> > >> > --
> > >> >
> > >> > Best,
> > >> > Benchao Li
> > >> >
> > >>
> > >>
> > >> --
> > >>
> > >> Best,
> > >> Benchao Li
> > >>
> > >
> > >
> > > --
> > > **************************************
> > > tivanli
> > > **************************************
> > >
> >
> >
> > --
> > **************************************
> > tivanli
> > **************************************
> >
>
--
Best,
Benchao Li
Re: flink interval join后按窗口聚组问题
Posted by 赵一旦 <hi...@gmail.com>.
大概看了下。这个问题我业务中涉及到过。我是DataStream API做的。
不过我是在任务设计阶段就考虑了所有case,然后提前考虑了这些问题的。
watermark是可以重设的。其次我还更改了interval join的算子实现,默认1.10只支持inner join。不支持left/right
join。
并且inner join后采用最大的timestamp。这个比较复杂,实际如果做left join,业务上可能更希望使用left的时间,right
join则使用right的时间。out join则只能使用留下的那个的时间,inner join情况需要看业务。
你这个问题主要就是watermark重设就可以了。
Tianwang Li <li...@gmail.com> 于2020年8月16日周日 上午10:45写道:
> 展开讨论一些特点从场景。
> 1、inner join场景。有什么办法取两条流的的rowtime 的max吗?
> 使用SQL语句的场合,怎么实现?
> 例如:
> SELECT if(left.rowtime > right.rowtime, left.rowtime, right.rowtime) as
> rowtime, ...
>
> 如果支持了,那么这种场景我们还是可以在下游进行窗口计算和CEP之类的计算。
>
> Tianwang Li <li...@gmail.com> 于2020年8月16日周日 上午10:40写道:
>
> > 展开讨论一些特点场景。
> >
> > Benchao Li <li...@apache.org> 于2020年7月6日周一 下午11:08写道:
> >
> >> 我们最开始发现这个现象的时候也有些惊讶,不过后来想了一下感觉也是合理的。
> >>
> >> 因为双流Join的时间范围有可能会比较大,比如 A流 在 B流的[-10min, +10min],那这样的话,
> >> A流来一条数据,可能会join到几分钟之前的数据,而此时的watermark其实已经大于了那条数据的事件时间。
> >>
> >> 我个人感觉,这应该就是在更实时的产生Join结果和导致数据时间晚于watermark之间,需要有一个balance。
> >> 现在默认实现是选择了更加实时的产生结果。当然还有另外一种实现思路,就是保证watermark不会超过数据时间,
> >> 那样的话,Join结果的产生就会delay,或者需要修改watermark逻辑,让watermark一定要小于当前能join到的数据
> >> 的时间最早的那个。
> >>
> >> 元始(Bob Hu) <65...@qq.com> 于2020年7月5日周日 下午8:48写道:
> >>
> >> > 谢谢您的解答。感觉flink这个机制有点奇怪呢
> >> >
> >> >
> >> > ------------------ 原始邮件 ------------------
> >> > *发件人:* "Benchao Li"<li...@apache.org>;
> >> > *发送时间:* 2020年7月5日(星期天) 中午11:58
> >> > *收件人:* "元始(Bob Hu)"<65...@qq.com>;
> >> > *抄送:* "user-zh"<us...@flink.apache.org>;
> >> > *主题:* Re: flink interval join后按窗口聚组问题
> >> >
> >> > 回到你的问题,我觉得你的观察是正确的。Time interval join产生的结果的确是会有这种情况。
> >> > 所以如果用事件时间的time interval join,后面再接一个事件时间的window(或者其他的使用事件时间的算子,比如CEP等)
> >> > 就会有些问题,很多数据被作为late数据直接丢掉了。
> >> >
> >> > 元始(Bob Hu) <65...@qq.com> 于2020年7月3日周五 下午3:29写道:
> >> >
> >> >> 您好,我想请教一个问题:
> >> >> flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
> >> >> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between
> >> a.rowtime
> >> >> and a.rowtime + INTERVAL '1' HOUR
> >> >> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime
> +
> >> >> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 +
> >> >> allowedLateness +
> >> >>
> >> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize,
> >> >> rightRelativeSize) +
> >> >>
> >>
> allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group
> >> >> by的时候这种右表数据为空的数据就丢掉了啊。
> >> >> flink版本 1.10.0。
> >> >>
> >> >> 下面是我的一段测试代码:
> >> >>
> >> >> import org.apache.commons.net.ntp.TimeStamp;
> >> >> import org.apache.flink.api.common.typeinfo.TypeInformation;
> >> >> import org.apache.flink.api.common.typeinfo.Types;
> >> >> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> >> >> import org.apache.flink.streaming.api.TimeCharacteristic;
> >> >> import org.apache.flink.streaming.api.datastream.DataStream;
> >> >> import
> >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> >> >> import
> >> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
> >> >> import org.apache.flink.streaming.api.functions.ProcessFunction;
> >> >> import
> org.apache.flink.streaming.api.functions.source.SourceFunction;
> >> >> import org.apache.flink.streaming.api.watermark.Watermark;
> >> >> import org.apache.flink.table.api.EnvironmentSettings;
> >> >> import org.apache.flink.table.api.Table;
> >> >> import org.apache.flink.table.api.java.StreamTableEnvironment;
> >> >> import org.apache.flink.table.functions.ScalarFunction;
> >> >> import org.apache.flink.types.Row;
> >> >> import org.apache.flink.util.Collector;
> >> >> import org.apache.flink.util.IOUtils;
> >> >>
> >> >> import java.io.BufferedReader;
> >> >> import java.io.InputStreamReader;
> >> >> import java.io.Serializable;
> >> >> import java.net.InetSocketAddress;
> >> >> import java.net.Socket;
> >> >> import java.sql.Timestamp;
> >> >> import java.text.SimpleDateFormat;
> >> >> import java.util.ArrayList;
> >> >> import java.util.Date;
> >> >> import java.util.List;
> >> >>
> >> >> public class TimeBoundedJoin {
> >> >>
> >> >> public static AssignerWithPeriodicWatermarks<Row>
> >> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) {
> >> >> AssignerWithPeriodicWatermarks<Row> timestampExtractor = new
> >> AssignerWithPeriodicWatermarks<Row>() {
> >> >> private long currentMaxTimestamp = 0;
> >> >> private long lastMaxTimestamp = 0;
> >> >> private long lastUpdateTime = 0;
> >> >> boolean firstWatermark = true;
> >> >> // Integer maxIdleTime = 30;
> >> >>
> >> >> @Override
> >> >> public Watermark getCurrentWatermark() {
> >> >> if(firstWatermark) {
> >> >> lastUpdateTime = System.currentTimeMillis();
> >> >> firstWatermark = false;
> >> >> }
> >> >> if(currentMaxTimestamp != lastMaxTimestamp) {
> >> >> lastMaxTimestamp = currentMaxTimestamp;
> >> >> lastUpdateTime = System.currentTimeMillis();
> >> >> }
> >> >> if(maxIdleTime != null && System.currentTimeMillis()
> -
> >> lastUpdateTime > maxIdleTime * 1000) {
> >> >> return new Watermark(new Date().getTime() -
> >> finalMaxOutOfOrderness * 1000);
> >> >> }
> >> >> return new Watermark(currentMaxTimestamp -
> >> finalMaxOutOfOrderness * 1000);
> >> >>
> >> >> }
> >> >>
> >> >> @Override
> >> >> public long extractTimestamp(Row row, long
> >> previousElementTimestamp) {
> >> >> Object value = row.getField(1);
> >> >> long timestamp;
> >> >> try {
> >> >> timestamp = (long)value;
> >> >> } catch (Exception e) {
> >> >> timestamp = ((Timestamp)value).getTime();
> >> >> }
> >> >> if(timestamp > currentMaxTimestamp) {
> >> >> currentMaxTimestamp = timestamp;
> >> >> }
> >> >> return timestamp;
> >> >> }
> >> >> };
> >> >> return timestampExtractor;
> >> >> }
> >> >>
> >> >> public static void main(String[] args) throws Exception {
> >> >> StreamExecutionEnvironment bsEnv =
> >> StreamExecutionEnvironment.getExecutionEnvironment();
> >> >> EnvironmentSettings bsSettings =
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> >> >> StreamTableEnvironment bsTableEnv =
> >> StreamTableEnvironment.create(bsEnv, bsSettings);
> >> >> bsEnv.setParallelism(1);
> >> >>
> >> bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >> >>
> >> >>
> >> >> // DataStream<Row> ds1 =
> bsEnv.addSource(sourceFunction(9000));
> >> >> SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd
> >> HH:mm:ss");
> >> >> List<Row> list = new ArrayList<>();
> >> >> list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13
> >> 00:00:00").getTime()), 100));
> >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> >> 00:20:00").getTime()), 100));
> >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> >> 00:40:00").getTime()), 100));
> >> >> list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13
> >> 01:00:01").getTime()), 100));
> >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> >> 02:20:00").getTime()), 100));
> >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> >> 02:30:00").getTime()), 100));
> >> >> list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13
> >> 02:00:02").getTime()), 100));
> >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> >> 02:20:00").getTime()), 100));
> >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> >> 02:40:00").getTime()), 100));
> >> >> list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13
> >> 03:00:03").getTime()), 100));
> >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> >> 03:20:00").getTime()), 100));
> >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> >> 03:40:00").getTime()), 100));
> >> >> list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13
> >> 04:00:04").getTime()), 100));
> >> >> DataStream<Row> ds1 = bsEnv.addSource(new
> >> SourceFunction<Row>() {
> >> >> @Override
> >> >> public void run(SourceContext<Row> ctx) throws Exception
> {
> >> >> for(Row row : list) {
> >> >> ctx.collect(row);
> >> >> Thread.sleep(1000);
> >> >> }
> >> >>
> >> >> }
> >> >>
> >> >> @Override
> >> >> public void cancel() {
> >> >>
> >> >> }
> >> >> });
> >> >> ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null,
> 0));
> >> >> ds1.getTransformation().setOutputType((new
> >> RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT)));
> >> >> bsTableEnv.createTemporaryView("order_info", ds1, "order_id,
> >> order_time, fee, rowtime.rowtime");
> >> >>
> >> >> List<Row> list2 = new ArrayList<>();
> >> >> list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13
> >> 01:00:00").getTime())));
> >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> >> 01:20:00").getTime())));
> >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> >> 01:30:00").getTime())));
> >> >> list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13
> >> 02:00:00").getTime())));
> >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> >> 02:20:00").getTime())));
> >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> >> 02:40:00").getTime())));
> >> >> // list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13
> >> 03:00:03").getTime())));
> >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> >> 03:20:00").getTime())));
> >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> >> 03:40:00").getTime())));
> >> >> list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13
> >> 04:00:00").getTime())));
> >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> >> 04:20:00").getTime())));
> >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> >> 04:40:00").getTime())));
> >> >> list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13
> >> 05:00:00").getTime())));
> >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> >> 05:20:00").getTime())));
> >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> >> 05:40:00").getTime())));
> >> >> DataStream<Row> ds2 = bsEnv.addSource(new
> >> SourceFunction<Row>() {
> >> >> @Override
> >> >> public void run(SourceContext<Row> ctx) throws Exception
> {
> >> >> for(Row row : list2) {
> >> >> ctx.collect(row);
> >> >> Thread.sleep(1000);
> >> >> }
> >> >>
> >> >> }
> >> >>
> >> >> @Override
> >> >> public void cancel() {
> >> >>
> >> >> }
> >> >> });
> >> >> ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null,
> 0));
> >> >> ds2.getTransformation().setOutputType((new
> >> RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP)));
> >> >> bsTableEnv.createTemporaryView("pay", ds2, "order_id,
> >> pay_time, rowtime.rowtime");
> >> >>
> >> >> Table joinTable = bsTableEnv.sqlQuery("SELECT a.*,b.order_id
> >> from order_info a left join pay b on a.order_id=b.order_id and b.rowtime
> >> between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id
> >> <>'000' ");
> >> >>
> >> >> bsTableEnv.toAppendStream(joinTable, Row.class).process(new
> >> ProcessFunction<Row, Object>() {
> >> >> @Override
> >> >> public void processElement(Row value, Context ctx,
> >> Collector<Object> out) throws Exception {
> >> >> SimpleDateFormat sdf = new
> >> SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
> >> >> System.err.println("row:" + value + ",rowtime:" +
> >> value.getField(3) + ",watermark:" +
> >> sdf.format(ctx.timerService().currentWatermark()));
> >> >> }
> >> >> });
> >> >>
> >> >> bsTableEnv.execute("job");
> >> >> }
> >> >> }
> >> >>
> >> >>
> >> >
> >> > --
> >> >
> >> > Best,
> >> > Benchao Li
> >> >
> >>
> >>
> >> --
> >>
> >> Best,
> >> Benchao Li
> >>
> >
> >
> > --
> > **************************************
> > tivanli
> > **************************************
> >
>
>
> --
> **************************************
> tivanli
> **************************************
>
Re: flink interval join后按窗口聚组问题
Posted by Tianwang Li <li...@gmail.com>.
展开讨论一些特点从场景。
1、inner join场景。有什么办法取两条流的的rowtime 的max吗?
使用SQL语句的场合,怎么实现?
例如:
SELECT if(left.rowtime > right.rowtime, left.rowtime, right.rowtime) as
rowtime, ...
如果支持了,那么这种场景我们还是可以在下游进行窗口计算和CEP之类的计算。
Tianwang Li <li...@gmail.com> 于2020年8月16日周日 上午10:40写道:
> 展开讨论一些特点场景。
>
> Benchao Li <li...@apache.org> 于2020年7月6日周一 下午11:08写道:
>
>> 我们最开始发现这个现象的时候也有些惊讶,不过后来想了一下感觉也是合理的。
>>
>> 因为双流Join的时间范围有可能会比较大,比如 A流 在 B流的[-10min, +10min],那这样的话,
>> A流来一条数据,可能会join到几分钟之前的数据,而此时的watermark其实已经大于了那条数据的事件时间。
>>
>> 我个人感觉,这应该就是在更实时的产生Join结果和导致数据时间晚于watermark之间,需要有一个balance。
>> 现在默认实现是选择了更加实时的产生结果。当然还有另外一种实现思路,就是保证watermark不会超过数据时间,
>> 那样的话,Join结果的产生就会delay,或者需要修改watermark逻辑,让watermark一定要小于当前能join到的数据
>> 的时间最早的那个。
>>
>> 元始(Bob Hu) <65...@qq.com> 于2020年7月5日周日 下午8:48写道:
>>
>> > 谢谢您的解答。感觉flink这个机制有点奇怪呢
>> >
>> >
>> > ------------------ 原始邮件 ------------------
>> > *发件人:* "Benchao Li"<li...@apache.org>;
>> > *发送时间:* 2020年7月5日(星期天) 中午11:58
>> > *收件人:* "元始(Bob Hu)"<65...@qq.com>;
>> > *抄送:* "user-zh"<us...@flink.apache.org>;
>> > *主题:* Re: flink interval join后按窗口聚组问题
>> >
>> > 回到你的问题,我觉得你的观察是正确的。Time interval join产生的结果的确是会有这种情况。
>> > 所以如果用事件时间的time interval join,后面再接一个事件时间的window(或者其他的使用事件时间的算子,比如CEP等)
>> > 就会有些问题,很多数据被作为late数据直接丢掉了。
>> >
>> > 元始(Bob Hu) <65...@qq.com> 于2020年7月3日周五 下午3:29写道:
>> >
>> >> 您好,我想请教一个问题:
>> >> flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
>> >> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between
>> a.rowtime
>> >> and a.rowtime + INTERVAL '1' HOUR
>> >> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime +
>> >> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 +
>> >> allowedLateness +
>> >>
>> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize,
>> >> rightRelativeSize) +
>> >>
>> allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group
>> >> by的时候这种右表数据为空的数据就丢掉了啊。
>> >> flink版本 1.10.0。
>> >>
>> >> 下面是我的一段测试代码:
>> >>
>> >> import org.apache.commons.net.ntp.TimeStamp;
>> >> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> >> import org.apache.flink.api.common.typeinfo.Types;
>> >> import org.apache.flink.api.java.typeutils.RowTypeInfo;
>> >> import org.apache.flink.streaming.api.TimeCharacteristic;
>> >> import org.apache.flink.streaming.api.datastream.DataStream;
>> >> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> >> import
>> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
>> >> import org.apache.flink.streaming.api.functions.ProcessFunction;
>> >> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>> >> import org.apache.flink.streaming.api.watermark.Watermark;
>> >> import org.apache.flink.table.api.EnvironmentSettings;
>> >> import org.apache.flink.table.api.Table;
>> >> import org.apache.flink.table.api.java.StreamTableEnvironment;
>> >> import org.apache.flink.table.functions.ScalarFunction;
>> >> import org.apache.flink.types.Row;
>> >> import org.apache.flink.util.Collector;
>> >> import org.apache.flink.util.IOUtils;
>> >>
>> >> import java.io.BufferedReader;
>> >> import java.io.InputStreamReader;
>> >> import java.io.Serializable;
>> >> import java.net.InetSocketAddress;
>> >> import java.net.Socket;
>> >> import java.sql.Timestamp;
>> >> import java.text.SimpleDateFormat;
>> >> import java.util.ArrayList;
>> >> import java.util.Date;
>> >> import java.util.List;
>> >>
>> >> public class TimeBoundedJoin {
>> >>
>> >> public static AssignerWithPeriodicWatermarks<Row>
>> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) {
>> >> AssignerWithPeriodicWatermarks<Row> timestampExtractor = new
>> AssignerWithPeriodicWatermarks<Row>() {
>> >> private long currentMaxTimestamp = 0;
>> >> private long lastMaxTimestamp = 0;
>> >> private long lastUpdateTime = 0;
>> >> boolean firstWatermark = true;
>> >> // Integer maxIdleTime = 30;
>> >>
>> >> @Override
>> >> public Watermark getCurrentWatermark() {
>> >> if(firstWatermark) {
>> >> lastUpdateTime = System.currentTimeMillis();
>> >> firstWatermark = false;
>> >> }
>> >> if(currentMaxTimestamp != lastMaxTimestamp) {
>> >> lastMaxTimestamp = currentMaxTimestamp;
>> >> lastUpdateTime = System.currentTimeMillis();
>> >> }
>> >> if(maxIdleTime != null && System.currentTimeMillis() -
>> lastUpdateTime > maxIdleTime * 1000) {
>> >> return new Watermark(new Date().getTime() -
>> finalMaxOutOfOrderness * 1000);
>> >> }
>> >> return new Watermark(currentMaxTimestamp -
>> finalMaxOutOfOrderness * 1000);
>> >>
>> >> }
>> >>
>> >> @Override
>> >> public long extractTimestamp(Row row, long
>> previousElementTimestamp) {
>> >> Object value = row.getField(1);
>> >> long timestamp;
>> >> try {
>> >> timestamp = (long)value;
>> >> } catch (Exception e) {
>> >> timestamp = ((Timestamp)value).getTime();
>> >> }
>> >> if(timestamp > currentMaxTimestamp) {
>> >> currentMaxTimestamp = timestamp;
>> >> }
>> >> return timestamp;
>> >> }
>> >> };
>> >> return timestampExtractor;
>> >> }
>> >>
>> >> public static void main(String[] args) throws Exception {
>> >> StreamExecutionEnvironment bsEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> >> EnvironmentSettings bsSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> >> StreamTableEnvironment bsTableEnv =
>> StreamTableEnvironment.create(bsEnv, bsSettings);
>> >> bsEnv.setParallelism(1);
>> >>
>> bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> >>
>> >>
>> >> // DataStream<Row> ds1 = bsEnv.addSource(sourceFunction(9000));
>> >> SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd
>> HH:mm:ss");
>> >> List<Row> list = new ArrayList<>();
>> >> list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13
>> 00:00:00").getTime()), 100));
>> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
>> 00:20:00").getTime()), 100));
>> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
>> 00:40:00").getTime()), 100));
>> >> list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13
>> 01:00:01").getTime()), 100));
>> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
>> 02:20:00").getTime()), 100));
>> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
>> 02:30:00").getTime()), 100));
>> >> list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13
>> 02:00:02").getTime()), 100));
>> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
>> 02:20:00").getTime()), 100));
>> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
>> 02:40:00").getTime()), 100));
>> >> list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13
>> 03:00:03").getTime()), 100));
>> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
>> 03:20:00").getTime()), 100));
>> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
>> 03:40:00").getTime()), 100));
>> >> list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13
>> 04:00:04").getTime()), 100));
>> >> DataStream<Row> ds1 = bsEnv.addSource(new
>> SourceFunction<Row>() {
>> >> @Override
>> >> public void run(SourceContext<Row> ctx) throws Exception {
>> >> for(Row row : list) {
>> >> ctx.collect(row);
>> >> Thread.sleep(1000);
>> >> }
>> >>
>> >> }
>> >>
>> >> @Override
>> >> public void cancel() {
>> >>
>> >> }
>> >> });
>> >> ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, 0));
>> >> ds1.getTransformation().setOutputType((new
>> RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT)));
>> >> bsTableEnv.createTemporaryView("order_info", ds1, "order_id,
>> order_time, fee, rowtime.rowtime");
>> >>
>> >> List<Row> list2 = new ArrayList<>();
>> >> list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13
>> 01:00:00").getTime())));
>> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
>> 01:20:00").getTime())));
>> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
>> 01:30:00").getTime())));
>> >> list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13
>> 02:00:00").getTime())));
>> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
>> 02:20:00").getTime())));
>> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
>> 02:40:00").getTime())));
>> >> // list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13
>> 03:00:03").getTime())));
>> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
>> 03:20:00").getTime())));
>> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
>> 03:40:00").getTime())));
>> >> list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13
>> 04:00:00").getTime())));
>> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
>> 04:20:00").getTime())));
>> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
>> 04:40:00").getTime())));
>> >> list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13
>> 05:00:00").getTime())));
>> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
>> 05:20:00").getTime())));
>> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
>> 05:40:00").getTime())));
>> >> DataStream<Row> ds2 = bsEnv.addSource(new
>> SourceFunction<Row>() {
>> >> @Override
>> >> public void run(SourceContext<Row> ctx) throws Exception {
>> >> for(Row row : list2) {
>> >> ctx.collect(row);
>> >> Thread.sleep(1000);
>> >> }
>> >>
>> >> }
>> >>
>> >> @Override
>> >> public void cancel() {
>> >>
>> >> }
>> >> });
>> >> ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0));
>> >> ds2.getTransformation().setOutputType((new
>> RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP)));
>> >> bsTableEnv.createTemporaryView("pay", ds2, "order_id,
>> pay_time, rowtime.rowtime");
>> >>
>> >> Table joinTable = bsTableEnv.sqlQuery("SELECT a.*,b.order_id
>> from order_info a left join pay b on a.order_id=b.order_id and b.rowtime
>> between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id
>> <>'000' ");
>> >>
>> >> bsTableEnv.toAppendStream(joinTable, Row.class).process(new
>> ProcessFunction<Row, Object>() {
>> >> @Override
>> >> public void processElement(Row value, Context ctx,
>> Collector<Object> out) throws Exception {
>> >> SimpleDateFormat sdf = new
>> SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
>> >> System.err.println("row:" + value + ",rowtime:" +
>> value.getField(3) + ",watermark:" +
>> sdf.format(ctx.timerService().currentWatermark()));
>> >> }
>> >> });
>> >>
>> >> bsTableEnv.execute("job");
>> >> }
>> >> }
>> >>
>> >>
>> >
>> > --
>> >
>> > Best,
>> > Benchao Li
>> >
>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>
>
> --
> **************************************
> tivanli
> **************************************
>
--
**************************************
tivanli
**************************************
Re: flink interval join后按窗口聚组问题
Posted by Tianwang Li <li...@gmail.com>.
展开讨论一些特点场景。
Benchao Li <li...@apache.org> 于2020年7月6日周一 下午11:08写道:
> 我们最开始发现这个现象的时候也有些惊讶,不过后来想了一下感觉也是合理的。
>
> 因为双流Join的时间范围有可能会比较大,比如 A流 在 B流的[-10min, +10min],那这样的话,
> A流来一条数据,可能会join到几分钟之前的数据,而此时的watermark其实已经大于了那条数据的事件时间。
>
> 我个人感觉,这应该就是在更实时的产生Join结果和导致数据时间晚于watermark之间,需要有一个balance。
> 现在默认实现是选择了更加实时的产生结果。当然还有另外一种实现思路,就是保证watermark不会超过数据时间,
> 那样的话,Join结果的产生就会delay,或者需要修改watermark逻辑,让watermark一定要小于当前能join到的数据
> 的时间最早的那个。
>
> 元始(Bob Hu) <65...@qq.com> 于2020年7月5日周日 下午8:48写道:
>
> > 谢谢您的解答。感觉flink这个机制有点奇怪呢
> >
> >
> > ------------------ 原始邮件 ------------------
> > *发件人:* "Benchao Li"<li...@apache.org>;
> > *发送时间:* 2020年7月5日(星期天) 中午11:58
> > *收件人:* "元始(Bob Hu)"<65...@qq.com>;
> > *抄送:* "user-zh"<us...@flink.apache.org>;
> > *主题:* Re: flink interval join后按窗口聚组问题
> >
> > 回到你的问题,我觉得你的观察是正确的。Time interval join产生的结果的确是会有这种情况。
> > 所以如果用事件时间的time interval join,后面再接一个事件时间的window(或者其他的使用事件时间的算子,比如CEP等)
> > 就会有些问题,很多数据被作为late数据直接丢掉了。
> >
> > 元始(Bob Hu) <65...@qq.com> 于2020年7月3日周五 下午3:29写道:
> >
> >> 您好,我想请教一个问题:
> >> flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
> >> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between
> a.rowtime
> >> and a.rowtime + INTERVAL '1' HOUR
> >> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime +
> >> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 +
> >> allowedLateness +
> >> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize,
> >> rightRelativeSize) +
> >>
> allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group
> >> by的时候这种右表数据为空的数据就丢掉了啊。
> >> flink版本 1.10.0。
> >>
> >> 下面是我的一段测试代码:
> >>
> >> import org.apache.commons.net.ntp.TimeStamp;
> >> import org.apache.flink.api.common.typeinfo.TypeInformation;
> >> import org.apache.flink.api.common.typeinfo.Types;
> >> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> >> import org.apache.flink.streaming.api.TimeCharacteristic;
> >> import org.apache.flink.streaming.api.datastream.DataStream;
> >> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> >> import
> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
> >> import org.apache.flink.streaming.api.functions.ProcessFunction;
> >> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> >> import org.apache.flink.streaming.api.watermark.Watermark;
> >> import org.apache.flink.table.api.EnvironmentSettings;
> >> import org.apache.flink.table.api.Table;
> >> import org.apache.flink.table.api.java.StreamTableEnvironment;
> >> import org.apache.flink.table.functions.ScalarFunction;
> >> import org.apache.flink.types.Row;
> >> import org.apache.flink.util.Collector;
> >> import org.apache.flink.util.IOUtils;
> >>
> >> import java.io.BufferedReader;
> >> import java.io.InputStreamReader;
> >> import java.io.Serializable;
> >> import java.net.InetSocketAddress;
> >> import java.net.Socket;
> >> import java.sql.Timestamp;
> >> import java.text.SimpleDateFormat;
> >> import java.util.ArrayList;
> >> import java.util.Date;
> >> import java.util.List;
> >>
> >> public class TimeBoundedJoin {
> >>
> >> public static AssignerWithPeriodicWatermarks<Row>
> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) {
> >> AssignerWithPeriodicWatermarks<Row> timestampExtractor = new
> AssignerWithPeriodicWatermarks<Row>() {
> >> private long currentMaxTimestamp = 0;
> >> private long lastMaxTimestamp = 0;
> >> private long lastUpdateTime = 0;
> >> boolean firstWatermark = true;
> >> // Integer maxIdleTime = 30;
> >>
> >> @Override
> >> public Watermark getCurrentWatermark() {
> >> if(firstWatermark) {
> >> lastUpdateTime = System.currentTimeMillis();
> >> firstWatermark = false;
> >> }
> >> if(currentMaxTimestamp != lastMaxTimestamp) {
> >> lastMaxTimestamp = currentMaxTimestamp;
> >> lastUpdateTime = System.currentTimeMillis();
> >> }
> >> if(maxIdleTime != null && System.currentTimeMillis() -
> lastUpdateTime > maxIdleTime * 1000) {
> >> return new Watermark(new Date().getTime() -
> finalMaxOutOfOrderness * 1000);
> >> }
> >> return new Watermark(currentMaxTimestamp -
> finalMaxOutOfOrderness * 1000);
> >>
> >> }
> >>
> >> @Override
> >> public long extractTimestamp(Row row, long
> previousElementTimestamp) {
> >> Object value = row.getField(1);
> >> long timestamp;
> >> try {
> >> timestamp = (long)value;
> >> } catch (Exception e) {
> >> timestamp = ((Timestamp)value).getTime();
> >> }
> >> if(timestamp > currentMaxTimestamp) {
> >> currentMaxTimestamp = timestamp;
> >> }
> >> return timestamp;
> >> }
> >> };
> >> return timestampExtractor;
> >> }
> >>
> >> public static void main(String[] args) throws Exception {
> >> StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> >> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> >> StreamTableEnvironment bsTableEnv =
> StreamTableEnvironment.create(bsEnv, bsSettings);
> >> bsEnv.setParallelism(1);
> >> bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >>
> >>
> >> // DataStream<Row> ds1 = bsEnv.addSource(sourceFunction(9000));
> >> SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd
> HH:mm:ss");
> >> List<Row> list = new ArrayList<>();
> >> list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13
> 00:00:00").getTime()), 100));
> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> 00:20:00").getTime()), 100));
> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> 00:40:00").getTime()), 100));
> >> list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13
> 01:00:01").getTime()), 100));
> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> 02:20:00").getTime()), 100));
> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> 02:30:00").getTime()), 100));
> >> list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13
> 02:00:02").getTime()), 100));
> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> 02:20:00").getTime()), 100));
> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> 02:40:00").getTime()), 100));
> >> list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13
> 03:00:03").getTime()), 100));
> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> 03:20:00").getTime()), 100));
> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> 03:40:00").getTime()), 100));
> >> list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13
> 04:00:04").getTime()), 100));
> >> DataStream<Row> ds1 = bsEnv.addSource(new SourceFunction<Row>()
> {
> >> @Override
> >> public void run(SourceContext<Row> ctx) throws Exception {
> >> for(Row row : list) {
> >> ctx.collect(row);
> >> Thread.sleep(1000);
> >> }
> >>
> >> }
> >>
> >> @Override
> >> public void cancel() {
> >>
> >> }
> >> });
> >> ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, 0));
> >> ds1.getTransformation().setOutputType((new
> RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT)));
> >> bsTableEnv.createTemporaryView("order_info", ds1, "order_id,
> order_time, fee, rowtime.rowtime");
> >>
> >> List<Row> list2 = new ArrayList<>();
> >> list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13
> 01:00:00").getTime())));
> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> 01:20:00").getTime())));
> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> 01:30:00").getTime())));
> >> list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13
> 02:00:00").getTime())));
> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> 02:20:00").getTime())));
> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> 02:40:00").getTime())));
> >> // list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13
> 03:00:03").getTime())));
> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> 03:20:00").getTime())));
> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> 03:40:00").getTime())));
> >> list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13
> 04:00:00").getTime())));
> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> 04:20:00").getTime())));
> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> 04:40:00").getTime())));
> >> list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13
> 05:00:00").getTime())));
> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> 05:20:00").getTime())));
> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> 05:40:00").getTime())));
> >> DataStream<Row> ds2 = bsEnv.addSource(new SourceFunction<Row>()
> {
> >> @Override
> >> public void run(SourceContext<Row> ctx) throws Exception {
> >> for(Row row : list2) {
> >> ctx.collect(row);
> >> Thread.sleep(1000);
> >> }
> >>
> >> }
> >>
> >> @Override
> >> public void cancel() {
> >>
> >> }
> >> });
> >> ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0));
> >> ds2.getTransformation().setOutputType((new
> RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP)));
> >> bsTableEnv.createTemporaryView("pay", ds2, "order_id, pay_time,
> rowtime.rowtime");
> >>
> >> Table joinTable = bsTableEnv.sqlQuery("SELECT a.*,b.order_id
> from order_info a left join pay b on a.order_id=b.order_id and b.rowtime
> between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id
> <>'000' ");
> >>
> >> bsTableEnv.toAppendStream(joinTable, Row.class).process(new
> ProcessFunction<Row, Object>() {
> >> @Override
> >> public void processElement(Row value, Context ctx,
> Collector<Object> out) throws Exception {
> >> SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd
> HH:mm:ss.SSS");
> >> System.err.println("row:" + value + ",rowtime:" +
> value.getField(3) + ",watermark:" +
> sdf.format(ctx.timerService().currentWatermark()));
> >> }
> >> });
> >>
> >> bsTableEnv.execute("job");
> >> }
> >> }
> >>
> >>
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>
>
> --
>
> Best,
> Benchao Li
>
--
**************************************
tivanli
**************************************
Re: flink interval join后按窗口聚组问题
Posted by Benchao Li <li...@apache.org>.
我们最开始发现这个现象的时候也有些惊讶,不过后来想了一下感觉也是合理的。
因为双流Join的时间范围有可能会比较大,比如 A流 在 B流的[-10min, +10min],那这样的话,
A流来一条数据,可能会join到几分钟之前的数据,而此时的watermark其实已经大于了那条数据的事件时间。
我个人感觉,这应该就是在更实时的产生Join结果和导致数据时间晚于watermark之间,需要有一个balance。
现在默认实现是选择了更加实时的产生结果。当然还有另外一种实现思路,就是保证watermark不会超过数据时间,
那样的话,Join结果的产生就会delay,或者需要修改watermark逻辑,让watermark一定要小于当前能join到的数据
的时间最早的那个。
元始(Bob Hu) <65...@qq.com> 于2020年7月5日周日 下午8:48写道:
> 谢谢您的解答。感觉flink这个机制有点奇怪呢
>
>
> ------------------ 原始邮件 ------------------
> *发件人:* "Benchao Li"<li...@apache.org>;
> *发送时间:* 2020年7月5日(星期天) 中午11:58
> *收件人:* "元始(Bob Hu)"<65...@qq.com>;
> *抄送:* "user-zh"<us...@flink.apache.org>;
> *主题:* Re: flink interval join后按窗口聚组问题
>
> 回到你的问题,我觉得你的观察是正确的。Time interval join产生的结果的确是会有这种情况。
> 所以如果用事件时间的time interval join,后面再接一个事件时间的window(或者其他的使用事件时间的算子,比如CEP等)
> 就会有些问题,很多数据被作为late数据直接丢掉了。
>
> 元始(Bob Hu) <65...@qq.com> 于2020年7月3日周五 下午3:29写道:
>
>> 您好,我想请教一个问题:
>> flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
>> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between a.rowtime
>> and a.rowtime + INTERVAL '1' HOUR
>> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime +
>> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 +
>> allowedLateness +
>> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize,
>> rightRelativeSize) +
>> allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group
>> by的时候这种右表数据为空的数据就丢掉了啊。
>> flink版本 1.10.0。
>>
>> 下面是我的一段测试代码:
>>
>> import org.apache.commons.net.ntp.TimeStamp;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.common.typeinfo.Types;
>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
>> import org.apache.flink.streaming.api.functions.ProcessFunction;
>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>> import org.apache.flink.streaming.api.watermark.Watermark;
>> import org.apache.flink.table.api.EnvironmentSettings;
>> import org.apache.flink.table.api.Table;
>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>> import org.apache.flink.table.functions.ScalarFunction;
>> import org.apache.flink.types.Row;
>> import org.apache.flink.util.Collector;
>> import org.apache.flink.util.IOUtils;
>>
>> import java.io.BufferedReader;
>> import java.io.InputStreamReader;
>> import java.io.Serializable;
>> import java.net.InetSocketAddress;
>> import java.net.Socket;
>> import java.sql.Timestamp;
>> import java.text.SimpleDateFormat;
>> import java.util.ArrayList;
>> import java.util.Date;
>> import java.util.List;
>>
>> public class TimeBoundedJoin {
>>
>> public static AssignerWithPeriodicWatermarks<Row> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) {
>> AssignerWithPeriodicWatermarks<Row> timestampExtractor = new AssignerWithPeriodicWatermarks<Row>() {
>> private long currentMaxTimestamp = 0;
>> private long lastMaxTimestamp = 0;
>> private long lastUpdateTime = 0;
>> boolean firstWatermark = true;
>> // Integer maxIdleTime = 30;
>>
>> @Override
>> public Watermark getCurrentWatermark() {
>> if(firstWatermark) {
>> lastUpdateTime = System.currentTimeMillis();
>> firstWatermark = false;
>> }
>> if(currentMaxTimestamp != lastMaxTimestamp) {
>> lastMaxTimestamp = currentMaxTimestamp;
>> lastUpdateTime = System.currentTimeMillis();
>> }
>> if(maxIdleTime != null && System.currentTimeMillis() - lastUpdateTime > maxIdleTime * 1000) {
>> return new Watermark(new Date().getTime() - finalMaxOutOfOrderness * 1000);
>> }
>> return new Watermark(currentMaxTimestamp - finalMaxOutOfOrderness * 1000);
>>
>> }
>>
>> @Override
>> public long extractTimestamp(Row row, long previousElementTimestamp) {
>> Object value = row.getField(1);
>> long timestamp;
>> try {
>> timestamp = (long)value;
>> } catch (Exception e) {
>> timestamp = ((Timestamp)value).getTime();
>> }
>> if(timestamp > currentMaxTimestamp) {
>> currentMaxTimestamp = timestamp;
>> }
>> return timestamp;
>> }
>> };
>> return timestampExtractor;
>> }
>>
>> public static void main(String[] args) throws Exception {
>> StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>> EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
>> bsEnv.setParallelism(1);
>> bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>>
>> // DataStream<Row> ds1 = bsEnv.addSource(sourceFunction(9000));
>> SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
>> List<Row> list = new ArrayList<>();
>> list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 00:00:00").getTime()), 100));
>> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:20:00").getTime()), 100));
>> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:40:00").getTime()), 100));
>> list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 01:00:01").getTime()), 100));
>> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
>> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:30:00").getTime()), 100));
>> list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 02:00:02").getTime()), 100));
>> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
>> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime()), 100));
>> list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime()), 100));
>> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime()), 100));
>> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime()), 100));
>> list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 04:00:04").getTime()), 100));
>> DataStream<Row> ds1 = bsEnv.addSource(new SourceFunction<Row>() {
>> @Override
>> public void run(SourceContext<Row> ctx) throws Exception {
>> for(Row row : list) {
>> ctx.collect(row);
>> Thread.sleep(1000);
>> }
>>
>> }
>>
>> @Override
>> public void cancel() {
>>
>> }
>> });
>> ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, 0));
>> ds1.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT)));
>> bsTableEnv.createTemporaryView("order_info", ds1, "order_id, order_time, fee, rowtime.rowtime");
>>
>> List<Row> list2 = new ArrayList<>();
>> list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 01:00:00").getTime())));
>> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:20:00").getTime())));
>> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:30:00").getTime())));
>> list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 02:00:00").getTime())));
>> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime())));
>> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime())));
>> // list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime())));
>> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime())));
>> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime())));
>> list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 04:00:00").getTime())));
>> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:20:00").getTime())));
>> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:40:00").getTime())));
>> list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 05:00:00").getTime())));
>> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:20:00").getTime())));
>> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:40:00").getTime())));
>> DataStream<Row> ds2 = bsEnv.addSource(new SourceFunction<Row>() {
>> @Override
>> public void run(SourceContext<Row> ctx) throws Exception {
>> for(Row row : list2) {
>> ctx.collect(row);
>> Thread.sleep(1000);
>> }
>>
>> }
>>
>> @Override
>> public void cancel() {
>>
>> }
>> });
>> ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0));
>> ds2.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP)));
>> bsTableEnv.createTemporaryView("pay", ds2, "order_id, pay_time, rowtime.rowtime");
>>
>> Table joinTable = bsTableEnv.sqlQuery("SELECT a.*,b.order_id from order_info a left join pay b on a.order_id=b.order_id and b.rowtime between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id <>'000' ");
>>
>> bsTableEnv.toAppendStream(joinTable, Row.class).process(new ProcessFunction<Row, Object>() {
>> @Override
>> public void processElement(Row value, Context ctx, Collector<Object> out) throws Exception {
>> SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
>> System.err.println("row:" + value + ",rowtime:" + value.getField(3) + ",watermark:" + sdf.format(ctx.timerService().currentWatermark()));
>> }
>> });
>>
>> bsTableEnv.execute("job");
>> }
>> }
>>
>>
>
> --
>
> Best,
> Benchao Li
>
--
Best,
Benchao Li
回复: flink interval join后按窗口聚组问题
Posted by "元始(Bob Hu)" <65...@qq.com>.
谢谢您的解答。感觉flink这个机制有点奇怪呢
------------------ 原始邮件 ------------------
发件人: "Benchao Li"<libenchao@apache.org>;
发送时间: 2020年7月5日(星期天) 中午11:58
收件人: "元始(Bob Hu)"<657390448@qq.com>;
抄送: "user-zh"<user-zh@flink.apache.org>;
主题: Re: flink interval join后按窗口聚组问题
回到你的问题,我觉得你的观察是正确的。Time interval join产生的结果的确是会有这种情况。所以如果用事件时间的time interval join,后面再接一个事件时间的window(或者其他的使用事件时间的算子,比如CEP等)
就会有些问题,很多数据被作为late数据直接丢掉了。
元始(Bob Hu) <657390448@qq.com> 于2020年7月3日周五 下午3:29写道:
您好,我想请教一个问题:
flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
比如关联条件是select * from a,b where a.id=b.id and b.rowtime between a.rowtime and a.rowtime + INTERVAL '1' HOUR ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime + leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 + allowedLateness + 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize, rightRelativeSize) + allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group by的时候这种右表数据为空的数据就丢掉了啊。
flink版本 1.10.0。
下面是我的一段测试代码:
import org.apache.commons.net.ntp.TimeStamp;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.IOUtils;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
public class TimeBoundedJoin {
public static AssignerWithPeriodicWatermarks<Row> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) {
AssignerWithPeriodicWatermarks<Row> timestampExtractor = new AssignerWithPeriodicWatermarks<Row>() {
private long currentMaxTimestamp = 0;
private long lastMaxTimestamp = 0;
private long lastUpdateTime = 0;
boolean firstWatermark = true;
// Integer maxIdleTime = 30;
@Override
public Watermark getCurrentWatermark() {
if(firstWatermark) {
lastUpdateTime = System.currentTimeMillis();
firstWatermark = false;
}
if(currentMaxTimestamp != lastMaxTimestamp) {
lastMaxTimestamp = currentMaxTimestamp;
lastUpdateTime = System.currentTimeMillis();
}
if(maxIdleTime != null && System.currentTimeMillis() - lastUpdateTime > maxIdleTime * 1000) {
return new Watermark(new Date().getTime() - finalMaxOutOfOrderness * 1000);
}
return new Watermark(currentMaxTimestamp - finalMaxOutOfOrderness * 1000);
}
@Override
public long extractTimestamp(Row row, long previousElementTimestamp) {
Object value = row.getField(1);
long timestamp;
try {
timestamp = (long)value;
} catch (Exception e) {
timestamp = ((Timestamp)value).getTime();
}
if(timestamp > currentMaxTimestamp) {
currentMaxTimestamp = timestamp;
}
return timestamp;
}
};
return timestampExtractor;
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
bsEnv.setParallelism(1);
bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// DataStream<Row> ds1 = bsEnv.addSource(sourceFunction(9000));
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
List<Row> list = new ArrayList<>();
list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 00:00:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:20:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:40:00").getTime()), 100));
list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 01:00:01").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:30:00").getTime()), 100));
list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 02:00:02").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime()), 100));
list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime()), 100));
list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 04:00:04").getTime()), 100));
DataStream<Row> ds1 = bsEnv.addSource(new SourceFunction<Row>() {
@Override
public void run(SourceContext<Row> ctx) throws Exception {
for(Row row : list) {
ctx.collect(row);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
}
});
ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, 0));
ds1.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT)));
bsTableEnv.createTemporaryView("order_info", ds1, "order_id, order_time, fee, rowtime.rowtime");
List<Row> list2 = new ArrayList<>();
list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 01:00:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:30:00").getTime())));
list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 02:00:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime())));
// list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime())));
list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 04:00:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:40:00").getTime())));
list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 05:00:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:40:00").getTime())));
DataStream<Row> ds2 = bsEnv.addSource(new SourceFunction<Row>() {
@Override
public void run(SourceContext<Row> ctx) throws Exception {
for(Row row : list2) {
ctx.collect(row);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
}
});
ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0));
ds2.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP)));
bsTableEnv.createTemporaryView("pay", ds2, "order_id, pay_time, rowtime.rowtime");
Table joinTable = bsTableEnv.sqlQuery("SELECT a.*,b.order_id from order_info a left join pay b on a.order_id=b.order_id and b.rowtime between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id <>'000' ");
bsTableEnv.toAppendStream(joinTable, Row.class).process(new ProcessFunction<Row, Object>() {
@Override
public void processElement(Row value, Context ctx, Collector<Object> out) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
System.err.println("row:" + value + ",rowtime:" + value.getField(3) + ",watermark:" + sdf.format(ctx.timerService().currentWatermark()));
}
});
bsTableEnv.execute("job");
}
}
--
Best,
Benchao Li