You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 723849736 <72...@qq.com.INVALID> on 2023/02/15 08:29:19 UTC

Flink SQL 实现数组元素变换的UDF

大家好,

我在用flink sql的时候有一个场景,就是需要对数组中的某一列做变换,类似于spark sql中的tranform函数


https://spark.apache.org/docs/latest/api/sql/index.html#transform


目前flink sql好像不支持类似的功能,这个功能用UDF能实现吗?


因为这个函数需要传入一个函数作为输入,函数类型的参数不是flink的data type,validate阶段会抛异常, 这个有办法解决吗?


class ArrayTransformFunction extends ScalarFunction {

  def eval(a: Array[Long], function: Long =&gt; Long): Array[Long] = {
    a.map(e =&gt; function(e))
  }}
异常信息如下


Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. An error occurred in the type inference logic of function 'transform'.
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)
	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)
	at SQLTest$.main(SQLTest.scala:44)
	at SQLTest.main(SQLTest.scala)
Caused by: org.apache.flink.table.api.ValidationException: An error occurred in the type inference logic of function 'transform'.
	at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163)
	at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146)
	at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
	at java.util.Optional.flatMap(Optional.java:241)
	at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98)
	at org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1260)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1275)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1245)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)
	... 6 more
Caused by: org.apache.flink.table.api.ValidationException: Could not extract a valid type inference for function class 'udf.ArrayTransformFunction'. Please check for implementation mistakes and/or provide a corresponding hint.
	at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
	at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)
	at org.apache.flink.table.types.extraction.TypeInferenceExtractor.forScalarFunction(TypeInferenceExtractor.java:83)
	at org.apache.flink.table.functions.ScalarFunction.getTypeInference(ScalarFunction.java:143)
	at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160)
	... 17 more
Caused by: org.apache.flink.table.api.ValidationException: Error in extracting a signature to output mapping.
	at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
	at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117)
	at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161)
	at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148)
	... 20 more
Caused by: org.apache.flink.table.api.ValidationException: Unable to extract a type inference from method:
public long[] udf.ArrayTransformFunction.eval(long[],scala.Function1)
	at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
	at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:183)
	at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:114)
	... 22 more
Caused by: org.apache.flink.table.api.ValidationException: Could not extract a data type from 'scala.Function1<java.lang.Object, java.lang.Object&gt;' in parameter 1 of method 'eval' in class 'udf.ArrayTransformFunction'. Please pass the required data type manually or allow RAW types.
	at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
	at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:220)
	at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:198)
	at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:174)
	at org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromMethodParameter(DataTypeExtractor.java:128)
	at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractDataTypeArgument(FunctionMappingExtractor.java:409)
	at org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$null$10(FunctionMappingExtractor.java:385)
	at java.util.Optional.orElseGet(Optional.java:267)
	at org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$extractArgumentTemplates$11(FunctionMappingExtractor.java:383)
	at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
	at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
	at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
	at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractArgumentTemplates(FunctionMappingExtractor.java:387)
	at org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createParameterSignatureExtraction$9(FunctionMappingExtractor.java:364)
	at org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:324)
	at org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:269)
	at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:169)
	... 23 more
Caused by: org.apache.flink.table.api.ValidationException: Could not extract a data type from 'scala.Function1<java.lang.Object, java.lang.Object&gt;'. Interpreting it as a structured type was also not successful.
	at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
	at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:270)
	at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:212)
	... 43 more
Caused by: org.apache.flink.table.api.ValidationException: Class 'scala.Function1' must not be abstract.
	at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
	at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:328)
	at org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredClass(ExtractionUtils.java:162)
	at org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredType(DataTypeExtractor.java:453)
	at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:268)
	... 44 more

Re: Flink SQL 实现数组元素变换的UDF

Posted by Shammon FY <zj...@gmail.com>.
Hi

可以考虑将这个function打入到udf包里,在自定义的udf里直接调用?



On Wed, Feb 15, 2023 at 4:29 PM 723849736 <72...@qq.com.invalid> wrote:

