You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "luoyuxia (Jira)" <ji...@apache.org> on 2022/11/15 06:52:00 UTC

[jira] [Commented] (FLINK-30018) NPE error in generated StreamExecCalc

    [ https://issues.apache.org/jira/browse/FLINK-30018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17634188#comment-17634188 ] 

luoyuxia commented on FLINK-30018:
----------------------------------

[~jackwangcs] Thanks for reporting. Could you please share us the sql so that we can try to reproduce it?

> NPE error in generated StreamExecCalc
> -------------------------------------
>
>                 Key: FLINK-30018
>                 URL: https://issues.apache.org/jira/browse/FLINK-30018
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.16.0
>            Reporter: jackwangcs
>            Priority: Major
>
> Hi, I met a NPE exception running Flink SQL. The exception is
> {code:java}
> rg.apache.flink.runtime.taskmanager.Task                    [] - Join[292] -> Calc[293] -> ConstraintEnforcer[294] (10/48)#0 (e628391c0b38d4d22ae62a181a2d7f22_c9cd1581189658451a8850505c8a0007_9_0) switched from RUNNING to FAILED with failure cause: java.lang.NullPointerException
>     at StreamExecCalc$20690.processElement_split881(Unknown Source)
>     at StreamExecCalc$20690.processElement(Unknown Source)
>     at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>     at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
>     at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>     at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
>     at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
>     at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>     at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.outputNullPadding(StreamingJoinOperator.java:334)
>     at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:219)
>     at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement1(StreamingJoinOperator.java:124)
>     at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:217)
>     at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:183)
>     at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:266)
>     at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>     at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
>     at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>     at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>     at java.lang.Thread.run(Thread.java:750) {code}
> `StreamExecCalc$20690.processElement_split881()` function is below:
> {code:java}
> private final org.apache.flink.table.data.binary.BinaryStringData str$19838 = org.apache.flink.table.data.binary.BinaryStringData.fromString("N");
> void processElement_split881(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {
> if (isNull$19828) {
>             out.setNullAt(4);
>           } else {
>             out.setNonPrimitiveValue(4, field$19829);
>           }
> writer$19903.reset();
> writer$19903.writeBoolean(0, ((boolean) false));
> if (isNull$19830) {
>             writer$19903.setNullAt(1);
>           } else {
>             writer$19903.writeLong(1, field$19830);
>           }
> isNull$19831 = isNull$19821 || false;
> result$19832 = false;
> if (!isNull$19831) {
>             
>           
>           result$19832 = field$19821 == ((long) 44571L);
>           
>             
>           }
> result$19834 = -1L;
> if (result$19832) {
>             
>             if (!isNull$19833) {
>               result$19834 = field$19833;
>             }
>             isNull$19834 = isNull$19833;
>           } else {
>             
>             if (!false) {
>               result$19834 = ((long) 0L);
>             }
>             isNull$19834 = false;
>           }
> if (isNull$19834) {
>             writer$19903.setNullAt(2);
>           } else {
>             writer$19903.writeLong(2, result$19834);
>           }
> isNull$19851 = false;
> if (!isNull$19851) {
>           if (((org.apache.flink.table.data.binary.BinaryStringData) str$19838).numChars() > 1) {
>           result$19852 = ((org.apache.flink.table.data.binary.BinaryStringData) str$19838).substring(0, 1);
>           } else {
>           if (((org.apache.flink.table.data.binary.BinaryStringData) str$19838).numChars() < 1) {
>           
>           padLength$19853 = 1 - ((org.apache.flink.table.data.binary.BinaryStringData) str$19838).numChars();
>           
>           padString$19854 = org.apache.flink.table.data.binary.BinaryStringData.blankString(padLength$19853);
>           result$19852 = org.apache.flink.table.data.binary.BinaryStringDataUtil.concat(((org.apache.flink.table.data.binary.BinaryStringData) str$19838), padString$19854);
>           } else {
>           result$19852 = ((org.apache.flink.table.data.binary.BinaryStringData) str$19838);
>           }
>           }
>           isNull$19851 = result$19852 == null;
>           } else {
>           result$19852 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
>           }
> isNull$19855 = isNull$19850;
> if (!isNull$19855) {
>           if (result$19850.numChars() > 1) {
>           result$19856 = result$19850.substring(0, 1);
>           } else {
>           if (result$19850.numChars() < 1) {
>           
>           padLength$19857 = 1 - result$19850.numChars();
>           
>           padString$19858 = org.apache.flink.table.data.binary.BinaryStringData.blankString(padLength$19857);
>           result$19856 = org.apache.flink.table.data.binary.BinaryStringDataUtil.concat(result$19850, padString$19858);
>           } else {
>           result$19856 = result$19850;
>           }
>           }
>           isNull$19855 = result$19856 == null;
>           } else {
>           result$19856 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
>           }
> result$19859 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
> } {code}
>  
> Could you take a look at this issue? I could not find why NPE is thrown.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)