You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 张锴 <zk...@gmail.com> on 2021/01/21 10:24:41 UTC

Re: 根据业务需求选择合适的flink state

你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
下面是我的部分代码逻辑:

val ds = dataStream
  .filter(_.liveType == 1)
  .keyBy(1, 2)
  .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
  .process(new myProcessWindow()).uid("process-id")

class myProcessWindow() extends
ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
TimeWindow] {

  override def process(key: Tuple, context: Context, elements:
Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
= {
    var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
    var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间

    val currentDate = DateUtil.currentDate
    val created_time = currentDate
    val modified_time = currentDate
     。。。

    val join_time: String =
DateUtil.convertTimeStamp2DateStr(startTime,
DateUtil.SECOND_DATE_FORMAT)
    val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
DateUtil.SECOND_DATE_FORMAT)
    val duration = (endTime - startTime) / 1000  //停留多少秒
    val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
    out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
join_time, leave_time, created_time, modified_time
      , liveType, plat_form, duration, duration_time,
network_operator, role, useragent, uid, eventTime))

    CloudliveWatcher(id, partnerId, courseId, customerId,
courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
join_time, leave_time, created_time, modified_time
      , liveType, plat_form, duration, duration_time,
network_operator, role, useragent, uid, eventTime)

}


这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?




赵一旦 <hi...@gmail.com> 于2020年12月28日周一 下午7:12写道:

> 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
>
> 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
>
> session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
>
>
> 张锴 <zk...@gmail.com> 于2020年12月28日周一 下午5:35写道:
>
> > 能描述一下用session window的考虑吗
> >
> > Akisaya <ak...@gmail.com> 于2020年12月28日周一 下午5:00写道:
> >
> > > 这个可以用 session window 吧
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
> > >
> > > news_365@163.com <ne...@163.com> 于2020年12月28日周一 下午2:15写道:
> > >
> > > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
> > > >
> > > >
> > > >
> > > > news_365@163.com
> > > >
> > > > 发件人: 张锴
> > > > 发送时间: 2020-12-28 13:35
> > > > 收件人: user-zh
> > > > 主题: 根据业务需求选择合适的flink state
> > > > 各位大佬帮我分析下如下需求应该怎么写
> > > >
> > > > 需求说明:
> > > >
> > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
> > > >
> > > >
> > >
> >
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
> > > >
> > > > 我的想法:
> > > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
> > > > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
> > > >
> > > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
> > > >
> > > > flink 版本1.10.1
> > > >
> > >
> >
>

回复: 根据业务需求选择合适的flink state

Posted by xuhaiLong <xi...@163.com>.
Hi, 看了下你的代码,用session window 时长为1分钟,表示的是user1 的窗口在1分钟内没收到数据就进行一个触发计算,所以最终得到的结果应该是需要你把 user1 产生的每条记录的时长做一个sum,如果只看单条维度是不全的
在2021年1月21日 18:24,张锴<zk...@gmail.com> 写道:
你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
下面是我的部分代码逻辑:

val ds = dataStream
.filter(_.liveType == 1)
.keyBy(1, 2)
.window(EventTimeSessionWindows.withGap(Time.minutes(1)))
.process(new myProcessWindow()).uid("process-id")

class myProcessWindow() extends
ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
TimeWindow] {

override def process(key: Tuple, context: Context, elements:
Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
= {
var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间

val currentDate = DateUtil.currentDate
val created_time = currentDate
val modified_time = currentDate
。。。

val join_time: String =
DateUtil.convertTimeStamp2DateStr(startTime,
DateUtil.SECOND_DATE_FORMAT)
val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
DateUtil.SECOND_DATE_FORMAT)
val duration = (endTime - startTime) / 1000  //停留多少秒
val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
join_time, leave_time, created_time, modified_time
, liveType, plat_form, duration, duration_time,
network_operator, role, useragent, uid, eventTime))

CloudliveWatcher(id, partnerId, courseId, customerId,
courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
join_time, leave_time, created_time, modified_time
, liveType, plat_form, duration, duration_time,
network_operator, role, useragent, uid, eventTime)

}


这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?




赵一旦 <hi...@gmail.com> 于2020年12月28日周一 下午7:12写道:

按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。

或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。

session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。


张锴 <zk...@gmail.com> 于2020年12月28日周一 下午5:35写道:

能描述一下用session window的考虑吗

Akisaya <ak...@gmail.com> 于2020年12月28日周一 下午5:00写道:

这个可以用 session window 吧



https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows

news_365@163.com <ne...@163.com> 于2020年12月28日周一 下午2:15写道:

这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。



news_365@163.com

发件人: 张锴
发送时间: 2020-12-28 13:35
收件人: user-zh
主题: 根据业务需求选择合适的flink state
各位大佬帮我分析下如下需求应该怎么写

需求说明:

公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A




在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。

我的想法:
我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。

不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。

flink 版本1.10.1





Re: 根据业务需求选择合适的flink state

Posted by 张锴 <zk...@gmail.com>.
@赵一旦
我今天调整一下逻辑再试试

赵一旦 <hi...@gmail.com> 于2021年1月22日周五 上午10:10写道:

> 我理解你要的最终mysql结果表是:
> 直播间ID;用户ID;上线时间;下线时间;durationn=(下线时间 - 上线时间);....
>
> 如果user1在直播间1,一天内出现10次,就出现10个记录,分别记录了每次的duration。
>
>
> 如上按照我的方案就可以实现哈。
>
> xuhaiLong <xi...@163.com> 于2021年1月22日周五 上午10:03写道:
>
> > 可以试试这样,mysql 中 设置唯一键为窗口的 startTime 和
> > userId,然后对用户的每个窗口做停留时间的计算,最终会同一个用户在一天会产生多条记录,不过窗口的 startTime 不同,取值的时候sum
> 试试?
> >
> >
> > 在2021年1月21日 18:24,张锴<zk...@gmail.com> 写道:
> > 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
> >
> >
> context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
> > 下面是我的部分代码逻辑:
> >
> > val ds = dataStream
> > .filter(_.liveType == 1)
> > .keyBy(1, 2)
> > .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
> > .process(new myProcessWindow()).uid("process-id")
> >
> > class myProcessWindow() extends
> > ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
> > TimeWindow] {
> >
> > override def process(key: Tuple, context: Context, elements:
> > Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
> > = {
> > var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
> > var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间
> >
> > val currentDate = DateUtil.currentDate
> > val created_time = currentDate
> > val modified_time = currentDate
> > 。。。
> >
> > val join_time: String =
> > DateUtil.convertTimeStamp2DateStr(startTime,
> > DateUtil.SECOND_DATE_FORMAT)
> > val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
> > DateUtil.SECOND_DATE_FORMAT)
> > val duration = (endTime - startTime) / 1000  //停留多少秒
> > val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
> > out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
> > courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> > join_time, leave_time, created_time, modified_time
> > , liveType, plat_form, duration, duration_time,
> > network_operator, role, useragent, uid, eventTime))
> >
> > CloudliveWatcher(id, partnerId, courseId, customerId,
> > courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> > join_time, leave_time, created_time, modified_time
> > , liveType, plat_form, duration, duration_time,
> > network_operator, role, useragent, uid, eventTime)
> >
> > }
> >
> >
> > 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?
> >
> >
> >
> >
> > 赵一旦 <hi...@gmail.com> 于2020年12月28日周一 下午7:12写道:
> >
> > 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
> >
> > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
> >
> > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
> >
> >
> > 张锴 <zk...@gmail.com> 于2020年12月28日周一 下午5:35写道:
> >
> > 能描述一下用session window的考虑吗
> >
> > Akisaya <ak...@gmail.com> 于2020年12月28日周一 下午5:00写道:
> >
> > 这个可以用 session window 吧
> >
> >
> >
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
> >
> > news_365@163.com <ne...@163.com> 于2020年12月28日周一 下午2:15写道:
> >
> > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
> >
> >
> >
> > news_365@163.com
> >
> > 发件人: 张锴
> > 发送时间: 2020-12-28 13:35
> > 收件人: user-zh
> > 主题: 根据业务需求选择合适的flink state
> > 各位大佬帮我分析下如下需求应该怎么写
> >
> > 需求说明:
> >
> > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
> >
> >
> >
> >
> >
> >
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
> >
> > 我的想法:
> > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
> > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
> >
> > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
> >
> > flink 版本1.10.1
> >
> >
> >
> >
> >
>

