You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "lincoln lee (Jira)" <ji...@apache.org> on 2023/04/18 01:37:00 UTC

[jira] [Created] (FLINK-31829) Conversion to relational algebra failed to preserve datatypes' nullabilities

lincoln lee created FLINK-31829:
-----------------------------------

             Summary:  Conversion to relational algebra failed to preserve datatypes' nullabilities
                 Key: FLINK-31829
                 URL: https://issues.apache.org/jira/browse/FLINK-31829
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.17.0
            Reporter: lincoln lee
             Fix For: 1.18.0


AssertionError when run such a case:
{code}
  @Test
  def testCoalesceOnNestedColumns(): Unit = {
    val tEnv = util.tableEnv
    val tableDescriptor = TableDescriptor
      .forConnector("datagen")
      .schema(
        Schema.newBuilder
          .column("id", DataTypes.INT.notNull)
          .column("a", DataTypes.ROW(DataTypes.FIELD("np", DataTypes.INT)).nullable)
          .build)
      .build
    tEnv.createTemporaryTable("t1", tableDescriptor)
    tEnv.createTemporaryTable("t2", tableDescriptor)
    val res = tEnv.executeSql(
      "EXPLAIN SELECT a.id, COALESCE(a.a.np, b.a.np) c1, IFNULL(a.a.np, b.a.np) c2 FROM t1 a left JOIN t2 b ON a.id=b.id where a.a is null or a.a.np is null")
    res.print()
  }
{code}

stack:
{code}
java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:
validated type:
RecordType(INTEGER B1, INTEGER NOT NULL B2, INTEGER BenchmarkId1, INTEGER NOT NULL BenchmarkIdWithIfNull, INTEGER NOT NULL BenchmarkId2) NOT NULL
converted type:
RecordType(INTEGER NOT NULL B1, INTEGER NOT NULL B2, INTEGER BenchmarkId1, INTEGER NOT NULL BenchmarkIdWithIfNull, INTEGER NOT NULL BenchmarkId2) NOT NULL
rel:
LogicalProject(B1=[$4.BenchmarkId], B2=[$2.BenchmarkId], BenchmarkId1=[IF(IS NOT NULL($4), $4.BenchmarkId, IF(true, $2.BenchmarkId, null:INTEGER))], BenchmarkIdWithIfNull=[IFNULL($4.BenchmarkId, $2.BenchmarkId)], BenchmarkId2=[COALESCE($4.BenchmarkId, $2.BenchmarkId)])
  LogicalFilter(condition=[OR(IS NULL($4), IS NULL($4.BenchmarkId))])
    LogicalJoin(condition=[=($3, $0)], joinType=[left])
      LogicalJoin(condition=[=($1, $0)], joinType=[inner])
        LogicalTableScan(table=[[default_catalog, default_database, dbo_book]])
        LogicalTableScan(table=[[default_catalog, default_database, static_book]])
      LogicalTableScan(table=[[default_catalog, default_database, onebook_book_benchmark]])


	at org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:500)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:611)
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:216)
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:192)
	at org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:56)
	at org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48)
	at org.apache.flink.table.planner.operations.converters.SqlNodeConverters.convertSqlNode(SqlNodeConverters.java:65)
	at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNode(SqlNodeToOperationConversion.java:281)
	at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:271)
	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:665)
{code}

but the equivalent tests using sql ddl to create table works fine:
{code}
  @Test
  def testCoalesceOnNestedColumns2(): Unit = {
    val tEnv = util.tableEnv
    tEnv.executeSql(s"""
                       |create temporary table t1 (
                       |  id int not null,
                       |  a row<np int>
                       |) with (
                       | 'connector' = 'datagen'
                       |)
                       |""".stripMargin)
    tEnv.executeSql(s"""
                       |create temporary table t2 (
                       |  id int not null,
                       |  a row<np int>
                       |) with (
                       | 'connector' = 'datagen'
                       |)
                       |""".stripMargin)
    val res = tEnv.executeSql(
      "EXPLAIN SELECT a.id, COALESCE(a.a.np, b.a.np) c1, IFNULL(a.a.np, b.a.np) c2 FROM t1 a left JOIN t2 b ON a.id=b.id where a.a is null or a.a.np is null")
    res.print()
  }


== Abstract Syntax Tree ==
LogicalProject(id=[$0], c1=[COALESCE($1.np, $3.np)], c2=[IFNULL($1.np, $3.np)])
+- LogicalFilter(condition=[OR(IS NULL($1), IS NULL($1.np))])
   +- LogicalJoin(condition=[=($0, $2)], joinType=[left])
      :- LogicalTableScan(table=[[default_catalog, default_database, t1]])
      +- LogicalTableScan(table=[[default_catalog, default_database, t2]])

== Optimized Physical Plan ==
Calc(select=[id, COALESCE(a.np, a0.np) AS c1, IFNULL(a.np, a0.np) AS c2])
+- Join(joinType=[LeftOuterJoin], where=[=(id, id0)], select=[id, a, id0, a0], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[id, a], where=[OR(IS NULL(a), IS NULL(a.np))])
   :     +- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[id, a])
   +- Exchange(distribution=[hash[id]])
      +- TableSourceScan(table=[[default_catalog, default_database, t2]], fields=[id, a])

== Optimized Execution Plan ==
Calc(select=[id, COALESCE(a.np, a0.np) AS c1, IFNULL(a.np, a0.np) AS c2])
+- Join(joinType=[LeftOuterJoin], where=[(id = id0)], select=[id, a, id0, a0], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[id, a], where=[(a IS NULL OR a.np IS NULL)])
   :     +- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[id, a])
   +- Exchange(distribution=[hash[id]])
      +- TableSourceScan(table=[[default_catalog, default_database, t2]], fields=[id, a])
{code}






--
This message was sent by Atlassian Jira
(v8.20.10#820010)