You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Tom Thornton <th...@yelp.com> on 2022/05/26 22:47:04 UTC

Exception when running Java UDF with Blink table planner

We are migrating from the legacy table planner to the Blink table planner.
Previously we had a UDF defined like this that worked without issue:

public class ListToString extends DPScalarFunction {
    public String eval(List list) {
        return "foo";
    }

Since moving to the Blink table planner and receiving this error:

Caused by: org.apache.flink.table.api.ValidationException: Given
parameters of function 'ListToString' do not match any signature.
Actual: (java.lang.String[])
Expected: (java.util.List)


We refactored the UDF to take as input an Object[] to match what is
received from Blink:

public class ListToString extends DPScalarFunction {
    public String eval(Object[] arr) {        return "foo";
    }
}

Now the UDF always fails (including for the simplified example above where
we return a constant string regardless of input). For example, when we run
on a query like this one:

SELECT ListToString(`col1`) as col1_string FROM `table`

Produces an IndexOutOfBoundsException:

Caused by: java.lang.IndexOutOfBoundsException: Index 115 out of
bounds for length 0
	at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
	at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
	at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
	at java.base/java.util.Objects.checkIndex(Objects.java:372)
	at java.base/java.util.ArrayList.get(ArrayList.java:459)
	at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
	at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
	at org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:570)
	at org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:64)
	at org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:700)
	at org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:683)
	at org.apache.flink.table.data.util.DataFormatConverters.arrayDataToJavaArray(DataFormatConverters.java:1175)
	at org.apache.flink.table.data.util.DataFormatConverters.access$200(DataFormatConverters.java:104)
	at org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1128)
	at org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1070)
	at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:406)
	at StreamExecCalc$337.processElement(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
	at SourceConversion$328.processElement(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:107)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:114)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:187)
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:146)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:833)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:825)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:266)

Any ideas what may be causing this?

Re: [External] Re: Exception when running Java UDF with Blink table planner

Posted by Tom Thornton <th...@yelp.com>.
Hi,

Thank you for your help. Here's the requested info:

Could you also tell us which Flink version you are using, the schema of the
> source table and some test data? With these info, we can debug in our local
> environment.
>

Flink version: 1.11.6.
Schema of the source table:
name | type | null | key | computed column | watermark |
col1 | ARRAY<STRING> | true | | | |
Example input data to cause exception (i.e., anything that is not null):
{
  "col1": ["FooBar"]
}
If the input is instead null then there is no exception thrown:
{
  "col1": null
}

Thanks,
Tom

On Thu, May 26, 2022 at 7:47 PM Shengkai Fang <fs...@gmail.com> wrote:

> Hi.
>
> Could you also tell us which Flink version you are using, the schema of
> the source table and some test data? With these info, we can debug in our
> local environment.
>
> Best,
> Shengkai
>
> Tom Thornton <th...@yelp.com> 于2022年5月27日周五 06:47写道:
>
>> We are migrating from the legacy table planner to the Blink table
>> planner. Previously we had a UDF defined like this that worked without
>> issue:
>>
>> public class ListToString extends DPScalarFunction {
>>     public String eval(List list) {
>>         return "foo";
>>     }
>>
>> Since moving to the Blink table planner and receiving this error:
>>
>> Caused by: org.apache.flink.table.api.ValidationException: Given parameters of function 'ListToString' do not match any signature.
>> Actual: (java.lang.String[])
>> Expected: (java.util.List)
>>
>>
>> We refactored the UDF to take as input an Object[] to match what is
>> received from Blink:
>>
>> public class ListToString extends DPScalarFunction {
>>     public String eval(Object[] arr) {        return "foo";
>>     }
>> }
>>
>> Now the UDF always fails (including for the simplified example above
>> where we return a constant string regardless of input). For example, when
>> we run on a query like this one:
>>
>> SELECT ListToString(`col1`) as col1_string FROM `table`
>>
>> Produces an IndexOutOfBoundsException:
>>
>> Caused by: java.lang.IndexOutOfBoundsException: Index 115 out of bounds for length 0
>> 	at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
>> 	at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
>> 	at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
>> 	at java.base/java.util.Objects.checkIndex(Objects.java:372)
>> 	at java.base/java.util.ArrayList.get(ArrayList.java:459)
>> 	at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>> 	at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
>> 	at org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:570)
>> 	at org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:64)
>> 	at org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:700)
>> 	at org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:683)
>> 	at org.apache.flink.table.data.util.DataFormatConverters.arrayDataToJavaArray(DataFormatConverters.java:1175)
>> 	at org.apache.flink.table.data.util.DataFormatConverters.access$200(DataFormatConverters.java:104)
>> 	at org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1128)
>> 	at org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1070)
>> 	at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:406)
>> 	at StreamExecCalc$337.processElement(Unknown Source)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>> 	at SourceConversion$328.processElement(Unknown Source)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>> 	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:107)
>> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:114)
>> 	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
>> 	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:187)
>> 	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:146)
>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:833)
>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:825)
>> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:266)
>>
>> Any ideas what may be causing this?
>>
>

