You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by neha goyal <ne...@gmail.com> on 2023/05/02 08:32:55 UTC

Flink Sql erroring at runtime

Hello,

I am using Flink 1.16.1 and observing a different behavior from Flink
1.13.6.

SELECT if(some_string_field is null, 'default', 'some_string_field') from
my_stream

This SQL flink job in the streaming environment is erroring out during
runtime with the exception mentioned below. There are no null values sent
and it is failing for the nonnull values as well.

It is running fine in Flink 1.13.6. Also, if I use the Integer field, it
runs fine.
Was there any change around this in Flink 14/15/16?

io.IOException: Failed to deserialize consumer record due to
    at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
    at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
    at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
    at
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
    at org.apache.flink.streaming.runtime.io
.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
    at org.apache.flink.streaming.runtime.io
.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
    at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
    at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
    at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
    at java.lang.Thread.run(Thread.java:750)
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
    at
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
    at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
    at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
    at
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
    at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
    ... 14 more
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
    at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
    at
org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:128)
    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
    ... 22 more
Caused by: java.lang.NullPointerException
    at
StreamExecCalc$53548.processElement_trueFilter10044_split10048(Unknown
Source)
    at StreamExecCalc$53548.processElement_trueFilter10044(Unknown Source)
    at StreamExecCalc$53548.processElement_split10047(Unknown Source)
    at StreamExecCalc$53548.processElement(Unknown Source)
    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
    ... 28 more

Re: Flink Sql erroring at runtime

Posted by neha goyal <ne...@gmail.com>.
Hi Hang and community,
There is a correction in my earlier email. The issue comes when I use the
UPPER or TRIM function with IF.
Looks like there is a bug with Flink 1.16's IF operator. If I use UPPER or
TRIM functions(there might be more such functions), I am getting the
exception. These functions used to work fine with Flink 1.13.
select
  if(
    address_id = 'a',
    'default',
    upper(address_id)
  ) as address_id
from
  feature_test

sample Input sent to my Kafka Source:   {"address_id":"mydata"}
It should be reproducible. Please try it.

2023-05-05 23:30:24
java.io.IOException: Failed to deserialize consumer record due to
    at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
    at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
    at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
    at
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
    at org.apache.flink.streaming.runtime.io
.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
    at org.apache.flink.streaming.runtime.io
.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
    at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
    at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
    at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
    at java.lang.Thread.run(Thread.java:750)
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
    at
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
    at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
    at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
    at
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
    at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
    ... 14 more
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
    at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
    at
org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:128)
    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
    ... 22 more
Caused by: java.lang.NullPointerException
    at StreamExecCalc$14237.processElement_split1961(Unknown Source)
    at StreamExecCalc$14237.processElement(Unknown Source)
    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
    ... 28 more


On Mon, May 8, 2023 at 11:48 AM Hang Ruan <ru...@gmail.com> wrote:

> Hi, neha,
>
> I think the error occurred because of the deserialization. Is there some
> example data and runnable SQLs to reproduce the problem?
>
> Best,
> Hang
>
> neha goyal <ne...@gmail.com> 于2023年5月2日周二 16:33写道:
>
>> Hello,
>>
>> I am using Flink 1.16.1 and observing a different behavior from Flink
>> 1.13.6.
>>
>> SELECT if(some_string_field is null, 'default', 'some_string_field') from
>> my_stream
>>
>> This SQL flink job in the streaming environment is erroring out during
>> runtime with the exception mentioned below. There are no null values sent
>> and it is failing for the nonnull values as well.
>>
>> It is running fine in Flink 1.13.6. Also, if I use the Integer field, it
>> runs fine.
>> Was there any change around this in Flink 14/15/16?
>>
>> io.IOException: Failed to deserialize consumer record due to
>>     at
>> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
>>     at
>> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
>>     at
>> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
>>     at
>> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
>>     at org.apache.flink.streaming.runtime.io
>> .StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>>     at org.apache.flink.streaming.runtime.io
>> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
>>     at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
>>     at
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>>     at
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>>     at java.lang.Thread.run(Thread.java:750)
>> Caused by:
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>> Could not forward element to next operator
>>     at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
>>     at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
>>     at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>>     at
>> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
>>     at
>> org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
>>     at
>> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
>>     at
>> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
>>     at
>> org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
>>     at
>> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
>>     ... 14 more
>> Caused by:
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>> Could not forward element to next operator
>>     at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
>>     at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
>>     at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>>     at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
>>     at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
>>     at
>> org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:128)
>>     at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>>     ... 22 more
>> Caused by: java.lang.NullPointerException
>>     at
>> StreamExecCalc$53548.processElement_trueFilter10044_split10048(Unknown
>> Source)
>>     at StreamExecCalc$53548.processElement_trueFilter10044(Unknown Source)
>>     at StreamExecCalc$53548.processElement_split10047(Unknown Source)
>>     at StreamExecCalc$53548.processElement(Unknown Source)
>>     at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>>     ... 28 more
>>
>>

Re: Flink Sql erroring at runtime

Posted by Hang Ruan <ru...@gmail.com>.
Hi, neha,

I think the error occurred because of the deserialization. Is there some
example data and runnable SQLs to reproduce the problem?

Best,
Hang

neha goyal <ne...@gmail.com> 于2023年5月2日周二 16:33写道:

> Hello,
>
> I am using Flink 1.16.1 and observing a different behavior from Flink
> 1.13.6.
>
> SELECT if(some_string_field is null, 'default', 'some_string_field') from
> my_stream
>
> This SQL flink job in the streaming environment is erroring out during
> runtime with the exception mentioned below. There are no null values sent
> and it is failing for the nonnull values as well.
>
> It is running fine in Flink 1.13.6. Also, if I use the Integer field, it
> runs fine.
> Was there any change around this in Flink 14/15/16?
>
> io.IOException: Failed to deserialize consumer record due to
>     at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
>     at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
>     at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
>     at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
>     at org.apache.flink.streaming.runtime.io
> .StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>     at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
>     at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
>     at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>     at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>     at java.lang.Thread.run(Thread.java:750)
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
>     at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
>     at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
>     at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>     at
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
>     at
> org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
>     at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
>     at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
>     at
> org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
>     at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
>     ... 14 more
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
>     at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
>     at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
>     at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>     at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
>     at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
>     at
> org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:128)
>     at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>     ... 22 more
> Caused by: java.lang.NullPointerException
>     at
> StreamExecCalc$53548.processElement_trueFilter10044_split10048(Unknown
> Source)
>     at StreamExecCalc$53548.processElement_trueFilter10044(Unknown Source)
>     at StreamExecCalc$53548.processElement_split10047(Unknown Source)
>     at StreamExecCalc$53548.processElement(Unknown Source)
>     at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>     ... 28 more
>
>