You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Kurt Young (JIRA)" <ji...@apache.org> on 2019/01/30 10:37:01 UTC
[jira] [Closed] (FLINK-7942) NPE when apply FilterJoinRule
[ https://issues.apache.org/jira/browse/FLINK-7942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kurt Young closed FLINK-7942.
-----------------------------
Resolution: Unresolved
> NPE when apply FilterJoinRule
> -----------------------------
>
> Key: FLINK-7942
> URL: https://issues.apache.org/jira/browse/FLINK-7942
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Reporter: lincoln.lee
> Priority: Major
>
> Test case *testFilterRule1* fails due to a NPE
> {code}
> java.lang.RuntimeException: Error while applying rule FilterJoinRule:FilterJoinRule:filter, args [rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1, AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1, $3), 'c0')), 'c1'), 0)), rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0, $2),joinType=left)]
> at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
> at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
> at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270)
> at org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347)
> at org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186)
> testFilterRule1(FilterRuleTest.scala:63)
> Caused by: java.lang.NullPointerException
> at org.apache.calcite.plan.Strong.isNull(Strong.java:110)
> at org.apache.calcite.plan.Strong.anyNull(Strong.java:166)
> at org.apache.calcite.plan.Strong.isNull(Strong.java:114)
> at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99)
> at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84)
> at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354)
> at org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149)
> at org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348)
> at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
> {code}
> but *testFilterRule2* works which has the same query written in SQL.
> {code}
> class FilterRuleTest extends TableTestBase {
> @Test
> def testFilterRule1(): Unit = {
> val util = batchTestUtil()
> val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c)
> val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f)
> val results = t1
> .leftOuterJoin(t2, 'b === 'e)
> .select('c, Merger('c, 'f) as 'c0)
> .select(Merger('c, 'c0) as 'c1)
> .where('c1 >= 0)
> val expected = unaryNode(
> "DataSetCalc",
> binaryNode(
> "DataSetJoin",
> unaryNode(
> "DataSetCalc",
> batchTableNode(0),
> term("select", "b", "c")
> ),
> unaryNode(
> "DataSetCalc",
> batchTableNode(1),
> term("select", "e", "f")
> ),
> term("where", "=(b, e)"),
> term("join", "b", "c", "e", "f"),
> term("joinType", "LeftOuterJoin")
> ),
> term("select", "Merger$(c, Merger$(c, f)) AS c1"),
> term("where", ">=(Merger$(c, Merger$(c, f)), 0)")
> )
> util.verifyTable(results, expected)
> }
> @Test
> def testFilterRule2(): Unit = {
> val util = batchTestUtil()
> util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c)
> util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f)
> util.tableEnv.registerFunction("udf_test", Merger)
> val sql =
> s"""
> |select c1
> |from (
> | select udf_test(c, c0) as c1
> | from (
> | select c, udf_test(b, c) as c0
> | from
> | (select a, b, c
> | from T1
> | left outer join T2
> | on T1.b = T2.e
> | ) tmp
> | ) tmp1
> |) tmp2
> |where c1 >= 0
> """.stripMargin
> val results = util.tableEnv.sqlQuery(sql)
> val expected = "DataSetCalc(select=[udf_test(c, udf_test(b, c)) AS c1]) \n" +
> "DataSetJoin(where=[=(b, e)], join=[b, c, e], joinType=[LeftOuterJoin])\n" +
> "DataSetCalc(select=[b, c], where=[>=(udf_test(c, udf_test(b, c)), 0)])\n" +
> "DataSetScan(table=[[_DataSetTable_0]])\n" +
> "DataSetCalc(select=[e])\n" +
> "DataSetScan(table=[[_DataSetTable_1]])"
> util.verifyTable(results, expected)
> }
> }
> object Merger extends ScalarFunction {
> def eval(f0: Int, f1: Int): Int = {
> f0 + f1
> }
> }
> {code}
> A simple way to fix this is to change the calcite class {code} org.apache.calcite.plan.Strong{code}
> add an additional entry to the EnumMap in createPolicyMap method:
> {code}map.put(SqlKind.AS, Policy.AS_IS);{code}
> Either copy to Flink package and modify it or using reflection somewhere.
> I'm not sure if there exists other issues like this one since not all the types in SQLKind included in the Strong.MAP.
> @[~fhueske] @[~twalthr] any ideas?
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)