> 大家好,
>
> 我在用flink sql的时候有一个场景,就是需要对数组中的某一列做变换,类似于spark sql中的tranform函数
>
>
> https://spark.apache.org/docs/latest/api/sql/index.html#transform
>
>
> 目前flink sql好像不支持类似的功能,这个功能用UDF能实现吗?
>
>
> 因为这个函数需要传入一个函数作为输入,函数类型的参数不是flink的data type,validate阶段会抛异常, 这个有办法解决吗?
>
>
> class ArrayTransformFunction extends ScalarFunction {
>
>   def eval(a: Array[Long], function: Long =&gt; Long): Array[Long] = {
>     a.map(e =&gt; function(e))
>   }}
> 异常信息如下
>
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> SQL validation failed. An error occurred in the type inference logic of
> function 'transform'.
>         at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
>         at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
>         at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)
>         at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)
>         at SQLTest$.main(SQLTest.scala:44)
>         at SQLTest.main(SQLTest.scala)
> Caused by: org.apache.flink.table.api.ValidationException: An error
> occurred in the type inference logic of function 'transform'.
>         at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163)
>         at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146)
>         at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
>         at java.util.Optional.flatMap(Optional.java:241)
>         at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98)
>         at
> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1260)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1275)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1245)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)
>         at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)
>         ... 6 more
> Caused by: org.apache.flink.table.api.ValidationException: Could not
> extract a valid type inference for function class
> 'udf.ArrayTransformFunction'. Please check for implementation mistakes
> and/or provide a corresponding hint.
>         at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>         at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)
>         at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.forScalarFunction(TypeInferenceExtractor.java:83)
>         at
> org.apache.flink.table.functions.ScalarFunction.getTypeInference(ScalarFunction.java:143)
>         at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160)
>         ... 17 more
> Caused by: org.apache.flink.table.api.ValidationException: Error in
> extracting a signature to output mapping.
>         at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>         at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117)
>         at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161)
>         at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148)
>         ... 20 more
> Caused by: org.apache.flink.table.api.ValidationException: Unable to
> extract a type inference from method:
> public long[] udf.ArrayTransformFunction.eval(long[],scala.Function1)
>         at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>         at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:183)
>         at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:114)
>         ... 22 more
> Caused by: org.apache.flink.table.api.ValidationException: Could not
> extract a data type from 'scala.Function1<java.lang.Object,
> java.lang.Object&gt;' in parameter 1 of method 'eval' in class
> 'udf.ArrayTransformFunction'. Please pass the required data type manually
> or allow RAW types.
>         at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>         at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:220)
>         at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:198)
>         at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:174)
>         at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromMethodParameter(DataTypeExtractor.java:128)
>         at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractDataTypeArgument(FunctionMappingExtractor.java:409)
>         at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$null$10(FunctionMappingExtractor.java:385)
>         at java.util.Optional.orElseGet(Optional.java:267)
>         at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$extractArgumentTemplates$11(FunctionMappingExtractor.java:383)
>         at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
>         at
> java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
>         at
> java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
>         at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>         at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>         at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>         at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>         at
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>         at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractArgumentTemplates(FunctionMappingExtractor.java:387)
>         at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createParameterSignatureExtraction$9(FunctionMappingExtractor.java:364)
>         at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:324)
>         at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:269)
>         at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:169)
>         ... 23 more
> Caused by: org.apache.flink.table.api.ValidationException: Could not
> extract a data type from 'scala.Function1<java.lang.Object,
> java.lang.Object&gt;'. Interpreting it as a structured type was also not
> successful.
>         at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>         at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:270)
>         at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:212)
>         ... 43 more
> Caused by: org.apache.flink.table.api.ValidationException: Class
> 'scala.Function1' must not be abstract.
>         at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>         at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:328)
>         at
> org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredClass(ExtractionUtils.java:162)
>         at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredType(DataTypeExtractor.java:453)
>         at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:268)
>         ... 44 more

Re:Flink SQL 实现数组元素变换的UDF

Posted by "casel.chen" <ca...@126.com>.
目前应该是不支持,一个替代方案是利用concat函数将数组转成string作为输入,再在你的UDF中拆成数组进行处理。

















