You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dominik Wosiński <wo...@gmail.com> on 2020/03/18 18:38:01 UTC

Timestamp Erasure

Hey,
I just wanted to ask about one thing about timestamps. So, currently If I
have a KeyedBroadcastProcess function followed by Temporal Table Join, it
works like a charm. But, say I want to delay emitting some of the results
due to any reason. So If I *registerProcessingTimeTimer*  and any elements
are emitted in *onTimer* call then the timestamps are erased, meaning that
I will simply get :
*Caused by: java.lang.RuntimeException: Rowtime timestamp is null. Please
make sure that a proper TimestampAssigner is defined and the stream
environment uses the EventTime time characteristic.*
* at DataStreamSourceConversion$10.processElement(Unknown Source)*
* at
org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)*
* at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)*
* at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)*
* ... 23 more*

Is that the expected behavior? I haven't seen it described anywhere before
and I wasn't able to find any docs specifying this.

Thanks in advance,
Best Regards,
Dom.

Re: Timestamp Erasure

Posted by Jark Wu <im...@gmail.com>.
Hi Dom,

The output elements from ProcessingTime timer in BroadcastProcessFunction
or KeyedCoProcessFunction will be erased timestamp.
So you have to assign a new `*assignTimestampsAndWatermarks` *after that,
or use EventTime timer.

Best,
Jark

On Thu, 19 Mar 2020 at 16:40, Dominik Wosiński <wo...@gmail.com> wrote:

> Yes, I understand this completely, but my question is a little bit
> different.
>
> The issue is that if I have something like :
> *val firstStream = dataStreamFromKafka*
> *.assignTimestampAndWatermarks(...)*
> *val secondStream = otherStreamFromKafka*
> *.assignTimestampsAndWatermarks(...)*
> *.broadcast(...)*
>
> So, now If I do something like:
> *firstStream.keyby(...).connect(secondStream)*
> *.process(someBroadcastProcessFunction)*
>
> Now, I only select one field from the second stream and this is *not the
> timestamp field *and from the first stream I select all fields *including
> timestamp *(in process function when creating a new record).
>
> Then everything works like a charm and no issues there. But If I register
> ProcessingTime timer in this *someBroadcastProcessFunction *and any
> element is produced from *onTimer* function, then I get the issue
> described above.
>
> Best Regards,
> Dom.
>
> czw., 19 mar 2020 o 02:41 Jark Wu <im...@gmail.com> napisał(a):
>
>> Hi  Dom,
>>
>> If you are converting a DataStream to a Table with a rowtime attribute,
>> then the  DataStream should hold event-time timestamp.
>> For example, call `assignTimestampsAndWatermarks` before converting to
>> table. You can find more details in the doc [1].
>>
>> Best,
>> Jark
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion-1
>>
>> On Thu, 19 Mar 2020 at 02:38, Dominik Wosiński <wo...@gmail.com> wrote:
>>
>>> Hey,
>>> I just wanted to ask about one thing about timestamps. So, currently If
>>> I have a KeyedBroadcastProcess function followed by Temporal Table Join, it
>>> works like a charm. But, say I want to delay emitting some of the results
>>> due to any reason. So If I *registerProcessingTimeTimer*  and any
>>> elements are emitted in *onTimer* call then the timestamps are erased,
>>> meaning that I will simply get :
>>> *Caused by: java.lang.RuntimeException: Rowtime timestamp is null.
>>> Please make sure that a proper TimestampAssigner is defined and the stream
>>> environment uses the EventTime time characteristic.*
>>> * at DataStreamSourceConversion$10.processElement(Unknown Source)*
>>> * at
>>> org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)*
>>> * at
>>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)*
>>> * at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)*
>>> * ... 23 more*
>>>
>>> Is that the expected behavior? I haven't seen it described anywhere
>>> before and I wasn't able to find any docs specifying this.
>>>
>>> Thanks in advance,
>>> Best Regards,
>>> Dom.
>>>
>>

Re: Timestamp Erasure

Posted by Dominik Wosiński <wo...@gmail.com>.
Yes, I understand this completely, but my question is a little bit
different.

The issue is that if I have something like :
*val firstStream = dataStreamFromKafka*
*.assignTimestampAndWatermarks(...)*
*val secondStream = otherStreamFromKafka*
*.assignTimestampsAndWatermarks(...)*
*.broadcast(...)*

So, now If I do something like:
*firstStream.keyby(...).connect(secondStream)*
*.process(someBroadcastProcessFunction)*

Now, I only select one field from the second stream and this is *not the
timestamp field *and from the first stream I select all fields *including
timestamp *(in process function when creating a new record).

Then everything works like a charm and no issues there. But If I register
ProcessingTime timer in this *someBroadcastProcessFunction *and any element
is produced from *onTimer* function, then I get the issue described above.

Best Regards,
Dom.

czw., 19 mar 2020 o 02:41 Jark Wu <im...@gmail.com> napisał(a):

> Hi  Dom,
>
> If you are converting a DataStream to a Table with a rowtime attribute,
> then the  DataStream should hold event-time timestamp.
> For example, call `assignTimestampsAndWatermarks` before converting to
> table. You can find more details in the doc [1].
>
> Best,
> Jark
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion-1
>
> On Thu, 19 Mar 2020 at 02:38, Dominik Wosiński <wo...@gmail.com> wrote:
>
>> Hey,
>> I just wanted to ask about one thing about timestamps. So, currently If I
>> have a KeyedBroadcastProcess function followed by Temporal Table Join, it
>> works like a charm. But, say I want to delay emitting some of the results
>> due to any reason. So If I *registerProcessingTimeTimer*  and any
>> elements are emitted in *onTimer* call then the timestamps are erased,
>> meaning that I will simply get :
>> *Caused by: java.lang.RuntimeException: Rowtime timestamp is null. Please
>> make sure that a proper TimestampAssigner is defined and the stream
>> environment uses the EventTime time characteristic.*
>> * at DataStreamSourceConversion$10.processElement(Unknown Source)*
>> * at
>> org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)*
>> * at
>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)*
>> * at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)*
>> * ... 23 more*
>>
>> Is that the expected behavior? I haven't seen it described anywhere
>> before and I wasn't able to find any docs specifying this.
>>
>> Thanks in advance,
>> Best Regards,
>> Dom.
>>
>

Re: Timestamp Erasure

Posted by Jark Wu <im...@gmail.com>.
Hi  Dom,

If you are converting a DataStream to a Table with a rowtime attribute,
then the  DataStream should hold event-time timestamp.
For example, call `assignTimestampsAndWatermarks` before converting to
table. You can find more details in the doc [1].

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion-1

On Thu, 19 Mar 2020 at 02:38, Dominik Wosiński <wo...@gmail.com> wrote:

> Hey,
> I just wanted to ask about one thing about timestamps. So, currently If I
> have a KeyedBroadcastProcess function followed by Temporal Table Join, it
> works like a charm. But, say I want to delay emitting some of the results
> due to any reason. So If I *registerProcessingTimeTimer*  and any
> elements are emitted in *onTimer* call then the timestamps are erased,
> meaning that I will simply get :
> *Caused by: java.lang.RuntimeException: Rowtime timestamp is null. Please
> make sure that a proper TimestampAssigner is defined and the stream
> environment uses the EventTime time characteristic.*
> * at DataStreamSourceConversion$10.processElement(Unknown Source)*
> * at
> org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)*
> * at
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)*
> * at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)*
> * ... 23 more*
>
> Is that the expected behavior? I haven't seen it described anywhere before
> and I wasn't able to find any docs specifying this.
>
> Thanks in advance,
> Best Regards,
> Dom.
>