You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jing Zhang (JIRA)" <ji...@apache.org> on 2019/07/17 06:38:00 UTC

[jira] [Updated] (FLINK-13301) Some PlannerExpression resultType is not consistent with Calcite Type inference

     [ https://issues.apache.org/jira/browse/FLINK-13301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jing Zhang updated FLINK-13301:
-------------------------------
    Description: 
Some PlannerExpression resultType is not consistent with Calcite Type inference. The problem could be happened when run  the following example: 

{code:java}
        // prepare source Data
    val testData = new mutable.MutableList[(Int)]
    testData.+=((3))
    val t = env.fromCollection(testData).toTable(tEnv).as('a)

    // register a TableSink
    val fieldNames = Array("f0")
    val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT())
//    val fieldTypes: Array[TypeInformation[_]] = Array(Types.LONG())
    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
    tEnv.registerTableSink("targetTable", sink.configure(fieldNames, fieldTypes))
    
    t.select('a.floor()).insertInto("targetTable")

    env.execute()
{code}

The cause is ResultType of `floor` is LONG_TYPE_INFO, while in Calcite `SqlFloorFunction` infers returnType is the type of the first argument(INT in the above case).
If I change `fieldTypes` to Array(Types.INT()), the following error will be thrown in compile phase.

{code:java}
org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink [targetTable] do not match.
Query result schema: [_c0: Long]
TableSink schema:    [f0: Integer]

	at org.apache.flink.table.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:59)
	at org.apache.flink.table.planner.StreamPlanner$$anonfun$2.apply(StreamPlanner.scala:158)
	at org.apache.flink.table.planner.StreamPlanner$$anonfun$2.apply(StreamPlanner.scala:157)
	at scala.Option.map(Option.scala:146)
	at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:157)
	at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:129)
	at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:129)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
{code}

And If I change `fieldTypes` to Array(Types.LONG()), the other error will be thrown in runtime.

{code:java}
org.apache.flink.table.api.TableException: Result field does not match requested type. Requested: Long; Actual: Integer

	at org.apache.flink.table.planner.Conversions$$anonfun$generateRowConverterFunction$2.apply(Conversions.scala:103)
	at org.apache.flink.table.planner.Conversions$$anonfun$generateRowConverterFunction$2.apply(Conversions.scala:98)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
	at org.apache.flink.table.planner.Conversions$.generateRowConverterFunction(Conversions.scala:98)
	at org.apache.flink.table.planner.DataStreamConversions$.getConversionMapper(DataStreamConversions.scala:135)
	at org.apache.flink.table.planner.DataStreamConversions$.convert(DataStreamConversions.scala:91)
{code}

{color:red}Above inconsistent problem also exists in `Floor`, `Ceil`, `Mod` and so on.  {color}

  was:
Some PlannerExpression resultType is not consistent with Calcite Type inference. The problem could be happened when run  the following example: 

{code:java}
        // prepare source Data
    val testData = new mutable.MutableList[(Int)]
    testData.+=((3))
    val t = env.fromCollection(testData).toTable(tEnv).as('a)

    // register a TableSink
    val fieldNames = Array("f0")
    val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT())
//    val fieldTypes: Array[TypeInformation[_]] = Array(Types.LONG())
    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
    tEnv.registerTableSink("targetTable", sink.configure(fieldNames, fieldTypes))
    
    t.select('a.floor()).insertInto("targetTable")

    env.execute()
{code}

The cause is ResultType of `floor` is LONG_TYPE_INFO, while in Calcite `SqlFloorFunction` infers returnType is the type of the first argument(INT in the above case).
If I change `fieldTypes` to Array(Types.INT()), the following error will be thrown in compile phase.

{code:java}
org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink [targetTable] do not match.
Query result schema: [_c0: Long]
TableSink schema:    [f0: Integer]

	at org.apache.flink.table.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:59)
	at org.apache.flink.table.planner.StreamPlanner$$anonfun$2.apply(StreamPlanner.scala:158)
	at org.apache.flink.table.planner.StreamPlanner$$anonfun$2.apply(StreamPlanner.scala:157)
	at scala.Option.map(Option.scala:146)
	at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:157)
	at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:129)
	at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:129)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
{code}

