You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Pavel Solomin <p....@gmail.com> on 2023/10/01 21:24:20 UTC

Re: EFO KinesisIO watermarking doubt

Hello, sorry for the late reply.

EFOKinesisReader implemented the same logic of timestamps non-EFO
KinesisReader had. At the time of EFO implementation more careful
evaluation of the records' timestamps was out of context.

Can you please create an issue at https://github.com/apache/beam/issues ?
With an issue we can track this investigation which may become a new PR or
some clarifications in the IO documentation.

> We wanted the current timestamp based on some custom time embedded within
the record and not approximate arrival time and not sure how we can achieve
that.

KinesisIO outputs only byte[] of a message payload without any decoding. If
your timestamps sit in the messages' payload, I think, this approach should
work:
https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>





On Fri, 21 Jul 2023 at 07:19, Sachin Mittal <sj...@gmail.com> wrote:

> Hi,
> We are implementing EFO Kinesis IO reader provided by apache beam.
> I see that in code that for implementation of getCurrentTimestamp we
> always return getApproximateArrivalTimestamp and not the event time which
> we may have set for that record using withCustomWatermarkPolicy.
>
> Please refer:
>
> https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOKinesisReader.java#L91
>
> However for KafkaIO we do something different:
> We always get the getCurrentTimestamp based on `timestampPolicy` set for
> Kafka where user can emit a custom timestamp associated with each record.
>
> Please refer:
>
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L210
>
> So why is there a difference in these two implementations?
>
> We wanted the current timestamp based on some custom time embedded within
> the record and not approximate arrival time and not sure how we can achieve
> that.
>
> Please let us know if there is a way out to achieve this for Kinesis.
>
> Thanks
> Sachin
>
>

Re: EFO KinesisIO watermarking doubt

Posted by Sachin Mittal <sj...@gmail.com>.
Hi,
I have filed an issue: https://github.com/apache/beam/issues/28760
I have also created a PR (based of our local fix for this):
https://github.com/apache/beam/pull/28763
This can serve as a start.

Thanks
Sachin


On Mon, Oct 2, 2023 at 2:54 AM Pavel Solomin <p....@gmail.com> wrote:

> Hello, sorry for the late reply.
>
> EFOKinesisReader implemented the same logic of timestamps non-EFO
> KinesisReader had. At the time of EFO implementation more careful
> evaluation of the records' timestamps was out of context.
>
> Can you please create an issue at https://github.com/apache/beam/issues ?
> With an issue we can track this investigation which may become a new PR or
> some clarifications in the IO documentation.
>
> > We wanted the current timestamp based on some custom time embedded
> within the record and not approximate arrival time and not sure how we can
> achieve that.
>
> KinesisIO outputs only byte[] of a message payload without any decoding.
> If your timestamps sit in the messages' payload, I think, this approach
> should work:
> https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
>
> On Fri, 21 Jul 2023 at 07:19, Sachin Mittal <sj...@gmail.com> wrote:
>
>> Hi,
>> We are implementing EFO Kinesis IO reader provided by apache beam.
>> I see that in code that for implementation of getCurrentTimestamp we
>> always return getApproximateArrivalTimestamp and not the event time
>> which we may have set for that record using withCustomWatermarkPolicy.
>>
>> Please refer:
>>
>> https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOKinesisReader.java#L91
>>
>> However for KafkaIO we do something different:
>> We always get the getCurrentTimestamp based on `timestampPolicy` set for
>> Kafka where user can emit a custom timestamp associated with each record.
>>
>> Please refer:
>>
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L210
>>
>> So why is there a difference in these two implementations?
>>
>> We wanted the current timestamp based on some custom time embedded within
>> the record and not approximate arrival time and not sure how we can achieve
>> that.
>>
>> Please let us know if there is a way out to achieve this for Kinesis.
>>
>> Thanks
>> Sachin
>>
>>