You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dan Hill <qu...@gmail.com> on 2020/12/15 17:47:13 UTC
Flink - Create Temporary View and "Rowtime attributes must not be in
the input rows of a regular join"
When I try to refactor my joins into a temporary view to share joins and
state, I get the following error. I tried a few variations of the code
snippets below (adding TIMESTAMP casts based on Google searches). I
removed a bunch of fields to simplify this example.
Is this a known issue? Do I have a simple coding bug?
CREATE TEMPORARY VIEW `flat_impression_view` AS
SELECT
DATE_FORMAT(input_impression.ts, 'yyyy-MM-dd') AS dt,
input_insertion.log_user_id AS insertion_log_user_id,
COALESCE(CAST(input_insertion.ts AS TIMESTAMP(3)), CAST(0 AS TIMESTAMP(3)))
AS insertion_ts,
input_insertion.insertion_id AS insertion_insertion_id,
COALESCE(CAST(input_impression.ts AS TIMESTAMP(3)), CAST(0 AS
TIMESTAMP(3))) AS impression_ts,
input_impression.impression_id AS impression_impression_id,
input_impression.insertion_id AS impression_insertion_id,
FROM input_insertion
JOIN input_impression
ON input_insertion.insertion_id = input_impression.insertion_id
AND CAST(input_insertion.ts AS TIMESTAMP) BETWEEN CAST(input_impression.ts
AS TIMESTAMP) - INTERVAL '12' HOUR AND CAST(input_impression.ts AS
TIMESTAMP) + INTERVAL '1' HOUR
INSERT INTO `flat_impression_w_click`
SELECT
dt,
insertion_log_user_id,
CAST(insertion_ts AS TIMESTAMP(3)) AS insertion_ts,
insertion_insertion_id,
CAST(impression_ts AS TIMESTAMP(3)) AS mpression_ts,
impression_impression_id,
impression_insertion_id,
COALESCE(CAST(input_click.ts AS TIMESTAMP(3)), CAST(0 AS TIMESTAMP(3))) AS
click_ts,
COALESCE(input_click.click_id, EmptyByteArray()) AS click_click_id,
COALESCE(input_click.impression_id, EmptyByteArray()) AS
click_impression_id,
FROM flat_impression_view
LEFT JOIN input_click
ON flat_impression_view.impression_impression_id = input_click.impression_id
AND CAST(flat_impression_view.impression_ts AS TIMESTAMP) BETWEEN
CAST(input_click.ts AS TIMESTAMP) - INTERVAL '12' HOUR AND
CAST(input_click.ts AS TIMESTAMP) + INTERVAL '12' HOUR
java.lang.RuntimeException: Failed to executeSql=...
...
Caused by: org.apache.flink.table.api.TableException: Cannot generate a
valid execution plan for the given query:
FlinkLogicalLegacySink(name=[...])
+- FlinkLogicalCalc(select=[...])
+- FlinkLogicalJoin(condition=[AND(=($36, $45),
>=(CAST($35):TIMESTAMP(6) NOT NULL, -(CAST($43):TIMESTAMP(6),
43200000:INTERVAL HOUR)), <=(CAST($35):TIMESTAMP(6) NOT NULL,
+(CAST($43):TIMESTAMP(6), 43200000:INTERVAL HOUR)))], joinType=[left])
:- FlinkLogicalCalc(select=[...])
: +- FlinkLogicalJoin(condition=[AND(=($5, $35),
>=(CAST($4):TIMESTAMP(6), -(CAST($33):TIMESTAMP(6), 43200000:INTERVAL
HOUR)), <=(CAST($4):TIMESTAMP(6), +(CAST($33):TIMESTAMP(6),
3600000:INTERVAL HOUR)))], joinType=[inner])
: :- FlinkLogicalDataStreamTableScan(table=[[default, mydb,
input_insertion]])
: +- FlinkLogicalDataStreamTableScan(table=[[default, mydb,
input_impression]])
+- FlinkLogicalDataStreamTableScan(table=[[default, mydb,
input_click]])
Rowtime attributes must not be in the input rows of a regular join. As a
workaround you can cast the time attributes of input tables to TIMESTAMP
before.
Please check the documentation for the set of currently supported SQL
features.
at
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80)
at
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
at
com.example.logprocessor.common.flink.TableEnvUtils.executeSql(TableEnvUtils.java:14)
... 24 more
Caused by: org.apache.flink.table.api.TableException: Rowtime attributes
must not be in the input rows of a regular join. As a workaround you can
cast the time attributes of input tables to TIMESTAMP before.
at
org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecJoinRule.matches(StreamExecJoinRule.scala:88)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:272)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:256)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1476)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1742)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91)
at
org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91)
at
org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:534)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
... 45 more
Re: Flink - Create Temporary View and "Rowtime attributes must not be
in the input rows of a regular join"
Posted by Timo Walther <tw...@apache.org>.
Hi Dan,
are you intending to use interval joins, regular joins, or a mixture of
both?
For regular joins you must ensure to cast a rowtime attribute to
timestamp as early as possible. For interval joins, you need to make
sure that the rowtime attribute is unmodified.
Currently, I see
COALESCE(CAST(input_insertion.ts AS TIMESTAMP(3)), CAST(0 AS
TIMESTAMP(3))) AS insertion_ts
or
CAST(flat_impression_view.impression_ts AS TIMESTAMP)
which disables interval joins implicitly.
If you would like to keep the interval join properties, you need to do
the casting in a computed column during a CREATE TABLE statement. Before
declaring a watermark for it.
Regards,
Timo
On 15.12.20 18:47, Dan Hill wrote:
> When I try to refactor my joins into a temporary view to share joins and
> state, I get the following error. I tried a few variations of the code
> snippets below (adding TIMESTAMP casts based on Google searches). I
> removed a bunch of fields to simplify this example.
>
> Is this a known issue? Do I have a simple coding bug?
>
> CREATE TEMPORARY VIEW `flat_impression_view` AS
>
> SELECT
>
> DATE_FORMAT(input_impression.ts, 'yyyy-MM-dd') AS dt,
>
> input_insertion.log_user_id AS insertion_log_user_id,
>
> COALESCE(CAST(input_insertion.ts AS TIMESTAMP(3)), CAST(0 AS
> TIMESTAMP(3))) AS insertion_ts,
>
> input_insertion.insertion_id AS insertion_insertion_id,
>
> COALESCE(CAST(input_impression.ts AS TIMESTAMP(3)), CAST(0 AS
> TIMESTAMP(3))) AS impression_ts,
>
> input_impression.impression_id AS impression_impression_id,
>
> input_impression.insertion_id AS impression_insertion_id,
>
> FROM input_insertion
>
> JOIN input_impression
>
> ON input_insertion.insertion_id = input_impression.insertion_id
>
> AND CAST(input_insertion.ts AS TIMESTAMP) BETWEEN
> CAST(input_impression.ts AS TIMESTAMP) - INTERVAL '12' HOUR AND
> CAST(input_impression.ts AS TIMESTAMP) + INTERVAL '1' HOUR
>
>
> INSERT INTO `flat_impression_w_click`
>
> SELECT
>
> dt,
>
> insertion_log_user_id,
>
> CAST(insertion_ts AS TIMESTAMP(3)) AS insertion_ts,
>
> insertion_insertion_id,
>
> CAST(impression_ts AS TIMESTAMP(3)) AS mpression_ts,
>
> impression_impression_id,
>
> impression_insertion_id,
>
> COALESCE(CAST(input_click.ts AS TIMESTAMP(3)), CAST(0 AS TIMESTAMP(3)))
> AS click_ts,
>
> COALESCE(input_click.click_id, EmptyByteArray()) AS click_click_id,
>
> COALESCE(input_click.impression_id, EmptyByteArray()) AS
> click_impression_id,
>
> FROM flat_impression_view
>
> LEFT JOIN input_click
>
> ON flat_impression_view.impression_impression_id = input_click.impression_id
>
> AND CAST(flat_impression_view.impression_ts AS TIMESTAMP) BETWEEN
> CAST(input_click.ts AS TIMESTAMP) - INTERVAL '12' HOUR AND
> CAST(input_click.ts AS TIMESTAMP) + INTERVAL '12' HOUR
>
>
> java.lang.RuntimeException: Failed to executeSql=...
>
> ...
>
> Caused by: org.apache.flink.table.api.TableException: Cannot generate a
> valid execution plan for the given query:
>
> FlinkLogicalLegacySink(name=[...])
>
> +- FlinkLogicalCalc(select=[...])
>
> +- FlinkLogicalJoin(condition=[AND(=($36, $45),
> >=(CAST($35):TIMESTAMP(6) NOT NULL, -(CAST($43):TIMESTAMP(6),
> 43200000:INTERVAL HOUR)), <=(CAST($35):TIMESTAMP(6) NOT NULL,
> +(CAST($43):TIMESTAMP(6), 43200000:INTERVAL HOUR)))], joinType=[left])
>
> :- FlinkLogicalCalc(select=[...])
>
> :+- FlinkLogicalJoin(condition=[AND(=($5, $35),
> >=(CAST($4):TIMESTAMP(6), -(CAST($33):TIMESTAMP(6), 43200000:INTERVAL
> HOUR)), <=(CAST($4):TIMESTAMP(6), +(CAST($33):TIMESTAMP(6),
> 3600000:INTERVAL HOUR)))], joinType=[inner])
>
> : :- FlinkLogicalDataStreamTableScan(table=[[default, mydb,
> input_insertion]])
>
> : +- FlinkLogicalDataStreamTableScan(table=[[default, mydb,
> input_impression]])
>
> +- FlinkLogicalDataStreamTableScan(table=[[default, mydb, input_click]])
>
>
> Rowtime attributes must not be in the input rows of a regular join. As a
> workaround you can cast the time attributes of input tables to TIMESTAMP
> before.
>
> Please check the documentation for the set of currently supported SQL
> features.
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)
>
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80)
>
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
>
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
>
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
>
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
>
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
>
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
>
> at
> com.example.logprocessor.common.flink.TableEnvUtils.executeSql(TableEnvUtils.java:14)
>
> ... 24 more
>
> Caused by: org.apache.flink.table.api.TableException: Rowtime attributes
> must not be in the input rows of a regular join. As a workaround you can
> cast the time attributes of input tables to TIMESTAMP before.
>
> at
> org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecJoinRule.matches(StreamExecJoinRule.scala:88)
>
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:272)
>
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386)
>
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:386)
>
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:256)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1476)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1742)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91)
>
> at
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:91)
>
> at
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:321)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1611)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:851)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:866)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:534)
>
> at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>
> ... 45 more
>
>