在 2023-02-15 16:29:19,"723849736" <72...@qq.com.INVALID> 写道:
>大家好,
>
>我在用flink sql的时候有一个场景,就是需要对数组中的某一列做变换,类似于spark sql中的tranform函数
>
>
>https://spark.apache.org/docs/latest/api/sql/index.html#transform
>
>
>目前flink sql好像不支持类似的功能,这个功能用UDF能实现吗?
>
>
>因为这个函数需要传入一个函数作为输入,函数类型的参数不是flink的data type,validate阶段会抛异常, 这个有办法解决吗?
>
>
>class ArrayTransformFunction extends ScalarFunction {
>
>  def eval(a: Array[Long], function: Long =&gt; Long): Array[Long] = {
>    a.map(e =&gt; function(e))
>  }}
>异常信息如下
>
>
>Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. An error occurred in the type inference logic of function 'transform'.
>	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
>	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
>	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)
>	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)
>	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)
>	at SQLTest$.main(SQLTest.scala:44)
>	at SQLTest.main(SQLTest.scala)
>Caused by: org.apache.flink.table.api.ValidationException: An error occurred in the type inference logic of function 'transform'.
>	at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163)
>	at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146)
>	at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
>	at java.util.Optional.flatMap(Optional.java:241)
>	at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98)
>	at org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
>	at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1260)
>	at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1275)
>	at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1245)
>	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009)
>	at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)
>	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)
>	... 6 more
>Caused by: org.apache.flink.table.api.ValidationException: Could not extract a valid type inference for function class 'udf.ArrayTransformFunction'. Please check for implementation mistakes and/or provide a corresponding hint.
>	at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>	at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)
>	at org.apache.flink.table.types.extraction.TypeInferenceExtractor.forScalarFunction(TypeInferenceExtractor.java:83)
>	at org.apache.flink.table.functions.ScalarFunction.getTypeInference(ScalarFunction.java:143)
>	at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160)
>	... 17 more
>Caused by: org.apache.flink.table.api.ValidationException: Error in extracting a signature to output mapping.
>	at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>	at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117)
>	at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161)
>	at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148)
>	... 20 more
>Caused by: org.apache.flink.table.api.ValidationException: Unable to extract a type inference from method:
>public long[] udf.ArrayTransformFunction.eval(long[],scala.Function1)
>	at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>	at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:183)
>	at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:114)
>	... 22 more
>Caused by: org.apache.flink.table.api.ValidationException: Could not extract a data type from 'scala.Function1<java.lang.Object, java.lang.Object&gt;' in parameter 1 of method 'eval' in class 'udf.ArrayTransformFunction'. Please pass the required data type manually or allow RAW types.
>	at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>	at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:220)
>	at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:198)
>	at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:174)
>	at org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromMethodParameter(DataTypeExtractor.java:128)
>	at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractDataTypeArgument(FunctionMappingExtractor.java:409)
>	at org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$null$10(FunctionMappingExtractor.java:385)
>	at java.util.Optional.orElseGet(Optional.java:267)
>	at org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$extractArgumentTemplates$11(FunctionMappingExtractor.java:383)
>	at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
>	at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
>	at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
>	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>	at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractArgumentTemplates(FunctionMappingExtractor.java:387)
>	at org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createParameterSignatureExtraction$9(FunctionMappingExtractor.java:364)
>	at org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:324)
>	at org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:269)
>	at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:169)
>	... 23 more
>Caused by: org.apache.flink.table.api.ValidationException: Could not extract a data type from 'scala.Function1<java.lang.Object, java.lang.Object&gt;'. Interpreting it as a structured type was also not successful.
>	at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>	at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:270)
>	at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:212)
>	... 43 more
>Caused by: org.apache.flink.table.api.ValidationException: Class 'scala.Function1' must not be abstract.
>	at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>	at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:328)
>	at org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredClass(ExtractionUtils.java:162)
>	at org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredType(DataTypeExtractor.java:453)
>	at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:268)
>	... 44 more