You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "Yuan,Youjun" <yu...@baidu.com> on 2019/12/12 01:28:18 UTC

回复: flink持续查询过去30分钟登录网站的人数

首先通过一个自定义表函数(table function),将每条输入的消息变成3条消息,假设输入消息的时间为ts,产出的三条消息时间分别为:(ts-1, 0), (ts+1, 1), (ts+31, 0),
然后再用30分钟的range over窗口对前面的数值部分(0或者1)求SUM

袁尤军

-----邮件原件-----
发件人: 陈帅 <ca...@gmail.com> 
发送时间: Wednesday, December 11, 2019 9:31 PM
收件人: user-zh@flink.apache.org
主题: flink持续查询过去30分钟登录网站的人数

例如,用户在以下时间点登录:无, 12:02, 12:13, 12:15, 12:31, 12:40, 12:45, 无

那么我期望在以下时间点(实际查询可能在任意时间点)获取到的结果数为
12:01 (0),  12:03:(1),  12:14 (2),  12:16(3), 12:30 (4), 12:35 (4), 12:41 (5), 12:46 (4), 13:16 (0)

即每个元素进来就会设一个30分钟过期时间,窗口状态是当前还未过期元素集合。

用flink stream api和flink sql分别要如何实现?如果用timerService的话过多元素ttl会不会造成性能问题?

Re: flink持续查询过去30分钟登录网站的人数

Posted by 陈帅 <ca...@gmail.com>.
感谢你的回复,不过我仍有个疑问,这里的ts是事件时间event time,如何能够按处理时间processing
time推移而不依赖后续输入消息来自动修改SUM值?
例如输入select查询后,这时进来了一条用户登录消息,后面就一直没有别的用户登录消息进来,这时看到的行为应该是统计登录人数为1,而过了30分钟后这个统计登录人数自动变为0.

Yuan,Youjun <yu...@baidu.com> 于2019年12月12日周四 上午9:28写道:

> 首先通过一个自定义表函数(table
> function),将每条输入的消息变成3条消息,假设输入消息的时间为ts,产出的三条消息时间分别为:(ts-1, 0), (ts+1, 1),
> (ts+31, 0),
> 然后再用30分钟的range over窗口对前面的数值部分(0或者1)求SUM
>
> 袁尤军
>
> -----邮件原件-----
> 发件人: 陈帅 <ca...@gmail.com>
> 发送时间: Wednesday, December 11, 2019 9:31 PM
> 收件人: user-zh@flink.apache.org
> 主题: flink持续查询过去30分钟登录网站的人数
>
> 例如,用户在以下时间点登录:无, 12:02, 12:13, 12:15, 12:31, 12:40, 12:45, 无
>
> 那么我期望在以下时间点(实际查询可能在任意时间点)获取到的结果数为
> 12:01 (0),  12:03:(1),  12:14 (2),  12:16(3), 12:30 (4), 12:35 (4), 12:41
> (5), 12:46 (4), 13:16 (0)
>
> 即每个元素进来就会设一个30分钟过期时间,窗口状态是当前还未过期元素集合。
>
> 用flink stream api和flink sql分别要如何实现?如果用timerService的话过多元素ttl会不会造成性能问题?
>

Re: flink持续查询过去30分钟登录网站的人数

Posted by 陈帅 <ca...@gmail.com>.
附上我写的flink程序,输入Jack,输出一直是
Jack,0
Jack,1
Jack,1
而不是过了30秒后自动变回 Jack,0 麻烦看下是哪里的问题?谢谢!

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

import java.sql.Timestamp;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;

public class CustomWindowExample4 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        env.setParallelism(1);

        DataStream<Tuple2<String, Timestamp>> callStream =
env.socketTextStream("localhost", 9999).map(new MapFunction<String,
Tuple2<String, Timestamp>>() {
            @Override
            public Tuple2<String, Timestamp> map(String value) throws
Exception {
                return Tuple2.of(value, new
Timestamp(System.currentTimeMillis()));
            }
        }).assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<Tuple2<String,