回复: 根据业务需求选择合适的flink state

Posted by 纪军伟 <jj...@163.com>.
退订


| |
纪军伟
|
|
jjw8610.hi@163.com
|
签名由网易邮箱大师定制


在2021年01月23日 15:43,徐州州<25...@qq.com> 写道:
我觉得你可以尝试一下TTL,keyby之后设置key状态的失效时间为1分钟,如果一分钟没数据进来就清空state。




------------------&nbsp;原始邮件&nbsp;------------------
发件人: "张锴"<zk357794239@gmail.com&gt;;
发送时间: 2021年1月22日(星期五) 下午5:04
收件人: "user-zh"<user-zh@flink.apache.org&gt;;
主题: Re: 根据业务需求选择合适的flink state



@赵一旦
可以添加一下微信好友吗,具体的实践上还有点问题,我是在window后直接reduce(new myReduceFunc(),new
AssignWindowProcessFunc())自定义了这两个方法,但是效果还是有点问题,不知道我的写法是不是有问题

赵一旦 <hinobleyd@gmail.com&gt; 于2021年1月22日周五 上午10:10写道:

&gt; 我理解你要的最终mysql结果表是:
&gt; 直播间ID;用户ID;上线时间;下线时间;durationn=(下线时间 - 上线时间);....
&gt;
&gt; 如果user1在直播间1,一天内出现10次,就出现10个记录,分别记录了每次的duration。
&gt;
&gt;
&gt; 如上按照我的方案就可以实现哈。
&gt;
&gt; xuhaiLong <xiaguo98@163.com&gt; 于2021年1月22日周五 上午10:03写道:
&gt;
&gt; &gt; 可以试试这样,mysql 中 设置唯一键为窗口的 startTime 和
&gt; &gt; userId,然后对用户的每个窗口做停留时间的计算,最终会同一个用户在一天会产生多条记录,不过窗口的 startTime 不同,取值的时候sum
&gt; 试试?
&gt; &gt;
&gt; &gt;
&gt; &gt; 在2021年1月21日 18:24,张锴<zk357794239@gmail.com&gt; 写道:
&gt; &gt; 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
&gt; &gt;
&gt; &gt;
&gt; context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
&gt; &gt; 下面是我的部分代码逻辑:
&gt; &gt;
&gt; &gt; val ds = dataStream
&gt; &gt; .filter(_.liveType == 1)
&gt; &gt; .keyBy(1, 2)
&gt; &gt; .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
&gt; &gt; .process(new myProcessWindow()).uid("process-id")
&gt; &gt;
&gt; &gt; class myProcessWindow() extends
&gt; &gt; ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
&gt; &gt; TimeWindow] {
&gt; &gt;
&gt; &gt; override def process(key: Tuple, context: Context, elements:
&gt; &gt; Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
&gt; &gt; = {
&gt; &gt; var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
&gt; &gt; var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间
&gt; &gt;
&gt; &gt; val currentDate = DateUtil.currentDate
&gt; &gt; val created_time = currentDate
&gt; &gt; val modified_time = currentDate
&gt; &gt; 。。。
&gt; &gt;
&gt; &gt; val join_time: String =
&gt; &gt; DateUtil.convertTimeStamp2DateStr(startTime,
&gt; &gt; DateUtil.SECOND_DATE_FORMAT)
&gt; &gt; val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
&gt; &gt; DateUtil.SECOND_DATE_FORMAT)
&gt; &gt; val duration = (endTime - startTime) / 1000&nbsp; //停留多少秒
&gt; &gt; val duration_time = DateUtil.secondsToFormat(duration)&nbsp; //停留时分秒
&gt; &gt; out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
&gt; &gt; courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
&gt; &gt; join_time, leave_time, created_time, modified_time
&gt; &gt; , liveType, plat_form, duration, duration_time,
&gt; &gt; network_operator, role, useragent, uid, eventTime))
&gt; &gt;
&gt; &gt; CloudliveWatcher(id, partnerId, courseId, customerId,
&gt; &gt; courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
&gt; &gt; join_time, leave_time, created_time, modified_time
&gt; &gt; , liveType, plat_form, duration, duration_time,
&gt; &gt; network_operator, role, useragent, uid, eventTime)
&gt; &gt;
&gt; &gt; }
&gt; &gt;
&gt; &gt;
&gt; &gt; 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; 赵一旦 <hinobleyd@gmail.com&gt; 于2020年12月28日周一 下午7:12写道:
&gt; &gt;
&gt; &gt; 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
&gt; &gt;
&gt; &gt; 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
&gt; &gt;
&gt; &gt; session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
&gt; &gt;
&gt; &gt;
&gt; &gt; 张锴 <zk357794239@gmail.com&gt; 于2020年12月28日周一 下午5:35写道:
&gt; &gt;
&gt; &gt; 能描述一下用session window的考虑吗
&gt; &gt;
&gt; &gt; Akisaya <akikevinsama@gmail.com&gt; 于2020年12月28日周一 下午5:00写道:
&gt; &gt;
&gt; &gt; 这个可以用 session window 吧
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
&gt; &gt;
&gt; &gt; news_365@163.com <news_365@163.com&gt; 于2020年12月28日周一 下午2:15写道:
&gt; &gt;
&gt; &gt; 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; news_365@163.com
&gt; &gt;
&gt; &gt; 发件人: 张锴
&gt; &gt; 发送时间: 2020-12-28 13:35
&gt; &gt; 收件人: user-zh
&gt; &gt; 主题: 根据业务需求选择合适的flink state
&gt; &gt; 各位大佬帮我分析下如下需求应该怎么写
&gt; &gt;
&gt; &gt; 需求说明:
&gt; &gt;
&gt; &gt; 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
&gt; &gt;
&gt; &gt; 我的想法:
&gt; &gt; 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
&gt; &gt; 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
&gt; &gt;
&gt; &gt; 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
&gt; &gt;
&gt; &gt; flink 版本1.10.1
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt;

回复: 根据业务需求选择合适的flink state

Posted by 徐州州 <25...@qq.com>.
我觉得你可以尝试一下TTL,keyby之后设置key状态的失效时间为1分钟,如果一分钟没数据进来就清空state。