Re: Exception when running Java UDF with Blink table planner

Posted by Shengkai Fang <fs...@gmail.com>.
Hi.

Could you also tell us which Flink version you are using, the schema of the
source table and some test data? With these info, we can debug in our local
environment.

Best,
Shengkai

Tom Thornton <th...@yelp.com> 于2022年5月27日周五 06:47写道:

> We are migrating from the legacy table planner to the Blink table planner.
> Previously we had a UDF defined like this that worked without issue:
>
> public class ListToString extends DPScalarFunction {
>     public String eval(List list) {
>         return "foo";
>     }
>
> Since moving to the Blink table planner and receiving this error:
>
> Caused by: org.apache.flink.table.api.ValidationException: Given parameters of function 'ListToString' do not match any signature.
> Actual: (java.lang.String[])
> Expected: (java.util.List)
>
>
> We refactored the UDF to take as input an Object[] to match what is
> received from Blink:
>
> public class ListToString extends DPScalarFunction {
>     public String eval(Object[] arr) {        return "foo";
>     }
> }
>
> Now the UDF always fails (including for the simplified example above where
> we return a constant string regardless of input). For example, when we run
> on a query like this one:
>
> SELECT ListToString(`col1`) as col1_string FROM `table`
>
> Produces an IndexOutOfBoundsException:
>
> Caused by: java.lang.IndexOutOfBoundsException: Index 115 out of bounds for length 0
> 	at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
> 	at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
> 	at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
> 	at java.base/java.util.Objects.checkIndex(Objects.java:372)
> 	at java.base/java.util.ArrayList.get(ArrayList.java:459)
> 	at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> 	at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
> 	at org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:570)
> 	at org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:64)
> 	at org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:700)
> 	at org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:683)
> 	at org.apache.flink.table.data.util.DataFormatConverters.arrayDataToJavaArray(DataFormatConverters.java:1175)
> 	at org.apache.flink.table.data.util.DataFormatConverters.access$200(DataFormatConverters.java:104)
> 	at org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1128)
> 	at org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1070)
> 	at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:406)
> 	at StreamExecCalc$337.processElement(Unknown Source)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> 	at SourceConversion$328.processElement(Unknown Source)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> 	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:107)
> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:114)
> 	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
> 	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:187)
> 	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:146)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:833)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:825)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:266)
>
> Any ideas what may be causing this?
>

Re: [External] Re: Exception when running Java UDF with Blink table planner

Posted by Tom Thornton <th...@yelp.com>.
Hi Shengkai,

In order to reproduce the issue, the input argument type must be
`Object[]`. Also DPScalarFunction is a typo and should be ScalarFunction.
Are you able to observe the error if you try with the changed input type:

public static class ListToString extends ScalarFunction {
    public String eval(Object[] arr) {
        return "foo";
    }
}

The UDF should be able to take as input any type of array (e.g., array of
strings, ints, or longs) and still return a string regardless of the input.

We are blocked on upgrading due to some unrelated issues, so hoping to
determine if this is a legitimate issue with 1.11 or not.

Thanks,
Tom

On Tue, May 31, 2022 at 8:12 PM Shengkai Fang <fs...@gmail.com> wrote:

