You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Marcin Kuthan <ma...@gmail.com> on 2022/08/05 15:15:37 UTC

Dataflow SQL streaming extensions

Hi



I'm experimenting with Dataflow SQL streaming extension and I observed that
the event_timestamp field in the payload is ignored.



I would like to calculate the average value of the values reported by the
sensor every 5 seconds.



SELECT CURRENT_TIMESTAMP() AS created_at, * FROM

    (SELECT

        s1.window_start AS window_start,

        s1.window_end AS window_end,

        MIN(event_timestamp) AS event_timestamp_min,

        MAX(event_timestamp) AS event_timestamp_max,

        AVG(s1.sensor_value) AS sensor_value_avg,

    FROM TUMBLE(

        (SELECT * FROM
pubsub.topic.`sc-9366-nga-dev`.`marcin-atm22-signal-1`),

        DESCRIPTOR(event_timestamp),

        "INTERVAL 5 SECOND"

        ) as s1

    GROUP BY window_start, window_end)



For testing purposes sensor data is artificially generated, and
event_timestamp is always 30 seconds behind current time.



current timestamp: 2022-08-05T15:00:24+00:00

{'event_timestamp': '2022-08-05T14:59:54+00:00', 'sensor_value':
0.4083962116009032}



But I get the following result at 15:00:28 (the latest row stored in BQ) :

[{

  "created_at": "2022-08-05T15:00:20.170Z",

  "window_start": "2022-08-05T15:00:05Z",

  "window_end": "2022-08-05T15:00:10Z",

  "event_timestamp_min": "2022-08-05T15:00:05.019Z",

  "event_timestamp_max": "2022-08-05T15:00:09.035Z",

  "sensor_value_avg": "0.1612730883"

}]



Why is there a record created at 15:00:20 with a window 15:00:05-15:00:10
if the input event_time is always delayed by 30 seconds? At 15:00:20 the
latest emitted sensor event_timestamp is ~ 14:59:50.



Moreover the watermark lag reported by dataflow is always 10-20 seconds,
even if the event_timestamp reported by the sensor is far behind the
wallclock.

Any ideas?

Regards,

Marcin

Re: Dataflow SQL streaming extensions

Posted by Marcin Kuthan <ma...@gmail.com>.
Hi Andrew

Right now I'm much further with Beam SQL experimentation. Instead of
Dataflow SQL (
https://cloud.google.com/dataflow/docs/guides/sql/dataflow-sql-intro) I use
regular Beam SQL shell, Calcite dialect and Beam SQL extension for external
tables registration.

Looks much more complete, I'm able to register Pubsub topics as tables,
specify timestamp attribute for event time tracking and save the processing
results into BigQuery. Everything works as expected and the correct results
are calculated in the event time domain.

Because everything is a little magic and undocumented, I'm going to publish
the blog post about it
.
If you have spare 10 minutes I would appreciate any review and comments to
the early version :)
https://github.com/mkuthan/mkuthan.github.io/pull/11/files?short_path=60a54d1#diff-60a54d1fc16bb898873e0583526c01b7a535764358fe24105c0b2678ed91c3c5

Best regards,
Marcin




On Tue, 9 Aug 2022 at 21:31, Andrew Pilloud via user <us...@beam.apache.org>
wrote:

> Hi Marcin,
>
> I'm having a little trouble understanding this. I think this is a
> summary of your problem statement: You have a pipeline that windows
> data on event time. Your event generator has an artificial 30 second
> delay. The pipeline appears to be experiencing a 10-20 second delay
> instead of the expected 30 second delay so you think it may be using
> processing time instead of event time. You want some help
> investigating the issue.
>
> Is it possible that your clocks are not synchronised as well as you
> think they are? The 30 second delay is somewhat small, does the issue
> persist if you up it to an hour?
>
> This list isn't going to be much help in debugging Dataflow SQL
> issues, you should contact GCP support for that, but we can help with
> Beam SQL (which it is based on). Most Beam SQL pipelines only support
> using an older syntax where the windows are in a GROUP BY clause. I
> believe the GROUP BY format is supported by Dataflow SQL, can you try
> that? Documentation is here:
>
> https://beam.apache.org/documentation/dsls/sql/extensions/windowing-and-triggering/
>
> Andrew
>
>
> On Fri, Aug 5, 2022 at 8:15 AM Marcin Kuthan <ma...@gmail.com>
> wrote:
> >
> > Hi
> >
> >
> >
> > I'm experimenting with Dataflow SQL streaming extension and I observed
> that the event_timestamp field in the payload is ignored.
> >
> >
> >
> > I would like to calculate the average value of the values reported by
> the sensor every 5 seconds.
> >
> >
> >
> > SELECT CURRENT_TIMESTAMP() AS created_at, * FROM
> >
> >     (SELECT
> >
> >         s1.window_start AS window_start,
> >
> >         s1.window_end AS window_end,
> >
> >         MIN(event_timestamp) AS event_timestamp_min,
> >
> >         MAX(event_timestamp) AS event_timestamp_max,
> >
> >         AVG(s1.sensor_value) AS sensor_value_avg,
> >
> >     FROM TUMBLE(
> >
> >         (SELECT * FROM
> pubsub.topic.`sc-9366-nga-dev`.`marcin-atm22-signal-1`),
> >
> >         DESCRIPTOR(event_timestamp),
> >
> >         "INTERVAL 5 SECOND"
> >
> >         ) as s1
> >
> >     GROUP BY window_start, window_end)
> >
> >
> >
> > For testing purposes sensor data is artificially generated, and
> event_timestamp is always 30 seconds behind current time.
> >
> >
> >
> > current timestamp: 2022-08-05T15:00:24+00:00
> >
> > {'event_timestamp': '2022-08-05T14:59:54+00:00', 'sensor_value':
> 0.4083962116009032}
> >
> >
> >
> > But I get the following result at 15:00:28 (the latest row stored in BQ)
> :
> >
> > [{
> >
> >   "created_at": "2022-08-05T15:00:20.170Z",
> >
> >   "window_start": "2022-08-05T15:00:05Z",
> >
> >   "window_end": "2022-08-05T15:00:10Z",
> >
> >   "event_timestamp_min": "2022-08-05T15:00:05.019Z",
> >
> >   "event_timestamp_max": "2022-08-05T15:00:09.035Z",
> >
> >   "sensor_value_avg": "0.1612730883"
> >
> > }]
> >
> >
> >
> > Why is there a record created at 15:00:20 with a window
> 15:00:05-15:00:10 if the input event_time is always delayed by 30 seconds?
> At 15:00:20 the latest emitted sensor event_timestamp is ~ 14:59:50.
> >
> >
> >
> > Moreover the watermark lag reported by dataflow is always 10-20 seconds,
> even if the event_timestamp reported by the sensor is far behind the
> wallclock.
> >
> >
> > Any ideas?
> >
> >
> > Regards,
> >
> > Marcin
> >
> >
>