------------------&nbsp;原始邮件&nbsp;------------------
发件人: "张锴"<zk357794239@gmail.com&gt;; 
发送时间: 2021年1月22日(星期五) 下午5:04
收件人: "user-zh"<user-zh@flink.apache.org&gt;; 
主题: Re: 根据业务需求选择合适的flink state



@赵一旦
可以添加一下微信好友吗,具体的实践上还有点问题,我是在window后直接reduce(new myReduceFunc(),new
AssignWindowProcessFunc())自定义了这两个方法,但是效果还是有点问题,不知道我的写法是不是有问题

赵一旦 <hinobleyd@gmail.com&gt; 于2021年1月22日周五 上午10:10写道:

&gt; 我理解你要的最终mysql结果表是:
&gt; 直播间ID;用户ID;上线时间;下线时间;durationn=(下线时间 - 上线时间);....
&gt;
&gt; 如果user1在直播间1,一天内出现10次,就出现10个记录,分别记录了每次的duration。
&gt;
&gt;
&gt; 如上按照我的方案就可以实现哈。
&gt;
&gt; xuhaiLong <xiaguo98@163.com&gt; 于2021年1月22日周五 上午10:03写道:
&gt;
&gt; &gt; 可以试试这样,mysql 中 设置唯一键为窗口的 startTime 和
&gt; &gt; userId,然后对用户的每个窗口做停留时间的计算,最终会同一个用户在一天会产生多条记录,不过窗口的 startTime 不同,取值的时候sum
&gt; 试试?
&gt; &gt;
&gt; &gt;
&gt; &gt; 在2021年1月21日 18:24,张锴<zk357794239@gmail.com&gt; 写道:
&gt; &gt; 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
&gt; &gt;
&gt; &gt;
&gt; context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
&gt; &gt; 下面是我的部分代码逻辑:
&gt; &gt;
&gt; &gt; val ds = dataStream
&gt; &gt; .filter(_.liveType == 1)
&gt; &gt; .keyBy(1, 2)
&gt; &gt; .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
&gt; &gt; .process(new myProcessWindow()).uid("process-id")
&gt; &gt;
&gt; &gt; class myProcessWindow() extends
&gt; &gt; ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
&gt; &gt; TimeWindow] {
&gt; &gt;
&gt; &gt; override def process(key: Tuple, context: Context, elements:
&gt; &gt; Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
&gt; &gt; = {
&gt; &gt; var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
&gt; &gt; var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间
&gt; &gt;
&gt; &gt; val currentDate = DateUtil.currentDate
&gt; &gt; val created_time = currentDate
&gt; &gt; val modified_time = currentDate
&gt; &gt; 。。。
&gt; &gt;
&gt; &gt; val join_time: String =
&gt; &gt; DateUtil.convertTimeStamp2DateStr(startTime,
&gt; &gt; DateUtil.SECOND_DATE_FORMAT)
&gt; &gt; val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
&gt; &gt; DateUtil.SECOND_DATE_FORMAT)
&gt; &gt; val duration = (endTime - startTime) / 1000&nbsp; //停留多少秒
&gt; &gt; val duration_time = DateUtil.secondsToFormat(duration)&nbsp; //停留时分秒
&gt; &gt; out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
&gt; &gt; courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
&gt; &gt; join_time, leave_time, created_time, modified_time
&gt; &gt; , liveType, plat_form, duration, duration_time,
&gt; &gt; network_operator, role, useragent, uid, eventTime))
&gt; &gt;
&gt; &gt; CloudliveWatcher(id, partnerId, courseId, customerId,
&gt; &gt; courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
&gt; &gt; join_time, leave_time, created_time, modified_time
&gt; &gt; , liveType, plat_form, duration, duration_time,
&gt; &gt; network_operator, role, useragent, uid, eventTime)
&gt; &gt;
&gt; &gt; }
&gt; &gt;
&gt; &gt;
&gt; &gt; 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; 赵一旦 <hinobleyd@gmail.com&gt; 于2020年12月28日周一 下午7:12写道:
&gt; &gt;
&gt; &gt; 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
&gt; &gt;
&gt; &gt; 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
&gt; &gt;
&gt; &gt; session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
&gt; &gt;
&gt; &gt;
&gt; &gt; 张锴 <zk357794239@gmail.com&gt; 于2020年12月28日周一 下午5:35写道:
&gt; &gt;
&gt; &gt; 能描述一下用session window的考虑吗
&gt; &gt;
&gt; &gt; Akisaya <akikevinsama@gmail.com&gt; 于2020年12月28日周一 下午5:00写道:
&gt; &gt;
&gt; &gt; 这个可以用 session window 吧
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
&gt; &gt;
&gt; &gt; news_365@163.com <news_365@163.com&gt; 于2020年12月28日周一 下午2:15写道:
&gt; &gt;
&gt; &gt; 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; news_365@163.com
&gt; &gt;
&gt; &gt; 发件人: 张锴
&gt; &gt; 发送时间: 2020-12-28 13:35
&gt; &gt; 收件人: user-zh
&gt; &gt; 主题: 根据业务需求选择合适的flink state
&gt; &gt; 各位大佬帮我分析下如下需求应该怎么写
&gt; &gt;
&gt; &gt; 需求说明:
&gt; &gt;
&gt; &gt; 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
&gt; &gt;
&gt; &gt; 我的想法:
&gt; &gt; 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
&gt; &gt; 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
&gt; &gt;
&gt; &gt; 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
&gt; &gt;
&gt; &gt; flink 版本1.10.1
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt;

Re: 根据业务需求选择合适的flink state

Posted by 张锴 <zk...@gmail.com>.
@赵一旦
可以添加一下微信好友吗,具体的实践上还有点问题,我是在window后直接reduce(new myReduceFunc(),new
AssignWindowProcessFunc())自定义了这两个方法,但是效果还是有点问题,不知道我的写法是不是有问题

赵一旦 <hi...@gmail.com> 于2021年1月22日周五 上午10:10写道:

