You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "JING ZHANG (Jira)" <ji...@apache.org> on 2021/10/29 07:27:00 UTC

[jira] [Comment Edited] (FLINK-23919) PullUpWindowTableFunctionIntoWindowAggregateRule generates invalid Calc for Window TVF

    [ https://issues.apache.org/jira/browse/FLINK-23919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17435821#comment-17435821 ] 

JING ZHANG edited comment on FLINK-23919 at 10/29/21, 7:26 AM:
---------------------------------------------------------------

[~Yuval.Itzchakov], I reproduce the problem in my local environment. This is a bug.  cc [~jark]

> it is adding the rowtime column from the input row to the new calc without checking to see if there are any name collisions.

I agree with you.

>  I'm not entirely sure yet why the rowtime column of the input table is being added to the projected output row like that?

Because after transpose Calc and window TVF, the time attribute index may be changed. When construct new Window TVF, we should use new time attribute index.

 

Would you like to fix this bug?:)

If you don't have time, I would fix this bug later.

 

 


was (Author: qingru zhang):
[~Yuval.Itzchakov], I reproduce the problem in my local environment. This is a bug. 

> it is adding the rowtime column from the input row to the new calc without checking to see if there are any name collisions.

I agree with you.

>  I'm not entirely sure yet why the rowtime column of the input table is being added to the projected output row like that?

Because after transpose Calc and window TVF, the time attribute index may be changed. When construct new Window TVF, we should use new time attribute index.

 

Would you like to fix this bug?:)

If you don't have time, I would fix this bug later.

 

 

> PullUpWindowTableFunctionIntoWindowAggregateRule generates invalid Calc for Window TVF
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-23919
>                 URL: https://issues.apache.org/jira/browse/FLINK-23919
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.13.2
>            Reporter: Yuval Itzchakov
>            Priority: Major
>         Attachments: image-2021-08-23-13-31-24-052.png
>
>
> Given the following Window TVF:
> {code:java}
> SELECT window_time, 
>        MIN(alert_timestamp) as start_time, 
>        MAX(alert_timestamp) as end_time 
> FROM TABLE(TUMBLE(TABLE alert_table, DESCRIPTOR(alert_timestamp), INTERVAL '3' MINUTE)) 
> WHERE service_source = 'source' 
> GROUP BY window_start, window_end, window_time
> {code}
> Where the schema of alert_table is:
> {code:java}
> alert_timestamp: TIMESTAMP(3) ROWTIME INDICATOR
> service_source: VARCHAR{code}
> The following generates an invalid RowType:
> {code:java}
> Error while applying rule PullUpWindowTableFunctionIntoWindowAggregateRule, args [rel#358:StreamPhysicalWindowAggregate.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#356,window=TUMBLE(win_start=[window_start], win_end=[window_end], size=[3 min]),select=MIN(alert_timestamp) AS start_time, MAX(alert_timestamp) AS end_time, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time), rel#367:StreamPhysicalExchange.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE](input=RelSubset#355,distribution=single), rel#354:StreamPhysicalCalc.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#353,select=window_start, window_end, window_time, CAST(alert_timestamp) AS alert_timestamp,where==(service_source, _UTF-16LE'my source':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), rel#352:StreamPhysicalWindowTableFunction.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#351,window=TUMBLE(time_col=[alert_timestamp], size=[3 min]))]Error while applying rule PullUpWindowTableFunctionIntoWindowAggregateRule, args [rel#358:StreamPhysicalWindowAggregate.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#356,window=TUMBLE(win_start=[window_start], win_end=[window_end], size=[3 min]),select=MIN(alert_timestamp) AS start_time, MAX(alert_timestamp) AS end_time, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time), rel#367:StreamPhysicalExchange.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE](input=RelSubset#355,distribution=single), rel#354:StreamPhysicalCalc.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#353,select=window_start, window_end, window_time, CAST(alert_timestamp) AS alert_timestamp,where==(service_source, _UTF-16LE'Microsoft Defender for Identity':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), rel#352:StreamPhysicalWindowTableFunction.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#351,window=TUMBLE(time_col=[alert_timestamp], size=[3 min]))] at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256) at org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:69) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740) at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:99) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834)Caused by: java.lang.RuntimeException: Error occurred while applying rule PullUpWindowTableFunctionIntoWindowAggregateRule at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:161) at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:268) at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:283) at org.apache.flink.table.planner.plan.rules.physical.stream.PullUpWindowTableFunctionIntoWindowAggregateRule.onMatch(PullUpWindowTableFunctionIntoWindowAggregateRule.scala:143) at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:229) ... 31 moreCaused by: org.apache.flink.table.api.ValidationException: Field names must be unique. Found duplicates: [alert_timestamp] at org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:272) at org.apache.flink.table.types.logical.RowType.<init>(RowType.java:157) at org.apache.flink.table.types.logical.RowType.of(RowType.java:297) at org.apache.flink.table.types.logical.RowType.of(RowType.java:289) at org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:657) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.aggInfoList$lzycompute(StreamPhysicalWindowAggregate.scala:60) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.aggInfoList(StreamPhysicalWindowAggregate.scala:59) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.explainTerms(StreamPhysicalWindowAggregate.scala:86) at org.apache.calcite.rel.AbstractRelNode.getDigestItems(AbstractRelNode.java:409) at org.apache.calcite.rel.AbstractRelNode.deepHashCode(AbstractRelNode.java:391) at org.apache.calcite.rel.AbstractRelNode$InnerRelDigest.hashCode(AbstractRelNode.java:443) at java.base/java.util.HashMap.hash(HashMap.java:339) at java.base/java.util.HashMap.get(HashMap.java:552) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1150) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604) at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148)
> {code}
> Looking at the code, it seems that when PullUpWindowTableFunctionIntoWindowAggregateRule is building the new Calc in WindowUtil.buildNewProgramWithoutWindowColumns, it is adding the rowtime column from the input row to the new calc without checking to see if there are any name collisions. Also, TBH I'm not entirely sure yet why the rowtime column of the input table is being added to the projected output row like that?
> !image-2021-08-23-13-31-24-052.png|width=887,height=163!  
> [~jark] would appreciate your help with this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)