You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Satyam Shekhar <sa...@gmail.com> on 2020/09/01 20:55:33 UTC

Re: Editing Rowtime for SQL Table

Thanks for your replies Matthias and Timo.

Converting the Table to DataStream, assigning a new Watermark & Rowtime
attribute, and converting back to Table makes sense. One challenge with
that approach is that Table to DataStream conversion could emit retractable
data stream, however, I think, that can now be handled with the new
TableSource API (in 1.11) that allows TableSource to emit retractions.

I'll try this approach when I migrate to the new API and report back.

Regards,
Satyam

On Tue, Sep 1, 2020 at 4:46 AM Timo Walther <ti...@ververica.com> wrote:

> Hi Satyam,
>
> Matthias is right. A rowtime attribute cannot be modified and needs to be
> passed "as is" through the pipeline. The only exceptions are if a newer
> rowtime is offered such as `TUMBLE_ROWTIME` or `MATCH_ROWTIME`. In your
> case, you need to define utime as the time attribute. If this is not
> possible, you either express the computation in regular SQL (with
> non-streaming optimizations) or you go to DataStream API prepare the table
> (assign new watermark and StreamRecord timestamp there) and go back to
> Table API.
>
> I hope this helps.
>
> Regards,
> Timo
>
> On Tue, Sep 1, 2020 at 11:40 AM Matthias Pohl <ma...@ververica.com>
> wrote:
>
>> Hi Satyam,
>> Thanks for your post. Unfortunately, it looks like you cannot change the
>> rowtime column here. The rowtime is strongly coupled with the Watermarks
>> feature. By changing the rowtime column we cannot ensure that the
>> watermarks are still aligned as Fabian mentioned in [1].
>>
>> @Timo Walther <ti...@ververica.com> : Could you verify my findings?
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://stackoverflow.com/questions/52784089/flink-table-sql-api-modify-rowtime-attribute-after-session-window-aggregation
>>
>> On Mon, Aug 31, 2020 at 6:44 PM Satyam Shekhar <sa...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I use Flink for continuous evaluation of SQL queries on streaming data.
>>> One of the use cases requires us to run recursive SQL queries. I am unable
>>> to find a way to edit rowtime time attribute of the intermediate result
>>> table.
>>>
>>> For example, let's assume that there is a table T0 with schema -
>>> root
>>>  |-- str1: STRING
>>>  |-- int1: BIGINT
>>>  |-- utime: TIMESTAMP(3)
>>>  |-- itime: TIMESTAMP(3) *ROWTIME*
>>>
>>> Now, let's create a view V0 -
>>> var V0 = tEnv_.sqlQuery("select str1, int1, utime, itime from T0");
>>>
>>> I wish to change the rowtime of V0 from itime to utime. I tried doing -
>>>
>>> V0 = V0.addOrReplaceColumns($("utime").as("utime").rowtime());
>>>
>>> but ran into the following exception -
>>>
>>> org.apache.flink.table.api.ValidationException: Window properties can
>>> only be used on windowed tables.
>>> at
>>> org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:854)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:843)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.table.operations.utils.OperationTreeBuilder.lambda$project$1(OperationTreeBuilder.java:158)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at java.base/java.util.ArrayList.forEach(ArrayList.java:1540) ~[na:na]
>>> at
>>> org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:158)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.table.operations.utils.OperationTreeBuilder.addColumns(OperationTreeBuilder.java:207)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.table.api.internal.TableImpl.addColumnsOperation(TableImpl.java:475)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.table.api.internal.TableImpl.addOrReplaceColumns(TableImpl.java:459)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>>
>>> Any guidance on how to address this?
>>>
>>> Regards,
>>> Satyam
>>>
>>
>>
>> --
>>
>> Matthias Pohl | Engineer
>>
>> Follow us @VervericaData Ververica <https://www.ververica.com/>
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
>> Wehner
>>
>
>
> --
>
> Timo Walther | Software Engineer
>
> <https://data-artisans.com/>
>
>
> <https://www.ververica.com/>
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
> Wehner
>

Re: Editing Rowtime for SQL Table

Posted by Timo Walther <tw...@apache.org>.
Yes, the new TableSource API allows to emit retractions. However, it 
does not give you direct access to DataStream API.

FLIP-136 [1] might help you in the near future. We hope it can be part 
of 1.12.

Regards,
Timo

[1] 
https://lists.apache.org/thread.html/r62b47ec6812ddbafed65ac79e31ca0305099893559f1e5a991dee550%40%3Cdev.flink.apache.org%3E

