You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dylan Forciea <dy...@oseberg.io> on 2020/11/17 23:44:25 UTC
Lateral join not finding correlate variable
This may be due to not understanding lateral joins in Flink – perhaps you can only do so on temporal variables – but I figured I’d ask since the error message isn’t intuitive.
I am trying to do a combination of a lateral join and a top N query. Part of my ordering is based upon whether the a value in the left side of the query matches up. I’m trying to do this in the general form of:
SELECT
t1.id,
t1.attr1,
t2.attr2
FROM table1 t1
LEFT JOIN LATERAL (
SELECT
id,
attr2
FROM (
SELECT
id,
attr2,
ROW_NUMBER() OVER (
PARTITION BY id
ORDER BY
attr3 DESC,
t1.attr4 = attr4 DESC
) AS row_num
FROM table2
WHERE row_num = 1) t2
ON (t1.id = t2.id)
I am getting an error that looks like:
Exception in thread "main" org.apache.flink.table.api.TableException: unexpected correlate variable $cor2 in the plan
at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58)
at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:294)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)
at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113)
at org.apache.flink.table.api.bridge.scala.TableConversions.toRetractStream(TableConversions.scala:97)
at io.oseberg.flink.well.ok.Job$.main(Job.scala:57)
at io.oseberg.flink.well.ok.Job.main(Job.scala)
The only other thing I can think of doing is creating a Table Aggregate function to pull this off. But, I wanted to check to make sure I wasn’t doing something wrong in the above first, or if there is something I’m not thinking of doing.
Regards,
Dylan Forciea
Re: Lateral join not finding correlate variable
Posted by Dylan Forciea <dy...@oseberg.io>.
Godfrey,
Glad I could help! I suspected that was what the problem was. I have made a view in my postgres database to perform the inner lateral join, so that should let me work around this for the time being.
Thanks,
Dylan
From: godfrey he <go...@gmail.com>
Date: Friday, November 20, 2020 at 1:09 AM
To: Dylan Forciea <dy...@oseberg.io>
Cc: "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: Lateral join not finding correlate variable
Hi Dylan,
I have reproduced your issue based on your code,
Currently Flink does not support such nested correlate pattern query.
I have created a issue to track this [1].
Thanks for your reporting and help.
[1] https://issues.apache.org/jira/browse/FLINK-20255
Best,
Godfrey
Dylan Forciea <dy...@oseberg.io>> 于2020年11月19日周四 下午12:10写道:
Godfrey,
I confirmed that in Flink 1.11.2 and in 1.12-SNAPSHOT I get the stack trace running exactly this code:
import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row
import org.apache.flink.table.annotation.FunctionHint
import org.apache.flink.table.annotation.DataTypeHint
import org.apache.flink.table.functions.TableFunction
@FunctionHint(output = new DataTypeHint("ROW<val STRING>"))
class SplitStringToRows extends TableFunction[Row] {
def eval(str: String, separator: String = ";"): Unit = {
if (str != null) {
str.split(separator).foreach(s => collect(Row.of(s.trim())))
}
}
}
object Job {
def main(args: Array[String]): Unit = {
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)
streamTableEnv.createTemporarySystemFunction(
"SplitStringToRows",
classOf[SplitStringToRows]
) // Class defined in previous email
streamTableEnv.executeSql(
"""
CREATE TABLE table1 (
id_source BIGINT PRIMARY KEY,
attr1_source STRING,
attr2 STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true<http://host.domain.com/db1?ssl=true>',
'table-name' = '<table>',
'username' = '<username>',
'password' = '<password>',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false')
""")
streamTableEnv.executeSql(
"""
CREATE TABLE table2 (
attr1_source STRING,
attr2 STRING,
attr3 DECIMAL,
attr4 DATE
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true<http://host.domain.com/db1?ssl=true>',
'table-name' = '<table>',
'username' = '<username>',
'password' = '<password>',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false')
""")
val q1 = streamTableEnv.sqlQuery("""
SELECT
id_source AS id,
attr1_source AS attr1,
attr2
FROM table1
""")
streamTableEnv.createTemporaryView("view1", q1)
val q2 = streamTableEnv.sqlQuery(
"""
SELECT
a.attr1 AS attr1,
attr2,
attr3,
attr4
FROM table2 p, LATERAL TABLE(SplitStringToRows(p.attr1_source, ';')) AS a(attr1)
""")
streamTableEnv.createTemporaryView("view2", q2)
val q3 = streamTableEnv.sqlQuery("""
SELECT
w.attr1,
p.attr3
FROM view1 w
LEFT JOIN LATERAL (
SELECT
attr1,
attr3
FROM (
SELECT
attr1,
attr3,
ROW_NUMBER() OVER (
PARTITION BY attr1
ORDER BY
attr4 DESC NULLS LAST,
w.attr2 = attr2 DESC NULLS LAST
) AS row_num
FROM view2)
WHERE row_num = 1) p
ON (w.attr1 = p.attr1)
""")
streamTableEnv.createTemporaryView("view3", q3)
val view3 = streamTableEnv.from("view3")
view3
.toRetractStream[Row]
.writeAsCsv("./view3.csv", WriteMode.OVERWRITE)
.setParallelism(1)
streamEnv.execute()
}
}
Thanks,
Dylan Forciea
From: godfrey he <go...@gmail.com>>
Date: Wednesday, November 18, 2020 at 8:29 PM
To: Dylan Forciea <dy...@oseberg.io>>
Cc: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Lateral join not finding correlate variable
Dylan,
Thanks for you feedback, if the planner encounters
"unexpected correlate variable $cor2 in the plan" exception,
There's a high probability that FlinkDecorrelateProgram has some bugs
or the query pattern is not supported now. I try to use JDBC Connector as the input tables,
but I still don't reproduce the exception. Could you provide your full code, including ddl, query, etc.
Thanks so much.
Best,
Godfrey
Dylan Forciea <dy...@oseberg.io>> 于2020年11月18日周三 下午10:09写道:
Godfrey,
I was using Flink 1.11.2, but I just tried switching to 1.12-SNAPSHOT and am still having the same issue. Note that I am using the JDBC Connector for the input tables, and table1 and table2 are actually created from queries on those connector tables and not directly.
Since you indicated what I did should work, I played around a bit more, and determined it’s something inside of the table2 query that is triggering the error. The id field there is generated by a table function. Removing that piece made the plan start working. Table 2 is formulated as follows:
SELECT
T.id,
attr2,
attr3,
attr4
FROM table3 t3, LATERAL TABLE(SplitStringToRows(t3.id<http://t3.id>, ';')) AS T(id)
Where SplitStringToRows is defined as:
@FunctionHint(output = new DataTypeHint("ROW<val STRING>"))
class SplitStringToRows extends TableFunction[Row] {
def eval(str: String, separator: String = ";"): Unit = {
if (str != null) {
str.split(separator).foreach(s => collect(Row.of(s.trim())))
}
}
}
Removing the lateral table bit in that first table made the original query plan work correctly.
I greatly appreciate your assistance!
Regards,
Dylan Forciea
From: godfrey he <go...@gmail.com>>
Date: Wednesday, November 18, 2020 at 7:33 AM
To: Dylan Forciea <dy...@oseberg.io>>
Cc: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Lateral join not finding correlate variable
Hi Dylan,
Could you provide which Flink version you find out the problem with?
I test the above query on master, and I get the plan, no errors occur.
Here is my test case:
@Test
def testLateralJoin(): Unit = {
util.addTableSource[(String, String, String, String, String)]("table1", 'id, 'attr1, 'attr2, 'attr3, 'attr4)
util.addTableSource[(String, String, String, String, String)]("table2", 'id, 'attr1, 'attr2, 'attr3, 'attr4)
val query =
"""
|SELECT
| t1.id<http://t1.id>,
| t1.attr1,
| t2.attr2
|FROM table1 t1
|LEFT JOIN LATERAL (
| SELECT
| id,
| attr2
| FROM (
| SELECT
| id,
| attr2,
| ROW_NUMBER() OVER (
| PARTITION BY id
| ORDER BY
| attr3 DESC,
| t1.attr4 = attr4 DESC
| ) AS row_num
| FROM table2)
| WHERE row_num = 1) t2
|ON t1.id<http://t1.id> = t2.id<http://t2.id>
|""".stripMargin
util.verifyPlan(query)
}
Best,
Godfrey
Dylan Forciea <dy...@oseberg.io>> 于2020年11月18日周三 上午7:44写道:
This may be due to not understanding lateral joins in Flink – perhaps you can only do so on temporal variables – but I figured I’d ask since the error message isn’t intuitive.
I am trying to do a combination of a lateral join and a top N query. Part of my ordering is based upon whether the a value in the left side of the query matches up. I’m trying to do this in the general form of:
SELECT
t1.id<http://t1.id>,
t1.attr1,
t2.attr2
FROM table1 t1
LEFT JOIN LATERAL (
SELECT
id,
attr2
FROM (
SELECT
id,
attr2,
ROW_NUMBER() OVER (
PARTITION BY id
ORDER BY
attr3 DESC,
t1.attr4 = attr4 DESC
) AS row_num
FROM table2
WHERE row_num = 1) t2
ON (t1.id<http://t1.id> = t2.id<http://t2.id>)
I am getting an error that looks like:
Exception in thread "main" org.apache.flink.table.api.TableException: unexpected correlate variable $cor2 in the plan
at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58)
at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:294)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)
at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113)
at org.apache.flink.table.api.bridge.scala.TableConversions.toRetractStream(TableConversions.scala:97)
at io.oseberg.flink.well.ok.Job$.main(Job.scala:57)
at io.oseberg.flink.well.ok.Job.main(Job.scala)
The only other thing I can think of doing is creating a Table Aggregate function to pull this off. But, I wanted to check to make sure I wasn’t doing something wrong in the above first, or if there is something I’m not thinking of doing.
Regards,
Dylan Forciea
Re: Lateral join not finding correlate variable
Posted by godfrey he <go...@gmail.com>.
Hi Dylan,
I have reproduced your issue based on your code,
Currently Flink does not support such nested correlate pattern query.
I have created a issue to track this [1].
Thanks for your reporting and help.
[1] https://issues.apache.org/jira/browse/FLINK-20255
Best,
Godfrey
Dylan Forciea <dy...@oseberg.io> 于2020年11月19日周四 下午12:10写道:
> Godfrey,
>
>
>
> I confirmed that in Flink 1.11.2 and in 1.12-SNAPSHOT I get the stack
> trace running exactly this code:
>
>
>
> import org.apache.flink.api.scala._
>
> import org.apache.flink.core.fs.FileSystem.WriteMode
>
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>
> import org.apache.flink.table.api._
>
> import org.apache.flink.table.api.bridge.scala._
>
> import org.apache.flink.types.Row
>
> import org.apache.flink.table.annotation.FunctionHint
>
> import org.apache.flink.table.annotation.DataTypeHint
>
> import org.apache.flink.table.functions.TableFunction
>
>
>
>
>
> @FunctionHint(output = new DataTypeHint("ROW<val STRING>"))
>
> class SplitStringToRows extends TableFunction[Row] {
>
> def eval(str: String, separator: String = ";"): Unit = {
>
> if (str != null) {
>
> str.split(separator).foreach(s => collect(Row.of(s.trim())))
>
> }
>
> }
>
> }
>
> object Job {
>
>
>
> def main(args: Array[String]): Unit = {
>
> val settings = EnvironmentSettings
> .newInstance().useBlinkPlanner().inStreamingMode().build()
>
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
>
> val streamTableEnv = StreamTableEnvironment.create(streamEnv,
> settings)
>
>
>
> streamTableEnv.createTemporarySystemFunction(
>
> "SplitStringToRows",
>
> classOf[SplitStringToRows]
>
> ) // Class defined in previous email
>
>
>
> streamTableEnv.executeSql(
>
> """
>
> CREATE TABLE table1 (
>
> id_source BIGINT PRIMARY KEY,
>
> attr1_source STRING,
>
> attr2 STRING
>
> ) WITH (
>
> 'connector' = 'jdbc',
>
> 'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true',
>
> 'table-name' = '<table>',
>
> 'username' = '<username>',
>
> 'password' = '<password>',
>
> 'scan.fetch-size' = '500',
>
> 'scan.auto-commit' = 'false')
>
> """)
>
>
>
> streamTableEnv.executeSql(
>
> """
>
> CREATE TABLE table2 (
>
> attr1_source STRING,
>
> attr2 STRING,
>
> attr3 DECIMAL,
>
> attr4 DATE
>
> ) WITH (
>
> 'connector' = 'jdbc',
>
> 'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true',
>
> 'table-name' = '<table>',
>
> 'username' = '<username>',
>
> 'password' = '<password>',
>
> 'scan.fetch-size' = '500',
>
> 'scan.auto-commit' = 'false')
>
> """)
>
>
>
> val q1 = streamTableEnv.sqlQuery("""
>
> SELECT
>
> id_source AS id,
>
> attr1_source AS attr1,
>
> attr2
>
> FROM table1
>
> """)
>
> streamTableEnv.createTemporaryView("view1", q1)
>
>
>
> val q2 = streamTableEnv.sqlQuery(
>
> """
>
> SELECT
>
> a.attr1 AS attr1,
>
> attr2,
>
> attr3,
>
> attr4
>
> FROM table2 p, LATERAL TABLE(SplitStringToRows(p.attr1_source,
> ';')) AS a(attr1)
>
> """)
>
> streamTableEnv.createTemporaryView("view2", q2)
>
>
>
> val q3 = streamTableEnv.sqlQuery("""
>
> SELECT
>
> w.attr1,
>
> p.attr3
>
> FROM view1 w
>
> LEFT JOIN LATERAL (
>
> SELECT
>
> attr1,
>
> attr3
>
> FROM (
>
> SELECT
>
> attr1,
>
> attr3,
>
> ROW_NUMBER() OVER (
>
> PARTITION BY attr1
>
> ORDER BY
>
> attr4 DESC NULLS LAST,
>
> w.attr2 = attr2 DESC NULLS LAST
>
> ) AS row_num
>
> FROM view2)
>
> WHERE row_num = 1) p
>
> ON (w.attr1 = p.attr1)
>
> """)
>
> streamTableEnv.createTemporaryView("view3", q3)
>
>
>
> val view3 = streamTableEnv.from("view3")
>
>
>
> view3
>
> .toRetractStream[Row]
>
> .writeAsCsv("./view3.csv", WriteMode.OVERWRITE)
>
> .setParallelism(1)
>
>
>
> streamEnv.execute()
>
> }
>
> }
>
>
>
> Thanks,
>
> Dylan Forciea
>
>
>
> *From: *godfrey he <go...@gmail.com>
> *Date: *Wednesday, November 18, 2020 at 8:29 PM
> *To: *Dylan Forciea <dy...@oseberg.io>
> *Cc: *"user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *Re: Lateral join not finding correlate variable
>
>
>
> Dylan,
>
>
>
> Thanks for you feedback, if the planner encounters
>
> "unexpected correlate variable $cor2 in the plan" exception,
>
> There's a high probability that FlinkDecorrelateProgram has some bugs
>
> or the query pattern is not supported now. I try to use JDBC Connector as
> the input tables,
>
> but I still don't reproduce the exception. Could you provide your full
> code, including ddl, query, etc.
>
> Thanks so much.
>
>
>
> Best,
>
> Godfrey
>
>
>
>
>
>
>
> Dylan Forciea <dy...@oseberg.io> 于2020年11月18日周三 下午10:09写道:
>
> Godfrey,
>
>
>
> I was using Flink 1.11.2, but I just tried switching to 1.12-SNAPSHOT and
> am still having the same issue. Note that I am using the JDBC Connector for
> the input tables, and table1 and table2 are actually created from queries
> on those connector tables and not directly.
>
>
>
> Since you indicated what I did should work, I played around a bit more,
> and determined it’s something inside of the table2 query that is triggering
> the error. The id field there is generated by a table function. Removing
> that piece made the plan start working. Table 2 is formulated as follows:
>
>
>
> SELECT
>
> T.id,
>
> attr2,
> attr3,
>
> attr4
>
> FROM table3 t3, LATERAL TABLE(SplitStringToRows(t3.id, ';')) AS T(id)
>
>
>
> Where SplitStringToRows is defined as:
>
>
>
> @FunctionHint(output = new DataTypeHint("ROW<val STRING>"))
>
> class SplitStringToRows extends TableFunction[Row] {
>
>
>
> def eval(str: String, separator: String = ";"): Unit = {
>
> if (str != null) {
>
> str.split(separator).foreach(s => collect(Row.of(s.trim())))
>
> }
>
> }
>
> }
>
>
>
> Removing the lateral table bit in that first table made the original query
> plan work correctly.
>
>
>
> I greatly appreciate your assistance!
>
>
>
> Regards,
>
> Dylan Forciea
>
>
>
> *From: *godfrey he <go...@gmail.com>
> *Date: *Wednesday, November 18, 2020 at 7:33 AM
> *To: *Dylan Forciea <dy...@oseberg.io>
> *Cc: *"user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *Re: Lateral join not finding correlate variable
>
>
>
> Hi Dylan,
>
>
>
> Could you provide which Flink version you find out the problem with?
>
> I test the above query on master, and I get the plan, no errors occur.
>
> Here is my test case:
>
> @Test
> def testLateralJoin(): Unit = {
> *util*.addTableSource[(String, String, String, String, String)]("table1", 'id, 'attr1, 'attr2, 'attr3, 'attr4)
> *util*.addTableSource[(String, String, String, String, String)]("table2", 'id, 'attr1, 'attr2, 'attr3, 'attr4)
> val query =
> """
> |SELECT
> | t1.id,
> | t1.attr1,
> | t2.attr2
> |FROM table1 t1
> |LEFT JOIN LATERAL (
> | SELECT
> | id,
> | attr2
> | FROM (
> | SELECT
> | id,
> | attr2,
> | ROW_NUMBER() OVER (
> | PARTITION BY id
> | ORDER BY
> | attr3 DESC,
> | t1.attr4 = attr4 DESC
> | ) AS row_num
> | FROM table2)
> | WHERE row_num = 1) t2
> |ON t1.id = t2.id
> |""".stripMargin
> *util*.verifyPlan(query)
> }
>
> Best,
>
> Godfrey
>
>
>
> Dylan Forciea <dy...@oseberg.io> 于2020年11月18日周三 上午7:44写道:
>
> This may be due to not understanding lateral joins in Flink – perhaps you
> can only do so on temporal variables – but I figured I’d ask since the
> error message isn’t intuitive.
>
>
>
> I am trying to do a combination of a lateral join and a top N query. Part
> of my ordering is based upon whether the a value in the left side of the
> query matches up. I’m trying to do this in the general form of:
>
>
>
> SELECT
>
> t1.id,
>
> t1.attr1,
>
> t2.attr2
>
> FROM table1 t1
>
> LEFT JOIN LATERAL (
>
> SELECT
>
> id,
>
> attr2
>
> FROM (
>
> SELECT
>
> id,
>
> attr2,
>
> ROW_NUMBER() OVER (
>
> PARTITION BY id
> ORDER BY
>
> attr3 DESC,
>
> t1.attr4 = attr4 DESC
>
> ) AS row_num
>
> FROM table2
>
> WHERE row_num = 1) t2
>
> ON (t1.id = t2.id)
>
>
>
> I am getting an error that looks like:
>
>
>
> Exception in thread "main" org.apache.flink.table.api.TableException:
> unexpected correlate variable $cor2 in the plan
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
>
> at
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
>
> at
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
>
> at scala.collection.Iterator.foreach(Iterator.scala:943)
>
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
>
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>
> at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
>
> at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
>
> at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
>
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
>
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
>
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:294)
>
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
>
> at
> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)
>
> at
> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113)
>
> at
> org.apache.flink.table.api.bridge.scala.TableConversions.toRetractStream(TableConversions.scala:97)
>
> at io.oseberg.flink.well.ok.Job$.main(Job.scala:57)
>
> at io.oseberg.flink.well.ok.Job.main(Job.scala)
>
>
>
> The only other thing I can think of doing is creating a Table Aggregate
> function to pull this off. But, I wanted to check to make sure I wasn’t
> doing something wrong in the above first, or if there is something I’m not
> thinking of doing.
>
>
>
> Regards,
>
> Dylan Forciea
>
>
Re: Lateral join not finding correlate variable
Posted by Dylan Forciea <dy...@oseberg.io>.
Godfrey,
I confirmed that in Flink 1.11.2 and in 1.12-SNAPSHOT I get the stack trace running exactly this code:
import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row
import org.apache.flink.table.annotation.FunctionHint
import org.apache.flink.table.annotation.DataTypeHint
import org.apache.flink.table.functions.TableFunction
@FunctionHint(output = new DataTypeHint("ROW<val STRING>"))
class SplitStringToRows extends TableFunction[Row] {
def eval(str: String, separator: String = ";"): Unit = {
if (str != null) {
str.split(separator).foreach(s => collect(Row.of(s.trim())))
}
}
}
object Job {
def main(args: Array[String]): Unit = {
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)
streamTableEnv.createTemporarySystemFunction(
"SplitStringToRows",
classOf[SplitStringToRows]
) // Class defined in previous email
streamTableEnv.executeSql(
"""
CREATE TABLE table1 (
id_source BIGINT PRIMARY KEY,
attr1_source STRING,
attr2 STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true',
'table-name' = '<table>',
'username' = '<username>',
'password' = '<password>',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false')
""")
streamTableEnv.executeSql(
"""
CREATE TABLE table2 (
attr1_source STRING,
attr2 STRING,
attr3 DECIMAL,
attr4 DATE
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true',
'table-name' = '<table>',
'username' = '<username>',
'password' = '<password>',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false')
""")
val q1 = streamTableEnv.sqlQuery("""
SELECT
id_source AS id,
attr1_source AS attr1,
attr2
FROM table1
""")
streamTableEnv.createTemporaryView("view1", q1)
val q2 = streamTableEnv.sqlQuery(
"""
SELECT
a.attr1 AS attr1,
attr2,
attr3,
attr4
FROM table2 p, LATERAL TABLE(SplitStringToRows(p.attr1_source, ';')) AS a(attr1)
""")
streamTableEnv.createTemporaryView("view2", q2)
val q3 = streamTableEnv.sqlQuery("""
SELECT
w.attr1,
p.attr3
FROM view1 w
LEFT JOIN LATERAL (
SELECT
attr1,
attr3
FROM (
SELECT
attr1,
attr3,
ROW_NUMBER() OVER (
PARTITION BY attr1
ORDER BY
attr4 DESC NULLS LAST,
w.attr2 = attr2 DESC NULLS LAST
) AS row_num
FROM view2)
WHERE row_num = 1) p
ON (w.attr1 = p.attr1)
""")
streamTableEnv.createTemporaryView("view3", q3)
val view3 = streamTableEnv.from("view3")
view3
.toRetractStream[Row]
.writeAsCsv("./view3.csv", WriteMode.OVERWRITE)
.setParallelism(1)
streamEnv.execute()
}
}
Thanks,
Dylan Forciea
From: godfrey he <go...@gmail.com>
Date: Wednesday, November 18, 2020 at 8:29 PM
To: Dylan Forciea <dy...@oseberg.io>
Cc: "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: Lateral join not finding correlate variable
Dylan,
Thanks for you feedback, if the planner encounters
"unexpected correlate variable $cor2 in the plan" exception,
There's a high probability that FlinkDecorrelateProgram has some bugs
or the query pattern is not supported now. I try to use JDBC Connector as the input tables,
but I still don't reproduce the exception. Could you provide your full code, including ddl, query, etc.
Thanks so much.
Best,
Godfrey
Dylan Forciea <dy...@oseberg.io>> 于2020年11月18日周三 下午10:09写道:
Godfrey,
I was using Flink 1.11.2, but I just tried switching to 1.12-SNAPSHOT and am still having the same issue. Note that I am using the JDBC Connector for the input tables, and table1 and table2 are actually created from queries on those connector tables and not directly.
Since you indicated what I did should work, I played around a bit more, and determined it’s something inside of the table2 query that is triggering the error. The id field there is generated by a table function. Removing that piece made the plan start working. Table 2 is formulated as follows:
SELECT
T.id,
attr2,
attr3,
attr4
FROM table3 t3, LATERAL TABLE(SplitStringToRows(t3.id<http://t3.id>, ';')) AS T(id)
Where SplitStringToRows is defined as:
@FunctionHint(output = new DataTypeHint("ROW<val STRING>"))
class SplitStringToRows extends TableFunction[Row] {
def eval(str: String, separator: String = ";"): Unit = {
if (str != null) {
str.split(separator).foreach(s => collect(Row.of(s.trim())))
}
}
}
Removing the lateral table bit in that first table made the original query plan work correctly.
I greatly appreciate your assistance!
Regards,
Dylan Forciea
From: godfrey he <go...@gmail.com>>
Date: Wednesday, November 18, 2020 at 7:33 AM
To: Dylan Forciea <dy...@oseberg.io>>
Cc: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Lateral join not finding correlate variable
Hi Dylan,
Could you provide which Flink version you find out the problem with?
I test the above query on master, and I get the plan, no errors occur.
Here is my test case:
@Test
def testLateralJoin(): Unit = {
util.addTableSource[(String, String, String, String, String)]("table1", 'id, 'attr1, 'attr2, 'attr3, 'attr4)
util.addTableSource[(String, String, String, String, String)]("table2", 'id, 'attr1, 'attr2, 'attr3, 'attr4)
val query =
"""
|SELECT
| t1.id<http://t1.id>,
| t1.attr1,
| t2.attr2
|FROM table1 t1
|LEFT JOIN LATERAL (
| SELECT
| id,
| attr2
| FROM (
| SELECT
| id,
| attr2,
| ROW_NUMBER() OVER (
| PARTITION BY id
| ORDER BY
| attr3 DESC,
| t1.attr4 = attr4 DESC
| ) AS row_num
| FROM table2)
| WHERE row_num = 1) t2
|ON t1.id<http://t1.id> = t2.id<http://t2.id>
|""".stripMargin
util.verifyPlan(query)
}
Best,
Godfrey
Dylan Forciea <dy...@oseberg.io>> 于2020年11月18日周三 上午7:44写道:
This may be due to not understanding lateral joins in Flink – perhaps you can only do so on temporal variables – but I figured I’d ask since the error message isn’t intuitive.
I am trying to do a combination of a lateral join and a top N query. Part of my ordering is based upon whether the a value in the left side of the query matches up. I’m trying to do this in the general form of:
SELECT
t1.id<http://t1.id>,
t1.attr1,
t2.attr2
FROM table1 t1
LEFT JOIN LATERAL (
SELECT
id,
attr2
FROM (
SELECT
id,
attr2,
ROW_NUMBER() OVER (
PARTITION BY id
ORDER BY
attr3 DESC,
t1.attr4 = attr4 DESC
) AS row_num
FROM table2
WHERE row_num = 1) t2
ON (t1.id<http://t1.id> = t2.id<http://t2.id>)
I am getting an error that looks like:
Exception in thread "main" org.apache.flink.table.api.TableException: unexpected correlate variable $cor2 in the plan
at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58)
at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:294)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)
at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113)
at org.apache.flink.table.api.bridge.scala.TableConversions.toRetractStream(TableConversions.scala:97)
at io.oseberg.flink.well.ok.Job$.main(Job.scala:57)
at io.oseberg.flink.well.ok.Job.main(Job.scala)
The only other thing I can think of doing is creating a Table Aggregate function to pull this off. But, I wanted to check to make sure I wasn’t doing something wrong in the above first, or if there is something I’m not thinking of doing.
Regards,
Dylan Forciea
Re: Lateral join not finding correlate variable
Posted by godfrey he <go...@gmail.com>.
Dylan,
Thanks for you feedback, if the planner encounters
"unexpected correlate variable $cor2 in the plan" exception,
There's a high probability that FlinkDecorrelateProgram has some bugs
or the query pattern is not supported now. I try to use JDBC Connector as
the input tables,
but I still don't reproduce the exception. Could you provide your full
code, including ddl, query, etc.
Thanks so much.
Best,
Godfrey
Dylan Forciea <dy...@oseberg.io> 于2020年11月18日周三 下午10:09写道:
> Godfrey,
>
>
>
> I was using Flink 1.11.2, but I just tried switching to 1.12-SNAPSHOT and
> am still having the same issue. Note that I am using the JDBC Connector for
> the input tables, and table1 and table2 are actually created from queries
> on those connector tables and not directly.
>
>
>
> Since you indicated what I did should work, I played around a bit more,
> and determined it’s something inside of the table2 query that is triggering
> the error. The id field there is generated by a table function. Removing
> that piece made the plan start working. Table 2 is formulated as follows:
>
>
>
> SELECT
>
> T.id,
>
> attr2,
> attr3,
>
> attr4
>
> FROM table3 t3, LATERAL TABLE(SplitStringToRows(t3.id, ';')) AS T(id)
>
>
>
> Where SplitStringToRows is defined as:
>
>
>
> @FunctionHint(output = new DataTypeHint("ROW<val STRING>"))
>
> class SplitStringToRows extends TableFunction[Row] {
>
>
>
> def eval(str: String, separator: String = ";"): Unit = {
>
> if (str != null) {
>
> str.split(separator).foreach(s => collect(Row.of(s.trim())))
>
> }
>
> }
>
> }
>
>
>
> Removing the lateral table bit in that first table made the original query
> plan work correctly.
>
>
>
> I greatly appreciate your assistance!
>
>
>
> Regards,
>
> Dylan Forciea
>
>
>
> *From: *godfrey he <go...@gmail.com>
> *Date: *Wednesday, November 18, 2020 at 7:33 AM
> *To: *Dylan Forciea <dy...@oseberg.io>
> *Cc: *"user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *Re: Lateral join not finding correlate variable
>
>
>
> Hi Dylan,
>
>
>
> Could you provide which Flink version you find out the problem with?
>
> I test the above query on master, and I get the plan, no errors occur.
>
> Here is my test case:
>
> @Test
> def testLateralJoin(): Unit = {
> *util*.addTableSource[(String, String, String, String, String)]("table1", 'id, 'attr1, 'attr2, 'attr3, 'attr4)
> *util*.addTableSource[(String, String, String, String, String)]("table2", 'id, 'attr1, 'attr2, 'attr3, 'attr4)
> val query =
> """
> |SELECT
> | t1.id,
> | t1.attr1,
> | t2.attr2
> |FROM table1 t1
> |LEFT JOIN LATERAL (
> | SELECT
> | id,
> | attr2
> | FROM (
> | SELECT
> | id,
> | attr2,
> | ROW_NUMBER() OVER (
> | PARTITION BY id
> | ORDER BY
> | attr3 DESC,
> | t1.attr4 = attr4 DESC
> | ) AS row_num
> | FROM table2)
> | WHERE row_num = 1) t2
> |ON t1.id = t2.id
> |""".stripMargin
> *util*.verifyPlan(query)
> }
>
> Best,
>
> Godfrey
>
>
>
> Dylan Forciea <dy...@oseberg.io> 于2020年11月18日周三 上午7:44写道:
>
> This may be due to not understanding lateral joins in Flink – perhaps you
> can only do so on temporal variables – but I figured I’d ask since the
> error message isn’t intuitive.
>
>
>
> I am trying to do a combination of a lateral join and a top N query. Part
> of my ordering is based upon whether the a value in the left side of the
> query matches up. I’m trying to do this in the general form of:
>
>
>
> SELECT
>
> t1.id,
>
> t1.attr1,
>
> t2.attr2
>
> FROM table1 t1
>
> LEFT JOIN LATERAL (
>
> SELECT
>
> id,
>
> attr2
>
> FROM (
>
> SELECT
>
> id,
>
> attr2,
>
> ROW_NUMBER() OVER (
>
> PARTITION BY id
> ORDER BY
>
> attr3 DESC,
>
> t1.attr4 = attr4 DESC
>
> ) AS row_num
>
> FROM table2
>
> WHERE row_num = 1) t2
>
> ON (t1.id = t2.id)
>
>
>
> I am getting an error that looks like:
>
>
>
> Exception in thread "main" org.apache.flink.table.api.TableException:
> unexpected correlate variable $cor2 in the plan
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
>
> at
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
>
> at
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
>
> at scala.collection.Iterator.foreach(Iterator.scala:943)
>
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
>
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>
> at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
>
> at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
>
> at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
>
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
>
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
>
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:294)
>
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
>
> at
> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)
>
> at
> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113)
>
> at
> org.apache.flink.table.api.bridge.scala.TableConversions.toRetractStream(TableConversions.scala:97)
>
> at io.oseberg.flink.well.ok.Job$.main(Job.scala:57)
>
> at io.oseberg.flink.well.ok.Job.main(Job.scala)
>
>
>
> The only other thing I can think of doing is creating a Table Aggregate
> function to pull this off. But, I wanted to check to make sure I wasn’t
> doing something wrong in the above first, or if there is something I’m not
> thinking of doing.
>
>
>
> Regards,
>
> Dylan Forciea
>
>
Re: Lateral join not finding correlate variable
Posted by Dylan Forciea <dy...@oseberg.io>.
Godfrey,
I was using Flink 1.11.2, but I just tried switching to 1.12-SNAPSHOT and am still having the same issue. Note that I am using the JDBC Connector for the input tables, and table1 and table2 are actually created from queries on those connector tables and not directly.
Since you indicated what I did should work, I played around a bit more, and determined it’s something inside of the table2 query that is triggering the error. The id field there is generated by a table function. Removing that piece made the plan start working. Table 2 is formulated as follows:
SELECT
T.id,
attr2,
attr3,
attr4
FROM table3 t3, LATERAL TABLE(SplitStringToRows(t3.id, ';')) AS T(id)
Where SplitStringToRows is defined as:
@FunctionHint(output = new DataTypeHint("ROW<val STRING>"))
class SplitStringToRows extends TableFunction[Row] {
def eval(str: String, separator: String = ";"): Unit = {
if (str != null) {
str.split(separator).foreach(s => collect(Row.of(s.trim())))
}
}
}
Removing the lateral table bit in that first table made the original query plan work correctly.
I greatly appreciate your assistance!
Regards,
Dylan Forciea
From: godfrey he <go...@gmail.com>
Date: Wednesday, November 18, 2020 at 7:33 AM
To: Dylan Forciea <dy...@oseberg.io>
Cc: "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: Lateral join not finding correlate variable
Hi Dylan,
Could you provide which Flink version you find out the problem with?
I test the above query on master, and I get the plan, no errors occur.
Here is my test case:
@Test
def testLateralJoin(): Unit = {
util.addTableSource[(String, String, String, String, String)]("table1", 'id, 'attr1, 'attr2, 'attr3, 'attr4)
util.addTableSource[(String, String, String, String, String)]("table2", 'id, 'attr1, 'attr2, 'attr3, 'attr4)
val query =
"""
|SELECT
| t1.id<http://t1.id>,
| t1.attr1,
| t2.attr2
|FROM table1 t1
|LEFT JOIN LATERAL (
| SELECT
| id,
| attr2
| FROM (
| SELECT
| id,
| attr2,
| ROW_NUMBER() OVER (
| PARTITION BY id
| ORDER BY
| attr3 DESC,
| t1.attr4 = attr4 DESC
| ) AS row_num
| FROM table2)
| WHERE row_num = 1) t2
|ON t1.id<http://t1.id> = t2.id<http://t2.id>
|""".stripMargin
util.verifyPlan(query)
}
Best,
Godfrey
Dylan Forciea <dy...@oseberg.io>> 于2020年11月18日周三 上午7:44写道:
This may be due to not understanding lateral joins in Flink – perhaps you can only do so on temporal variables – but I figured I’d ask since the error message isn’t intuitive.
I am trying to do a combination of a lateral join and a top N query. Part of my ordering is based upon whether the a value in the left side of the query matches up. I’m trying to do this in the general form of:
SELECT
t1.id<http://t1.id>,
t1.attr1,
t2.attr2
FROM table1 t1
LEFT JOIN LATERAL (
SELECT
id,
attr2
FROM (
SELECT
id,
attr2,
ROW_NUMBER() OVER (
PARTITION BY id
ORDER BY
attr3 DESC,
t1.attr4 = attr4 DESC
) AS row_num
FROM table2
WHERE row_num = 1) t2
ON (t1.id<http://t1.id> = t2.id<http://t2.id>)
I am getting an error that looks like:
Exception in thread "main" org.apache.flink.table.api.TableException: unexpected correlate variable $cor2 in the plan
at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58)
at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:294)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)
at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113)
at org.apache.flink.table.api.bridge.scala.TableConversions.toRetractStream(TableConversions.scala:97)
at io.oseberg.flink.well.ok.Job$.main(Job.scala:57)
at io.oseberg.flink.well.ok.Job.main(Job.scala)
The only other thing I can think of doing is creating a Table Aggregate function to pull this off. But, I wanted to check to make sure I wasn’t doing something wrong in the above first, or if there is something I’m not thinking of doing.
Regards,
Dylan Forciea
Re: Lateral join not finding correlate variable
Posted by godfrey he <go...@gmail.com>.
Hi Dylan,
Could you provide which Flink version you find out the problem with?
I test the above query on master, and I get the plan, no errors occur.
Here is my test case:
@Test
def testLateralJoin(): Unit = {
util.addTableSource[(String, String, String, String,
String)]("table1", 'id, 'attr1, 'attr2, 'attr3, 'attr4)
util.addTableSource[(String, String, String, String,
String)]("table2", 'id, 'attr1, 'attr2, 'attr3, 'attr4)
val query =
"""
|SELECT
| t1.id,
| t1.attr1,
| t2.attr2
|FROM table1 t1
|LEFT JOIN LATERAL (
| SELECT
| id,
| attr2
| FROM (
| SELECT
| id,
| attr2,
| ROW_NUMBER() OVER (
| PARTITION BY id
| ORDER BY
| attr3 DESC,
| t1.attr4 = attr4 DESC
| ) AS row_num
| FROM table2)
| WHERE row_num = 1) t2
|ON t1.id = t2.id
|""".stripMargin
util.verifyPlan(query)
}
Best,
Godfrey
Dylan Forciea <dy...@oseberg.io> 于2020年11月18日周三 上午7:44写道:
> This may be due to not understanding lateral joins in Flink – perhaps you
> can only do so on temporal variables – but I figured I’d ask since the
> error message isn’t intuitive.
>
>
>
> I am trying to do a combination of a lateral join and a top N query. Part
> of my ordering is based upon whether the a value in the left side of the
> query matches up. I’m trying to do this in the general form of:
>
>
>
> SELECT
>
> t1.id,
>
> t1.attr1,
>
> t2.attr2
>
> FROM table1 t1
>
> LEFT JOIN LATERAL (
>
> SELECT
>
> id,
>
> attr2
>
> FROM (
>
> SELECT
>
> id,
>
> attr2,
>
> ROW_NUMBER() OVER (
>
> PARTITION BY id
> ORDER BY
>
> attr3 DESC,
>
> t1.attr4 = attr4 DESC
>
> ) AS row_num
>
> FROM table2
>
> WHERE row_num = 1) t2
>
> ON (t1.id = t2.id)
>
>
>
> I am getting an error that looks like:
>
>
>
> Exception in thread "main" org.apache.flink.table.api.TableException:
> unexpected correlate variable $cor2 in the plan
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
>
> at
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
>
> at
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
>
> at scala.collection.Iterator.foreach(Iterator.scala:943)
>
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
>
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>
> at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
>
> at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
>
> at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
>
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
>
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
>
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:294)
>
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
>
> at
> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)
>
> at
> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113)
>
> at
> org.apache.flink.table.api.bridge.scala.TableConversions.toRetractStream(TableConversions.scala:97)
>
> at io.oseberg.flink.well.ok.Job$.main(Job.scala:57)
>
> at io.oseberg.flink.well.ok.Job.main(Job.scala)
>
>
>
> The only other thing I can think of doing is creating a Table Aggregate
> function to pull this off. But, I wanted to check to make sure I wasn’t
> doing something wrong in the above first, or if there is something I’m not
> thinking of doing.
>
>
>
> Regards,
>
> Dylan Forciea
>