You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yuval Itzchakov (Jira)" <ji...@apache.org> on 2021/01/13 11:13:00 UTC
[jira] [Created] (FLINK-20961) Flink throws NullPointerException
for tables created from DataStream with no assigned timestamps and
watermarks
Yuval Itzchakov created FLINK-20961:
---------------------------------------
Summary: Flink throws NullPointerException for tables created from DataStream with no assigned timestamps and watermarks
Key: FLINK-20961
URL: https://issues.apache.org/jira/browse/FLINK-20961
Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 1.12.0
Reporter: Yuval Itzchakov
Given the following program:
{code:java}
//import org.apache.flink.api.common.eventtime.{ SerializableTimestampAssigner, WatermarkStrategy }
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.table.annotation.{DataTypeHint, FunctionHint}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{$, AnyWithOperations}
import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction}
import java.time.Instant
object BugRepro {
def text: String =
s"""
|{
| "s": "hello",
| "i": ${Random.nextInt()}
|}
|""".stripMargin def main(args: Array[String]): Unit = {
val flink =
StreamExecutionEnvironment.createLocalEnvironment()
val tableEnv = StreamTableEnvironment.create(flink)
val dataStream = flink
.addSource {
new SourceFunction[(Long, String)] {
var isRunning = true
override def run(ctx: SourceFunction.SourceContext[(Long, String)]): Unit =
while (isRunning) {
val x = (Instant.now().toEpochMilli, text)
ctx.collect(x)
ctx.emitWatermark(new Watermark(x._1))
Thread.sleep(300)
}
override def cancel(): Unit =
isRunning = false
}
}
// .assignTimestampsAndWatermarks(
// WatermarkStrategy
// .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(30))
// .withTimestampAssigner {
// new SerializableTimestampAssigner[(Long, String)] {
// override def extractTimestamp(element: (Long, String), recordTimestamp: Long): Long =
// element._1
// }
// }
// )
//
tableEnv.createTemporaryView("testview", dataStream, $("event_time").rowtime(), $("json_text"))
val res = tableEnv.sqlQuery("""
|SELECT json_text
|FROM testview
|""".stripMargin)
val sink = tableEnv.executeSql(
"""
|CREATE TABLE SINK (
| json_text STRING
|)
|WITH (
| 'connector' = 'print'
|)
|""".stripMargin
) res.executeInsert("SINK").await()
()
}
res.executeInsert("SINK").await()
{code}
Flink will throw a NullPointerException at runtime:
{code:java}
Caused by: java.lang.NullPointerExceptionCaused by: java.lang.NullPointerException at SourceConversion$3.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) at ai.hunters.pipeline.BugRepro$$anon$1.run(BugRepro.scala:78) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
{code}
This is due to the fact that the DataStream did not assign a timestamp to the underlying source. This is the generated code:
{code:java}
public class SourceConversion$3 extends org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
implements org.apache.flink.streaming.api.operators.OneInputStreamOperator { private final Object[] references;
private transient org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter converter$0;
org.apache.flink.table.data.GenericRowData out = new org.apache.flink.table.data.GenericRowData(2);
private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); public SourceConversion$3(
Object[] references,
org.apache.flink.streaming.runtime.tasks.StreamTask task,
org.apache.flink.streaming.api.graph.StreamConfig config,
org.apache.flink.streaming.api.operators.Output output,
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService) throws Exception {
this.references = references;
converter$0 = (((org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter) references[0]));
this.setup(task, config, output);
if (this instanceof org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
.setProcessingTimeService(processingTimeService);
}
} @Override
public void open() throws Exception {
super.open();
} @Override
public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {
org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) (org.apache.flink.table.data.RowData) converter$0.toInternal((scala.Tuple2) element.getValue());
org.apache.flink.table.data.TimestampData result$1;
boolean isNull$1;
org.apache.flink.table.data.binary.BinaryStringData field$2;
boolean isNull$2;
isNull$2 = in1.isNullAt(1);
field$2 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
if (!isNull$2) {
field$2 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1));
}
ctx.element = element;
result$1 = org.apache.flink.table.data.TimestampData.fromEpochMillis(ctx.timestamp());
if (result$1 == null) {
throw new RuntimeException("Rowtime timestamp is null. Please make sure that a " +
"proper TimestampAssigner is defined and the stream environment uses the EventTime " +
"time characteristic.");
}
isNull$1 = false;
if (isNull$1) {
out.setField(0, null);
} else {
out.setField(0, result$1);
}
if (isNull$2) {
out.setField(1, null);
} else {
out.setField(1, field$2);
}
output.collect(outElement.replace(out));
ctx.element = null;
} @Override
public void close() throws Exception {
super.close();
}
}
{code}
The important line is here:
{code:java}
result$1 = org.apache.flink.table.data.TimestampData.fromEpochMillis(ctx.timestamp()); if (result$1 == null) { throw new RuntimeException("Rowtime timestamp is null. Please make sure that a " + "proper TimestampAssigner is defined and the stream environment uses the EventTime " + "time characteristic.");
{code}
`ctx.timestamp` returns null in case no timestamp assigner was created, and `TimestampData.fromEpochMillis` expects a primitive `long`, so a deference fails. The actual check should be:
{code:java}
if (!ctx.hasTimestamp) {
throw new RuntimeException("Rowtime timestamp is null. Please make sure that a " + "proper TimestampAssigner is defined and the stream environment uses the EventTime " + "time characteristic.");
}
result$1 = TimestampData.fromEpochMillis(ctx.timestamp());{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)