And If I change `fieldTypes` to Array(Types.LONG()), the other error will be thrown in runtime.

{code:java}
org.apache.flink.table.api.TableException: Result field does not match requested type. Requested: Long; Actual: Integer

	at org.apache.flink.table.planner.Conversions$$anonfun$generateRowConverterFunction$2.apply(Conversions.scala:103)
	at org.apache.flink.table.planner.Conversions$$anonfun$generateRowConverterFunction$2.apply(Conversions.scala:98)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
	at org.apache.flink.table.planner.Conversions$.generateRowConverterFunction(Conversions.scala:98)
	at org.apache.flink.table.planner.DataStreamConversions$.getConversionMapper(DataStreamConversions.scala:135)
	at org.apache.flink.table.planner.DataStreamConversions$.convert(DataStreamConversions.scala:91)
{code}

Above inconsistent problem also exists in `Floor`, `Ceil`, `Mod` and so on.


> Some PlannerExpression resultType is not consistent with Calcite Type inference
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-13301
>                 URL: https://issues.apache.org/jira/browse/FLINK-13301
>             Project: Flink
>          Issue Type: Task
>          Components: Table SQL / API
>            Reporter: Jing Zhang
>            Priority: Major
>
> Some PlannerExpression resultType is not consistent with Calcite Type inference. The problem could be happened when run  the following example: 
> {code:java}
>         // prepare source Data
>     val testData = new mutable.MutableList[(Int)]
>     testData.+=((3))
>     val t = env.fromCollection(testData).toTable(tEnv).as('a)
>     // register a TableSink
>     val fieldNames = Array("f0")
>     val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT())
> //    val fieldTypes: Array[TypeInformation[_]] = Array(Types.LONG())
>     val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
>     tEnv.registerTableSink("targetTable", sink.configure(fieldNames, fieldTypes))
>     
>     t.select('a.floor()).insertInto("targetTable")
>     env.execute()
> {code}
> The cause is ResultType of `floor` is LONG_TYPE_INFO, while in Calcite `SqlFloorFunction` infers returnType is the type of the first argument(INT in the above case).
> If I change `fieldTypes` to Array(Types.INT()), the following error will be thrown in compile phase.
> {code:java}
> org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink [targetTable] do not match.
> Query result schema: [_c0: Long]
> TableSink schema:    [f0: Integer]
> 	at org.apache.flink.table.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:59)
> 	at org.apache.flink.table.planner.StreamPlanner$$anonfun$2.apply(StreamPlanner.scala:158)
> 	at org.apache.flink.table.planner.StreamPlanner$$anonfun$2.apply(StreamPlanner.scala:157)
> 	at scala.Option.map(Option.scala:146)
> 	at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:157)
> 	at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:129)
> 	at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:129)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> {code}
> And If I change `fieldTypes` to Array(Types.LONG()), the other error will be thrown in runtime.
> {code:java}
> org.apache.flink.table.api.TableException: Result field does not match requested type. Requested: Long; Actual: Integer
> 	at org.apache.flink.table.planner.Conversions$$anonfun$generateRowConverterFunction$2.apply(Conversions.scala:103)
> 	at org.apache.flink.table.planner.Conversions$$anonfun$generateRowConverterFunction$2.apply(Conversions.scala:98)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> 	at org.apache.flink.table.planner.Conversions$.generateRowConverterFunction(Conversions.scala:98)
> 	at org.apache.flink.table.planner.DataStreamConversions$.getConversionMapper(DataStreamConversions.scala:135)
> 	at org.apache.flink.table.planner.DataStreamConversions$.convert(DataStreamConversions.scala:91)
> {code}
> {color:red}Above inconsistent problem also exists in `Floor`, `Ceil`, `Mod` and so on.  {color}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)