You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Caizhi Weng (Jira)" <ji...@apache.org> on 2022/06/22 07:49:00 UTC
[jira] [Closed] (FLINK-28190) NullPointerException is thrown if the intermediate result of nesting UDFs is used
[ https://issues.apache.org/jira/browse/FLINK-28190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Caizhi Weng closed FLINK-28190.
-------------------------------
Resolution: Not A Problem
> NullPointerException is thrown if the intermediate result of nesting UDFs is used
> ---------------------------------------------------------------------------------
>
> Key: FLINK-28190
> URL: https://issues.apache.org/jira/browse/FLINK-28190
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.15.0, 1.14.5
> Reporter: Caizhi Weng
> Priority: Major
>
> Add the following test case to {{TableEnvironmentITCase}} to reproduce this bug.
> {code:scala}
> @Test
> def myTest(): Unit = {
> tEnv.executeSql("create temporary function myfun1 as 'MyFun1'")
> tEnv.executeSql("create temporary function myfun2 as 'MyFun2'")
> val data: Seq[Row] = Seq(
> Row.of("Hi", "Hello")
> )
> tEnv.executeSql(
> s"""
> |create table T (
> | a string,
> | b string
> |) with (
> | 'connector' = 'values',
> | 'data-id' = '${TestValuesTableFactory.registerData(data)}',
> | 'bounded' = 'true'
> |)
> |""".stripMargin)
> tEnv.executeSql("create temporary view my_view as select myfun1(a, b) as mp from T")
> tEnv.executeSql("select myfun2(mp), mp['Hi'] from my_view").print()
> }
> {code}
> UDF classes are
> {code:java}
> import org.apache.flink.table.functions.ScalarFunction;
> import java.util.HashMap;
> import java.util.Map;
> public class MyFun1 extends ScalarFunction {
> public Map<String, String> eval(String k, String v) {
> Map<String, String> returnMap = new HashMap<>();
> returnMap.put(k, v);
> return returnMap;
> }
> }
> {code}
> {code:java}
> import org.apache.flink.table.functions.ScalarFunction;
> import java.util.Map;
> public class MyFun2 extends ScalarFunction {
> public String eval(Map<String, String> input) {
> return String.valueOf(input);
> }
> }
> {code}
> The exception stack is
> {code}
> Caused by: java.lang.NullPointerException
> at StreamExecCalc$25.processElement_split1(Unknown Source)
> at StreamExecCalc$25.processElement(Unknown Source)
> at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)
> at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)
> at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
> at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
> at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
> at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418)
> at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513)
> at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103)
> at org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$FromElementSourceFunction.run(TestValuesRuntimeFunctions.java:530)
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:332)
> {code}
> The generated code is
> {code}
> public class ToBinary$0 implements org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData, org.apache.flink.table.data.binary.BinaryRowData> {
> org.apache.flink.table.data.binary.BinaryRowData out = new org.apache.flink.table.data.binary.BinaryRowData(2);
> org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new org.apache.flink.table.data.writer.BinaryRowWriter(out);
> public ToBinary$0(Object[] references) throws Exception {
>
> }
> @Override
> public org.apache.flink.table.data.binary.BinaryRowData apply(org.apache.flink.table.data.RowData in1) {
>
> if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) {
> return ((org.apache.flink.table.data.binary.BinaryRowData) in1);
> }
> innerApply(in1);
> return out;
> }
> /* Fit into JavaCodeSplitter's void function limitation. */
> private void innerApply(org.apache.flink.table.data.RowData in1) {
>
>
> outWriter.reset();
> if (in1.isNullAt(0)) {
> outWriter.setNullAt(0);
> } else {
> outWriter.writeString(0, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)));
> }
>
> if (in1.isNullAt(1)) {
> outWriter.setNullAt(1);
> } else {
> outWriter.writeString(1, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)));
> }
>
> outWriter.complete();
> out.setRowKind(in1.getRowKind());
> }
> }
>
> public class ToBinary$1 implements org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData, org.apache.flink.table.data.binary.BinaryRowData> {
> org.apache.flink.table.data.binary.BinaryRowData out = new org.apache.flink.table.data.binary.BinaryRowData(2);
> org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new org.apache.flink.table.data.writer.BinaryRowWriter(out);
> public ToBinary$1(Object[] references) throws Exception {
>
> }
> @Override
> public org.apache.flink.table.data.binary.BinaryRowData apply(org.apache.flink.table.data.RowData in1) {
>
> if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) {
> return ((org.apache.flink.table.data.binary.BinaryRowData) in1);
> }
> innerApply(in1);
> return out;
> }
> /* Fit into JavaCodeSplitter's void function limitation. */
> private void innerApply(org.apache.flink.table.data.RowData in1) {
>
>
> outWriter.reset();
> if (in1.isNullAt(0)) {
> outWriter.setNullAt(0);
> } else {
> outWriter.writeString(0, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)));
> }
>
> if (in1.isNullAt(1)) {
> outWriter.setNullAt(1);
> } else {
> outWriter.writeString(1, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)));
> }
>
> outWriter.complete();
> out.setRowKind(in1.getRowKind());
> }
> }
>
> public class ToBinary$2 implements org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData, org.apache.flink.table.data.binary.BinaryRowData> {
> org.apache.flink.table.data.binary.BinaryRowData out = new org.apache.flink.table.data.binary.BinaryRowData(2);
> org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new org.apache.flink.table.data.writer.BinaryRowWriter(out);
> public ToBinary$2(Object[] references) throws Exception {
>
> }
> @Override
> public org.apache.flink.table.data.binary.BinaryRowData apply(org.apache.flink.table.data.RowData in1) {
>
> if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) {
> return ((org.apache.flink.table.data.binary.BinaryRowData) in1);
> }
> innerApply(in1);
> return out;
> }
> /* Fit into JavaCodeSplitter's void function limitation. */
> private void innerApply(org.apache.flink.table.data.RowData in1) {
>
>
> outWriter.reset();
> if (in1.isNullAt(0)) {
> outWriter.setNullAt(0);
> } else {
> outWriter.writeString(0, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)));
> }
>
> if (in1.isNullAt(1)) {
> outWriter.setNullAt(1);
> } else {
> outWriter.writeString(1, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)));
> }
>
> outWriter.complete();
> out.setRowKind(in1.getRowKind());
> }
> }
>
> public class ToBinary$3 implements org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData, org.apache.flink.table.data.binary.BinaryRowData> {
> org.apache.flink.table.data.binary.BinaryRowData out = new org.apache.flink.table.data.binary.BinaryRowData(2);
> org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new org.apache.flink.table.data.writer.BinaryRowWriter(out);
> public ToBinary$3(Object[] references) throws Exception {
>
> }
> @Override
> public org.apache.flink.table.data.binary.BinaryRowData apply(org.apache.flink.table.data.RowData in1) {
>
> if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) {
> return ((org.apache.flink.table.data.binary.BinaryRowData) in1);
> }
> innerApply(in1);
> return out;
> }
> /* Fit into JavaCodeSplitter's void function limitation. */
> private void innerApply(org.apache.flink.table.data.RowData in1) {
>
>
> outWriter.reset();
> if (in1.isNullAt(0)) {
> outWriter.setNullAt(0);
> } else {
> outWriter.writeString(0, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)));
> }
>
> if (in1.isNullAt(1)) {
> outWriter.setNullAt(1);
> } else {
> outWriter.writeString(1, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)));
> }
>
> outWriter.complete();
> out.setRowKind(in1.getRowKind());
> }
> }
>
> public class StreamExecCalc$25 extends org.apache.flink.table.runtime.operators.TableStreamOperator
> implements org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> private final Object[] references;
> private transient MyFun1 function_MyFun1;
> private transient org.apache.flink.table.data.conversion.StringStringConverter converter$6;
> private transient org.apache.flink.table.data.conversion.MapMapConverter converter$8;
> private transient MyFun2 function_MyFun2;
>
> private final org.apache.flink.table.data.binary.BinaryStringData str$12 = org.apache.flink.table.data.binary.BinaryStringData.fromString("Hi");
>
> org.apache.flink.table.data.BoxedWrapperRowData out = new org.apache.flink.table.data.BoxedWrapperRowData(2);
> private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> public StreamExecCalc$25(
> Object[] references,
> org.apache.flink.streaming.runtime.tasks.StreamTask task,
> org.apache.flink.streaming.api.graph.StreamConfig config,
> org.apache.flink.streaming.api.operators.Output output,
> org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService) throws Exception {
> this.references = references;
> function_MyFun1 = (((MyFun1) references[0]));
> converter$6 = (((org.apache.flink.table.data.conversion.StringStringConverter) references[1]));
> converter$8 = (((org.apache.flink.table.data.conversion.MapMapConverter) references[2]));
> function_MyFun2 = (((MyFun2) references[3]));
> this.setup(task, config, output);
> if (this instanceof org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
> ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
> .setProcessingTimeService(processingTimeService);
> }
> }
> @Override
> public void open() throws Exception {
> super.open();
>
> function_MyFun1.open(new org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));
>
>
> converter$6.open(getRuntimeContext().getUserCodeClassLoader());
>
>
> converter$8.open(getRuntimeContext().getUserCodeClassLoader());
>
>
> function_MyFun2.open(new org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));
>
> }
> @Override
> public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {
> org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) element.getValue();
>
> org.apache.flink.table.data.binary.BinaryStringData field$4;
> boolean isNull$4;
> org.apache.flink.table.data.binary.BinaryStringData field$5;
> boolean isNull$5;
> java.util.Map externalResult$7;
> org.apache.flink.table.data.MapData result$9;
> boolean isNull$9;
> java.lang.String externalResult$10;
> org.apache.flink.table.data.binary.BinaryStringData result$11;
> boolean isNull$11;
> boolean isNull$23 = false;
> boolean result$24 = false;
>
>
> isNull$4 = in1.isNullAt(0);
> field$4 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
> if (!isNull$4) {
> field$4 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0));
> }
> isNull$5 = in1.isNullAt(1);
> field$5 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
> if (!isNull$5) {
> field$5 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1));
> }
>
> out.setRowKind(in1.getRowKind());
>
>
>
>
>
>
>
>
> externalResult$7 = (java.util.Map) function_MyFun1
> .eval(isNull$4 ? null : ((java.lang.String) converter$6.toExternal((org.apache.flink.table.data.binary.BinaryStringData) field$4)), isNull$5 ? null : ((java.lang.String) converter$6.toExternal((org.apache.flink.table.data.binary.BinaryStringData) field$5)));
>
> externalResult$10 = (java.lang.String) function_MyFun2
> .eval(externalResult$7);
>
> isNull$11 = externalResult$10 == null;
> result$11 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
> if (!isNull$11) {
> result$11 = (org.apache.flink.table.data.binary.BinaryStringData) converter$6.toInternalOrNull((java.lang.String) externalResult$10);
> }
>
> if (isNull$11) {
> out.setNullAt(0);
> } else {
> out.setNonPrimitiveValue(0, result$11);
> }
>
>
>
>
>
> boolean isNull$13 = (isNull$9 || false);
> org.apache.flink.table.data.binary.BinaryStringData result$13 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
> if (!isNull$13) {
>
> if (result$9 instanceof org.apache.flink.table.data.binary.BinaryMapData) {
> org.apache.flink.table.data.binary.BinaryMapData binaryMap$21 = (org.apache.flink.table.data.binary.BinaryMapData) result$9;
> final int length$15 = binaryMap$21.size();
> final org.apache.flink.table.data.binary.BinaryArrayData keys$16 = binaryMap$21.keyArray();
> final org.apache.flink.table.data.binary.BinaryArrayData values$17 = binaryMap$21.valueArray();
>
> int index$18 = 0;
> boolean found$19 = false;
> if (false) {
> while (index$18 < length$15 && !found$19) {
> if (keys$16.isNullAt(index$18)) {
> found$19 = true;
> } else {
> index$18++;
> }
> }
> } else {
> while (index$18 < length$15 && !found$19) {
> final org.apache.flink.table.data.binary.BinaryStringData key$14 = ((org.apache.flink.table.data.binary.BinaryStringData) keys$16.getString(index$18));
>
>
>
> isNull$23 = false || false;
> result$24 = false;
> if (!isNull$23) {
>
>
> result$24 = ((org.apache.flink.table.data.binary.BinaryStringData) str$12).equals(key$14);
>
>
> }
>
> if (result$24) {
> found$19 = true;
> } else {
> index$18++;
> }
> }
> }
>
> if (!found$19 || values$17.isNullAt(index$18)) {
> isNull$13 = true;
> } else {
> result$13 = ((org.apache.flink.table.data.binary.BinaryStringData) values$17.getString(index$18));
> }
> } else {
> org.apache.flink.table.data.GenericMapData genericMap$22 = (org.apache.flink.table.data.GenericMapData) result$9;
> org.apache.flink.table.data.binary.BinaryStringData value$20 =
> (org.apache.flink.table.data.binary.BinaryStringData) genericMap$22.get((org.apache.flink.table.data.binary.BinaryStringData) ((org.apache.flink.table.data.binary.BinaryStringData) str$12));
> if (value$20 == null) {
> isNull$13 = true;
> } else {
> result$13 = value$20;
> }
> }
>
> }
>
> if (isNull$13) {
> out.setNullAt(1);
> } else {
> out.setNonPrimitiveValue(1, result$13);
> }
>
>
> output.collect(outElement.replace(out));
>
>
> }
>
> @Override
> public void close() throws Exception {
> super.close();
>
> function_MyFun1.close();
>
>
> function_MyFun2.close();
>
> }
>
> }
>
> public class ToBinary$26 implements org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData, org.apache.flink.table.data.binary.BinaryRowData> {
> org.apache.flink.table.data.binary.BinaryRowData out = new org.apache.flink.table.data.binary.BinaryRowData(2);
> org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new org.apache.flink.table.data.writer.BinaryRowWriter(out);
> public ToBinary$26(Object[] references) throws Exception {
>
> }
> @Override
> public org.apache.flink.table.data.binary.BinaryRowData apply(org.apache.flink.table.data.RowData in1) {
>
> if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) {
> return ((org.apache.flink.table.data.binary.BinaryRowData) in1);
> }
> innerApply(in1);
> return out;
> }
> /* Fit into JavaCodeSplitter's void function limitation. */
> private void innerApply(org.apache.flink.table.data.RowData in1) {
>
>
> outWriter.reset();
> if (in1.isNullAt(0)) {
> outWriter.setNullAt(0);
> } else {
> outWriter.writeString(0, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)));
> }
>
> if (in1.isNullAt(1)) {
> outWriter.setNullAt(1);
> } else {
> outWriter.writeString(1, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)));
> }
>
> outWriter.complete();
> out.setRowKind(in1.getRowKind());
> }
> }
>
> public class ToBinary$27 implements org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData, org.apache.flink.table.data.binary.BinaryRowData> {
> org.apache.flink.table.data.binary.BinaryRowData out = new org.apache.flink.table.data.binary.BinaryRowData(2);
> org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new org.apache.flink.table.data.writer.BinaryRowWriter(out);
> public ToBinary$27(Object[] references) throws Exception {
>
> }
> @Override
> public org.apache.flink.table.data.binary.BinaryRowData apply(org.apache.flink.table.data.RowData in1) {
>
> if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) {
> return ((org.apache.flink.table.data.binary.BinaryRowData) in1);
> }
> innerApply(in1);
> return out;
> }
> /* Fit into JavaCodeSplitter's void function limitation. */
> private void innerApply(org.apache.flink.table.data.RowData in1) {
>
>
> outWriter.reset();
> if (in1.isNullAt(0)) {
> outWriter.setNullAt(0);
> } else {
> outWriter.writeString(0, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)));
> }
>
> if (in1.isNullAt(1)) {
> outWriter.setNullAt(1);
> } else {
> outWriter.writeString(1, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)));
> }
>
> outWriter.complete();
> out.setRowKind(in1.getRowKind());
> }
> }
>
> public class ToBinary$28 implements org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData, org.apache.flink.table.data.binary.BinaryRowData> {
> org.apache.flink.table.data.binary.BinaryRowData out = new org.apache.flink.table.data.binary.BinaryRowData(2);
> org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new org.apache.flink.table.data.writer.BinaryRowWriter(out);
> public ToBinary$28(Object[] references) throws Exception {
>
> }
> @Override
> public org.apache.flink.table.data.binary.BinaryRowData apply(org.apache.flink.table.data.RowData in1) {
>
> if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) {
> return ((org.apache.flink.table.data.binary.BinaryRowData) in1);
> }
> innerApply(in1);
> return out;
> }
> /* Fit into JavaCodeSplitter's void function limitation. */
> private void innerApply(org.apache.flink.table.data.RowData in1) {
>
>
> outWriter.reset();
> if (in1.isNullAt(0)) {
> outWriter.setNullAt(0);
> } else {
> outWriter.writeString(0, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)));
> }
>
> if (in1.isNullAt(1)) {
> outWriter.setNullAt(1);
> } else {
> outWriter.writeString(1, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)));
> }
>
> outWriter.complete();
> out.setRowKind(in1.getRowKind());
> }
> }
> {code}
> You can see that {{result$9}} is never assigned a value, causing this bug.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)