You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Rong Rong (JIRA)" <ji...@apache.org> on 2018/08/13 19:01:00 UTC
[jira] [Comment Edited] (FLINK-10019) Fix Composite getResultType
of UDF cannot be chained with other operators
[ https://issues.apache.org/jira/browse/FLINK-10019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16578793#comment-16578793 ]
Rong Rong edited comment on FLINK-10019 at 8/13/18 7:00 PM:
------------------------------------------------------------
Dug a little deeper in Calcite and found that in:
https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/InferTypes.java#L68
Seems like if the return type is inferred as a record, or {{isStruct()}}, "it must have the same number of fields as the number of operands." which clearly is not the case here since the following expression: **{{AS(func(a), "myRow")}}** only passes over the **{{func(a)}}** for type inference, but not the alias **{{"myRow"}}**
was (Author: walterddr):
Dug a little deeper in Calcite and found that in:
https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/InferTypes.java#L68
Seems like if the return type is inferred as a record, or {{isStruct()}}, "it must have the same number of fields as the number of operands." which clearly is not the case here since the following expression: {{AS(func(a), "myRow")}} only passes over the {{func(a)}} for type inference, but not the alias {{"myRow"}}
> Fix Composite getResultType of UDF cannot be chained with other operators
> -------------------------------------------------------------------------
>
> Key: FLINK-10019
> URL: https://issues.apache.org/jira/browse/FLINK-10019
> Project: Flink
> Issue Type: Bug
> Reporter: Rong Rong
> Assignee: Rong Rong
> Priority: Major
>
> If explicitly return a CompositeType in {{udf.getResultType}}, will result in some failures in chained operators.
> For example: consider a simple UDF,
> {code:scala}
> object Func extends ScalarFunction {
> def eval(row: Row): Row = {
> row
> }
> override def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] =
> Array(Types.ROW(Types.INT))
> override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
> Types.ROW(Types.INT)
> }
> {code}
> This should work perfectly since it's just a simple pass through, however
> {code:scala}
> @Test
> def testRowType(): Unit = {
> val data = List(
> Row.of(Row.of(12.asInstanceOf[Integer]), "1")
> )
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val stream = env.fromCollection(data)(Types.ROW(Types.ROW(Types.INT), Types.STRING))
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val table = stream.toTable(tEnv, 'a, 'b)
> tEnv.registerFunction("func", Func)
> tEnv.registerTable("t", table)
> // This works perfectly
> val result1 = tEnv.sqlQuery("SELECT func(a) FROM t").toAppendStream[Row]
> result1.addSink(new StreamITCase.StringSink[Row])
> // This throws exception
> val result2 = tEnv.sqlQuery("SELECT func(a) as myRow FROM t").toAppendStream[Row]
> result2.addSink(new StreamITCase.StringSink[Row])
> env.execute()
> }
> {code}
> Exception code:
> {code:java}
> java.lang.IndexOutOfBoundsException: index (1) must be less than size (1)
> at com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:310)
> at com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:293)
> at com.google.common.collect.SingletonImmutableList.get(SingletonImmutableList.java:41)
> at org.apache.calcite.sql.type.InferTypes$2.inferOperandTypes(InferTypes.java:83)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1777)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:459)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.expandStar(SqlValidatorImpl.java:349)
> ...
> {code}
> This is due to the fact that Calcite inferOperandTypes does not expect to infer a struct RelDataType.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)