> Hi, Tom.
>
> I don't reproduce the exception in the master. I am not sure whether the
> problem is fixed or I missing something.
>
> The only difference is my test udf extends ScalarFunction rather than
> DPScalarFunction and I use String[] as the input type.
>
> ```
>
> public static class ListToString extends ScalarFunction {
>     public String eval(String[] arr) {
>         return "foo";
>     }
> }
>
> ```
>
> I think you can also debug in this way:
> 0. Open the Flink repo and checkout to the release-1.11
> 1. Create the UDF in JavaUserDefinedScalarFunctions
> 2. Find a test in the table ITCase, e.g. TableSinkITCase.scala
> 3. Add a new test to verify the results. I just add the following code
> ```
>  @Test
>   def test(): Unit = {
>
>     val dataId = TestValuesTableFactory.registerRowData(
>       Seq(GenericRowData.of(new
> GenericArrayData(Array(StringData.fromString("3")).toArray[Object]))))
>
>     tEnv.executeSql(
>       s"""
>          |CREATE TABLE test2   (person ARRAY<STRING>) WITH(
>          |  'connector' = 'values',
>          |  'data-id' = '$dataId',
>          |  'register-internal-data' = 'true'
>          |)
>          |""".stripMargin
>     )
>     tEnv.createFunction("ListToString", classOf[ListToString])
>     tEnv.executeSql("SELECT ListToString(`person`) as col1_string FROM
> `test2`").print()
>   }
> ```
> 4. Then you can debug the case in your IDEA.
>
> Considering the Flink 1.11 is not maintained by the community, do you mind
> to upgrade to the latest version(1.13/1.14/1.15)?
>
>
>
> Best,
> Shengkai
>
> Tom Thornton <th...@yelp.com> 于2022年6月1日周三 02:06写道:
>
>> Hi all,
>>
>> Thank you for the help.
>>
>> It seems an exception thrown when Flink try to deserialize the object
>>> outputed by your udf. So is the obejct produced by your udf serializable?
>>> Does it contain any lambda function in the object/class?
>>
>>
>> The output object of the UDF is the string "foo" which should be
>> serializable. This exception only occurs when the input to the UDF is not
>> null. However, when the input is null, then the output object (which is
>> still the string "foo") does not cause any error or exception (i.e. it is
>> able to be serialized). There are no lambda functions in the output object
>> (it is just a string object).
>>
>> Thanks,
>> Tom
>>
>> On Thu, May 26, 2022 at 9:36 PM yuxia <lu...@alumni.sjtu.edu.cn>
>> wrote:
>>
>>> It seems an exception thrown when Flink try to deserialize the object
>>> outputed by your udf. So is the obejct produced by your udf serializable?
>>> Does it contain any lambda function in the object/class?
>>>
>>> Best regards,
>>> Yuxia
>>>
>>> ------------------------------
>>> *发件人: *"Tom Thornton" <th...@yelp.com>
>>> *收件人: *"User" <us...@flink.apache.org>
>>> *发送时间: *星期五, 2022年 5 月 27日 上午 6:47:04
>>> *主题: *Exception when running Java UDF with Blink table planner
>>>
>>> We are migrating from the legacy table planner to the Blink table
>>> planner. Previously we had a UDF defined like this that worked without
>>> issue:
>>>
>>> public class ListToString extends DPScalarFunction {
>>>     public String eval(List list) {
>>>         return "foo";
>>>     }
>>>
>>> Since moving to the Blink table planner and receiving this error:
>>>
>>> Caused by: org.apache.flink.table.api.ValidationException: Given parameters of function 'ListToString' do not match any signature.
>>> Actual: (java.lang.String[])
>>> Expected: (java.util.List)
>>>
>>>
>>> We refactored the UDF to take as input an Object[] to match what is
>>> received from Blink:
>>>
>>> public class ListToString extends DPScalarFunction {
>>>     public String eval(Object[] arr) {        return "foo";
>>>     }
>>> }
>>>
>>> Now the UDF always fails (including for the simplified example above
>>> where we return a constant string regardless of input). For example, when
>>> we run on a query like this one:
>>>
>>> SELECT ListToString(`col1`) as col1_string FROM `table`
>>>
>>> Produces an IndexOutOfBoundsException:
>>>
>>> Caused by: java.lang.IndexOutOfBoundsException: Index 115 out of bounds for length 0
>>> 	at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
>>> 	at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
>>> 	at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
>>> 	at java.base/java.util.Objects.checkIndex(Objects.java:372)
>>> 	at java.base/java.util.ArrayList.get(ArrayList.java:459)
>>> 	at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>>> 	at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>>> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>>> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
>>> 	at org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:570)
>>> 	at org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:64)
>>> 	at org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:700)
>>> 	at org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:683)
>>> 	at org.apache.flink.table.data.util.DataFormatConverters.arrayDataToJavaArray(DataFormatConverters.java:1175)
>>> 	at org.apache.flink.table.data.util.DataFormatConverters.access$200(DataFormatConverters.java:104)
>>> 	at org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1128)
>>> 	at org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1070)
>>> 	at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:406)
>>> 	at StreamExecCalc$337.processElement(Unknown Source)
>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>>> 	at SourceConversion$328.processElement(Unknown Source)
>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>>> 	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>>> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:107)
>>> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:114)
>>> 	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
>>> 	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:187)
>>> 	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:146)
>>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:833)
>>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:825)
>>> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>>> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>>> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:266)
>>>
>>> Any ideas what may be causing this?
>>>
>>>

