You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Sachin Mittal <sj...@gmail.com> on 2023/07/21 06:19:23 UTC

EFO KinesisIO watermarking doubt

Hi,
We are implementing EFO Kinesis IO reader provided by apache beam.
I see that in code that for implementation of getCurrentTimestamp we always
return getApproximateArrivalTimestamp and not the event time which we may
have set for that record using withCustomWatermarkPolicy.

Please refer:
https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOKinesisReader.java#L91

However for KafkaIO we do something different:
We always get the getCurrentTimestamp based on `timestampPolicy` set for
Kafka where user can emit a custom timestamp associated with each record.

Please refer:
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L210

So why is there a difference in these two implementations?

We wanted the current timestamp based on some custom time embedded within
the record and not approximate arrival time and not sure how we can achieve
that.

Please let us know if there is a way out to achieve this for Kinesis.

Thanks
Sachin

Re: EFO KinesisIO watermarking doubt

Posted by Sachin Mittal <sj...@gmail.com>.
Hi,
I have filed an issue: https://github.com/apache/beam/issues/28760
I have also created a PR (based of our local fix for this):
https://github.com/apache/beam/pull/28763
This can serve as a start.

Thanks
Sachin


On Mon, Oct 2, 2023 at 2:54 AM Pavel Solomin <p....@gmail.com> wrote:

> Hello, sorry for the late reply.
>
> EFOKinesisReader implemented the same logic of timestamps non-EFO
> KinesisReader had. At the time of EFO implementation more careful
> evaluation of the records' timestamps was out of context.
>
> Can you please create an issue at https://github.com/apache/beam/issues ?
> With an issue we can track this investigation which may become a new PR or
> some clarifications in the IO documentation.
>
> > We wanted the current timestamp based on some custom time embedded
> within the record and not approximate arrival time and not sure how we can
> achieve that.
>
> KinesisIO outputs only byte[] of a message payload without any decoding.
> If your timestamps sit in the messages' payload, I think, this approach
> should work:
> https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
>
> On Fri, 21 Jul 2023 at 07:19, Sachin Mittal <sj...@gmail.com> wrote:
>
>> Hi,
>> We are implementing EFO Kinesis IO reader provided by apache beam.
>> I see that in code that for implementation of getCurrentTimestamp we
>> always return getApproximateArrivalTimestamp and not the event time
>> which we may have set for that record using withCustomWatermarkPolicy.
>>
>> Please refer:
>>
>> https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOKinesisReader.java#L91
>>
>> However for KafkaIO we do something different:
>> We always get the getCurrentTimestamp based on `timestampPolicy` set for
>> Kafka where user can emit a custom timestamp associated with each record.
>>
>> Please refer:
>>
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L210
>>
>> So why is there a difference in these two implementations?
>>
>> We wanted the current timestamp based on some custom time embedded within
>> the record and not approximate arrival time and not sure how we can achieve
>> that.
>>
>> Please let us know if there is a way out to achieve this for Kinesis.
>>
>> Thanks
>> Sachin
>>
>>

Re: EFO KinesisIO watermarking doubt

Posted by Pavel Solomin <p....@gmail.com>.
Hello, sorry for the late reply.

EFOKinesisReader implemented the same logic of timestamps non-EFO
KinesisReader had. At the time of EFO implementation more careful
evaluation of the records' timestamps was out of context.

Can you please create an issue at https://github.com/apache/beam/issues ?
With an issue we can track this investigation which may become a new PR or
some clarifications in the IO documentation.

> We wanted the current timestamp based on some custom time embedded within
the record and not approximate arrival time and not sure how we can achieve
that.

KinesisIO outputs only byte[] of a message payload without any decoding. If
your timestamps sit in the messages' payload, I think, this approach should
work:
https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>





On Fri, 21 Jul 2023 at 07:19, Sachin Mittal <sj...@gmail.com> wrote:

> Hi,
> We are implementing EFO Kinesis IO reader provided by apache beam.
> I see that in code that for implementation of getCurrentTimestamp we
> always return getApproximateArrivalTimestamp and not the event time which
> we may have set for that record using withCustomWatermarkPolicy.
>
> Please refer:
>
> https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOKinesisReader.java#L91
>
> However for KafkaIO we do something different:
> We always get the getCurrentTimestamp based on `timestampPolicy` set for
> Kafka where user can emit a custom timestamp associated with each record.
>
> Please refer:
>
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L210
>
> So why is there a difference in these two implementations?
>
> We wanted the current timestamp based on some custom time embedded within
> the record and not approximate arrival time and not sure how we can achieve
> that.
>
> Please let us know if there is a way out to achieve this for Kinesis.
>
> Thanks
> Sachin
>
>