You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Kurt Young (Jira)" <ji...@apache.org> on 2021/01/19 06:57:00 UTC

[jira] [Assigned] (FLINK-20961) Flink throws NullPointerException for tables created from DataStream with no assigned timestamps and watermarks

     [ https://issues.apache.org/jira/browse/FLINK-20961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kurt Young reassigned FLINK-20961:
----------------------------------

    Assignee: Yuval Itzchakov

> Flink throws NullPointerException for tables created from DataStream with no assigned timestamps and watermarks
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-20961
>                 URL: https://issues.apache.org/jira/browse/FLINK-20961
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.12.0
>            Reporter: Yuval Itzchakov
>            Assignee: Yuval Itzchakov
>            Priority: Minor
>              Labels: pull-request-available
>             Fix For: 1.13.0, 1.12.2
>
>
>  
> Given the following program:
> {code:java}
> //import org.apache.flink.api.common.eventtime.{ SerializableTimestampAssigner, WatermarkStrategy }
> import org.apache.flink.streaming.api.functions.source.SourceFunction
> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
> import org.apache.flink.streaming.api.watermark.Watermark
> import org.apache.flink.table.annotation.{DataTypeHint, FunctionHint}
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.table.api.{$, AnyWithOperations}
> import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction}
> import java.time.Instant
> object BugRepro {
>   def text: String =
>     s"""
>        |{
>        |  "s": "hello",
>        |  "i": ${Random.nextInt()}
>        |}
>        |""".stripMargin  
>   def main(args: Array[String]): Unit = {
>     val flink =
>       StreamExecutionEnvironment.createLocalEnvironment()
>     val tableEnv = StreamTableEnvironment.create(flink)
>     val dataStream = flink
>       .addSource {
>         new SourceFunction[(Long, String)] {
>           var isRunning = true          
>           override def run(ctx: SourceFunction.SourceContext[(Long, String)]): Unit =
>             while (isRunning) {
>               val x = (Instant.now().toEpochMilli, text)
>               ctx.collect(x)
>               ctx.emitWatermark(new Watermark(x._1))
>               Thread.sleep(300)
>             }          
>             override def cancel(): Unit =
>               isRunning = false
>         }
>       }
> //      .assignTimestampsAndWatermarks(
> //        WatermarkStrategy
> //          .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(30))
> //          .withTimestampAssigner {
> //            new SerializableTimestampAssigner[(Long, String)] {
> //              override def extractTimestamp(element: (Long, String), recordTimestamp: Long): Long =
> //                element._1
> //            }
> //          }
> //      )
> //
>     tableEnv.createTemporaryView("testview", dataStream, $("event_time").rowtime(), $("json_text"))
>     val res = tableEnv.sqlQuery("""
>                                   |SELECT json_text
>                                   |FROM testview
>                                   |""".stripMargin)    
>     val sink = tableEnv.executeSql(
>       """
>         |CREATE TABLE SINK (
>         |  json_text STRING
>         |)
>         |WITH (
>         |  'connector' = 'print'
>         |)
>         |""".stripMargin
>     )    res.executeInsert("SINK").await()
>     ()
>   }
>     res.executeInsert("SINK").await()
> {code}
>  
> Flink will throw a NullPointerException at runtime:
> {code:java}
> Caused by: java.lang.NullPointerExceptionCaused by: java.lang.NullPointerException at SourceConversion$3.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) at ai.hunters.pipeline.BugRepro$$anon$1.run(BugRepro.scala:78) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
> {code}
> This is due to the fact that the DataStream did not assign a timestamp to the underlying source. This is the generated code:
> {code:java}
> public class SourceConversion$3 extends org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
>           implements org.apache.flink.streaming.api.operators.OneInputStreamOperator {        private final Object[] references;
>         private transient org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter converter$0;
>         org.apache.flink.table.data.GenericRowData out = new org.apache.flink.table.data.GenericRowData(2);
>         private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);        public SourceConversion$3(
>             Object[] references,
>             org.apache.flink.streaming.runtime.tasks.StreamTask task,
>             org.apache.flink.streaming.api.graph.StreamConfig config,
>             org.apache.flink.streaming.api.operators.Output output,
>             org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService) throws Exception {
>           this.references = references;
>           converter$0 = (((org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter) references[0]));
>           this.setup(task, config, output);
>           if (this instanceof org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
>             ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
>               .setProcessingTimeService(processingTimeService);
>           }
>         }        @Override
>         public void open() throws Exception {
>           super.open();
>           
>         }        @Override
>         public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {
>           org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) (org.apache.flink.table.data.RowData) converter$0.toInternal((scala.Tuple2) element.getValue());
>           
>           org.apache.flink.table.data.TimestampData result$1;
>           boolean isNull$1;
>           org.apache.flink.table.data.binary.BinaryStringData field$2;
>           boolean isNull$2;
>           isNull$2 = in1.isNullAt(1);
>           field$2 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
>           if (!isNull$2) {
>             field$2 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1));
>           }
>           
>           ctx.element = element;
>           
>           
>           
>           result$1 = org.apache.flink.table.data.TimestampData.fromEpochMillis(ctx.timestamp());
>           if (result$1 == null) {
>             throw new RuntimeException("Rowtime timestamp is null. Please make sure that a " +
>               "proper TimestampAssigner is defined and the stream environment uses the EventTime " +
>               "time characteristic.");
>           }
>           isNull$1 = false;
>           if (isNull$1) {
>             out.setField(0, null);
>           } else {
>             out.setField(0, result$1);
>           }
>                     
>           
>           
>           if (isNull$2) {
>             out.setField(1, null);
>           } else {
>             out.setField(1, field$2);
>           }
>                     
>                   
>           output.collect(outElement.replace(out));
>           ctx.element = null;
>           
>         }                @Override
>         public void close() throws Exception {
>            super.close();
>           
>         }        
>       }
> {code}
> The important line is here:
> {code:java}
> result$1 = org.apache.flink.table.data.TimestampData.fromEpochMillis(ctx.timestamp()); 
> if (result$1 == null) { throw new RuntimeException("Rowtime timestamp is null. Please make sure that a " + "proper TimestampAssigner is defined and the stream environment uses the EventTime " + "time characteristic.");
> {code}
> `ctx.timestamp` returns null in case no timestamp assigner was created, and `TimestampData.fromEpochMillis` expects a primitive `long`, so a deference fails. The actual check should be:
> {code:java}
> if (!ctx.hasTimestamp) {
>   throw new RuntimeException("Rowtime timestamp is null. Please make sure that a " + "proper TimestampAssigner is defined and the stream environment uses the EventTime " + "time characteristic.");
> }
> result$1 = TimestampData.fromEpochMillis(ctx.timestamp());{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)