Re: [External] Re: Exception when running Java UDF with Blink table planner

Posted by Shengkai Fang <fs...@gmail.com>.
Hi, Tom.

I don't reproduce the exception in the master. I am not sure whether the
problem is fixed or I missing something.

The only difference is my test udf extends ScalarFunction rather than
DPScalarFunction and I use String[] as the input type.

```

public static class ListToString extends ScalarFunction {
    public String eval(String[] arr) {
        return "foo";
    }
}

```

I think you can also debug in this way:
0. Open the Flink repo and checkout to the release-1.11
1. Create the UDF in JavaUserDefinedScalarFunctions
2. Find a test in the table ITCase, e.g. TableSinkITCase.scala
3. Add a new test to verify the results. I just add the following code
```
 @Test
  def test(): Unit = {

    val dataId = TestValuesTableFactory.registerRowData(
      Seq(GenericRowData.of(new
GenericArrayData(Array(StringData.fromString("3")).toArray[Object]))))

    tEnv.executeSql(
      s"""
         |CREATE TABLE test2   (person ARRAY<STRING>) WITH(
         |  'connector' = 'values',
         |  'data-id' = '$dataId',
         |  'register-internal-data' = 'true'
         |)
         |""".stripMargin
    )
    tEnv.createFunction("ListToString", classOf[ListToString])
    tEnv.executeSql("SELECT ListToString(`person`) as col1_string FROM
`test2`").print()
  }
```
4. Then you can debug the case in your IDEA.

Considering the Flink 1.11 is not maintained by the community, do you mind
to upgrade to the latest version(1.13/1.14/1.15)?



Best,
Shengkai

Tom Thornton <th...@yelp.com> 于2022年6月1日周三 02:06写道:

