You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Matthew Brown <me...@matthewbrown.io> on 2022/04/28 06:01:17 UTC
Temporal join fails with "unexpected correlate variable $cor0 in the plan"
Hi all,
I'm trying to join the following two tables using a temporal join:
*table_1*
(
`f0` STRING NOT NULL,
`f1` DOUBLE NOT NULL,
`rowtime` TIMESTAMP(3) METADATA,
WATERMARK FOR `rowtime`: TIMESTAMP(3) AS rowtime - INTERVAL '10' SECOND
)
*table_2:*
(
`f0` STRING NOT NULL,
`f1` DOUBLE NOT NULL,
`rowtime` TIMESTAMP(3) METADATA,
WATERMARK FOR `rowtime`: TIMESTAMP(3) AS rowtime - INTERVAL '10' SECOND,
CONSTRAINT `PK_f0` PRIMARY KEY (`f0`) NOT ENFORCED
)
using the following query:
--
*SELECT*
* table_1.f0,*
* table_1.f1 AS table_1_value,*
* table_2.f1 AS table_2_value,*
*FROM table_1*
*JOIN table_2 FOR SYSTEM_TIME AS OF table_1.rowtime ON table_1.f0 =
table_2.f0*
--
and it's raising the following exception
---
Exception in thread "main" org.apache.flink.table.api.TableException:
unexpected correlate variable $cor0 in the plan
at
org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:57)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
---
Has anybody come across this before? Any debugging tips?
Cheers,
Matt.
--
--
AU: +61 459 493 730
UK: +44 7927 618921
@mnbbrown
Re:Temporal join fails with "unexpected correlate variable $cor0 in the plan"
Posted by Xuyang <xy...@163.com>.
Hi, can you provide the version of the Flink with this exception?
I test your SQL in Flink/master and it works. My test SQL is the following.
create table table_1 (
`f0` STRING NOT NULL,
`f1` DOUBLE NOT NULL,
`rowtime` TIMESTAMP(3),
WATERMARK FOR `rowtime` AS rowtime - INTERVAL '10' SECOND
)
with(
...
)
create table table_2 (
`f0` STRING NOT NULL,
`f1` DOUBLE NOT NULL,
`rowtime` TIMESTAMP(3),
WATERMARK FOR `rowtime` AS rowtime - INTERVAL '10' SECOND,
CONSTRAINT `PK_f0` PRIMARY KEY (`f0`) NOT ENFORCED
)
with(
...
)
SELECT
table_1.f0,
table_1.f1 AS table_1_value,
table_2.f1 AS table_2_value
FROM table_1
JOIN table_2 FOR SYSTEM_TIME AS OF table_1.rowtime
ON table_1.f0 = table_2.f0
At 2022-04-28 14:01:17, "Matthew Brown" <me...@matthewbrown.io> wrote:
Hi all,
I'm trying to join the following two tables using a temporal join:
table_1
(
`f0` STRING NOT NULL,
`f1` DOUBLE NOT NULL,
`rowtime` TIMESTAMP(3) METADATA,
WATERMARK FOR `rowtime`: TIMESTAMP(3) AS rowtime - INTERVAL '10' SECOND
)
table_2:
(
`f0` STRING NOT NULL,
`f1` DOUBLE NOT NULL,
`rowtime` TIMESTAMP(3) METADATA,
WATERMARK FOR `rowtime`: TIMESTAMP(3) AS rowtime - INTERVAL '10' SECOND,
CONSTRAINT `PK_f0` PRIMARY KEY (`f0`) NOT ENFORCED
)
using the following query:
--
SELECT
table_1.f0,
table_1.f1 AS table_1_value,
table_2.f1 AS table_2_value,
FROM table_1
JOIN table_2 FOR SYSTEM_TIME AS OF table_1.rowtime ON table_1.f0 = table_2.f0
--
and it's raising the following exception
---
Exception in thread "main" org.apache.flink.table.api.TableException: unexpected correlate variable $cor0 in the plan
at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:57)
at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
---
Has anybody come across this before? Any debugging tips?
Cheers,
Matt.
--
--
AU: +61 459 493 730
UK: +44 7927 618921
@mnbbrown