You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by RS <ti...@163.com> on 2022/07/13 06:47:37 UTC

IF IS NULL 函数运行报错:NullPointerException

Hi,


版本:Flink-1.15.1 
模式:单机


s1为kafka的source表,已定义,
g_name为s1表的字段,string类型


select `g_name` from s1;
这个运行正常,有数据输出


然后加上 IF 语句,则会报错:
Flink SQL> select if(`g_name` is null, 'no', `g_name`) from s1;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
运行报错


堆栈里面的Unknown Source,感觉有问题。
at StreamExecCalc$1247.processElement_split196(Unknown Source) ~[?:?]


报错堆栈:
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67) ~[flink-connector-kafka-1.15.1.jar:1.15.1]
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) ~[flink-connector-kafka-1.15.1.jar:1.15.1]
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:54) ~[flink-connector-kafka-1.15.1.jar:1.15.1]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ~[flink-connector-kafka-1.15.1.jar:1.15.1]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33) ~[flink-connector-kafka-1.15.1.jar:1.15.1]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143) ~[flink-connector-files-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-dist-1.15.1.jar:1.15.1]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_302]
Caused by: java.lang.NullPointerException
at StreamExecCalc$1247.processElement_split196(Unknown Source) ~[?:?]
at StreamExecCalc$1247.processElement(Unknown Source) ~[?:?]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67) ~[flink-connector-kafka-1.15.1.jar:1.15.1]
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) ~[flink-connector-kafka-1.15.1.jar:1.15.1]
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:54) ~[flink-connector-kafka-1.15.1.jar:1.15.1]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ~[flink-connector-kafka-1.15.1.jar:1.15.1]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33) ~[flink-connector-kafka-1.15.1.jar:1.15.1]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143) ~[flink-connector-files-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-dist-1.15.1.jar:1.15.1]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_302]


Thanks