You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Randal Pitt <ra...@foresite.com> on 2020/12/07 13:43:17 UTC

Event time issues when Kinesis consumer receives batched messages

Hi there,

We're using Flink to read from a Kinesis stream. The stream contains
messages that themselves contain lists of events and we want our Flink jobs
(using the event time characteristic) to process those events individually.
We have this working using flatMap in the DataStream but we're having
trouble correctly assigning timestamps to the events.

We have been using FlinkKinesisConsumer.setPeriodicWatermarkAssigner() as
that should mean the watermarks are generated correctly, but it results in
all events in one message sharing a timestamp, resulting in some events
being assigned to the wrong window.

Using DataStream.assignTimestampsAndWatermarks() after the flatMap means we
can assign the correct timestamps, but the watermarks may not necessarily be
correct with respect to the Kinesis shards.

Is there are strategy we can use that gets us both watermarks from the
Kinesis consumer and correct timestamps for individual events?

Best regards,

Randal.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Event time issues when Kinesis consumer receives batched messages

Posted by Randal Pitt <ra...@foresite.com>.
Thanks Roman, I'll look into how I go about doing that.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Event time issues when Kinesis consumer receives batched messages

Posted by Khachatryan Roman <kh...@gmail.com>.
Thanks, Randal,

Yes, I think the only way is to partition the stream the same way as
kinesis does (as I wrote before).

Regards,
Roman


On Tue, Dec 8, 2020 at 1:38 PM Randal Pitt <ra...@foresite.com> wrote:

> Hi Roman,
>
> We're using a custom watermarker that uses a histogram to calculate a "best
> fit" event time as the data we receive can be very unordered.
>
> As you can see we're using the timestamp from the first event in the batch,
> so we're essentially sampling the timestamps rather than using them all.
>
> FlinkKinesisConsumer<Batch&lt;EventType>> consumer = new
> FlinkKinesisConsumer<>(...);
>
> consumer.setPeriodicWatermarkAssigner(
>     new HistogramWatermarker<>(Time.minutes(30), 100) {
>         @Override
>         public long extractTimestamp(final Batch<EventType> element) {
>             return element.getBatch().get(0).getDate().getTime();
>         }
>     }
> );
>
> Cheers,
> Randal.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Event time issues when Kinesis consumer receives batched messages

Posted by Randal Pitt <ra...@foresite.com>.
Hi Roman,

We're using a custom watermarker that uses a histogram to calculate a "best
fit" event time as the data we receive can be very unordered.

As you can see we're using the timestamp from the first event in the batch,
so we're essentially sampling the timestamps rather than using them all.

FlinkKinesisConsumer<Batch&lt;EventType>> consumer = new
FlinkKinesisConsumer<>(...);

consumer.setPeriodicWatermarkAssigner(
    new HistogramWatermarker<>(Time.minutes(30), 100) {
        @Override
        public long extractTimestamp(final Batch<EventType> element) {
            return element.getBatch().get(0).getDate().getTime();
        }
    }
);

Cheers,
Randal.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Event time issues when Kinesis consumer receives batched messages

Posted by Khachatryan Roman <kh...@gmail.com>.
Hi Randal,

Can you share the code for the 1st approach
(FlinkKinesisConsumer.setPeriodicWatermarkAssigner))?
I think the 2nd approach (flatMap) can be improved by partitioning the
stream the same way kinesis does (i.e. same partition key).

Regards,
Roman


On Mon, Dec 7, 2020 at 2:44 PM Randal Pitt <ra...@foresite.com> wrote:

> Hi there,
>
> We're using Flink to read from a Kinesis stream. The stream contains
> messages that themselves contain lists of events and we want our Flink jobs
> (using the event time characteristic) to process those events individually.
> We have this working using flatMap in the DataStream but we're having
> trouble correctly assigning timestamps to the events.
>
> We have been using FlinkKinesisConsumer.setPeriodicWatermarkAssigner() as
> that should mean the watermarks are generated correctly, but it results in
> all events in one message sharing a timestamp, resulting in some events
> being assigned to the wrong window.
>
> Using DataStream.assignTimestampsAndWatermarks() after the flatMap means we
> can assign the correct timestamps, but the watermarks may not necessarily
> be
> correct with respect to the Kinesis shards.
>
> Is there are strategy we can use that gets us both watermarks from the
> Kinesis consumer and correct timestamps for individual events?
>
> Best regards,
>
> Randal.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>