You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Maximilian Michels <mx...@apache.org> on 2020/05/28 16:45:09 UTC

SQL Windowing

Hi,

I'm using the SqlTransform as an external transform from within a Python
pipeline. The SQL docs [1] mention that you can either (a) window the
input or (b) window in the SQL query.

Option (a):

  input
      | "Window >> beam.WindowInto(window.FixedWindows(30))
      | "Aggregate" >>
      SqlTransform("""Select field, count(field) from PCOLLECTION
                      WHERE ...
		      GROUP BY field
                   """)

This results in an exception:

  Caused by: java.lang.ClassCastException:
  org.apache.beam.sdk.transforms.windowing.IntervalWindow cannot be cast
  to org.apache.beam.sdk.transforms.windowing.GlobalWindow

=> Is this a bug?


Let's try Option (b):

  input
      | "Aggregate & Window" >>
      SqlTransform("""Select field, count(field) from PCOLLECTION
                      WHERE ...
		      GROUP BY field,
                               TUMBLE(f_timestamp, INTERVAL '30' MINUTE)
                   """)

The issue that I'm facing here is that the timestamp is already assigned
to my values but is not exposed as a field. So I need to use a DoFn to
extract the timestamp as a new field:

  class GetTimestamp(beam.DoFn):
    def process(self, event, timestamp=beam.DoFn.TimestampParam):
      yield TimestampedRow(..., timestamp)

  input
      | "Extract timestamp" >>
      beam.ParDo(GetTimestamp())
      | "Aggregate & Window" >>
      SqlTransform("""Select field, count(field) from PCOLLECTION
                      WHERE ...
		      GROUP BY field,
                               TUMBLE(f_timestamp, INTERVAL '30' MINUTE)
                   """)

=> It would be very convenient if there was a reserved field name which
would point to the timestamp of an element. Maybe there is?


-Max


[1]
https://beam.apache.org/documentation/dsls/sql/extensions/windowing-and-triggering/

Re: SQL Windowing

Posted by Maximilian Michels <mx...@apache.org>.
Thanks for the quick reply Brian! I've filed a JIRA for option (a):
https://jira.apache.org/jira/browse/BEAM-10143

Makes sense to define DATETIME as a logical type. I'll check out your
PR. We could work around this for now by doing a cast, e.g.:

  TUMBLE(CAST(f_timestamp AS DATETIME), INTERVAL '30' MINUTE)

Note that we may have to do a more sophisticated cast to convert the
Python micros into a DATETIME.

-Max

On 28.05.20 19:18, Brian Hulette wrote:
> Hey Max,
> Thanks for kicking the tires on SqlTransform in Python :)
> 
> We don't have any tests of windowing and Sql in Python yet, so I'm not
> that surprised you're running into issues here. Portable schemas don't
> support the DATETIME type, because we decided not to define it as one of
> the atomic types [1] and hope to add support via a logical type instead
> (see BEAM-7554 [2]). This was the motivation for the MillisInstant PR I
> put up, and the ongoing discussion [3].
> Regardless, that should only be an obstacle for option (b), where you'd
> need to have a DATETIME in the input and/or output PCollection of the
> SqlTransform. In theory option (a) should be possible, so I'd consider
> that a bug - can you file a jira for it?
> 
> Brian
> 
> [1] https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/schema.proto#L58
> [2] https://issues.apache.org/jira/browse/BEAM-7554
> [3] https://lists.apache.org/thread.html/r2e05355b74fb5b8149af78ade1e3539ec08371a9a4b2b9e45737e6be%40%3Cdev.beam.apache.org%3E
> 
> On Thu, May 28, 2020 at 9:45 AM Maximilian Michels <mxm@apache.org
> <ma...@apache.org>> wrote:
> 
>     Hi,
> 
>     I'm using the SqlTransform as an external transform from within a Python
>     pipeline. The SQL docs [1] mention that you can either (a) window the
>     input or (b) window in the SQL query.
> 
>     Option (a):
> 
>       input
>           | "Window >> beam.WindowInto(window.FixedWindows(30))
>           | "Aggregate" >>
>           SqlTransform("""Select field, count(field) from PCOLLECTION
>                           WHERE ...
>                           GROUP BY field
>                        """)
> 
>     This results in an exception:
> 
>       Caused by: java.lang.ClassCastException:
>       org.apache.beam.sdk.transforms.windowing.IntervalWindow cannot be cast
>       to org.apache.beam.sdk.transforms.windowing.GlobalWindow
> 
>     => Is this a bug?
> 
> 
>     Let's try Option (b):
> 
>       input
>           | "Aggregate & Window" >>
>           SqlTransform("""Select field, count(field) from PCOLLECTION
>                           WHERE ...
>                           GROUP BY field,
>                                    TUMBLE(f_timestamp, INTERVAL '30' MINUTE)
>                        """)
> 
>     The issue that I'm facing here is that the timestamp is already assigned
>     to my values but is not exposed as a field. So I need to use a DoFn to
>     extract the timestamp as a new field:
> 
>       class GetTimestamp(beam.DoFn):
>         def process(self, event, timestamp=beam.DoFn.TimestampParam):
>           yield TimestampedRow(..., timestamp)
> 
>       input
>           | "Extract timestamp" >>
>           beam.ParDo(GetTimestamp())
>           | "Aggregate & Window" >>
>           SqlTransform("""Select field, count(field) from PCOLLECTION
>                           WHERE ...
>                           GROUP BY field,
>                                    TUMBLE(f_timestamp, INTERVAL '30' MINUTE)
>                        """)
> 
>     => It would be very convenient if there was a reserved field name which
>     would point to the timestamp of an element. Maybe there is?
> 
> 
>     -Max
> 
> 
>     [1]
>     https://beam.apache.org/documentation/dsls/sql/extensions/windowing-and-triggering/
> 

