You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Tom Thornton <th...@yelp.com> on 2022/05/31 18:05:31 UTC

Re: [External] Re: Exception when running Java UDF with Blink table planner

Hi,

Thank you for your help. Here's the requested info:

Could you also tell us which Flink version you are using, the schema of the
> source table and some test data? With these info, we can debug in our local
> environment.
>

Flink version: 1.11.6.
Schema of the source table:
name | type | null | key | computed column | watermark |
col1 | ARRAY<STRING> | true | | | |
Example input data to cause exception (i.e., anything that is not null):
{
  "col1": ["FooBar"]
}
If the input is instead null then there is no exception thrown:
{
  "col1": null
}

Thanks,
Tom

On Thu, May 26, 2022 at 7:47 PM Shengkai Fang <fs...@gmail.com> wrote:

> Hi.
>
> Could you also tell us which Flink version you are using, the schema of
> the source table and some test data? With these info, we can debug in our
> local environment.
>
> Best,
> Shengkai
>
> Tom Thornton <th...@yelp.com> 于2022年5月27日周五 06:47写道:
>
>> We are migrating from the legacy table planner to the Blink table
>> planner. Previously we had a UDF defined like this that worked without
>> issue:
>>
>> public class ListToString extends DPScalarFunction {
>>     public String eval(List list) {
>>         return "foo";
>>     }
>>
>> Since moving to the Blink table planner and receiving this error:
>>
>> Caused by: org.apache.flink.table.api.ValidationException: Given parameters of function 'ListToString' do not match any signature.
>> Actual: (java.lang.String[])
>> Expected: (java.util.List)
>>
>>
>> We refactored the UDF to take as input an Object[] to match what is
>> received from Blink:
>>
>> public class ListToString extends DPScalarFunction {
>>     public String eval(Object[] arr) {        return "foo";
>>     }
>> }
>>
>> Now the UDF always fails (including for the simplified example above
>> where we return a constant string regardless of input). For example, when
>> we run on a query like this one:
>>
>> SELECT ListToString(`col1`) as col1_string FROM `table`
>>
>> Produces an IndexOutOfBoundsException:
>>
>> Caused by: java.lang.IndexOutOfBoundsException: Index 115 out of bounds for length 0
>> 	at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
>> 	at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
>> 	at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
>> 	at java.base/java.util.Objects.checkIndex(Objects.java:372)
>> 	at java.base/java.util.ArrayList.get(ArrayList.java:459)
>> 	at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>> 	at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
>> 	at org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:570)
>> 	at org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:64)
>> 	at org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:700)
>> 	at org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:683)
>> 	at org.apache.flink.table.data.util.DataFormatConverters.arrayDataToJavaArray(DataFormatConverters.java:1175)
>> 	at org.apache.flink.table.data.util.DataFormatConverters.access$200(DataFormatConverters.java:104)
>> 	at org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1128)
>> 	at org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1070)
>> 	at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:406)
>> 	at StreamExecCalc$337.processElement(Unknown Source)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>> 	at SourceConversion$328.processElement(Unknown Source)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>> 	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:107)
>> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:114)
>> 	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
>> 	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:187)
>> 	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:146)
>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:833)
>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:825)
>> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:266)
>>
>> Any ideas what may be causing this?
>>
>