Timestamp>>(Time.seconds(5)) {
            @Override
            public long extractTimestamp(Tuple2<String, Timestamp> element)
{
                return element.f1.getTime();
            }
        });

        StreamTableEnvironment stblEnv = StreamTableEnvironment.create(env);
        stblEnv.registerDataStream("tbl", callStream, "phone, ts");
        stblEnv.registerFunction("udtf", new MyTableFunction());
        Table genTable = stblEnv.sqlQuery("select phone1, flag1, ts1 from
tbl, LATERAL TABLE(udtf(phone, ts)) as t(phone1, ts1, flag1)");
        DataStream<Tuple3<String, Integer, Timestamp>> genStream =
stblEnv.toAppendStream(genTable, Row.class)
                .map(new MapFunction<Row, Tuple3<String, Integer,
Timestamp>>() {
                    @Override
                    public Tuple3<String, Integer, Timestamp> map(Row row)
throws Exception {
                        return Tuple3.of((String)row.getField(0),
(Integer)row.getField(1), (Timestamp)row.getField(2));
                    }
                }).assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Integer,
Timestamp>>(Time.seconds(5)) {
                    @Override
                    public long extractTimestamp(Tuple3<String, Integer,
Timestamp> element) {
                        return element.f2.getTime();
                    }
                });

        stblEnv.registerDataStream("tbl1", genStream, "phone1, flag1, ts1,
pt.proctime");

        String query = "SELECT phone1, SUM(flag1) OVER (PARTITION BY phone1
ORDER BY pt RANGE BETWEEN INTERVAL '30' SECOND preceding AND CURRENT ROW)
AS last30SecondsCount FROM tbl1";
        Table resultTable = stblEnv.sqlQuery(query);
        stblEnv.toAppendStream(resultTable, Row.class).print();

        env.execute();
    }

    public static class MyTableFunction extends TableFunction<Row> {

        public void eval(String phone, Timestamp t) {
            Timestamp t0 = addSeconds(t, -1);
            Timestamp t1 = addSeconds(t, 1);
            Timestamp t2 = addSeconds(t, 31);

            collect(createRow(phone, t0, 0));
            collect(createRow(phone, t1, 1));
            collect(createRow(phone, t2, 0));
        }

        private Row createRow(String phone, Timestamp ts, Integer flag) {
            Row row = new Row(3);
            row.setField(0, phone);
            row.setField(1, ts);
            row.setField(2, flag);
            return row;
        }

        @Override
        public TypeInformation<Row> getResultType() {
            return Types.ROW(Types.STRING(), Types.SQL_TIMESTAMP(),
Types.INT());
        }

        private Timestamp addSeconds(Timestamp oldTime, long seconds) {
            ZonedDateTime zonedDateTime =
oldTime.toInstant().atZone(ZoneId.systemDefault());
            Timestamp newTime = Timestamp.from(zonedDateTime.plus(seconds,
ChronoUnit.SECONDS).toInstant());
            return newTime;
        }
    }

}

Yuan,Youjun <yu...@baidu.com> 于2019年12月12日周四 上午9:28写道:

> 首先通过一个自定义表函数(table
> function),将每条输入的消息变成3条消息,假设输入消息的时间为ts,产出的三条消息时间分别为:(ts-1, 0), (ts+1, 1),
> (ts+31, 0),
> 然后再用30分钟的range over窗口对前面的数值部分(0或者1)求SUM
>
> 袁尤军
>
> -----邮件原件-----
> 发件人: 陈帅 <ca...@gmail.com>
> 发送时间: Wednesday, December 11, 2019 9:31 PM
> 收件人: user-zh@flink.apache.org
> 主题: flink持续查询过去30分钟登录网站的人数
>
> 例如,用户在以下时间点登录:无, 12:02, 12:13, 12:15, 12:31, 12:40, 12:45, 无
>
> 那么我期望在以下时间点(实际查询可能在任意时间点)获取到的结果数为
> 12:01 (0),  12:03:(1),  12:14 (2),  12:16(3), 12:30 (4), 12:35 (4), 12:41
> (5), 12:46 (4), 13:16 (0)
>
> 即每个元素进来就会设一个30分钟过期时间,窗口状态是当前还未过期元素集合。
>
> 用flink stream api和flink sql分别要如何实现?如果用timerService的话过多元素ttl会不会造成性能问题?
>