You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Caizhi Weng (Jira)" <ji...@apache.org> on 2022/06/22 07:49:00 UTC

[jira] [Closed] (FLINK-28190) NullPointerException is thrown if the intermediate result of nesting UDFs is used

     [ https://issues.apache.org/jira/browse/FLINK-28190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Caizhi Weng closed FLINK-28190.
-------------------------------
    Resolution: Not A Problem

> NullPointerException is thrown if the intermediate result of nesting UDFs is used
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-28190
>                 URL: https://issues.apache.org/jira/browse/FLINK-28190
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.15.0, 1.14.5
>            Reporter: Caizhi Weng
>            Priority: Major
>
> Add the following test case to {{TableEnvironmentITCase}} to reproduce this bug.
> {code:scala}
> @Test
> def myTest(): Unit = {
>   tEnv.executeSql("create temporary function myfun1 as 'MyFun1'")
>   tEnv.executeSql("create temporary function myfun2 as 'MyFun2'")
>   val data: Seq[Row] = Seq(
>     Row.of("Hi", "Hello")
>   )
>   tEnv.executeSql(
>     s"""
>       |create table T (
>       |  a string,
>       |  b string
>       |) with (
>       |  'connector' = 'values',
>       |  'data-id' = '${TestValuesTableFactory.registerData(data)}',
>       |  'bounded' = 'true'
>       |)
>       |""".stripMargin)
>   tEnv.executeSql("create temporary view my_view as select myfun1(a, b) as mp from T")
>   tEnv.executeSql("select myfun2(mp), mp['Hi'] from my_view").print()
> }
> {code}
> UDF classes are
> {code:java}
> import org.apache.flink.table.functions.ScalarFunction;
> import java.util.HashMap;
> import java.util.Map;
> public class MyFun1 extends ScalarFunction {
>     public Map<String, String> eval(String k, String v) {
>         Map<String, String> returnMap = new HashMap<>();
>         returnMap.put(k, v);
>         return returnMap;
>     }
> }
> {code}
> {code:java}
> import org.apache.flink.table.functions.ScalarFunction;
> import java.util.Map;
> public class MyFun2 extends ScalarFunction {
>     public String eval(Map<String, String> input) {
>         return String.valueOf(input);
>     }
> }
> {code}
> The exception stack is
> {code}
> Caused by: java.lang.NullPointerException
> 	at StreamExecCalc$25.processElement_split1(Unknown Source)
> 	at StreamExecCalc$25.processElement(Unknown Source)
> 	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)
> 	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)
> 	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418)
> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513)
> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103)
> 	at org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$FromElementSourceFunction.run(TestValuesRuntimeFunctions.java:530)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:332)
> {code}
> The generated code is
> {code}
> public class ToBinary$0 implements org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData, org.apache.flink.table.data.binary.BinaryRowData> {
>   org.apache.flink.table.data.binary.BinaryRowData out = new org.apache.flink.table.data.binary.BinaryRowData(2);
> org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new org.apache.flink.table.data.writer.BinaryRowWriter(out);
>   public ToBinary$0(Object[] references) throws Exception {
>     
>   }
>   @Override
>   public org.apache.flink.table.data.binary.BinaryRowData apply(org.apache.flink.table.data.RowData in1) {
>     
> if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) {
>   return ((org.apache.flink.table.data.binary.BinaryRowData) in1);
> }
>     innerApply(in1);
>     return out;
>   }
>   /* Fit into JavaCodeSplitter's void function limitation. */
>   private void innerApply(org.apache.flink.table.data.RowData in1) {
>     
>     
> outWriter.reset();
> if (in1.isNullAt(0)) {
>   outWriter.setNullAt(0);
> } else {
>   outWriter.writeString(0, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)));
> }
>              
> if (in1.isNullAt(1)) {
>   outWriter.setNullAt(1);
> } else {
>   outWriter.writeString(1, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)));
> }
>              
> outWriter.complete();
>         out.setRowKind(in1.getRowKind());
>   }
> }
>         
> public class ToBinary$1 implements org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData, org.apache.flink.table.data.binary.BinaryRowData> {
>   org.apache.flink.table.data.binary.BinaryRowData out = new org.apache.flink.table.data.binary.BinaryRowData(2);
> org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new org.apache.flink.table.data.writer.BinaryRowWriter(out);
>   public ToBinary$1(Object[] references) throws Exception {
>     
>   }
>   @Override
>   public org.apache.flink.table.data.binary.BinaryRowData apply(org.apache.flink.table.data.RowData in1) {
>     
> if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) {
>   return ((org.apache.flink.table.data.binary.BinaryRowData) in1);
> }
>     innerApply(in1);
>     return out;
>   }
>   /* Fit into JavaCodeSplitter's void function limitation. */
>   private void innerApply(org.apache.flink.table.data.RowData in1) {
>     
>     
> outWriter.reset();
> if (in1.isNullAt(0)) {
>   outWriter.setNullAt(0);
> } else {
>   outWriter.writeString(0, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)));
> }
>              
> if (in1.isNullAt(1)) {
>   outWriter.setNullAt(1);
> } else {
>   outWriter.writeString(1, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)));
> }
>              
> outWriter.complete();
>         out.setRowKind(in1.getRowKind());
>   }
> }
>         
> public class ToBinary$2 implements org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData, org.apache.flink.table.data.binary.BinaryRowData> {
>   org.apache.flink.table.data.binary.BinaryRowData out = new org.apache.flink.table.data.binary.BinaryRowData(2);
> org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new org.apache.flink.table.data.writer.BinaryRowWriter(out);
>   public ToBinary$2(Object[] references) throws Exception {
>     
>   }
>   @Override
>   public org.apache.flink.table.data.binary.BinaryRowData apply(org.apache.flink.table.data.RowData in1) {
>     
> if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) {
>   return ((org.apache.flink.table.data.binary.BinaryRowData) in1);
> }
>     innerApply(in1);
>     return out;
>   }
>   /* Fit into JavaCodeSplitter's void function limitation. */
>   private void innerApply(org.apache.flink.table.data.RowData in1) {
>     
>     
> outWriter.reset();
> if (in1.isNullAt(0)) {
>   outWriter.setNullAt(0);
> } else {
>   outWriter.writeString(0, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)));
> }
>              
> if (in1.isNullAt(1)) {
>   outWriter.setNullAt(1);
> } else {
>   outWriter.writeString(1, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)));
> }
>              
> outWriter.complete();
>         out.setRowKind(in1.getRowKind());
>   }
> }
>         
> public class ToBinary$3 implements org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData, org.apache.flink.table.data.binary.BinaryRowData> {
>   org.apache.flink.table.data.binary.BinaryRowData out = new org.apache.flink.table.data.binary.BinaryRowData(2);
> org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new org.apache.flink.table.data.writer.BinaryRowWriter(out);
>   public ToBinary$3(Object[] references) throws Exception {
>     
>   }
>   @Override
>   public org.apache.flink.table.data.binary.BinaryRowData apply(org.apache.flink.table.data.RowData in1) {
>     
> if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) {
>   return ((org.apache.flink.table.data.binary.BinaryRowData) in1);
> }
>     innerApply(in1);
>     return out;
>   }
>   /* Fit into JavaCodeSplitter's void function limitation. */
>   private void innerApply(org.apache.flink.table.data.RowData in1) {
>     
>     
> outWriter.reset();
> if (in1.isNullAt(0)) {
>   outWriter.setNullAt(0);
> } else {
>   outWriter.writeString(0, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)));
> }
>              
> if (in1.isNullAt(1)) {
>   outWriter.setNullAt(1);
> } else {
>   outWriter.writeString(1, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)));
> }
>              
> outWriter.complete();
>         out.setRowKind(in1.getRowKind());
>   }
> }
>         
>       public class StreamExecCalc$25 extends org.apache.flink.table.runtime.operators.TableStreamOperator
>           implements org.apache.flink.streaming.api.operators.OneInputStreamOperator {
>         private final Object[] references;
>         private transient MyFun1 function_MyFun1;
>         private transient org.apache.flink.table.data.conversion.StringStringConverter converter$6;
>         private transient org.apache.flink.table.data.conversion.MapMapConverter converter$8;
>         private transient MyFun2 function_MyFun2;
>         
>         private final org.apache.flink.table.data.binary.BinaryStringData str$12 = org.apache.flink.table.data.binary.BinaryStringData.fromString("Hi");
>                    
>         org.apache.flink.table.data.BoxedWrapperRowData out = new org.apache.flink.table.data.BoxedWrapperRowData(2);
>         private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
>         public StreamExecCalc$25(
>             Object[] references,
>             org.apache.flink.streaming.runtime.tasks.StreamTask task,
>             org.apache.flink.streaming.api.graph.StreamConfig config,
>             org.apache.flink.streaming.api.operators.Output output,
>             org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService) throws Exception {
>           this.references = references;
>           function_MyFun1 = (((MyFun1) references[0]));
>           converter$6 = (((org.apache.flink.table.data.conversion.StringStringConverter) references[1]));
>           converter$8 = (((org.apache.flink.table.data.conversion.MapMapConverter) references[2]));
>           function_MyFun2 = (((MyFun2) references[3]));
>           this.setup(task, config, output);
>           if (this instanceof org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
>             ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
>               .setProcessingTimeService(processingTimeService);
>           }
>         }
>         @Override
>         public void open() throws Exception {
>           super.open();
>           
>           function_MyFun1.open(new org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));
>                  
>           
>           converter$6.open(getRuntimeContext().getUserCodeClassLoader());
>                      
>           
>           converter$8.open(getRuntimeContext().getUserCodeClassLoader());
>                      
>           
>           function_MyFun2.open(new org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));
>                  
>         }
>         @Override
>         public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {
>           org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) element.getValue();
>           
>           org.apache.flink.table.data.binary.BinaryStringData field$4;
>           boolean isNull$4;
>           org.apache.flink.table.data.binary.BinaryStringData field$5;
>           boolean isNull$5;
>           java.util.Map externalResult$7;
>           org.apache.flink.table.data.MapData result$9;
>           boolean isNull$9;
>           java.lang.String externalResult$10;
>           org.apache.flink.table.data.binary.BinaryStringData result$11;
>           boolean isNull$11;
>           boolean isNull$23 = false;
>           boolean result$24 = false;
>           
>           
>           isNull$4 = in1.isNullAt(0);
>           field$4 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
>           if (!isNull$4) {
>             field$4 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0));
>           }
>           isNull$5 = in1.isNullAt(1);
>           field$5 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
>           if (!isNull$5) {
>             field$5 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1));
>           }
>           
>           out.setRowKind(in1.getRowKind());
>           
>           
>           
>           
>           
>           
>           
>           
>           externalResult$7 = (java.util.Map) function_MyFun1
>             .eval(isNull$4 ? null : ((java.lang.String) converter$6.toExternal((org.apache.flink.table.data.binary.BinaryStringData) field$4)), isNull$5 ? null : ((java.lang.String) converter$6.toExternal((org.apache.flink.table.data.binary.BinaryStringData) field$5)));
>           
>           externalResult$10 = (java.lang.String) function_MyFun2
>             .eval(externalResult$7);
>           
>           isNull$11 = externalResult$10 == null;
>           result$11 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
>           if (!isNull$11) {
>             result$11 = (org.apache.flink.table.data.binary.BinaryStringData) converter$6.toInternalOrNull((java.lang.String) externalResult$10);
>           }
>           
>           if (isNull$11) {
>             out.setNullAt(0);
>           } else {
>             out.setNonPrimitiveValue(0, result$11);
>           }
>                     
>           
>           
>           
>           
>           boolean isNull$13 = (isNull$9 || false);
>           org.apache.flink.table.data.binary.BinaryStringData result$13 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
>           if (!isNull$13) {
>            
>           if (result$9 instanceof org.apache.flink.table.data.binary.BinaryMapData) {
>             org.apache.flink.table.data.binary.BinaryMapData binaryMap$21 = (org.apache.flink.table.data.binary.BinaryMapData) result$9;
>             final int length$15 = binaryMap$21.size();
>             final org.apache.flink.table.data.binary.BinaryArrayData keys$16 = binaryMap$21.keyArray();
>             final org.apache.flink.table.data.binary.BinaryArrayData values$17 = binaryMap$21.valueArray();
>           
>             int index$18 = 0;
>             boolean found$19 = false;
>             if (false) {
>               while (index$18 < length$15 && !found$19) {
>                 if (keys$16.isNullAt(index$18)) {
>                   found$19 = true;
>                 } else {
>                   index$18++;
>                 }
>               }
>             } else {
>               while (index$18 < length$15 && !found$19) {
>                 final org.apache.flink.table.data.binary.BinaryStringData key$14 = ((org.apache.flink.table.data.binary.BinaryStringData) keys$16.getString(index$18));
>                 
>           
>           
>           isNull$23 = false || false;
>           result$24 = false;
>           if (!isNull$23) {
>             
>           
>           result$24 = ((org.apache.flink.table.data.binary.BinaryStringData) str$12).equals(key$14);
>           
>             
>           }
>           
>                 if (result$24) {
>                   found$19 = true;
>                 } else {
>                   index$18++;
>                 }
>               }
>             }
>           
>             if (!found$19 || values$17.isNullAt(index$18)) {
>               isNull$13 = true;
>             } else {
>               result$13 = ((org.apache.flink.table.data.binary.BinaryStringData) values$17.getString(index$18));
>             }
>           } else {
>             org.apache.flink.table.data.GenericMapData genericMap$22 = (org.apache.flink.table.data.GenericMapData) result$9;
>             org.apache.flink.table.data.binary.BinaryStringData value$20 =
>               (org.apache.flink.table.data.binary.BinaryStringData) genericMap$22.get((org.apache.flink.table.data.binary.BinaryStringData) ((org.apache.flink.table.data.binary.BinaryStringData) str$12));
>             if (value$20 == null) {
>               isNull$13 = true;
>             } else {
>               result$13 = value$20;
>             }
>           }
>                   
>           }
>                   
>           if (isNull$13) {
>             out.setNullAt(1);
>           } else {
>             out.setNonPrimitiveValue(1, result$13);
>           }
>                     
>                   
>           output.collect(outElement.replace(out));
>           
>           
>         }
>         
>         @Override
>         public void close() throws Exception {
>            super.close();
>           
>           function_MyFun1.close();
>                  
>           
>           function_MyFun2.close();
>                  
>         }
>         
>       }
>     
> public class ToBinary$26 implements org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData, org.apache.flink.table.data.binary.BinaryRowData> {
>   org.apache.flink.table.data.binary.BinaryRowData out = new org.apache.flink.table.data.binary.BinaryRowData(2);
> org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new org.apache.flink.table.data.writer.BinaryRowWriter(out);
>   public ToBinary$26(Object[] references) throws Exception {
>     
>   }
>   @Override
>   public org.apache.flink.table.data.binary.BinaryRowData apply(org.apache.flink.table.data.RowData in1) {
>     
> if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) {
>   return ((org.apache.flink.table.data.binary.BinaryRowData) in1);
> }
>     innerApply(in1);
>     return out;
>   }
>   /* Fit into JavaCodeSplitter's void function limitation. */
>   private void innerApply(org.apache.flink.table.data.RowData in1) {
>     
>     
> outWriter.reset();
> if (in1.isNullAt(0)) {
>   outWriter.setNullAt(0);
> } else {
>   outWriter.writeString(0, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)));
> }
>              
> if (in1.isNullAt(1)) {
>   outWriter.setNullAt(1);
> } else {
>   outWriter.writeString(1, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)));
> }
>              
> outWriter.complete();
>         out.setRowKind(in1.getRowKind());
>   }
> }
>         
> public class ToBinary$27 implements org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData, org.apache.flink.table.data.binary.BinaryRowData> {
>   org.apache.flink.table.data.binary.BinaryRowData out = new org.apache.flink.table.data.binary.BinaryRowData(2);
> org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new org.apache.flink.table.data.writer.BinaryRowWriter(out);
>   public ToBinary$27(Object[] references) throws Exception {
>     
>   }
>   @Override
>   public org.apache.flink.table.data.binary.BinaryRowData apply(org.apache.flink.table.data.RowData in1) {
>     
> if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) {
>   return ((org.apache.flink.table.data.binary.BinaryRowData) in1);
> }
>     innerApply(in1);
>     return out;
>   }
>   /* Fit into JavaCodeSplitter's void function limitation. */
>   private void innerApply(org.apache.flink.table.data.RowData in1) {
>     
>     
> outWriter.reset();
> if (in1.isNullAt(0)) {
>   outWriter.setNullAt(0);
> } else {
>   outWriter.writeString(0, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)));
> }
>              
> if (in1.isNullAt(1)) {
>   outWriter.setNullAt(1);
> } else {
>   outWriter.writeString(1, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)));
> }
>              
> outWriter.complete();
>         out.setRowKind(in1.getRowKind());
>   }
> }
>         
> public class ToBinary$28 implements org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData, org.apache.flink.table.data.binary.BinaryRowData> {
>   org.apache.flink.table.data.binary.BinaryRowData out = new org.apache.flink.table.data.binary.BinaryRowData(2);
> org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new org.apache.flink.table.data.writer.BinaryRowWriter(out);
>   public ToBinary$28(Object[] references) throws Exception {
>     
>   }
>   @Override
>   public org.apache.flink.table.data.binary.BinaryRowData apply(org.apache.flink.table.data.RowData in1) {
>     
> if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) {
>   return ((org.apache.flink.table.data.binary.BinaryRowData) in1);
> }
>     innerApply(in1);
>     return out;
>   }
>   /* Fit into JavaCodeSplitter's void function limitation. */
>   private void innerApply(org.apache.flink.table.data.RowData in1) {
>     
>     
> outWriter.reset();
> if (in1.isNullAt(0)) {
>   outWriter.setNullAt(0);
> } else {
>   outWriter.writeString(0, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)));
> }
>              
> if (in1.isNullAt(1)) {
>   outWriter.setNullAt(1);
> } else {
>   outWriter.writeString(1, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)));
> }
>              
> outWriter.complete();
>         out.setRowKind(in1.getRowKind());
>   }
> }
> {code}
> You can see that {{result$9}} is never assigned a value, causing this bug.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)