> 我理解你要的最终mysql结果表是:
> 直播间ID;用户ID;上线时间;下线时间;durationn=(下线时间 - 上线时间);....
>
> 如果user1在直播间1,一天内出现10次,就出现10个记录,分别记录了每次的duration。
>
>
> 如上按照我的方案就可以实现哈。
>
> xuhaiLong <xi...@163.com> 于2021年1月22日周五 上午10:03写道:
>
> > 可以试试这样,mysql 中 设置唯一键为窗口的 startTime 和
> > userId,然后对用户的每个窗口做停留时间的计算,最终会同一个用户在一天会产生多条记录,不过窗口的 startTime 不同,取值的时候sum
> 试试?
> >
> >
> > 在2021年1月21日 18:24,张锴<zk...@gmail.com> 写道:
> > 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
> >
> >
> context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
> > 下面是我的部分代码逻辑:
> >
> > val ds = dataStream
> > .filter(_.liveType == 1)
> > .keyBy(1, 2)
> > .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
> > .process(new myProcessWindow()).uid("process-id")
> >
> > class myProcessWindow() extends
> > ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
> > TimeWindow] {
> >
> > override def process(key: Tuple, context: Context, elements:
> > Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
> > = {
> > var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
> > var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间
> >
> > val currentDate = DateUtil.currentDate
> > val created_time = currentDate
> > val modified_time = currentDate
> > 。。。
> >
> > val join_time: String =
> > DateUtil.convertTimeStamp2DateStr(startTime,
> > DateUtil.SECOND_DATE_FORMAT)
> > val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
> > DateUtil.SECOND_DATE_FORMAT)
> > val duration = (endTime - startTime) / 1000  //停留多少秒
> > val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
> > out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
> > courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> > join_time, leave_time, created_time, modified_time
> > , liveType, plat_form, duration, duration_time,
> > network_operator, role, useragent, uid, eventTime))
> >
> > CloudliveWatcher(id, partnerId, courseId, customerId,
> > courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> > join_time, leave_time, created_time, modified_time
> > , liveType, plat_form, duration, duration_time,
> > network_operator, role, useragent, uid, eventTime)
> >
> > }
> >
> >
> > 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?
> >
> >
> >
> >
> > 赵一旦 <hi...@gmail.com> 于2020年12月28日周一 下午7:12写道:
> >
> > 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
> >
> > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
> >
> > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
> >
> >
> > 张锴 <zk...@gmail.com> 于2020年12月28日周一 下午5:35写道:
> >
> > 能描述一下用session window的考虑吗
> >
> > Akisaya <ak...@gmail.com> 于2020年12月28日周一 下午5:00写道:
> >
> > 这个可以用 session window 吧
> >
> >
> >
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
> >
> > news_365@163.com <ne...@163.com> 于2020年12月28日周一 下午2:15写道:
> >
> > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
> >
> >
> >
> > news_365@163.com
> >
> > 发件人: 张锴
> > 发送时间: 2020-12-28 13:35
> > 收件人: user-zh
> > 主题: 根据业务需求选择合适的flink state
> > 各位大佬帮我分析下如下需求应该怎么写
> >
> > 需求说明:
> >
> > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
> >
> >
> >
> >
> >
> >
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
> >
> > 我的想法:
> > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
> > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
> >
> > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
> >
> > flink 版本1.10.1
> >
> >
> >
> >
> >
>

Re: 根据业务需求选择合适的flink state

Posted by 赵一旦 <hi...@gmail.com>.
我理解你要的最终mysql结果表是:
直播间ID;用户ID;上线时间;下线时间;durationn=(下线时间 - 上线时间);....

如果user1在直播间1,一天内出现10次,就出现10个记录,分别记录了每次的duration。


如上按照我的方案就可以实现哈。

xuhaiLong <xi...@163.com> 于2021年1月22日周五 上午10:03写道:

> 可以试试这样,mysql 中 设置唯一键为窗口的 startTime 和
> userId,然后对用户的每个窗口做停留时间的计算,最终会同一个用户在一天会产生多条记录,不过窗口的 startTime 不同,取值的时候sum 试试?
>
>
> 在2021年1月21日 18:24,张锴<zk...@gmail.com> 写道:
> 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
>
> context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
> 下面是我的部分代码逻辑:
>
> val ds = dataStream
> .filter(_.liveType == 1)
> .keyBy(1, 2)
> .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
> .process(new myProcessWindow()).uid("process-id")
>
> class myProcessWindow() extends
> ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
> TimeWindow] {
>
> override def process(key: Tuple, context: Context, elements:
> Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
> = {
> var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
> var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间
>
> val currentDate = DateUtil.currentDate
> val created_time = currentDate
> val modified_time = currentDate
> 。。。
>
> val join_time: String =
> DateUtil.convertTimeStamp2DateStr(startTime,
> DateUtil.SECOND_DATE_FORMAT)
> val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
> DateUtil.SECOND_DATE_FORMAT)
> val duration = (endTime - startTime) / 1000  //停留多少秒
> val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
> out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> join_time, leave_time, created_time, modified_time
> , liveType, plat_form, duration, duration_time,
> network_operator, role, useragent, uid, eventTime))
>
> CloudliveWatcher(id, partnerId, courseId, customerId,
> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> join_time, leave_time, created_time, modified_time
> , liveType, plat_form, duration, duration_time,
> network_operator, role, useragent, uid, eventTime)
>
> }
>
>
> 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?
>
>
>
>
> 赵一旦 <hi...@gmail.com> 于2020年12月28日周一 下午7:12写道:
>
> 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
>
> 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
>
> session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
>
>
> 张锴 <zk...@gmail.com> 于2020年12月28日周一 下午5:35写道:
>
> 能描述一下用session window的考虑吗
>
> Akisaya <ak...@gmail.com> 于2020年12月28日周一 下午5:00写道:
>
> 这个可以用 session window 吧
>
>
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
>
> news_365@163.com <ne...@163.com> 于2020年12月28日周一 下午2:15写道:
>
> 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
>
>
>
> news_365@163.com
>
> 发件人: 张锴
> 发送时间: 2020-12-28 13:35
> 收件人: user-zh
> 主题: 根据业务需求选择合适的flink state
> 各位大佬帮我分析下如下需求应该怎么写
>
> 需求说明:
>
> 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
>
>
>
>
>
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
>
> 我的想法:
> 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
> 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
>
> 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
>
> flink 版本1.10.1
>
>
>
>
>

回复: 根据业务需求选择合适的flink state

Posted by xuhaiLong <xi...@163.com>.
可以试试这样,mysql 中 设置唯一键为窗口的 startTime 和 userId,然后对用户的每个窗口做停留时间的计算,最终会同一个用户在一天会产生多条记录,不过窗口的 startTime 不同,取值的时候sum 试试?


在2021年1月21日 18:24,张锴<zk...@gmail.com> 写道:
你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
下面是我的部分代码逻辑:

val ds = dataStream
.filter(_.liveType == 1)
.keyBy(1, 2)
.window(EventTimeSessionWindows.withGap(Time.minutes(1)))
.process(new myProcessWindow()).uid("process-id")

class myProcessWindow() extends
ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
TimeWindow] {

override def process(key: Tuple, context: Context, elements:
Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
= {
var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间

val currentDate = DateUtil.currentDate
val created_time = currentDate
val modified_time = currentDate
。。。

val join_time: String =
DateUtil.convertTimeStamp2DateStr(startTime,
DateUtil.SECOND_DATE_FORMAT)
val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
DateUtil.SECOND_DATE_FORMAT)
val duration = (endTime - startTime) / 1000  //停留多少秒
val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
join_time, leave_time, created_time, modified_time
, liveType, plat_form, duration, duration_time,
network_operator, role, useragent, uid, eventTime))

CloudliveWatcher(id, partnerId, courseId, customerId,
courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
join_time, leave_time, created_time, modified_time
, liveType, plat_form, duration, duration_time,
network_operator, role, useragent, uid, eventTime)

}


这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?




赵一旦 <hi...@gmail.com> 于2020年12月28日周一 下午7:12写道:

按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。

或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。

session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。


张锴 <zk...@gmail.com> 于2020年12月28日周一 下午5:35写道:

能描述一下用session window的考虑吗

Akisaya <ak...@gmail.com> 于2020年12月28日周一 下午5:00写道:

这个可以用 session window 吧



https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows

news_365@163.com <ne...@163.com> 于2020年12月28日周一 下午2:15写道:

这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。



news_365@163.com

发件人: 张锴
发送时间: 2020-12-28 13:35
收件人: user-zh
主题: 根据业务需求选择合适的flink state
各位大佬帮我分析下如下需求应该怎么写

