You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Nico Kruber (Jira)" <ji...@apache.org> on 2022/04/19 11:34:00 UTC
[jira] [Commented] (FLINK-20255) Nested decorrelate failed
[ https://issues.apache.org/jira/browse/FLINK-20255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17524261#comment-17524261 ]
Nico Kruber commented on FLINK-20255:
-------------------------------------
The following example (which was simplified from a more complex join that makes more sense than this version) also seems to be an incarnation of the described problem (tested in Flink 1.14.3):
{code}
CREATE TEMPORARY TABLE Messages (
`id` CHAR(1),
`userId` TINYINT,
`relatedUserIds` ARRAY<TINYINT>
)
WITH (
'connector' = 'datagen',
'fields.id.length' = '10',
'fields.userId.kind' = 'random',
'fields.userId.min' = '1',
'fields.userId.max' = '10',
'fields.relatedUserIds.kind' = 'random',
'fields.relatedUserIds.element.min' = '1',
'fields.relatedUserIds.element.max' = '10',
'rows-per-second' = '1000'
);
-- the non-working version:
SELECT *
FROM Messages outer_message
WHERE
outer_message.userId IN
(
SELECT relatedUserId
FROM Messages inner_message
CROSS JOIN UNNEST(inner_message.relatedUserIds) AS t (relatedUserId)
WHERE inner_message.id = outer_message.id
)
-- this one is working:
/*
SELECT *
FROM Messages
CROSS JOIN UNNEST(relatedUserIds) AS t (relatedUserId)
WHERE
userId = t.relatedUserId
*/
{code}
It produces the following exception:
{code}
org.apache.flink.table.api.TableException: unexpected correlate variable $cor1 in the plan
at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:57) ~[flink-table_2.11-1.14.3.jar:1.14.3]
at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42) ~[flink-table_2.11-1.14.3.jar:1.14.3]
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) ~[flink-table_2.11-1.14.3.jar:1.14.3]
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) ~[flink-table_2.11-1.14.3.jar:1.14.3]
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60) ~[flink-table_2.11-1.14.3.jar:1.14.3]
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55) ~[flink-table_2.11-1.14.3.jar:1.14.3]
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at scala.collection.immutable.Range.foreach(Range.scala:160) ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) ~[flink-table_2.11-1.14.3.jar:1.14.3]
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) ~[flink-table_2.11-1.14.3.jar:1.14.3]
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) ~[flink-table_2.11-1.14.3.jar:1.14.3]
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) ~[flink-table_2.11-1.14.3.jar:1.14.3]
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:161) ~[flink-table_2.11-1.14.3.jar:1.14.3]
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:75) ~[flink-table_2.11-1.14.3.jar:1.14.3]
at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) ~[flink-table_2.11-1.14.3.jar:1.14.3]
at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300) ~[flink-table_2.11-1.14.3.jar:1.14.3]
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183) ~[flink-table_2.11-1.14.3.jar:1.14.3]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665) ~[flink-table_2.11-1.14.3.jar:1.14.3]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:805) ~[flink-table_2.11-1.14.3.jar:1.14.3]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1274) ~[flink-table_2.11-1.14.3.jar:1.14.3]
at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeOperation$3(LocalExecutor.java:209) ~[flink-sql-client_2.11-1.14.3.jar:1.14.3]
at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:88) ~[flink-sql-client_2.11-1.14.3.jar:1.14.3]
at org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:209) ~[flink-sql-client_2.11-1.14.3.jar:1.14.3]
... 12 more
{code}
> Nested decorrelate failed
> -------------------------
>
> Key: FLINK-20255
> URL: https://issues.apache.org/jira/browse/FLINK-20255
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.11.0, 1.12.0
> Reporter: godfrey he
> Priority: Not a Priority
> Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> This issue is from ML https://www.mail-archive.com/user@flink.apache.org/msg37746.html
> We can reproduce the issue through the following code
> {code:java}
> @FunctionHint(output = new DataTypeHint("ROW<val STRING>"))
> class SplitStringToRows extends TableFunction[Row] {
> def eval(str: String, separator: String = ";"): Unit = {
> if (str != null) {
> str.split(separator).foreach(s => collect(Row.of(s.trim())))
> }
> }
> }
> object Job {
> def main(args: Array[String]): Unit = {
> val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)
> streamTableEnv.createTemporarySystemFunction(
> "SplitStringToRows",
> classOf[SplitStringToRows]
> ) // Class defined in previous email
> streamTableEnv.executeSql(
> """
> CREATE TABLE table2 (
> attr1 STRING,
> attr2 STRING,
> attr3 DECIMAL,
> attr4 DATE
> ) WITH (
> 'connector' = 'datagen'
> )""")
> val q2 = streamTableEnv.sqlQuery(
> """
> SELECT
> a.attr1 AS attr1,
> attr2,
> attr3,
> attr4
> FROM table2 p, LATERAL TABLE(SplitStringToRows(p.attr1, ';')) AS a(attr1)
> """)
> streamTableEnv.createTemporaryView("view2", q2)
> val q3 =
> """
> SELECT
> w.attr1,
> p.attr3
> FROM table2 w
> LEFT JOIN LATERAL (
> SELECT
> attr1,
> attr3
> FROM (
> SELECT
> attr1,
> attr3,
> ROW_NUMBER() OVER (
> PARTITION BY attr1
> ORDER BY
> attr4 DESC NULLS LAST,
> w.attr2 = attr2 DESC NULLS LAST
> ) AS row_num
> FROM view2)
> WHERE row_num = 1) p
> ON (w.attr1 = p.attr1)
> """
> println(streamTableEnv.explainSql(q3))
> }
> }
> {code}
> The reason is {{RelDecorrelator}} in Calcite can't handle such nested decorrelate pattern now
--
This message was sent by Atlassian Jira
(v8.20.1#820001)