You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Yan Zhou [FDS Science]" <yz...@coupang.com> on 2017/12/13 23:09:21 UTC

how does time-windowed join and Over-Window Aggregation implemented in flink SQL?

Hi,


I am building a data pipeline with a lot of streaming join and over window aggregation. And flink SQL have these feature supported. However, there is no similar DataStream APIs provided(maybe there is and I didn't find them. please point out if there is). I got confused because I assume that the SQL logical plan will be translated into a graph of operators or transformations.


Could someone explain how these two sql query are  implemented or translated into low level code ( operators or transformations)? I am asking this because I have implemented these features without using SQL and the performance looks good. And I certainly love to migrate to SQL, but I want to understand them well first. Any information or hints or links are appreciated.


  1.  Time-Windowed Join

The DataStream API only provides streaming join within same window. But the SQL API (time-windowed join) can join two streams within quite different time range. Below is an sample query that listed in official doc, and we can see that Orders and Shipments have 4 hours difference. Is it implemented by CoProcessFunction or TwoInputOperator which buffers the event for a certain period?


SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.orderId AND
      o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime

2. Over-Window Aggregation
There is no similar feature in DataStream API. How does this get implemented? Does it use keyed state to buffer the previous events, and pull the records when there is a need? How does sorting get handled?


Best
Yan




Re: how does time-windowed join and Over-Window Aggregation implemented in flink SQL?

Posted by "Yan Zhou [FDS Science]" <yz...@coupang.com>.
Thanks for the information.

Best
Yan

From: Xingcan Cui <xi...@gmail.com>
Date: Wednesday, December 13, 2017 at 6:02 PM
To: "Yan Zhou [FDS Science]" <yz...@coupang.com>
Cc: "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: how does time-windowed join and Over-Window Aggregation implemented in flink SQL?

Hi Yan Zhou,

as you may have noticed, the SQL level stream join was not built on top of some join APIs but was implemented with the low-level CoProcessFunction (see TimeBoundedStreamInnerJoin.scala<http://flink/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala>). The pipeline is generated in DataStreamWindowJoin.scala<https://github.com/apache/flink/blob/14bc62740e90ecefd34f9202f4a37c883c3122e5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala#L106>.

Regarding the over-window aggregation, most of the implementations can be found in this package<https://github.com/apache/flink/tree/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate>. The pipeline is generated in DataStreamOverAggregate.scala<https://github.com/apache/flink/blob/14bc62740e90ecefd34f9202f4a37c883c3122e5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala#L99>.

In summary, they use built-in state tools to cache the rows/intermediate results and clean/fire them when necessary.

Hope that helps.

Best,
Xingcan

On Thu, Dec 14, 2017 at 7:09 AM, Yan Zhou [FDS Science] <yz...@coupang.com>> wrote:

Hi,



I am building a data pipeline with a lot of streaming join and over window aggregation. And flink SQL have these feature supported. However, there is no similar DataStream APIs provided(maybe there is and I didn't find them. please point out if there is). I got confused because I assume that the SQL logical plan will be translated into a graph of operators or transformations.



Could someone explain how these two sql query are  implemented or translated into low level code ( operators or transformations)? I am asking this because I have implemented these features without using SQL and the performance looks good. And I certainly love to migrate to SQL, but I want to understand them well first. Any information or hints or links are appreciated.



  1.  Time-Windowed Join

The DataStream API only provides streaming join within same window. But the SQL API (time-windowed join) can join two streams within quite different time range. Below is an sample query that listed in official doc, and we can see that Orders and Shipments have 4 hours difference. Is it implemented by CoProcessFunction or TwoInputOperator which buffers the event for a certain period?



SELECT *

FROM Orders o, Shipments s

WHERE o.id = s.orderId AND

      o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime

2. Over-Window Aggregation
There is no similar feature in DataStream API. How does this get implemented? Does it use keyed state to buffer the previous events, and pull the records when there is a need? How does sorting get handled?


Best
Yan








Re: how does time-windowed join and Over-Window Aggregation implemented in flink SQL?

Posted by Xingcan Cui <xi...@gmail.com>.
Hi Yan Zhou,

as you may have noticed, the SQL level stream join was not built on top of
some join APIs but was implemented with the low-level CoProcessFunction
(see TimeBoundedStreamInnerJoin.scala
<http://flink/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala>).
The pipeline is generated in DataStreamWindowJoin.scala
<https://github.com/apache/flink/blob/14bc62740e90ecefd34f9202f4a37c883c3122e5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala#L106>
.

Regarding the over-window aggregation, most of the implementations can be
found in this package
<https://github.com/apache/flink/tree/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate>.
The pipeline is generated in DataStreamOverAggregate.scala
<https://github.com/apache/flink/blob/14bc62740e90ecefd34f9202f4a37c883c3122e5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala#L99>
.

In summary, they use built-in state tools to cache the rows/intermediate
results and clean/fire them when necessary.

Hope that helps.

Best,
Xingcan

On Thu, Dec 14, 2017 at 7:09 AM, Yan Zhou [FDS Science] <yz...@coupang.com>
wrote:

> Hi,
>
>
> I am building a data pipeline with a lot of streaming join and
> over window aggregation. And flink SQL have these feature supported. However,
> there is no similar DataStream APIs provided(maybe there is and I didn't
> find them. please point out if there is). I got confused because I assume
> that the SQL logical plan will be translated into a graph of operators
> or transformations.
>
>
> Could someone explain how these two sql query are  implemented or
> translated into low level code ( operators or transformations)? I am asking
> this because I have implemented these features without using SQL and the
> performance looks good. And I certainly love to migrate to SQL, but I want
> to understand them well first. Any information or hints or links are
> appreciated.
>
>
>
>    1. Time-Windowed Join
>
> The DataStream API only provides streaming join within same window. But
> the SQL API (time-windowed join) can join two streams within quite
> different time range. Below is an sample query that listed in official
> doc, and we can see that *Orders* and *Shipments *have 4 hours
> difference. Is it implemented by CoProcessFunction or TwoInputOperator
> which buffers the event for a certain period?
>
>
> SELECT *FROM Orders o, Shipments sWHERE o.id = s.orderId AND
>       o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
>
>
> 2. Over-Window Aggregation
> There is no similar feature in DataStream API. How does this get
> implemented? Does it use keyed state to buffer the previous events, and
> pull the records when there is a need? How does sorting get handled?
>
>
> Best
> Yan
>
>
>
>
>