需求说明:

公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A




在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。

我的想法:
我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。

不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。

flink 版本1.10.1





Re: 根据业务需求选择合适的flink state

Posted by 赵一旦 <hi...@gmail.com>.
对scala不是很熟悉,看你写的好复杂,下面是伪代码。
Ele{
   id1;
   id2;
   ts; //timestamp
  minTs = ts;
  maxTs = ts;
}

keyBy(id1,id2).window(EventTimeSessionWindows.withGap(Time.minutes(1))).reduce(xxxReduce,
xxxProcessWindowFunc).

xxxReduce(Ele ele1, Ele ele2){
  return new Ele(ele1.id1, ele1.id2, null, min(ele1.minTs, ele2.minTs),
max(ele1.maxTs, ele2.maxTs) )
}

xxxProcessWindowFunc(Iterable<Ele> eles){
  Ele ele = eles.iterator().next()
  durationMs = ele.maxTs - ele.minTs;
  collectOut(new XX(ele.id1, ele.id2, ele.minTs, ele.maxTs, duration)) //
-->这代表了id1,id2的一次出现,从minTs开始出现,到maxTs结束,duration为xxx。
}


张锴 <zk...@gmail.com> 于2021年1月23日周六 下午5:35写道:

> *我按照 session
>
> window分组,定义了最大最小状态,使用reduce+processWindowFunction方式重新跑了一下,出来的数据duration(秒),duration_time(时分秒)都是0。下面是我的程序,帮忙分析一下是哪里定义的有问题,为什么会出现这种情况,这个困扰了我好半天。*
>
> case class CloudLiveLogOnLine(
>                          id: Long,
>                          courseId: Long,
>                          customerId: Long,
>                          courseNumber: Long,
>                          nickName: String,
>                          partnerId: Long,
>                          ip: String,
>                          reportTime: String,
>                          liveType: Int,
>                          uid: String,
>                          eventTime: Long
>                        )
>
> case class MinMaxTemp(
>                        id: Long,
>                        courseId: Long,
>                        customerId: Long,
>                        courseNumber: Long,
>                        nickName: String,
>                        partnerId: Long,
>                        ip: String,
>                        reportTime: String,
>                        liveType: Int,
>                        uid: String,
>                        mineventTime: Long,
>                        maxeventTime: Long
>                      )
>
> object OnLineFlinkTask {
>
>   def main(args: Array[String]): Unit = {
>
> 配置省略。。。。
>
>  val dataStream: DataStream[CloudLiveLogOnLine] = stream.map(line => {
>       var id = 0L
>       var courseId = 0L
>       var courseNumber = 0L
>       var customerId = 0L
>       var nickName = ""
>       var partnerId = 0L
>       var ip = ""
>       var reportTime = ""
>       var liveType = 0
>       var uid = ""
>       var eventTime = 0L
>       try {
>         val messageJson = JSON.parseObject(line)
>         val data: JSONObject = messageJson.getJSONObject("data")
>         id = data.getLong("id")
>         courseId = data.getLongValue("courseId")
>         courseNumber = data.getLongValue("courseNumber")
>         customerId = data.getLongValue("customerId")
>         nickName = data.getString("nickName")
>         partnerId = data.getLongValue("partnerId")
>         ip = data.getString("ip")
>         reportTime = data.getString("reportTime")
>         liveType = data.getIntValue("liveType")
>         uid = data.getString("uid")
>         eventTime = messageJson.getLongValue("eventTime")
>       } catch {
>         case e => println(line)
>       }
>       CloudLiveLogOnLine(id, courseId, customerId, courseNumber,
> nickName, partnerId, ip, reportTime, liveType, uid, eventTime)
>     }).assignAscendingTimestamps(_.eventTime)
>
>     // 3. transform 处理数据
>     val ds = dataStream
>       .filter(_.liveType == 1)
>       .map(r=>MinMaxTemp(r.id
> ,r.courseId,r.customerId,r.courseNumber,r.nickName,r.partnerId,
>         r.ip,r.reportTime,r.liveType,r.uid,r.eventTime,r.eventTime))
>       .keyBy(1, 2)
>       .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
>         .reduce(new myReduceFunc(),new AssignWindowProcessFunc())
>
>     ds.print()
>     env.execute("flink job")
>
>    }
>
> }
>
> class myReduceFunc() extends ReduceFunction[MinMaxTemp]{
>   override def reduce(value1: MinMaxTemp, value2: MinMaxTemp): MinMaxTemp
> = {
>     MinMaxTemp(value1.id
> ,value1.courseId,value1.customerId,value1.courseNumber,value1.nickName,
>
> value1.partnerId,value1.ip,value1.reportTime,value1.liveType,value1.uid,value1.mineventTime.min(value2.mineventTime),
>
> value1.maxeventTime.max(value2.maxeventTime))
>
>   }
> }
>
> class AssignWindowProcessFunc() extends
> ProcessWindowFunction[MinMaxTemp,CloudliveWatcher,Tuple,TimeWindow]{
>
>   private var minTsState: ValueState[Long] = _
>   private var maxTsState: ValueState[Long] = _
>
>   override def open(parameters: Configuration): Unit = {
>     minTsState =getRuntimeContext.getState(new
> ValueStateDescriptor[Long]("min-state",classOf[Long]))
>     maxTsState =getRuntimeContext.getState(new
> ValueStateDescriptor[Long]("max-state",classOf[Long]))
>   }
>
>   override def process(key: Tuple, context: Context, elements:
> Iterable[MinMaxTemp], out: Collector[CloudliveWatcher]): Unit = {
>     val minTs: Long = minTsState.value() //取出上一个时间戳最小值
>     val maxTs: Long = maxTsState.value() //取出上一个时间戳最大值
>
>     val device_type = 0
>     val net_opretor = ""
>     val net_type = ""
>     val area = ""
>     val plat_form = ""
>     val network_operator = ""
>     val role = 0
>     val useragent = ""
>     val currentDate = DateUtil.currentDate
>     val created_time = currentDate
>     val modified_time = currentDate
>     var id =0L
>     var courseId =0L
>     var partnerId =0L
>     var ip =""
>     var customerId =0L
>     var courseNumber =0L
>     var nickName =""
>     var liveType =0
>     var uid =""
>     var eventTime =0L
>     var min =0L
>     var max =0L
>     var join_time =""
>     var leave_time =""
>     var duration =0L
>     var duration_time =""
>     val iterator: Iterator[MinMaxTemp] = elements.iterator
>     if (iterator.hasNext) {
>       val value: MinMaxTemp = iterator.next()
>       id = value.id
>       courseId= value.courseId
>       partnerId = value.partnerId
>       ip = value.ip
>       customerId = value.customerId
>       courseNumber = value.courseNumber
>       nickName = value.nickName
>       liveType = value.liveType
>       uid = value.uid
>       minTsState.update(value.mineventTime) //更新最小时间戳
>       maxTsState.update(value.maxeventTime)  //更新最大时间戳
>     }
>     join_time = DateUtil.convertTimeStamp2DateStr(minTs,
> DateUtil.SECOND_DATE_FORMAT)
>     leave_time = DateUtil.convertTimeStamp2DateStr(maxTs,
> DateUtil.SECOND_DATE_FORMAT)
>     duration = (maxTs - minTs) / 1000  //停留多少秒
>     duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
>     minTsState.clear()
>     maxTsState.clear()
>
>     out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> join_time, leave_time, created_time, modified_time
>       , liveType, plat_form, duration, duration_time,
> network_operator, role, useragent, uid, eventTime))
>
>     CloudliveWatcher(id, partnerId, courseId, customerId,
> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> join_time, leave_time, created_time, modified_time
>       , liveType, plat_form, duration, duration_time,
> network_operator, role, useragent, uid, eventTime)
>
>   }
> }
>
>
>
>
>
> 赵一旦 <hi...@gmail.com> 于2021年1月21日周四 下午8:38写道:
>
> > 我表达的方法是按照session
> > window将数据分窗口,实际就是把连续1min没数据的那部分数据给你分割好,然后这部分数据中的最大时间戳和最小时间戳的差就是你要的结果理论上。
> >
> > 实现的话就是用2个状态,分别保存最大最小时间戳,没进来一个数据,对比更新最大最小时间戳即可。
> >
> >
> >
> 然后在窗口被触发的时候将结果输出。使用reduce+procesWindowFunction配合。reduce的时候只计算最大最小,触发的时候才基于2个状态计算duration输出结果。
> >
> > 赵一旦 <hi...@gmail.com> 于2021年1月21日周四 下午8:28写道:
> >
> > > 我其实没看懂你逻辑。这个和窗口的最大最小时间戳啥关系。
> > >
> > > 张锴 <zk...@gmail.com> 于2021年1月21日周四 下午6:25写道:
> > >
> > >>
> 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
> > >>
> > >>
> >
> context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
> > >> 下面是我的部分代码逻辑:
> > >>
> > >> val ds = dataStream
> > >>   .filter(_.liveType == 1)
> > >>   .keyBy(1, 2)
> > >>   .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
> > >>   .process(new myProcessWindow()).uid("process-id")
> > >>
> > >> class myProcessWindow() extends
> > >> ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
> > >> TimeWindow] {
> > >>
> > >>   override def process(key: Tuple, context: Context, elements:
> > >> Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
> > >> = {
> > >>     var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
> > >>     var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间
> > >>
> > >>     val currentDate = DateUtil.currentDate
> > >>     val created_time = currentDate
> > >>     val modified_time = currentDate
> > >>      。。。
> > >>
> > >>     val join_time: String =
> > >> DateUtil.convertTimeStamp2DateStr(startTime,
> > >> DateUtil.SECOND_DATE_FORMAT)
> > >>     val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
> > >> DateUtil.SECOND_DATE_FORMAT)
> > >>     val duration = (endTime - startTime) / 1000  //停留多少秒
> > >>     val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
> > >>     out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
> > >> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> > >> join_time, leave_time, created_time, modified_time
> > >>       , liveType, plat_form, duration, duration_time,
> > >> network_operator, role, useragent, uid, eventTime))
> > >>
> > >>     CloudliveWatcher(id, partnerId, courseId, customerId,
> > >> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> > >> join_time, leave_time, created_time, modified_time
> > >>       , liveType, plat_form, duration, duration_time,
> > >> network_operator, role, useragent, uid, eventTime)
> > >>
> > >> }
> > >>
> > >>
> > >> 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?
> > >>
> > >>
> > >>
> > >>
> > >> 赵一旦 <hi...@gmail.com> 于2020年12月28日周一 下午7:12写道:
> > >>
> > >> > 按直播间ID和用户ID分组,使用session
> > >> window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
> > >> >
> > >> > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
> > >> >
> > >> > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
> > >> >
> > >> >
> > >> > 张锴 <zk...@gmail.com> 于2020年12月28日周一 下午5:35写道:
> > >> >
> > >> > > 能描述一下用session window的考虑吗
> > >> > >
> > >> > > Akisaya <ak...@gmail.com> 于2020年12月28日周一 下午5:00写道:
> > >> > >
> > >> > > > 这个可以用 session window 吧
> > >> > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
> > >> > > >
> > >> > > > news_365@163.com <ne...@163.com> 于2020年12月28日周一 下午2:15写道:
> > >> > > >
> > >> > > > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
> > >> > > > >
> > >> > > > >
> > >> > > > >
> > >> > > > > news_365@163.com
> > >> > > > >
> > >> > > > > 发件人: 张锴
> > >> > > > > 发送时间: 2020-12-28 13:35
> > >> > > > > 收件人: user-zh
> > >> > > > > 主题: 根据业务需求选择合适的flink state
> > >> > > > > 各位大佬帮我分析下如下需求应该怎么写
> > >> > > > >
> > >> > > > > 需求说明:
> > >> > > > >
> > >> > >
> > >>
> > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
> > >> > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
> > >> > > > >
> > >> > > > > 我的想法:
> > >> > > > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event
> > >> Time中的分钟数
> > >> > > > >
> > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
> > >> > > > >
> > >> > > > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
> > >> > > > >
> > >> > > > > flink 版本1.10.1
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> >
>