On 01.09.20 22:55, Satyam Shekhar wrote:
> Thanks for your replies Matthias and Timo.
> 
> Converting the Table to DataStream, assigning a new Watermark & Rowtime 
> attribute, and converting back to Table makes sense. One challenge with 
> that approach is that Table to DataStream conversion could emit 
> retractable data stream, however, I think, that can now be handled with 
> the new TableSource API (in 1.11) that allows TableSource to emit 
> retractions.
> 
> I'll try this approach when I migrate to the new API and report back.
> 
> Regards,
> Satyam
> 
> On Tue, Sep 1, 2020 at 4:46 AM Timo Walther <timo@ververica.com 
> <ma...@ververica.com>> wrote:
> 
>     Hi Satyam,
> 
>     Matthias is right. A rowtime attribute cannot be modified and needs
>     to be passed "as is" through the pipeline. The only exceptions are
>     if a newer rowtime is offered such as `TUMBLE_ROWTIME` or
>     `MATCH_ROWTIME`. In your case, you need to define utime as the time
>     attribute. If this is not possible, you either express the
>     computation in regular SQL (with non-streaming optimizations) or you
>     go to DataStream API prepare the table (assign new watermark and
>     StreamRecord timestamp there) and go back to Table API.
> 
>     I hope this helps.
> 
>     Regards,
>     Timo
> 
>     On Tue, Sep 1, 2020 at 11:40 AM Matthias Pohl
>     <matthias@ververica.com <ma...@ververica.com>> wrote:
> 
>         Hi Satyam,
>         Thanks for your post. Unfortunately, it looks like you cannot
>         change the rowtime column here. The rowtime is strongly coupled
>         with the Watermarks feature. By changing the rowtime column we
>         cannot ensure that the watermarks are still aligned as Fabian
>         mentioned in [1].
> 
>         @Timo Walther <ma...@ververica.com> : Could you verify my
>         findings?
> 
>         Best,
>         Matthias
> 
>         [1]
>         https://stackoverflow.com/questions/52784089/flink-table-sql-api-modify-rowtime-attribute-after-session-window-aggregation
> 
>         On Mon, Aug 31, 2020 at 6:44 PM Satyam Shekhar
>         <satyamshekhar@gmail.com <ma...@gmail.com>> wrote:
> 
>             Hello,
> 
>             I use Flink for continuous evaluation of SQL queries on
>             streaming data. One of the use cases requires us to run
>             recursive SQL queries. I am unable to find a way to edit
>             rowtime time attribute of the intermediate result table.
> 
>             For example, let's assume that there is a table T0 with schema -
>             root
>               |-- str1: STRING
>               |-- int1: BIGINT
>               |-- utime: TIMESTAMP(3)
>               |-- itime: TIMESTAMP(3) *ROWTIME*
> 
>             Now, let's create a view V0 -
>             var V0 = tEnv_.sqlQuery("select str1, int1, utime, itime
>             from T0");
> 
>             I wish to change the rowtime of V0 from itime to utime. I
>             tried doing -
> 
>             V0 = V0.addOrReplaceColumns($("utime").as("utime").rowtime());
> 
>             but ran into the following exception -
> 
>             org.apache.flink.table.api.ValidationException: Window
>             properties can only be used on windowed tables.
>             at
>             org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:854)
>             ~[flink-table-api-java-1.11.1.jar:1.11.1]
>             at
>             org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:843)
>             ~[flink-table-api-java-1.11.1.jar:1.11.1]
>             at
>             org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
>             ~[flink-table-api-java-1.11.1.jar:1.11.1]
>             at
>             org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
>             ~[flink-table-api-java-1.11.1.jar:1.11.1]
>             at
>             org.apache.flink.table.operations.utils.OperationTreeBuilder.lambda$project$1(OperationTreeBuilder.java:158)
>             ~[flink-table-api-java-1.11.1.jar:1.11.1]
>             at
>             java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
>             ~[na:na]
>             at
>             org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:158)
>             ~[flink-table-api-java-1.11.1.jar:1.11.1]
>             at
>             org.apache.flink.table.operations.utils.OperationTreeBuilder.addColumns(OperationTreeBuilder.java:207)
>             ~[flink-table-api-java-1.11.1.jar:1.11.1]
>             at
>             org.apache.flink.table.api.internal.TableImpl.addColumnsOperation(TableImpl.java:475)
>             ~[flink-table-api-java-1.11.1.jar:1.11.1]
>             at
>             org.apache.flink.table.api.internal.TableImpl.addOrReplaceColumns(TableImpl.java:459)
>             ~[flink-table-api-java-1.11.1.jar:1.11.1]
> 
>             Any guidance on how to address this?
> 
>             Regards,
>             Satyam
> 
> 
> 
>         -- 
> 
>         Matthias Pohl| Engineer
> 
> 
>         Follow us @VervericaData Ververica <https://www.ververica.com/>
> 
>         --
> 
>         Join Flink Forward <https://flink-forward.org/>- The Apache
>         FlinkConference
> 
>         Stream Processing | Event Driven | Real Time
> 
>         --
> 
>         Ververica GmbH| Invalidenstrasse 115, 10115 Berlin, Germany
> 
>         --
> 
>         Ververica GmbH
>         Registered at Amtsgericht Charlottenburg: HRB 158244 B
>         Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang,
>         Karl Anton Wehner
> 
> 
> 
>     -- 
> 
>     Timo Walther| Software Engineer
> 
>     <https://data-artisans.com/>
> 
> 
>     <https://www.ververica.com/>
> 
> 
>     Follow us @VervericaData
> 
>     --
> 
>     Join Flink Forward <https://flink-forward.org/>- The Apache
>     FlinkConference
> 
>     Stream Processing | Event Driven | Real Time
> 
>     --
> 
>     Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> 
>     --
> 
>     Ververica GmbH
>     Registered at Amtsgericht Charlottenburg: HRB 158244 B
>     Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl
>     Anton Wehner
>