You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Satyam Shekhar <sa...@gmail.com> on 2020/09/01 20:55:33 UTC
Re: Editing Rowtime for SQL Table
Thanks for your replies Matthias and Timo.
Converting the Table to DataStream, assigning a new Watermark & Rowtime
attribute, and converting back to Table makes sense. One challenge with
that approach is that Table to DataStream conversion could emit retractable
data stream, however, I think, that can now be handled with the new
TableSource API (in 1.11) that allows TableSource to emit retractions.
I'll try this approach when I migrate to the new API and report back.
Regards,
Satyam
On Tue, Sep 1, 2020 at 4:46 AM Timo Walther <ti...@ververica.com> wrote:
> Hi Satyam,
>
> Matthias is right. A rowtime attribute cannot be modified and needs to be
> passed "as is" through the pipeline. The only exceptions are if a newer
> rowtime is offered such as `TUMBLE_ROWTIME` or `MATCH_ROWTIME`. In your
> case, you need to define utime as the time attribute. If this is not
> possible, you either express the computation in regular SQL (with
> non-streaming optimizations) or you go to DataStream API prepare the table
> (assign new watermark and StreamRecord timestamp there) and go back to
> Table API.
>
> I hope this helps.
>
> Regards,
> Timo
>
> On Tue, Sep 1, 2020 at 11:40 AM Matthias Pohl <ma...@ververica.com>
> wrote:
>
>> Hi Satyam,
>> Thanks for your post. Unfortunately, it looks like you cannot change the
>> rowtime column here. The rowtime is strongly coupled with the Watermarks
>> feature. By changing the rowtime column we cannot ensure that the
>> watermarks are still aligned as Fabian mentioned in [1].
>>
>> @Timo Walther <ti...@ververica.com> : Could you verify my findings?
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://stackoverflow.com/questions/52784089/flink-table-sql-api-modify-rowtime-attribute-after-session-window-aggregation
>>
>> On Mon, Aug 31, 2020 at 6:44 PM Satyam Shekhar <sa...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I use Flink for continuous evaluation of SQL queries on streaming data.
>>> One of the use cases requires us to run recursive SQL queries. I am unable
>>> to find a way to edit rowtime time attribute of the intermediate result
>>> table.
>>>
>>> For example, let's assume that there is a table T0 with schema -
>>> root
>>> |-- str1: STRING
>>> |-- int1: BIGINT
>>> |-- utime: TIMESTAMP(3)
>>> |-- itime: TIMESTAMP(3) *ROWTIME*
>>>
>>> Now, let's create a view V0 -
>>> var V0 = tEnv_.sqlQuery("select str1, int1, utime, itime from T0");
>>>
>>> I wish to change the rowtime of V0 from itime to utime. I tried doing -
>>>
>>> V0 = V0.addOrReplaceColumns($("utime").as("utime").rowtime());
>>>
>>> but ran into the following exception -
>>>
>>> org.apache.flink.table.api.ValidationException: Window properties can
>>> only be used on windowed tables.
>>> at
>>> org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:854)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:843)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.table.operations.utils.OperationTreeBuilder.lambda$project$1(OperationTreeBuilder.java:158)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at java.base/java.util.ArrayList.forEach(ArrayList.java:1540) ~[na:na]
>>> at
>>> org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:158)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.table.operations.utils.OperationTreeBuilder.addColumns(OperationTreeBuilder.java:207)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.table.api.internal.TableImpl.addColumnsOperation(TableImpl.java:475)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.table.api.internal.TableImpl.addOrReplaceColumns(TableImpl.java:459)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>>
>>> Any guidance on how to address this?
>>>
>>> Regards,
>>> Satyam
>>>
>>
>>
>> --
>>
>> Matthias Pohl | Engineer
>>
>> Follow us @VervericaData Ververica <https://www.ververica.com/>
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
>> Wehner
>>
>
>
> --
>
> Timo Walther | Software Engineer
>
> <https://data-artisans.com/>
>
>
> <https://www.ververica.com/>
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
> Wehner
>
Re: Editing Rowtime for SQL Table
Posted by Timo Walther <tw...@apache.org>.
Yes, the new TableSource API allows to emit retractions. However, it
does not give you direct access to DataStream API.
FLIP-136 [1] might help you in the near future. We hope it can be part
of 1.12.
Regards,
Timo
[1]
https://lists.apache.org/thread.html/r62b47ec6812ddbafed65ac79e31ca0305099893559f1e5a991dee550%40%3Cdev.flink.apache.org%3E
On 01.09.20 22:55, Satyam Shekhar wrote:
> Thanks for your replies Matthias and Timo.
>
> Converting the Table to DataStream, assigning a new Watermark & Rowtime
> attribute, and converting back to Table makes sense. One challenge with
> that approach is that Table to DataStream conversion could emit
> retractable data stream, however, I think, that can now be handled with
> the new TableSource API (in 1.11) that allows TableSource to emit
> retractions.
>
> I'll try this approach when I migrate to the new API and report back.
>
> Regards,
> Satyam
>
> On Tue, Sep 1, 2020 at 4:46 AM Timo Walther <timo@ververica.com
> <ma...@ververica.com>> wrote:
>
> Hi Satyam,
>
> Matthias is right. A rowtime attribute cannot be modified and needs
> to be passed "as is" through the pipeline. The only exceptions are
> if a newer rowtime is offered such as `TUMBLE_ROWTIME` or
> `MATCH_ROWTIME`. In your case, you need to define utime as the time
> attribute. If this is not possible, you either express the
> computation in regular SQL (with non-streaming optimizations) or you
> go to DataStream API prepare the table (assign new watermark and
> StreamRecord timestamp there) and go back to Table API.
>
> I hope this helps.
>
> Regards,
> Timo
>
> On Tue, Sep 1, 2020 at 11:40 AM Matthias Pohl
> <matthias@ververica.com <ma...@ververica.com>> wrote:
>
> Hi Satyam,
> Thanks for your post. Unfortunately, it looks like you cannot
> change the rowtime column here. The rowtime is strongly coupled
> with the Watermarks feature. By changing the rowtime column we
> cannot ensure that the watermarks are still aligned as Fabian
> mentioned in [1].
>
> @Timo Walther <ma...@ververica.com> : Could you verify my
> findings?
>
> Best,
> Matthias
>
> [1]
> https://stackoverflow.com/questions/52784089/flink-table-sql-api-modify-rowtime-attribute-after-session-window-aggregation
>
> On Mon, Aug 31, 2020 at 6:44 PM Satyam Shekhar
> <satyamshekhar@gmail.com <ma...@gmail.com>> wrote:
>
> Hello,
>
> I use Flink for continuous evaluation of SQL queries on
> streaming data. One of the use cases requires us to run
> recursive SQL queries. I am unable to find a way to edit
> rowtime time attribute of the intermediate result table.
>
> For example, let's assume that there is a table T0 with schema -
> root
> |-- str1: STRING
> |-- int1: BIGINT
> |-- utime: TIMESTAMP(3)
> |-- itime: TIMESTAMP(3) *ROWTIME*
>
> Now, let's create a view V0 -
> var V0 = tEnv_.sqlQuery("select str1, int1, utime, itime
> from T0");
>
> I wish to change the rowtime of V0 from itime to utime. I
> tried doing -
>
> V0 = V0.addOrReplaceColumns($("utime").as("utime").rowtime());
>
> but ran into the following exception -
>
> org.apache.flink.table.api.ValidationException: Window
> properties can only be used on windowed tables.
> at
> org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:854)
> ~[flink-table-api-java-1.11.1.jar:1.11.1]
> at
> org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:843)
> ~[flink-table-api-java-1.11.1.jar:1.11.1]
> at
> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
> ~[flink-table-api-java-1.11.1.jar:1.11.1]
> at
> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
> ~[flink-table-api-java-1.11.1.jar:1.11.1]
> at
> org.apache.flink.table.operations.utils.OperationTreeBuilder.lambda$project$1(OperationTreeBuilder.java:158)
> ~[flink-table-api-java-1.11.1.jar:1.11.1]
> at
> java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
> ~[na:na]
> at
> org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:158)
> ~[flink-table-api-java-1.11.1.jar:1.11.1]
> at
> org.apache.flink.table.operations.utils.OperationTreeBuilder.addColumns(OperationTreeBuilder.java:207)
> ~[flink-table-api-java-1.11.1.jar:1.11.1]
> at
> org.apache.flink.table.api.internal.TableImpl.addColumnsOperation(TableImpl.java:475)
> ~[flink-table-api-java-1.11.1.jar:1.11.1]
> at
> org.apache.flink.table.api.internal.TableImpl.addOrReplaceColumns(TableImpl.java:459)
> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>
> Any guidance on how to address this?
>
> Regards,
> Satyam
>
>
>
> --
>
> Matthias Pohl| Engineer
>
>
> Follow us @VervericaData Ververica <https://www.ververica.com/>
>
> --
>
> Join Flink Forward <https://flink-forward.org/>- The Apache
> FlinkConference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH| Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang,
> Karl Anton Wehner
>
>
>
> --
>
> Timo Walther| Software Engineer
>
> <https://data-artisans.com/>
>
>
> <https://www.ververica.com/>
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/>- The Apache
> FlinkConference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl
> Anton Wehner
>