Re: 根据业务需求选择合适的flink state

Posted by 张锴 <zk...@gmail.com>.
*我按照 session
window分组,定义了最大最小状态,使用reduce+processWindowFunction方式重新跑了一下,出来的数据duration(秒),duration_time(时分秒)都是0。下面是我的程序,帮忙分析一下是哪里定义的有问题,为什么会出现这种情况,这个困扰了我好半天。*

case class CloudLiveLogOnLine(
                         id: Long,
                         courseId: Long,
                         customerId: Long,
                         courseNumber: Long,
                         nickName: String,
                         partnerId: Long,
                         ip: String,
                         reportTime: String,
                         liveType: Int,
                         uid: String,
                         eventTime: Long
                       )

case class MinMaxTemp(
                       id: Long,
                       courseId: Long,
                       customerId: Long,
                       courseNumber: Long,
                       nickName: String,
                       partnerId: Long,
                       ip: String,
                       reportTime: String,
                       liveType: Int,
                       uid: String,
                       mineventTime: Long,
                       maxeventTime: Long
                     )

object OnLineFlinkTask {

  def main(args: Array[String]): Unit = {

配置省略。。。。

 val dataStream: DataStream[CloudLiveLogOnLine] = stream.map(line => {
      var id = 0L
      var courseId = 0L
      var courseNumber = 0L
      var customerId = 0L
      var nickName = ""
      var partnerId = 0L
      var ip = ""
      var reportTime = ""
      var liveType = 0
      var uid = ""
      var eventTime = 0L
      try {
        val messageJson = JSON.parseObject(line)
        val data: JSONObject = messageJson.getJSONObject("data")
        id = data.getLong("id")
        courseId = data.getLongValue("courseId")
        courseNumber = data.getLongValue("courseNumber")
        customerId = data.getLongValue("customerId")
        nickName = data.getString("nickName")
        partnerId = data.getLongValue("partnerId")
        ip = data.getString("ip")
        reportTime = data.getString("reportTime")
        liveType = data.getIntValue("liveType")
        uid = data.getString("uid")
        eventTime = messageJson.getLongValue("eventTime")
      } catch {
        case e => println(line)
      }
      CloudLiveLogOnLine(id, courseId, customerId, courseNumber,
nickName, partnerId, ip, reportTime, liveType, uid, eventTime)
    }).assignAscendingTimestamps(_.eventTime)

    // 3. transform 处理数据
    val ds = dataStream
      .filter(_.liveType == 1)
      .map(r=>MinMaxTemp(r.id,r.courseId,r.customerId,r.courseNumber,r.nickName,r.partnerId,
        r.ip,r.reportTime,r.liveType,r.uid,r.eventTime,r.eventTime))
      .keyBy(1, 2)
      .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
        .reduce(new myReduceFunc(),new AssignWindowProcessFunc())

    ds.print()
    env.execute("flink job")

   }

}

