You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Sachin Mittal <sj...@gmail.com> on 2023/07/21 06:19:23 UTC
EFO KinesisIO watermarking doubt
Hi,
We are implementing EFO Kinesis IO reader provided by apache beam.
I see that in code that for implementation of getCurrentTimestamp we always
return getApproximateArrivalTimestamp and not the event time which we may
have set for that record using withCustomWatermarkPolicy.
Please refer:
https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOKinesisReader.java#L91
However for KafkaIO we do something different:
We always get the getCurrentTimestamp based on `timestampPolicy` set for
Kafka where user can emit a custom timestamp associated with each record.
Please refer:
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L210
So why is there a difference in these two implementations?
We wanted the current timestamp based on some custom time embedded within
the record and not approximate arrival time and not sure how we can achieve
that.
Please let us know if there is a way out to achieve this for Kinesis.
Thanks
Sachin
Re: EFO KinesisIO watermarking doubt
Posted by Sachin Mittal <sj...@gmail.com>.
Hi,
I have filed an issue: https://github.com/apache/beam/issues/28760
I have also created a PR (based of our local fix for this):
https://github.com/apache/beam/pull/28763
This can serve as a start.
Thanks
Sachin
On Mon, Oct 2, 2023 at 2:54 AM Pavel Solomin <p....@gmail.com> wrote:
> Hello, sorry for the late reply.
>
> EFOKinesisReader implemented the same logic of timestamps non-EFO
> KinesisReader had. At the time of EFO implementation more careful
> evaluation of the records' timestamps was out of context.
>
> Can you please create an issue at https://github.com/apache/beam/issues ?
> With an issue we can track this investigation which may become a new PR or
> some clarifications in the IO documentation.
>
> > We wanted the current timestamp based on some custom time embedded
> within the record and not approximate arrival time and not sure how we can
> achieve that.
>
> KinesisIO outputs only byte[] of a message payload without any decoding.
> If your timestamps sit in the messages' payload, I think, this approach
> should work:
> https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
>
> On Fri, 21 Jul 2023 at 07:19, Sachin Mittal <sj...@gmail.com> wrote:
>
>> Hi,
>> We are implementing EFO Kinesis IO reader provided by apache beam.
>> I see that in code that for implementation of getCurrentTimestamp we
>> always return getApproximateArrivalTimestamp and not the event time
>> which we may have set for that record using withCustomWatermarkPolicy.
>>
>> Please refer:
>>
>> https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOKinesisReader.java#L91
>>
>> However for KafkaIO we do something different:
>> We always get the getCurrentTimestamp based on `timestampPolicy` set for
>> Kafka where user can emit a custom timestamp associated with each record.
>>
>> Please refer:
>>
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L210
>>
>> So why is there a difference in these two implementations?
>>
>> We wanted the current timestamp based on some custom time embedded within
>> the record and not approximate arrival time and not sure how we can achieve
>> that.
>>
>> Please let us know if there is a way out to achieve this for Kinesis.
>>
>> Thanks
>> Sachin
>>
>>
Re: EFO KinesisIO watermarking doubt
Posted by Pavel Solomin <p....@gmail.com>.
Hello, sorry for the late reply.
EFOKinesisReader implemented the same logic of timestamps non-EFO
KinesisReader had. At the time of EFO implementation more careful
evaluation of the records' timestamps was out of context.
Can you please create an issue at https://github.com/apache/beam/issues ?
With an issue we can track this investigation which may become a new PR or
some clarifications in the IO documentation.
> We wanted the current timestamp based on some custom time embedded within
the record and not approximate arrival time and not sure how we can achieve
that.
KinesisIO outputs only byte[] of a message payload without any decoding. If
your timestamps sit in the messages' payload, I think, this approach should
work:
https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements
Best Regards,
Pavel Solomin
Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>
On Fri, 21 Jul 2023 at 07:19, Sachin Mittal <sj...@gmail.com> wrote:
> Hi,
> We are implementing EFO Kinesis IO reader provided by apache beam.
> I see that in code that for implementation of getCurrentTimestamp we
> always return getApproximateArrivalTimestamp and not the event time which
> we may have set for that record using withCustomWatermarkPolicy.
>
> Please refer:
>
> https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOKinesisReader.java#L91
>
> However for KafkaIO we do something different:
> We always get the getCurrentTimestamp based on `timestampPolicy` set for
> Kafka where user can emit a custom timestamp associated with each record.
>
> Please refer:
>
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L210
>
> So why is there a difference in these two implementations?
>
> We wanted the current timestamp based on some custom time embedded within
> the record and not approximate arrival time and not sure how we can achieve
> that.
>
> Please let us know if there is a way out to achieve this for Kinesis.
>
> Thanks
> Sachin
>
>