You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Rong Rong (JIRA)" <ji...@apache.org> on 2018/08/13 19:00:00 UTC

[jira] [Commented] (FLINK-10019) Fix Composite getResultType of UDF cannot be chained with other operators

    [ https://issues.apache.org/jira/browse/FLINK-10019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16578793#comment-16578793 ] 

Rong Rong commented on FLINK-10019:
-----------------------------------

Dug a little deeper in Calcite and found that in:
https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/InferTypes.java#L68

Seems like if the return type is inferred as a record, or {{isStruct()}}, "it must have the same number of fields as the number of operands." which clearly is not the case here since the following expression: {{AS(func(a), "myRow")}} only passes over the {{func(a)}} for type inference, but not the alias {{"myRow"}}

> Fix Composite getResultType of UDF cannot be chained with other operators
> -------------------------------------------------------------------------
>
>                 Key: FLINK-10019
>                 URL: https://issues.apache.org/jira/browse/FLINK-10019
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Rong Rong
>            Assignee: Rong Rong
>            Priority: Major
>
> If explicitly return a CompositeType in {{udf.getResultType}}, will result in some failures in chained operators.
> For example: consider a simple UDF,
> {code:scala}
> object Func extends ScalarFunction {
>   def eval(row: Row): Row = {
>     row
>   }
>   override def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] =
>     Array(Types.ROW(Types.INT))
>   override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
>     Types.ROW(Types.INT)
> }
> {code}
> This should work perfectly since it's just a simple pass through, however
> {code:scala}
>   @Test
>   def testRowType(): Unit = {
>     val data = List(
>       Row.of(Row.of(12.asInstanceOf[Integer]), "1")
>     )
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     val stream = env.fromCollection(data)(Types.ROW(Types.ROW(Types.INT), Types.STRING))
>     val tEnv = TableEnvironment.getTableEnvironment(env)
>     val table = stream.toTable(tEnv, 'a, 'b)
>     tEnv.registerFunction("func", Func)
>     tEnv.registerTable("t", table)
>     // This works perfectly
>     val result1 = tEnv.sqlQuery("SELECT func(a) FROM t").toAppendStream[Row]
>     result1.addSink(new StreamITCase.StringSink[Row])
>     // This throws exception
>     val result2 = tEnv.sqlQuery("SELECT func(a) as myRow FROM t").toAppendStream[Row]
>     result2.addSink(new StreamITCase.StringSink[Row])
>     env.execute()
>   }
> {code}
> Exception code:
> {code:java}
> java.lang.IndexOutOfBoundsException: index (1) must be less than size (1)
> 	at com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:310)
> 	at com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:293)
> 	at com.google.common.collect.SingletonImmutableList.get(SingletonImmutableList.java:41)
> 	at org.apache.calcite.sql.type.InferTypes$2.inferOperandTypes(InferTypes.java:83)
> 	at org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1777)
> 	at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:459)
> 	at org.apache.calcite.sql.validate.SqlValidatorImpl.expandStar(SqlValidatorImpl.java:349)
> ...
> {code}
> This is due to the fact that Calcite inferOperandTypes does not expect to infer a struct RelDataType.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)