You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Mayer Crystal (JIRA)" <ji...@apache.org> on 2018/09/20 23:32:00 UTC

[jira] [Commented] (FLINK-7942) NPE when apply FilterJoinRule

    [ https://issues.apache.org/jira/browse/FLINK-7942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16622869#comment-16622869 ] 

Mayer Crystal commented on FLINK-7942:
--------------------------------------

I know that this is marked as resolved, but I'm wondering if there was a regression or if the are other codepaths that may still exhibit this issue.  I am running flink 1.6.1 and am seeing the same stack trace as above (included below).  I don't have a standalone test yet (the code is a bit involved in that the actual joins and projections are controlled by external configurations in this case), but the sequence of events is basically as follows:

1. A table, T1, is registered with the table env using a CsvTableSource
2. A second table, T2, is registered with the env using a CsvTableSource.  
3. The columns in T2 are aliased (using the .as(String)) method to ensure that there are no name clashes during the join.
4. T2 is joined to T1 using a single equality join condition and then the desired columns are selected.
5. The columns of the result of step 4, J1, are renamed back to the 'expected' names using the .as(String) method
6. The data is J1 is filtered
7. The filtered table is then joined to another table, T3, using the same process as in steps 3 - 5
8. Another table, T4, is also joined using the process as in steps 3 - 5
8. The resultant table is then grouped/aggregated
9. The resultant table is then filtered 
10. The final table is sent to a sink for writing to disk

Also, as a sanity check, I performed the hack mentioned above (modifying the calcite class to have map.put(SqlKind.AS, Policy.AS_IS);) and the code then worked as expected.


java.lang.RuntimeException: Error while applying rule FilterJoinRule:FilterJoinRule:filter, args [rel#771:LogicalFilter.NONE(input=rel#468:Subset#41.NONE,condition==(AS($5, _UTF-16LE'level'), _UTF-16LE'Level 1')), rel#467:LogicalJoin.NONE(left=rel#465:Subset#39.NONE,right=rel#466:Subset#40.NONE,condition==($1, $3),joinType=left)]
                at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
                at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
                at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
                at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:271)
                at org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:478)
                at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:511)
                at org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:311)
                at org.apache.flink.table.api.Table.writeToSink(table.scala:862)
                at org.apache.flink.table.api.Table.writeToSink(table.scala:830)
...
Caused by: java.lang.NullPointerException: null
                at org.apache.calcite.plan.Strong.isNull(Strong.java:110)
                at org.apache.calcite.plan.Strong.anyNull(Strong.java:166)
                at org.apache.calcite.plan.Strong.isNull(Strong.java:114)
                at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99)
                at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84)
                at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2353)
                at org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149)
                at org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348)
                at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
                ... 21 common frames omitted

> NPE when apply FilterJoinRule
> -----------------------------
>
>                 Key: FLINK-7942
>                 URL: https://issues.apache.org/jira/browse/FLINK-7942
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API &amp; SQL
>            Reporter: lincoln.lee
>            Assignee: Timo Walther
>            Priority: Major
>             Fix For: 1.4.0, 1.5.0
>
>
> Test case *testFilterRule1* fails due to a NPE 
> {code}
> java.lang.RuntimeException: Error while applying rule FilterJoinRule:FilterJoinRule:filter, args [rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1, AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1, $3), 'c0')), 'c1'), 0)), rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0, $2),joinType=left)]
> 	at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
> 	at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
> 	at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
> 	at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270)
> 	at org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347)
> 	at org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186)
> 	testFilterRule1(FilterRuleTest.scala:63)	
> Caused by: java.lang.NullPointerException
> 	at org.apache.calcite.plan.Strong.isNull(Strong.java:110)
> 	at org.apache.calcite.plan.Strong.anyNull(Strong.java:166)
> 	at org.apache.calcite.plan.Strong.isNull(Strong.java:114)
> 	at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99)
> 	at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84)
> 	at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354)
> 	at org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149)
> 	at org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348)
> 	at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
> {code}
> but *testFilterRule2* works which has the same query written in SQL.
> {code}
> class FilterRuleTest extends TableTestBase {
>   @Test
>   def testFilterRule1(): Unit = {
>     val util = batchTestUtil()
>     val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c)
>     val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f)
>     val results = t1
>       .leftOuterJoin(t2, 'b === 'e)
>       .select('c, Merger('c, 'f) as 'c0)
>       .select(Merger('c, 'c0) as 'c1)
>       .where('c1 >= 0)
>     val expected = unaryNode(
>       "DataSetCalc",
>       binaryNode(
>         "DataSetJoin",
>         unaryNode(
>           "DataSetCalc",
>           batchTableNode(0),
>           term("select", "b", "c")
>         ),
>         unaryNode(
>           "DataSetCalc",
>           batchTableNode(1),
>           term("select", "e", "f")
>         ),
>         term("where", "=(b, e)"),
>         term("join", "b", "c", "e", "f"),
>         term("joinType", "LeftOuterJoin")
>       ),
>       term("select", "Merger$(c, Merger$(c, f)) AS c1"),
>       term("where", ">=(Merger$(c, Merger$(c, f)), 0)")
>     )
>     util.verifyTable(results, expected)
>   }
>   @Test
>   def testFilterRule2(): Unit = {
>     val util = batchTestUtil()
>     util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c)
>     util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f)
>     util.tableEnv.registerFunction("udf_test", Merger)
>     val sql =
>       s"""
>          |select c1
>          |from (
>          |  select udf_test(c, c0) as c1
>          |  from (
>          |    select c, udf_test(b, c) as c0
>          |      from
>          |      (select a, b, c
>          |        from T1
>          |        left outer join T2
>          |        on T1.b = T2.e
>          |      ) tmp
>          |  ) tmp1
>          |) tmp2
>          |where c1 >= 0
>        """.stripMargin
>     val results = util.tableEnv.sqlQuery(sql)
>     val expected = "DataSetCalc(select=[udf_test(c, udf_test(b, c)) AS c1]) \n" +
>       "DataSetJoin(where=[=(b, e)], join=[b, c, e], joinType=[LeftOuterJoin])\n" +
>       "DataSetCalc(select=[b, c], where=[>=(udf_test(c, udf_test(b, c)), 0)])\n" +
>       "DataSetScan(table=[[_DataSetTable_0]])\n" +
>       "DataSetCalc(select=[e])\n" +
>       "DataSetScan(table=[[_DataSetTable_1]])"
>     util.verifyTable(results, expected)
>   }
> }
> object Merger extends ScalarFunction {
>   def eval(f0: Int, f1: Int): Int = {
>     f0 + f1
>   }
> }
> {code}
> A simple way to fix this is to change the calcite class {code} org.apache.calcite.plan.Strong{code}
> add an additional entry to the EnumMap in createPolicyMap method:
> {code}map.put(SqlKind.AS, Policy.AS_IS);{code}
> Either copy to Flink package and modify it  or using reflection somewhere.
> I'm not sure if there exists other issues like this one since not all the types in SQLKind included in the Strong.MAP.
> @[~fhueske]  @[~twalthr] any ideas?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)