You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "Yuan,Youjun" <yu...@baidu.com> on 2019/12/12 01:28:18 UTC
回复: flink持续查询过去30分钟登录网站的人数
首先通过一个自定义表函数(table function),将每条输入的消息变成3条消息,假设输入消息的时间为ts,产出的三条消息时间分别为:(ts-1, 0), (ts+1, 1), (ts+31, 0),
然后再用30分钟的range over窗口对前面的数值部分(0或者1)求SUM
袁尤军
-----邮件原件-----
发件人: 陈帅 <ca...@gmail.com>
发送时间: Wednesday, December 11, 2019 9:31 PM
收件人: user-zh@flink.apache.org
主题: flink持续查询过去30分钟登录网站的人数
例如,用户在以下时间点登录:无, 12:02, 12:13, 12:15, 12:31, 12:40, 12:45, 无
那么我期望在以下时间点(实际查询可能在任意时间点)获取到的结果数为
12:01 (0), 12:03:(1), 12:14 (2), 12:16(3), 12:30 (4), 12:35 (4), 12:41 (5), 12:46 (4), 13:16 (0)
即每个元素进来就会设一个30分钟过期时间,窗口状态是当前还未过期元素集合。
用flink stream api和flink sql分别要如何实现?如果用timerService的话过多元素ttl会不会造成性能问题?
Re: flink持续查询过去30分钟登录网站的人数
Posted by 陈帅 <ca...@gmail.com>.
感谢你的回复,不过我仍有个疑问,这里的ts是事件时间event time,如何能够按处理时间processing
time推移而不依赖后续输入消息来自动修改SUM值?
例如输入select查询后,这时进来了一条用户登录消息,后面就一直没有别的用户登录消息进来,这时看到的行为应该是统计登录人数为1,而过了30分钟后这个统计登录人数自动变为0.
Yuan,Youjun <yu...@baidu.com> 于2019年12月12日周四 上午9:28写道:
> 首先通过一个自定义表函数(table
> function),将每条输入的消息变成3条消息,假设输入消息的时间为ts,产出的三条消息时间分别为:(ts-1, 0), (ts+1, 1),
> (ts+31, 0),
> 然后再用30分钟的range over窗口对前面的数值部分(0或者1)求SUM
>
> 袁尤军
>
> -----邮件原件-----
> 发件人: 陈帅 <ca...@gmail.com>
> 发送时间: Wednesday, December 11, 2019 9:31 PM
> 收件人: user-zh@flink.apache.org
> 主题: flink持续查询过去30分钟登录网站的人数
>
> 例如,用户在以下时间点登录:无, 12:02, 12:13, 12:15, 12:31, 12:40, 12:45, 无
>
> 那么我期望在以下时间点(实际查询可能在任意时间点)获取到的结果数为
> 12:01 (0), 12:03:(1), 12:14 (2), 12:16(3), 12:30 (4), 12:35 (4), 12:41
> (5), 12:46 (4), 13:16 (0)
>
> 即每个元素进来就会设一个30分钟过期时间,窗口状态是当前还未过期元素集合。
>
> 用flink stream api和flink sql分别要如何实现?如果用timerService的话过多元素ttl会不会造成性能问题?
>
Re: flink持续查询过去30分钟登录网站的人数
Posted by 陈帅 <ca...@gmail.com>.
附上我写的flink程序,输入Jack,输出一直是
Jack,0
Jack,1
Jack,1
而不是过了30秒后自动变回 Jack,0 麻烦看下是哪里的问题?谢谢!
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import java.sql.Timestamp;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
public class CustomWindowExample4 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setParallelism(1);
DataStream<Tuple2<String, Timestamp>> callStream =
env.socketTextStream("localhost", 9999).map(new MapFunction<String,
Tuple2<String, Timestamp>>() {
@Override
public Tuple2<String, Timestamp> map(String value) throws
Exception {
return Tuple2.of(value, new
Timestamp(System.currentTimeMillis()));
}
}).assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<Tuple2<String,
Timestamp>>(Time.seconds(5)) {
@Override
public long extractTimestamp(Tuple2<String, Timestamp> element)
{
return element.f1.getTime();
}
});
StreamTableEnvironment stblEnv = StreamTableEnvironment.create(env);
stblEnv.registerDataStream("tbl", callStream, "phone, ts");
stblEnv.registerFunction("udtf", new MyTableFunction());
Table genTable = stblEnv.sqlQuery("select phone1, flag1, ts1 from
tbl, LATERAL TABLE(udtf(phone, ts)) as t(phone1, ts1, flag1)");
DataStream<Tuple3<String, Integer, Timestamp>> genStream =
stblEnv.toAppendStream(genTable, Row.class)
.map(new MapFunction<Row, Tuple3<String, Integer,
Timestamp>>() {
@Override
public Tuple3<String, Integer, Timestamp> map(Row row)
throws Exception {
return Tuple3.of((String)row.getField(0),
(Integer)row.getField(1), (Timestamp)row.getField(2));
}
}).assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Integer,
Timestamp>>(Time.seconds(5)) {
@Override
public long extractTimestamp(Tuple3<String, Integer,
Timestamp> element) {
return element.f2.getTime();
}
});
stblEnv.registerDataStream("tbl1", genStream, "phone1, flag1, ts1,
pt.proctime");
String query = "SELECT phone1, SUM(flag1) OVER (PARTITION BY phone1
ORDER BY pt RANGE BETWEEN INTERVAL '30' SECOND preceding AND CURRENT ROW)
AS last30SecondsCount FROM tbl1";
Table resultTable = stblEnv.sqlQuery(query);
stblEnv.toAppendStream(resultTable, Row.class).print();
env.execute();
}
public static class MyTableFunction extends TableFunction<Row> {
public void eval(String phone, Timestamp t) {
Timestamp t0 = addSeconds(t, -1);
Timestamp t1 = addSeconds(t, 1);
Timestamp t2 = addSeconds(t, 31);
collect(createRow(phone, t0, 0));
collect(createRow(phone, t1, 1));
collect(createRow(phone, t2, 0));
}
private Row createRow(String phone, Timestamp ts, Integer flag) {
Row row = new Row(3);
row.setField(0, phone);
row.setField(1, ts);
row.setField(2, flag);
return row;
}
@Override
public TypeInformation<Row> getResultType() {
return Types.ROW(Types.STRING(), Types.SQL_TIMESTAMP(),
Types.INT());
}
private Timestamp addSeconds(Timestamp oldTime, long seconds) {
ZonedDateTime zonedDateTime =
oldTime.toInstant().atZone(ZoneId.systemDefault());
Timestamp newTime = Timestamp.from(zonedDateTime.plus(seconds,
ChronoUnit.SECONDS).toInstant());
return newTime;
}
}
}
Yuan,Youjun <yu...@baidu.com> 于2019年12月12日周四 上午9:28写道:
> 首先通过一个自定义表函数(table
> function),将每条输入的消息变成3条消息,假设输入消息的时间为ts,产出的三条消息时间分别为:(ts-1, 0), (ts+1, 1),
> (ts+31, 0),
> 然后再用30分钟的range over窗口对前面的数值部分(0或者1)求SUM
>
> 袁尤军
>
> -----邮件原件-----
> 发件人: 陈帅 <ca...@gmail.com>
> 发送时间: Wednesday, December 11, 2019 9:31 PM
> 收件人: user-zh@flink.apache.org
> 主题: flink持续查询过去30分钟登录网站的人数
>
> 例如,用户在以下时间点登录:无, 12:02, 12:13, 12:15, 12:31, 12:40, 12:45, 无
>
> 那么我期望在以下时间点(实际查询可能在任意时间点)获取到的结果数为
> 12:01 (0), 12:03:(1), 12:14 (2), 12:16(3), 12:30 (4), 12:35 (4), 12:41
> (5), 12:46 (4), 13:16 (0)
>
> 即每个元素进来就会设一个30分钟过期时间,窗口状态是当前还未过期元素集合。
>
> 用flink stream api和flink sql分别要如何实现?如果用timerService的话过多元素ttl会不会造成性能问题?
>