Re: SQL Windowing

Posted by Brian Hulette <bh...@google.com>.
Hey Max,
Thanks for kicking the tires on SqlTransform in Python :)

We don't have any tests of windowing and Sql in Python yet, so I'm not that
surprised you're running into issues here. Portable schemas don't support
the DATETIME type, because we decided not to define it as one of the atomic
types [1] and hope to add support via a logical type instead (see BEAM-7554
[2]). This was the motivation for the MillisInstant PR I put up, and the
ongoing discussion [3].
Regardless, that should only be an obstacle for option (b), where you'd
need to have a DATETIME in the input and/or output PCollection of the
SqlTransform. In theory option (a) should be possible, so I'd consider that
a bug - can you file a jira for it?

Brian

[1]
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/schema.proto#L58
[2] https://issues.apache.org/jira/browse/BEAM-7554
[3]
https://lists.apache.org/thread.html/r2e05355b74fb5b8149af78ade1e3539ec08371a9a4b2b9e45737e6be%40%3Cdev.beam.apache.org%3E

On Thu, May 28, 2020 at 9:45 AM Maximilian Michels <mx...@apache.org> wrote:

> Hi,
>
> I'm using the SqlTransform as an external transform from within a Python
> pipeline. The SQL docs [1] mention that you can either (a) window the
> input or (b) window in the SQL query.
>
> Option (a):
>
>   input
>       | "Window >> beam.WindowInto(window.FixedWindows(30))
>       | "Aggregate" >>
>       SqlTransform("""Select field, count(field) from PCOLLECTION
>                       WHERE ...
>                       GROUP BY field
>                    """)
>
> This results in an exception:
>
>   Caused by: java.lang.ClassCastException:
>   org.apache.beam.sdk.transforms.windowing.IntervalWindow cannot be cast
>   to org.apache.beam.sdk.transforms.windowing.GlobalWindow
>
> => Is this a bug?
>
>
> Let's try Option (b):
>
>   input
>       | "Aggregate & Window" >>
>       SqlTransform("""Select field, count(field) from PCOLLECTION
>                       WHERE ...
>                       GROUP BY field,
>                                TUMBLE(f_timestamp, INTERVAL '30' MINUTE)
>                    """)
>
> The issue that I'm facing here is that the timestamp is already assigned
> to my values but is not exposed as a field. So I need to use a DoFn to
> extract the timestamp as a new field:
>
>   class GetTimestamp(beam.DoFn):
>     def process(self, event, timestamp=beam.DoFn.TimestampParam):
>       yield TimestampedRow(..., timestamp)
>
>   input
>       | "Extract timestamp" >>
>       beam.ParDo(GetTimestamp())
>       | "Aggregate & Window" >>
>       SqlTransform("""Select field, count(field) from PCOLLECTION
>                       WHERE ...
>                       GROUP BY field,
>                                TUMBLE(f_timestamp, INTERVAL '30' MINUTE)
>                    """)
>
> => It would be very convenient if there was a reserved field name which
> would point to the timestamp of an element. Maybe there is?
>
>
> -Max
>
>
> [1]
>
> https://beam.apache.org/documentation/dsls/sql/extensions/windowing-and-triggering/
>