You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by bat man <ti...@gmail.com> on 2022/06/14 12:51:32 UTC

context.timestamp null in keyedprocess function

Hi,

We are using flink 12.1 on AWS EMR. The job reads the event stream and
enrich stream from another topic.
We extend AssignerWithPeriodicWatermarks to assign watermarks and extract
timestamp from the event and handle idle source partitions.
AutoWatermarkInterval set to 5000L.
 The timestamp extractor looks like below -

        @Override
        public long extractTimestamp(Raw event, long
previousElementTimestamp) {
            lastRecordProcessingTime = System.currentTimeMillis();
            Double eventTime =

Double.parseDouble(event.getTimestamp().toString()).longValue();
            long timestamp = Instant.ofEpochMilli(eventTime
*1_000).toEpochMilli();
            if (timestamp > currentMaxTimestamp) {
                currentMaxTimestamp = timestamp;
            }
            return timestamp;
        }

Second step the rules are joined to events, this is done in keyedprocess
function.
What we have observed is that at times when the job starts consuming from
the beginning of the event source stream, the timestamp accessed in
the keyedprocess fn using context.timestamp comes as null and the code is
throwing NPE.
This happens only for some records intermittently and the same event when
we try to process in another environment it processes fine, that means the
event is getting parsed fine.

What could be the issue, anyone has any idea, because as far as timestamp
goes it could only be null if the timestamp extractor sends null.

Thanks.

Re: context.timestamp null in keyedprocess function

Posted by Shengkai Fang <fs...@gmail.com>.
hi.

Could you share more info for us, e.g. exception stack? Do you set the
assigner for all the source? I think you can modify the
KeyedProcessFuncition to print the message whose timestamp is null.

Best,
Shengkai

bat man <ti...@gmail.com> 于2022年6月15日周三 14:57写道:

> Has anyone experienced this or has any clue?
>
> On Tue, Jun 14, 2022 at 6:21 PM bat man <ti...@gmail.com> wrote:
>
>> Hi,
>>
>> We are using flink 12.1 on AWS EMR. The job reads the event stream and
>> enrich stream from another topic.
>> We extend AssignerWithPeriodicWatermarks to assign watermarks and extract
>> timestamp from the event and handle idle source partitions.
>> AutoWatermarkInterval set to 5000L.
>>  The timestamp extractor looks like below -
>>
>>         @Override
>>         public long extractTimestamp(Raw event, long
>> previousElementTimestamp) {
>>             lastRecordProcessingTime = System.currentTimeMillis();
>>             Double eventTime =
>>
>> Double.parseDouble(event.getTimestamp().toString()).longValue();
>>             long timestamp = Instant.ofEpochMilli(eventTime
>> *1_000).toEpochMilli();
>>             if (timestamp > currentMaxTimestamp) {
>>                 currentMaxTimestamp = timestamp;
>>             }
>>             return timestamp;
>>         }
>>
>> Second step the rules are joined to events, this is done in keyedprocess
>> function.
>> What we have observed is that at times when the job starts consuming from
>> the beginning of the event source stream, the timestamp accessed in
>> the keyedprocess fn using context.timestamp comes as null and the code is
>> throwing NPE.
>> This happens only for some records intermittently and the same event when
>> we try to process in another environment it processes fine, that means the
>> event is getting parsed fine.
>>
>> What could be the issue, anyone has any idea, because as far as timestamp
>> goes it could only be null if the timestamp extractor sends null.
>>
>> Thanks.
>>
>

Re: context.timestamp null in keyedprocess function

Posted by bat man <ti...@gmail.com>.
Has anyone experienced this or has any clue?

On Tue, Jun 14, 2022 at 6:21 PM bat man <ti...@gmail.com> wrote:

> Hi,
>
> We are using flink 12.1 on AWS EMR. The job reads the event stream and
> enrich stream from another topic.
> We extend AssignerWithPeriodicWatermarks to assign watermarks and extract
> timestamp from the event and handle idle source partitions.
> AutoWatermarkInterval set to 5000L.
>  The timestamp extractor looks like below -
>
>         @Override
>         public long extractTimestamp(Raw event, long
> previousElementTimestamp) {
>             lastRecordProcessingTime = System.currentTimeMillis();
>             Double eventTime =
>
> Double.parseDouble(event.getTimestamp().toString()).longValue();
>             long timestamp = Instant.ofEpochMilli(eventTime
> *1_000).toEpochMilli();
>             if (timestamp > currentMaxTimestamp) {
>                 currentMaxTimestamp = timestamp;
>             }
>             return timestamp;
>         }
>
> Second step the rules are joined to events, this is done in keyedprocess
> function.
> What we have observed is that at times when the job starts consuming from
> the beginning of the event source stream, the timestamp accessed in
> the keyedprocess fn using context.timestamp comes as null and the code is
> throwing NPE.
> This happens only for some records intermittently and the same event when
> we try to process in another environment it processes fine, that means the
> event is getting parsed fine.
>
> What could be the issue, anyone has any idea, because as far as timestamp
> goes it could only be null if the timestamp extractor sends null.
>
> Thanks.
>