class myReduceFunc() extends ReduceFunction[MinMaxTemp]{
  override def reduce(value1: MinMaxTemp, value2: MinMaxTemp): MinMaxTemp = {
    MinMaxTemp(value1.id,value1.courseId,value1.customerId,value1.courseNumber,value1.nickName,
      value1.partnerId,value1.ip,value1.reportTime,value1.liveType,value1.uid,value1.mineventTime.min(value2.mineventTime),

value1.maxeventTime.max(value2.maxeventTime))

  }
}

class AssignWindowProcessFunc() extends
ProcessWindowFunction[MinMaxTemp,CloudliveWatcher,Tuple,TimeWindow]{

  private var minTsState: ValueState[Long] = _
  private var maxTsState: ValueState[Long] = _

  override def open(parameters: Configuration): Unit = {
    minTsState =getRuntimeContext.getState(new
ValueStateDescriptor[Long]("min-state",classOf[Long]))
    maxTsState =getRuntimeContext.getState(new
ValueStateDescriptor[Long]("max-state",classOf[Long]))
  }

  override def process(key: Tuple, context: Context, elements:
Iterable[MinMaxTemp], out: Collector[CloudliveWatcher]): Unit = {
    val minTs: Long = minTsState.value() //取出上一个时间戳最小值
    val maxTs: Long = maxTsState.value() //取出上一个时间戳最大值

    val device_type = 0
    val net_opretor = ""
    val net_type = ""
    val area = ""
    val plat_form = ""
    val network_operator = ""
    val role = 0
    val useragent = ""
    val currentDate = DateUtil.currentDate
    val created_time = currentDate
    val modified_time = currentDate
    var id =0L
    var courseId =0L
    var partnerId =0L
    var ip =""
    var customerId =0L
    var courseNumber =0L
    var nickName =""
    var liveType =0
    var uid =""
    var eventTime =0L
    var min =0L
    var max =0L
    var join_time =""
    var leave_time =""
    var duration =0L
    var duration_time =""
    val iterator: Iterator[MinMaxTemp] = elements.iterator
    if (iterator.hasNext) {
      val value: MinMaxTemp = iterator.next()
      id = value.id
      courseId= value.courseId
      partnerId = value.partnerId
      ip = value.ip
      customerId = value.customerId
      courseNumber = value.courseNumber
      nickName = value.nickName
      liveType = value.liveType
      uid = value.uid
      minTsState.update(value.mineventTime) //更新最小时间戳
      maxTsState.update(value.maxeventTime)  //更新最大时间戳
    }
    join_time = DateUtil.convertTimeStamp2DateStr(minTs,
DateUtil.SECOND_DATE_FORMAT)
    leave_time = DateUtil.convertTimeStamp2DateStr(maxTs,
DateUtil.SECOND_DATE_FORMAT)
    duration = (maxTs - minTs) / 1000  //停留多少秒
    duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
    minTsState.clear()
    maxTsState.clear()
	
    out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
join_time, leave_time, created_time, modified_time
      , liveType, plat_form, duration, duration_time,
network_operator, role, useragent, uid, eventTime))

    CloudliveWatcher(id, partnerId, courseId, customerId,
courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
join_time, leave_time, created_time, modified_time
      , liveType, plat_form, duration, duration_time,
network_operator, role, useragent, uid, eventTime)

  }
}





赵一旦 <hi...@gmail.com> 于2021年1月21日周四 下午8:38写道:

> 我表达的方法是按照session
> window将数据分窗口,实际就是把连续1min没数据的那部分数据给你分割好,然后这部分数据中的最大时间戳和最小时间戳的差就是你要的结果理论上。
>
> 实现的话就是用2个状态,分别保存最大最小时间戳,没进来一个数据,对比更新最大最小时间戳即可。
>
>
> 然后在窗口被触发的时候将结果输出。使用reduce+procesWindowFunction配合。reduce的时候只计算最大最小,触发的时候才基于2个状态计算duration输出结果。
>
> 赵一旦 <hi...@gmail.com> 于2021年1月21日周四 下午8:28写道:
>
> > 我其实没看懂你逻辑。这个和窗口的最大最小时间戳啥关系。
> >
> > 张锴 <zk...@gmail.com> 于2021年1月21日周四 下午6:25写道:
> >
> >> 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
> >>
> >>
> context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
> >> 下面是我的部分代码逻辑:
> >>
> >> val ds = dataStream
> >>   .filter(_.liveType == 1)
> >>   .keyBy(1, 2)
> >>   .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
> >>   .process(new myProcessWindow()).uid("process-id")
> >>
> >> class myProcessWindow() extends
> >> ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
> >> TimeWindow] {
> >>
> >>   override def process(key: Tuple, context: Context, elements:
> >> Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
> >> = {
> >>     var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
> >>     var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间
> >>
> >>     val currentDate = DateUtil.currentDate
> >>     val created_time = currentDate
> >>     val modified_time = currentDate
> >>      。。。
> >>
> >>     val join_time: String =
> >> DateUtil.convertTimeStamp2DateStr(startTime,
> >> DateUtil.SECOND_DATE_FORMAT)
> >>     val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
> >> DateUtil.SECOND_DATE_FORMAT)
> >>     val duration = (endTime - startTime) / 1000  //停留多少秒
> >>     val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
> >>     out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
> >> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> >> join_time, leave_time, created_time, modified_time
> >>       , liveType, plat_form, duration, duration_time,
> >> network_operator, role, useragent, uid, eventTime))
> >>
> >>     CloudliveWatcher(id, partnerId, courseId, customerId,
> >> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> >> join_time, leave_time, created_time, modified_time
> >>       , liveType, plat_form, duration, duration_time,
> >> network_operator, role, useragent, uid, eventTime)
> >>
> >> }
> >>
> >>
> >> 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?
> >>
> >>
> >>
> >>
> >> 赵一旦 <hi...@gmail.com> 于2020年12月28日周一 下午7:12写道:
> >>
> >> > 按直播间ID和用户ID分组,使用session
> >> window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
> >> >
> >> > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
> >> >
> >> > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
> >> >
> >> >
> >> > 张锴 <zk...@gmail.com> 于2020年12月28日周一 下午5:35写道:
> >> >
> >> > > 能描述一下用session window的考虑吗
> >> > >
> >> > > Akisaya <ak...@gmail.com> 于2020年12月28日周一 下午5:00写道:
> >> > >
> >> > > > 这个可以用 session window 吧
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
> >> > > >
> >> > > > news_365@163.com <ne...@163.com> 于2020年12月28日周一 下午2:15写道:
> >> > > >
> >> > > > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > news_365@163.com
> >> > > > >
> >> > > > > 发件人: 张锴
> >> > > > > 发送时间: 2020-12-28 13:35
> >> > > > > 收件人: user-zh
> >> > > > > 主题: 根据业务需求选择合适的flink state
> >> > > > > 各位大佬帮我分析下如下需求应该怎么写
> >> > > > >
> >> > > > > 需求说明:
> >> > > > >
> >> > >
> >>
> 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
> >> > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
> >> > > > >
> >> > > > > 我的想法:
> >> > > > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event
> >> Time中的分钟数
> >> > > > >
> 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
> >> > > > >
> >> > > > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
> >> > > > >
> >> > > > > flink 版本1.10.1
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
>