Re: Dataflow SQL streaming extensions

Posted by Andrew Pilloud via user <us...@beam.apache.org>.
Hi Marcin,

I'm having a little trouble understanding this. I think this is a
summary of your problem statement: You have a pipeline that windows
data on event time. Your event generator has an artificial 30 second
delay. The pipeline appears to be experiencing a 10-20 second delay
instead of the expected 30 second delay so you think it may be using
processing time instead of event time. You want some help
investigating the issue.

Is it possible that your clocks are not synchronised as well as you
think they are? The 30 second delay is somewhat small, does the issue
persist if you up it to an hour?

This list isn't going to be much help in debugging Dataflow SQL
issues, you should contact GCP support for that, but we can help with
Beam SQL (which it is based on). Most Beam SQL pipelines only support
using an older syntax where the windows are in a GROUP BY clause. I
believe the GROUP BY format is supported by Dataflow SQL, can you try
that? Documentation is here:
https://beam.apache.org/documentation/dsls/sql/extensions/windowing-and-triggering/

Andrew


On Fri, Aug 5, 2022 at 8:15 AM Marcin Kuthan <ma...@gmail.com> wrote:
>
> Hi
>
>
>
> I'm experimenting with Dataflow SQL streaming extension and I observed that the event_timestamp field in the payload is ignored.
>
>
>
> I would like to calculate the average value of the values reported by the sensor every 5 seconds.
>
>
>
> SELECT CURRENT_TIMESTAMP() AS created_at, * FROM
>
>     (SELECT
>
>         s1.window_start AS window_start,
>
>         s1.window_end AS window_end,
>
>         MIN(event_timestamp) AS event_timestamp_min,
>
>         MAX(event_timestamp) AS event_timestamp_max,
>
>         AVG(s1.sensor_value) AS sensor_value_avg,
>
>     FROM TUMBLE(
>
>         (SELECT * FROM pubsub.topic.`sc-9366-nga-dev`.`marcin-atm22-signal-1`),
>
>         DESCRIPTOR(event_timestamp),
>
>         "INTERVAL 5 SECOND"
>
>         ) as s1
>
>     GROUP BY window_start, window_end)
>
>
>
> For testing purposes sensor data is artificially generated, and event_timestamp is always 30 seconds behind current time.
>
>
>
> current timestamp: 2022-08-05T15:00:24+00:00
>
> {'event_timestamp': '2022-08-05T14:59:54+00:00', 'sensor_value': 0.4083962116009032}
>
>
>
> But I get the following result at 15:00:28 (the latest row stored in BQ) :
>
> [{
>
>   "created_at": "2022-08-05T15:00:20.170Z",
>
>   "window_start": "2022-08-05T15:00:05Z",
>
>   "window_end": "2022-08-05T15:00:10Z",
>
>   "event_timestamp_min": "2022-08-05T15:00:05.019Z",
>
>   "event_timestamp_max": "2022-08-05T15:00:09.035Z",
>
>   "sensor_value_avg": "0.1612730883"
>
> }]
>
>
>
> Why is there a record created at 15:00:20 with a window 15:00:05-15:00:10 if the input event_time is always delayed by 30 seconds? At 15:00:20 the latest emitted sensor event_timestamp is ~ 14:59:50.
>
>
>
> Moreover the watermark lag reported by dataflow is always 10-20 seconds, even if the event_timestamp reported by the sensor is far behind the wallclock.
>
>
> Any ideas?
>
>
> Regards,
>
> Marcin
>
>