You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Kurt Young (Jira)" <ji...@apache.org> on 2021/01/19 06:57:00 UTC
[jira] [Assigned] (FLINK-20961) Flink throws NullPointerException
for tables created from DataStream with no assigned timestamps and
watermarks
[ https://issues.apache.org/jira/browse/FLINK-20961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kurt Young reassigned FLINK-20961:
----------------------------------
Assignee: Yuval Itzchakov
> Flink throws NullPointerException for tables created from DataStream with no assigned timestamps and watermarks
> ---------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-20961
> URL: https://issues.apache.org/jira/browse/FLINK-20961
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.12.0
> Reporter: Yuval Itzchakov
> Assignee: Yuval Itzchakov
> Priority: Minor
> Labels: pull-request-available
> Fix For: 1.13.0, 1.12.2
>
>
>
> Given the following program:
> {code:java}
> //import org.apache.flink.api.common.eventtime.{ SerializableTimestampAssigner, WatermarkStrategy }
> import org.apache.flink.streaming.api.functions.source.SourceFunction
> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
> import org.apache.flink.streaming.api.watermark.Watermark
> import org.apache.flink.table.annotation.{DataTypeHint, FunctionHint}
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.table.api.{$, AnyWithOperations}
> import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction}
> import java.time.Instant
> object BugRepro {
> def text: String =
> s"""
> |{
> | "s": "hello",
> | "i": ${Random.nextInt()}
> |}
> |""".stripMargin
> def main(args: Array[String]): Unit = {
> val flink =
> StreamExecutionEnvironment.createLocalEnvironment()
> val tableEnv = StreamTableEnvironment.create(flink)
> val dataStream = flink
> .addSource {
> new SourceFunction[(Long, String)] {
> var isRunning = true
> override def run(ctx: SourceFunction.SourceContext[(Long, String)]): Unit =
> while (isRunning) {
> val x = (Instant.now().toEpochMilli, text)
> ctx.collect(x)
> ctx.emitWatermark(new Watermark(x._1))
> Thread.sleep(300)
> }
> override def cancel(): Unit =
> isRunning = false
> }
> }
> // .assignTimestampsAndWatermarks(
> // WatermarkStrategy
> // .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(30))
> // .withTimestampAssigner {
> // new SerializableTimestampAssigner[(Long, String)] {
> // override def extractTimestamp(element: (Long, String), recordTimestamp: Long): Long =
> // element._1
> // }
> // }
> // )
> //
> tableEnv.createTemporaryView("testview", dataStream, $("event_time").rowtime(), $("json_text"))
> val res = tableEnv.sqlQuery("""
> |SELECT json_text
> |FROM testview
> |""".stripMargin)
> val sink = tableEnv.executeSql(
> """
> |CREATE TABLE SINK (
> | json_text STRING
> |)
> |WITH (
> | 'connector' = 'print'
> |)
> |""".stripMargin
> ) res.executeInsert("SINK").await()
> ()
> }
> res.executeInsert("SINK").await()
> {code}
>
> Flink will throw a NullPointerException at runtime:
> {code:java}
> Caused by: java.lang.NullPointerExceptionCaused by: java.lang.NullPointerException at SourceConversion$3.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) at ai.hunters.pipeline.BugRepro$$anon$1.run(BugRepro.scala:78) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
> {code}
> This is due to the fact that the DataStream did not assign a timestamp to the underlying source. This is the generated code:
> {code:java}
> public class SourceConversion$3 extends org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
> implements org.apache.flink.streaming.api.operators.OneInputStreamOperator { private final Object[] references;
> private transient org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter converter$0;
> org.apache.flink.table.data.GenericRowData out = new org.apache.flink.table.data.GenericRowData(2);
> private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); public SourceConversion$3(
> Object[] references,
> org.apache.flink.streaming.runtime.tasks.StreamTask task,
> org.apache.flink.streaming.api.graph.StreamConfig config,
> org.apache.flink.streaming.api.operators.Output output,
> org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService) throws Exception {
> this.references = references;
> converter$0 = (((org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter) references[0]));
> this.setup(task, config, output);
> if (this instanceof org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
> ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
> .setProcessingTimeService(processingTimeService);
> }
> } @Override
> public void open() throws Exception {
> super.open();
>
> } @Override
> public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {
> org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) (org.apache.flink.table.data.RowData) converter$0.toInternal((scala.Tuple2) element.getValue());
>
> org.apache.flink.table.data.TimestampData result$1;
> boolean isNull$1;
> org.apache.flink.table.data.binary.BinaryStringData field$2;
> boolean isNull$2;
> isNull$2 = in1.isNullAt(1);
> field$2 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
> if (!isNull$2) {
> field$2 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1));
> }
>
> ctx.element = element;
>
>
>
> result$1 = org.apache.flink.table.data.TimestampData.fromEpochMillis(ctx.timestamp());
> if (result$1 == null) {
> throw new RuntimeException("Rowtime timestamp is null. Please make sure that a " +
> "proper TimestampAssigner is defined and the stream environment uses the EventTime " +
> "time characteristic.");
> }
> isNull$1 = false;
> if (isNull$1) {
> out.setField(0, null);
> } else {
> out.setField(0, result$1);
> }
>
>
>
> if (isNull$2) {
> out.setField(1, null);
> } else {
> out.setField(1, field$2);
> }
>
>
> output.collect(outElement.replace(out));
> ctx.element = null;
>
> } @Override
> public void close() throws Exception {
> super.close();
>
> }
> }
> {code}
> The important line is here:
> {code:java}
> result$1 = org.apache.flink.table.data.TimestampData.fromEpochMillis(ctx.timestamp());
> if (result$1 == null) { throw new RuntimeException("Rowtime timestamp is null. Please make sure that a " + "proper TimestampAssigner is defined and the stream environment uses the EventTime " + "time characteristic.");
> {code}
> `ctx.timestamp` returns null in case no timestamp assigner was created, and `TimestampData.fromEpochMillis` expects a primitive `long`, so a deference fails. The actual check should be:
> {code:java}
> if (!ctx.hasTimestamp) {
> throw new RuntimeException("Rowtime timestamp is null. Please make sure that a " + "proper TimestampAssigner is defined and the stream environment uses the EventTime " + "time characteristic.");
> }
> result$1 = TimestampData.fromEpochMillis(ctx.timestamp());{code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)