Re: 根据业务需求选择合适的flink state

Posted by 赵一旦 <hi...@gmail.com>.
我表达的方法是按照session
window将数据分窗口,实际就是把连续1min没数据的那部分数据给你分割好,然后这部分数据中的最大时间戳和最小时间戳的差就是你要的结果理论上。

实现的话就是用2个状态,分别保存最大最小时间戳,没进来一个数据,对比更新最大最小时间戳即可。

然后在窗口被触发的时候将结果输出。使用reduce+procesWindowFunction配合。reduce的时候只计算最大最小,触发的时候才基于2个状态计算duration输出结果。

赵一旦 <hi...@gmail.com> 于2021年1月21日周四 下午8:28写道:

> 我其实没看懂你逻辑。这个和窗口的最大最小时间戳啥关系。
>
> 张锴 <zk...@gmail.com> 于2021年1月21日周四 下午6:25写道:
>
>> 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
>>
>> context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
>> 下面是我的部分代码逻辑:
>>
>> val ds = dataStream
>>   .filter(_.liveType == 1)
>>   .keyBy(1, 2)
>>   .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
>>   .process(new myProcessWindow()).uid("process-id")
>>
>> class myProcessWindow() extends
>> ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
>> TimeWindow] {
>>
>>   override def process(key: Tuple, context: Context, elements:
>> Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
>> = {
>>     var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
>>     var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间
>>
>>     val currentDate = DateUtil.currentDate
>>     val created_time = currentDate
>>     val modified_time = currentDate
>>      。。。
>>
>>     val join_time: String =
>> DateUtil.convertTimeStamp2DateStr(startTime,
>> DateUtil.SECOND_DATE_FORMAT)
>>     val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
>> DateUtil.SECOND_DATE_FORMAT)
>>     val duration = (endTime - startTime) / 1000  //停留多少秒
>>     val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
>>     out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
>> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
>> join_time, leave_time, created_time, modified_time
>>       , liveType, plat_form, duration, duration_time,
>> network_operator, role, useragent, uid, eventTime))
>>
>>     CloudliveWatcher(id, partnerId, courseId, customerId,
>> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
>> join_time, leave_time, created_time, modified_time
>>       , liveType, plat_form, duration, duration_time,
>> network_operator, role, useragent, uid, eventTime)
>>
>> }
>>
>>
>> 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?
>>
>>
>>
>>
>> 赵一旦 <hi...@gmail.com> 于2020年12月28日周一 下午7:12写道:
>>
>> > 按直播间ID和用户ID分组,使用session
>> window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
>> >
>> > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
>> >
>> > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
>> >
>> >
>> > 张锴 <zk...@gmail.com> 于2020年12月28日周一 下午5:35写道:
>> >
>> > > 能描述一下用session window的考虑吗
>> > >
>> > > Akisaya <ak...@gmail.com> 于2020年12月28日周一 下午5:00写道:
>> > >
>> > > > 这个可以用 session window 吧
>> > > >
>> > > >
>> > >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
>> > > >
>> > > > news_365@163.com <ne...@163.com> 于2020年12月28日周一 下午2:15写道:
>> > > >
>> > > > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
>> > > > >
>> > > > >
>> > > > >
>> > > > > news_365@163.com
>> > > > >
>> > > > > 发件人: 张锴
>> > > > > 发送时间: 2020-12-28 13:35
>> > > > > 收件人: user-zh
>> > > > > 主题: 根据业务需求选择合适的flink state
>> > > > > 各位大佬帮我分析下如下需求应该怎么写
>> > > > >
>> > > > > 需求说明:
>> > > > >
>> > >
>> 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
>> > > > >
>> > > > > 我的想法:
>> > > > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event
>> Time中的分钟数
>> > > > > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
>> > > > >
>> > > > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
>> > > > >
>> > > > > flink 版本1.10.1
>> > > > >
>> > > >
>> > >
>> >
>>
>

Re: 根据业务需求选择合适的flink state

Posted by 赵一旦 <hi...@gmail.com>.
我其实没看懂你逻辑。这个和窗口的最大最小时间戳啥关系。

张锴 <zk...@gmail.com> 于2021年1月21日周四 下午6:25写道:

> 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
>
> context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
> 下面是我的部分代码逻辑:
>
> val ds = dataStream
>   .filter(_.liveType == 1)
>   .keyBy(1, 2)
>   .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
>   .process(new myProcessWindow()).uid("process-id")
>
> class myProcessWindow() extends
> ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
> TimeWindow] {
>
>   override def process(key: Tuple, context: Context, elements:
> Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
> = {
>     var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
>     var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间
>
>     val currentDate = DateUtil.currentDate
>     val created_time = currentDate
>     val modified_time = currentDate
>      。。。
>
>     val join_time: String =
> DateUtil.convertTimeStamp2DateStr(startTime,
> DateUtil.SECOND_DATE_FORMAT)
>     val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
> DateUtil.SECOND_DATE_FORMAT)
>     val duration = (endTime - startTime) / 1000  //停留多少秒
>     val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
>     out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> join_time, leave_time, created_time, modified_time
>       , liveType, plat_form, duration, duration_time,
> network_operator, role, useragent, uid, eventTime))
>
>     CloudliveWatcher(id, partnerId, courseId, customerId,
> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> join_time, leave_time, created_time, modified_time
>       , liveType, plat_form, duration, duration_time,
> network_operator, role, useragent, uid, eventTime)
>
> }
>
>
> 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?
>
>
>
>
> 赵一旦 <hi...@gmail.com> 于2020年12月28日周一 下午7:12写道:
>
> > 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
> >
> > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
> >
> > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
> >
> >
> > 张锴 <zk...@gmail.com> 于2020年12月28日周一 下午5:35写道:
> >
> > > 能描述一下用session window的考虑吗
> > >
> > > Akisaya <ak...@gmail.com> 于2020年12月28日周一 下午5:00写道:
> > >
> > > > 这个可以用 session window 吧
> > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
> > > >
> > > > news_365@163.com <ne...@163.com> 于2020年12月28日周一 下午2:15写道:
> > > >
> > > > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
> > > > >
> > > > >
> > > > >
> > > > > news_365@163.com
> > > > >
> > > > > 发件人: 张锴
> > > > > 发送时间: 2020-12-28 13:35
> > > > > 收件人: user-zh
> > > > > 主题: 根据业务需求选择合适的flink state
> > > > > 各位大佬帮我分析下如下需求应该怎么写
> > > > >
> > > > > 需求说明:
> > > > >
> > >
> 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
> > > > >
> > > > >
> > > >
> > >
> >
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
> > > > >
> > > > > 我的想法:
> > > > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event
> Time中的分钟数
> > > > > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
> > > > >
> > > > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
> > > > >
> > > > > flink 版本1.10.1
> > > > >
> > > >
> > >
> >
>