You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "godfrey he (Jira)" <ji...@apache.org> on 2021/10/09 06:25:00 UTC

[jira] [Assigned] (FLINK-21345) NullPointerException LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157

     [ https://issues.apache.org/jira/browse/FLINK-21345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

godfrey he reassigned FLINK-21345:
----------------------------------

    Assignee: Lyn Zhang

> NullPointerException LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-21345
>                 URL: https://issues.apache.org/jira/browse/FLINK-21345
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.12.1
>         Environment: Planner: BlinkPlanner
> Flink Version: 1.12.1_2.11
> Java Version: 1.8
> OS: mac os
>            Reporter: Lyn Zhang
>            Assignee: Lyn Zhang
>            Priority: Minor
>              Labels: auto-unassigned, pull-request-available
>             Fix For: 1.15.0, 1.14.1
>
>         Attachments: image-2021-02-10-16-00-45-553.png
>
>
> First Step: Create 2 Source Tables as below:
> {code:java}
> CREATE TABLE test_streaming(
>  vid BIGINT,
>  ts BIGINT,
>  proc AS proctime()
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'test-streaming',
>  'properties.bootstrap.servers' = '127.0.0.1:9092',
>  'scan.startup.mode' = 'latest-offset',
>  'format' = 'json'
> );
> CREATE TABLE test_streaming2(
>  vid BIGINT,
>  ts BIGINT,
>  proc AS proctime()
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'test-streaming2',
>  'properties.bootstrap.servers' = '127.0.0.1:9092',
>  'scan.startup.mode' = 'latest-offset',
>  'format' = 'json'
> );
> {code}
> Second Step: Create a TEMPORARY Table Function, function name:dim, key:vid, timestamp:proctime()
> Third Step: test_streaming union all  test_streaming2 join dim like below:
> {code:java}
> SELECT r.vid,d.name,timestamp_from_long(r.ts)
> FROM (
>     SELECT * FROM test_streaming UNION ALL SELECT * FROM test_streaming2
> ) AS r,
>     LATERAL TABLE (dim(r.proc)) AS d
> WHERE r.vid = d.vid;
> {code}
> Exception Detail: (if only use test-streaming or test-streaming2 join temporary table function, the program run ok)
> {code:java}
> Exception in thread "main" java.lang.NullPointerException
> 	at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.getRelOptSchema(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157)
> 	at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:99)
> 	at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
> 	at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
> 	at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
> 	at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
> 	at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> 	at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
> 	at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
> 	at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
> 	at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> 	at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
> 	at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
> 	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
> 	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> 	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
> 	at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> 	at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
> 	at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
> 	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
> 	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
> 	at scala.collection.immutable.Range.foreach(Range.scala:166)
> 	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
> 	at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> 	at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
> 	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> 	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> 	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
> 	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> 	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
> 	at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> 	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)