> Hi all,
>
> Thank you for the help.
>
> It seems an exception thrown when Flink try to deserialize the object
>> outputed by your udf. So is the obejct produced by your udf serializable?
>> Does it contain any lambda function in the object/class?
>
>
> The output object of the UDF is the string "foo" which should be
> serializable. This exception only occurs when the input to the UDF is not
> null. However, when the input is null, then the output object (which is
> still the string "foo") does not cause any error or exception (i.e. it is
> able to be serialized). There are no lambda functions in the output object
> (it is just a string object).
>
> Thanks,
> Tom
>
> On Thu, May 26, 2022 at 9:36 PM yuxia <lu...@alumni.sjtu.edu.cn> wrote:
>
>> It seems an exception thrown when Flink try to deserialize the object
>> outputed by your udf. So is the obejct produced by your udf serializable?
>> Does it contain any lambda function in the object/class?
>>
>> Best regards,
>> Yuxia
>>
>> ------------------------------
>> *发件人: *"Tom Thornton" <th...@yelp.com>
>> *收件人: *"User" <us...@flink.apache.org>
>> *发送时间: *星期五, 2022年 5 月 27日 上午 6:47:04
>> *主题: *Exception when running Java UDF with Blink table planner
>>
>> We are migrating from the legacy table planner to the Blink table
>> planner. Previously we had a UDF defined like this that worked without
>> issue:
>>
>> public class ListToString extends DPScalarFunction {
>>     public String eval(List list) {
>>         return "foo";
>>     }
>>
>> Since moving to the Blink table planner and receiving this error:
>>
>> Caused by: org.apache.flink.table.api.ValidationException: Given parameters of function 'ListToString' do not match any signature.
>> Actual: (java.lang.String[])
>> Expected: (java.util.List)
>>
>>
>> We refactored the UDF to take as input an Object[] to match what is
>> received from Blink:
>>
>> public class ListToString extends DPScalarFunction {
>>     public String eval(Object[] arr) {        return "foo";
>>     }
>> }
>>
>> Now the UDF always fails (including for the simplified example above
>> where we return a constant string regardless of input). For example, when
>> we run on a query like this one:
>>
>> SELECT ListToString(`col1`) as col1_string FROM `table`
>>
>> Produces an IndexOutOfBoundsException:
>>
>> Caused by: java.lang.IndexOutOfBoundsException: Index 115 out of bounds for length 0
>> 	at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
>> 	at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
>> 	at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
>> 	at java.base/java.util.Objects.checkIndex(Objects.java:372)
>> 	at java.base/java.util.ArrayList.get(ArrayList.java:459)
>> 	at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>> 	at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
>> 	at org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:570)
>> 	at org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:64)
>> 	at org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:700)
>> 	at org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:683)
>> 	at org.apache.flink.table.data.util.DataFormatConverters.arrayDataToJavaArray(DataFormatConverters.java:1175)
>> 	at org.apache.flink.table.data.util.DataFormatConverters.access$200(DataFormatConverters.java:104)
>> 	at org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1128)
>> 	at org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1070)
>> 	at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:406)
>> 	at StreamExecCalc$337.processElement(Unknown Source)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>> 	at SourceConversion$328.processElement(Unknown Source)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>> 	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:107)
>> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:114)
>> 	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
>> 	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:187)
>> 	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:146)
>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:833)
>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:825)
>> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:266)
>>
>> Any ideas what may be causing this?
>>
>>

Re: [External] Re: Exception when running Java UDF with Blink table planner

Posted by Tom Thornton <th...@yelp.com>.
Hi all,

Thank you for the help.

It seems an exception thrown when Flink try to deserialize the object
> outputed by your udf. So is the obejct produced by your udf serializable?
> Does it contain any lambda function in the object/class?


The output object of the UDF is the string "foo" which should be
serializable. This exception only occurs when the input to the UDF is not
null. However, when the input is null, then the output object (which is
still the string "foo") does not cause any error or exception (i.e. it is
able to be serialized). There are no lambda functions in the output object
(it is just a string object).

Thanks,
Tom

On Thu, May 26, 2022 at 9:36 PM yuxia <lu...@alumni.sjtu.edu.cn> wrote:

