You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Fabian Hueske (JIRA)" <ji...@apache.org> on 2017/07/18 08:28:00 UTC

[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API

    [ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16091276#comment-16091276 ] 

Fabian Hueske commented on FLINK-6233:
--------------------------------------

Hi [~Yuhong_kyo], are you still planning to work on this issue?

I'm asking because we need a good design for it. 
One challenge is the handling and definition of the output timestamp. Since a row can only have a single timestamp, the user must specify the chosen timestamp in the projection.
This is also tricky, because we need to maintain the watermarks according to the chosen timestamp. This means we have to hold back watermarks in order to prevent to emit late data.
ProcessFunction does not support holding back of watermark and therefore, we need to implement the join as a low-level custom operator.

I'm happy to discuss details with you.
Thanks, Fabian

> Support rowtime inner equi-join between two streams in the SQL API
> ------------------------------------------------------------------
>
>                 Key: FLINK-6233
>                 URL: https://issues.apache.org/jira/browse/FLINK-6233
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: hongyuhong
>            Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR}} only can use rowtime that is a system attribute, the time condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support unbounded like {{o.rowtime &lt; s.rowtime}} ,  and  should include both two stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + 1}} should also not be supported.
> An row-time streams join will not be able to handle late data, because this would mean in insert a row into a sorted order shift all other computations. This would be too expensive to maintain. Therefore, we will throw an error if a user tries to use an row-time stream join with late data handling.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)