You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "godfrey he (Jira)" <ji...@apache.org> on 2021/10/09 08:46:00 UTC
[jira] [Closed] (FLINK-21345) NullPointerException
LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157
[ https://issues.apache.org/jira/browse/FLINK-21345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
godfrey he closed FLINK-21345.
------------------------------
Resolution: Fixed
Fixed in 1.15.0: 24e6121d5f882e55dfc0616b1da81dc0b46f2d34
Fixed in 1.14.1: 3598a16f4d2d46b75f15a4eb01610ecfe2640f1e
> NullPointerException LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157
> ----------------------------------------------------------------------------------
>
> Key: FLINK-21345
> URL: https://issues.apache.org/jira/browse/FLINK-21345
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.12.1
> Environment: Planner: BlinkPlanner
> Flink Version: 1.12.1_2.11
> Java Version: 1.8
> OS: mac os
> Reporter: Lyn Zhang
> Assignee: Lyn Zhang
> Priority: Minor
> Labels: auto-unassigned, pull-request-available
> Fix For: 1.15.0, 1.14.1
>
> Attachments: image-2021-02-10-16-00-45-553.png
>
>
> First Step: Create 2 Source Tables as below:
> {code:java}
> CREATE TABLE test_streaming(
> vid BIGINT,
> ts BIGINT,
> proc AS proctime()
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'test-streaming',
> 'properties.bootstrap.servers' = '127.0.0.1:9092',
> 'scan.startup.mode' = 'latest-offset',
> 'format' = 'json'
> );
> CREATE TABLE test_streaming2(
> vid BIGINT,
> ts BIGINT,
> proc AS proctime()
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'test-streaming2',
> 'properties.bootstrap.servers' = '127.0.0.1:9092',
> 'scan.startup.mode' = 'latest-offset',
> 'format' = 'json'
> );
> {code}
> Second Step: Create a TEMPORARY Table Function, function name:dim, key:vid, timestamp:proctime()
> Third Step: test_streaming union all test_streaming2 join dim like below:
> {code:java}
> SELECT r.vid,d.name,timestamp_from_long(r.ts)
> FROM (
> SELECT * FROM test_streaming UNION ALL SELECT * FROM test_streaming2
> ) AS r,
> LATERAL TABLE (dim(r.proc)) AS d
> WHERE r.vid = d.vid;
> {code}
> Exception Detail: (if only use test-streaming or test-streaming2 join temporary table function, the program run ok)
> {code:java}
> Exception in thread "main" java.lang.NullPointerException
> at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.getRelOptSchema(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157)
> at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:99)
> at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
> at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
> at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
> at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
> at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
> at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
> at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
> at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
> at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
> at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
> at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
> at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
> at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
> at scala.collection.immutable.Range.foreach(Range.scala:166)
> at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
> at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
> at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57){code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)