> It seems an exception thrown when Flink try to deserialize the object
> outputed by your udf. So is the obejct produced by your udf serializable?
> Does it contain any lambda function in the object/class?
>
> Best regards,
> Yuxia
>
> ------------------------------
> *发件人: *"Tom Thornton" <th...@yelp.com>
> *收件人: *"User" <us...@flink.apache.org>
> *发送时间: *星期五, 2022年 5 月 27日 上午 6:47:04
> *主题: *Exception when running Java UDF with Blink table planner
>
> We are migrating from the legacy table planner to the Blink table planner.
> Previously we had a UDF defined like this that worked without issue:
>
> public class ListToString extends DPScalarFunction {
>     public String eval(List list) {
>         return "foo";
>     }
>
> Since moving to the Blink table planner and receiving this error:
>
> Caused by: org.apache.flink.table.api.ValidationException: Given parameters of function 'ListToString' do not match any signature.
> Actual: (java.lang.String[])
> Expected: (java.util.List)
>
>
> We refactored the UDF to take as input an Object[] to match what is
> received from Blink:
>
> public class ListToString extends DPScalarFunction {
>     public String eval(Object[] arr) {        return "foo";
>     }
> }
>
> Now the UDF always fails (including for the simplified example above where
> we return a constant string regardless of input). For example, when we run
> on a query like this one:
>
> SELECT ListToString(`col1`) as col1_string FROM `table`
>
> Produces an IndexOutOfBoundsException:
>
> Caused by: java.lang.IndexOutOfBoundsException: Index 115 out of bounds for length 0
> 	at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
> 	at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
> 	at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
> 	at java.base/java.util.Objects.checkIndex(Objects.java:372)
> 	at java.base/java.util.ArrayList.get(ArrayList.java:459)
> 	at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> 	at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
> 	at org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:570)
> 	at org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:64)
> 	at org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:700)
> 	at org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:683)
> 	at org.apache.flink.table.data.util.DataFormatConverters.arrayDataToJavaArray(DataFormatConverters.java:1175)
> 	at org.apache.flink.table.data.util.DataFormatConverters.access$200(DataFormatConverters.java:104)
> 	at org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1128)
> 	at org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1070)
> 	at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:406)
> 	at StreamExecCalc$337.processElement(Unknown Source)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> 	at SourceConversion$328.processElement(Unknown Source)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> 	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:107)
> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:114)
> 	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
> 	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:187)
> 	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:146)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:833)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:825)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:266)
>
> Any ideas what may be causing this?
>
>

Re: Exception when running Java UDF with Blink table planner

Posted by yuxia <lu...@alumni.sjtu.edu.cn>.
It seems an exception thrown when Flink try to deserialize the object outputed by your udf. So is the obejct produced by your udf serializable? Does it contain any lambda function in the object/class? 

Best regards, 
Yuxia 


发件人: "Tom Thornton" <th...@yelp.com> 
收件人: "User" <us...@flink.apache.org> 
发送时间: 星期五, 2022年 5 月 27日 上午 6:47:04 
主题: Exception when running Java UDF with Blink table planner 

We are migrating from the legacy table planner to the Blink table planner. Previously we had a UDF defined like this that worked without issue: 
public class ListToString extends DPScalarFunction { 
public String eval (List list) { 
return "foo" ; 
} 
Since moving to the Blink table planner and receiving this error: 

Caused by: org.apache.flink.table.api.ValidationException: Given parameters of function 'ListToString' do not match any signature. 
Actual: (java.lang.String[]) 
Expected: (java.util.List) 

We refactored the UDF to take as input an Object[] to match what is received from Blink: 
public class ListToString extends DPScalarFunction { 
public String eval (Object[] arr) { return "foo" ; 
} 
} 
Now the UDF always fails (including for the simplified example above where we return a constant string regardless of input). For example, when we run on a query like this one: 
SELECT ListToString(`col1`) as col1_string FROM `table` 
Produces an IndexOutOfBoundsException: 
Caused by: java.lang.IndexOutOfBoundsException: Index 115 out of bounds for length 0 
at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64) 
at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70) 
at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248) 
at java.base/java.util.Objects.checkIndex(Objects.java:372) 
at java.base/java.util.ArrayList.get(ArrayList.java:459) 
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) 
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) 
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) 
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354) 
at org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:570) 
at org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:64) 
at org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:700) 
at org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:683) 
at org.apache.flink.table.data.util.DataFormatConverters.arrayDataToJavaArray(DataFormatConverters.java:1175) 
at org.apache.flink.table.data.util.DataFormatConverters.access$200(DataFormatConverters.java:104) 
at org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1128) 
at org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1070) 
at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:406) 
at StreamExecCalc$337.processElement(Unknown Source) 
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757) 
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732) 
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712) 
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) 
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) 
at SourceConversion$328.processElement(Unknown Source) 
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757) 
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732) 
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712) 
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) 
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) 
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) 
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757) 
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732) 
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712) 
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) 
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) 
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:107) 
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:114) 
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365) 
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:187) 
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:146) 
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:833) 
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:825) 
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) 
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) 
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:266) 
Any ideas what may be causing this?