You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Robin Zhang <vi...@outlook.com> on 2020/09/29 06:03:56 UTC

如何在流式数据源上使用分析函数LAG和EAD函数

环境: flink 1.10,使用flinkSQL

kafka输入数据如:
{"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0}
{"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0}
{"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0}
{"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0}
{"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0}
{"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0}

sql如下:

INSERT INTO topic_sink
SELECT
  t,
  id,
  speed,
  LAG(speed, 1) OVER w AS speed_1,
  LAG(speed, 2) OVER w AS speed_2
FROM topic_source
WINDOW w AS (
      PARTITION BY id
      ORDER BY t
)
我期望得到的结果数据是
{"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":null,
"speed_2":null}
{"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":1.0,
"speed_2":null}
{"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":2.0,
"speed_2":1.0}
{"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":3.0,
"speed_2":2.0}
{"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":4.0,
"speed_2":3.0}
{"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":5.0,
"speed_2":4.0}

实际得到的结果数据是:
{"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":1.0,
"speed_2":1.0}
{"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":2.0,
"speed_2":2.0}
{"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":3.0,
"speed_2":3.0}
{"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":4.0,
"speed_2":4.0}
{"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":5.0,
"speed_2":5.0}
{"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":6.0,
"speed_2":6.0}

想问一下flink sql里的LAG函数能完成我期望的计算吗?如果可以sql该如何写?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 如何在流式数据源上使用分析函数LAG和EAD函数

Posted by Robin Zhang <vi...@outlook.com>.
Hi Benchao,
     
     感谢回复,解决了我最近的疑惑。

Best,
Robin


Benchao Li-2 wrote
> Hi Robin,
> 
> 目前LAG/LEAD函数在流式场景下的实现的确是有bug的,那个实现只能在批式场景下work,
> 是线上其实没有考虑流式的场景。所以你看到的结果应该是它只能返回当前数据。
> 这个问题我也是最近才发现的,刚刚建了一个issue[1] 来跟踪这个问题。
> 当前如果你想实现类似功能,可以先自己写一个udaf来做。
> 
> [1] https://issues.apache.org/jira/browse/FLINK-19449
> 
> Robin Zhang &lt;

> vincent2015qdlg@

> &gt; 于2020年9月29日周二 下午2:04写道:
> 
>> 环境: flink 1.10,使用flinkSQL
>>
>> kafka输入数据如:
>> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0}
>> {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0}
>> {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0}
>> {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0}
>> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0}
>> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0}
>>
>> sql如下:
>>
>> INSERT INTO topic_sink
>> SELECT
>>   t,
>>   id,
>>   speed,
>>   LAG(speed, 1) OVER w AS speed_1,
>>   LAG(speed, 2) OVER w AS speed_2
>> FROM topic_source
>> WINDOW w AS (
>>       PARTITION BY id
>>       ORDER BY t
>> )
>> 我期望得到的结果数据是
>> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":null,
>> "speed_2":null}
>> {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":1.0,
>> "speed_2":null}
>> {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":2.0,
>> "speed_2":1.0}
>> {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":3.0,
>> "speed_2":2.0}
>> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":4.0,
>> "speed_2":3.0}
>> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":5.0,
>> "speed_2":4.0}
>>
>> 实际得到的结果数据是:
>> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":1.0,
>> "speed_2":1.0}
>> {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":2.0,
>> "speed_2":2.0}
>> {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":3.0,
>> "speed_2":3.0}
>> {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":4.0,
>> "speed_2":4.0}
>> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":5.0,
>> "speed_2":5.0}
>> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":6.0,
>> "speed_2":6.0}
>>
>> 想问一下flink sql里的LAG函数能完成我期望的计算吗?如果可以sql该如何写?
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
> 
> 
> -- 
> 
> Best,
> Benchao Li





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 如何在流式数据源上使用分析函数LAG和EAD函数

Posted by Benchao Li <li...@apache.org>.
Hi Robin,

目前LAG/LEAD函数在流式场景下的实现的确是有bug的,那个实现只能在批式场景下work,
是线上其实没有考虑流式的场景。所以你看到的结果应该是它只能返回当前数据。
这个问题我也是最近才发现的,刚刚建了一个issue[1] 来跟踪这个问题。
当前如果你想实现类似功能,可以先自己写一个udaf来做。

[1] https://issues.apache.org/jira/browse/FLINK-19449

Robin Zhang <vi...@outlook.com> 于2020年9月29日周二 下午2:04写道:

> 环境: flink 1.10,使用flinkSQL
>
> kafka输入数据如:
> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0}
> {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0}
> {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0}
> {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0}
> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0}
> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0}
>
> sql如下:
>
> INSERT INTO topic_sink
> SELECT
>   t,
>   id,
>   speed,
>   LAG(speed, 1) OVER w AS speed_1,
>   LAG(speed, 2) OVER w AS speed_2
> FROM topic_source
> WINDOW w AS (
>       PARTITION BY id
>       ORDER BY t
> )
> 我期望得到的结果数据是
> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":null,
> "speed_2":null}
> {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":1.0,
> "speed_2":null}
> {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":2.0,
> "speed_2":1.0}
> {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":3.0,
> "speed_2":2.0}
> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":4.0,
> "speed_2":3.0}
> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":5.0,
> "speed_2":4.0}
>
> 实际得到的结果数据是:
> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":1.0,
> "speed_2":1.0}
> {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":2.0,
> "speed_2":2.0}
> {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":3.0,
> "speed_2":3.0}
> {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":4.0,
> "speed_2":4.0}
> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":5.0,
> "speed_2":5.0}
> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":6.0,
> "speed_2":6.0}
>
> 想问一下flink sql里的LAG函数能完成我期望的计算吗?如果可以sql该如何写?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li