You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Brian Hulette <bh...@google.com> on 2020/01/28 19:37:01 UTC

Re: [PROPOSAL] Add support for writing flattened schemas to pubsub

I filed a few jiras to track the follow-up work we discussed here:

BEAM-9208 [1] - Add support for mapping columns to pubsub message
attributes in flat schemas DDL
BEAM-9209 [2] - Add support for mapping columns to pubsub message
event_timestamp when using flat schemas DDL
BEAM-9210 [3] - Deprecate and remove "nested" schemas for pubsub tables

Deprecation and removal of nested schemas is blocked by adding support for
attributes in flat schemas, since it will be a regression otherwise. The
event_timestamp change does not block removal since it matches the current
behavior.

Would it make sense to go ahead and mark nested schemas as deprecated
(perhaps with a warning message), so that we can remove them once BEAM-9208
is complete?

[1] https://issues.apache.org/jira/browse/BEAM-9208
[2] https://issues.apache.org/jira/browse/BEAM-9209
[3] https://issues.apache.org/jira/browse/BEAM-9210

On Thu, Nov 21, 2019 at 9:20 AM Brian Hulette <bh...@google.com> wrote:

> A PR is up here [1].
>
> Gleb: If I understand what you're saying, I think it's already implemented
> the way you're describing - PubsubIOJsonTable [2] is just a thin wrapper
> that connects PubsubIO with Beam SQL tables.
> Alex/Kenn: I agree with everything you've said :) The hard-coded
> event_timestamp is troublesome and should be configurable just like mapping
> to attributes. I like that proposal for the DDL syntax.
>
> [1] https://github.com/apache/beam/pull/10158
> [2]
> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java
>
> On Mon, Nov 18, 2019 at 10:35 AM Kenneth Knowles <ke...@apache.org> wrote:
>
>> I like Alex's syntax suggestion. Very readable. In addition to tables
>> defined via DDL, we also have a metastore abstraction that currently
>> supports Hive Metastore and Google's Data Catalog. We should think about
>> how something like what Alex describes could be served by these systems.
>>
>> Kenn
>>
>> On Sun, Nov 17, 2019 at 4:50 PM Reza Rokni <re...@google.com> wrote:
>>
>>> +1 to reduced boiler plate for basic things folks want to do with SQL.
>>>
>>> I like Alex use of Option for more advanced use cases.
>>>
>>> On Sun, 17 Nov 2019 at 20:17, Gleb Kanterov <gl...@spotify.com> wrote:
>>>
>>>> Expanding on what Kenn said regarding having fewer dependencies on SQL.
>>>> Can the whole thing be seen as extending PubSubIO, that would implement
>>>> most of the logic from the proposal, given column annotations, and then
>>>> having a thin layer that connects it with Beam SQL tables?
>>>>
>>>> On Sun, Nov 17, 2019 at 12:38 PM Alex Van Boxel <al...@vanboxel.be>
>>>> wrote:
>>>>
>>>>> I like it, but I'm worried about the magic event_timestamp injection.
>>>>> Wouldn't explicit injection via option not be a better approach:
>>>>>
>>>>> CREATE TABLE people (
>>>>>     my_timestamp TIMESTAMP *OPTION(ref="pubsub:event_timestamp)*,
>>>>>     my_id VARCHAR *OPTION(ref="pubsub:attributes['id_name']")*,
>>>>>     name VARCHAR,
>>>>>     age INTEGER
>>>>>   )
>>>>>   TYPE 'pubsub'
>>>>>   LOCATION 'projects/my-project/topics/my-topic'
>>>>>
>>>>>
>>>>>  _/
>>>>> _/ Alex Van Boxel
>>>>>
>>>>>
>>>>> On Sat, Nov 16, 2019 at 7:58 PM Kenneth Knowles <ke...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Big +1 from me.
>>>>>>
>>>>>> Nice explanation. This makes a lot of sense. Much simpler to
>>>>>> understand with fewer magic strings. It also makes the Beam SQL connector
>>>>>> less dependent on newer SQL features that are simply less widespread. I'm
>>>>>> not too surprised that Calcite's nested row support lags behind the rest of
>>>>>> the library. It simply isn't as widespread and important as flat relational
>>>>>> structures. And MAP is even less widespread.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Wed, Nov 13, 2019 at 12:32 PM Brian Hulette <bh...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I've been looking into adding support for writing (i.e. INSERT INTO
>>>>>>> statements) for the pubsub DDL, which currently only supports reading. This
>>>>>>> DDL requires the defined schema to have exactly three fields:
>>>>>>> event_timestamp, attributes, and payload, corresponding to the fields in
>>>>>>> PubsubMessage (event_timestamp can be configured to come from either
>>>>>>> publish time or from the value in a particular attribute, and the payload
>>>>>>> must be a ROW with a schema corresponding to the JSON written to the pubsub
>>>>>>> topic).
>>>>>>>
>>>>>>> When writing, I think it's a bit onerous to require users to use
>>>>>>> exactly these three top-level fields. For example imagine we have two
>>>>>>> topics: people, and eligible_voters. people contains a stream of {"name":
>>>>>>> "..", age: XX} items, and we want eligible_voters to contain a stream with
>>>>>>> {"name": ".."} items corresponding to people with age >= 18. With the
>>>>>>> current approach this would look like:
>>>>>>>
>>>>>>> ```
>>>>>>> CREATE TABLE people (
>>>>>>>     event_timestamp TIMESTAMP,
>>>>>>>     attributes MAP<VARCHAR, VARCHAR>,
>>>>>>>     payload ROW<name VARCHAR, age INTEGER>
>>>>>>>   )
>>>>>>>   TYPE 'pubsub'
>>>>>>>   LOCATION 'projects/my-project/topics/my-topic'
>>>>>>>
>>>>>>> CREATE TABLE eligible_voters ....
>>>>>>>
>>>>>>> INSERT INTO eligible_voters (
>>>>>>>   SELECT
>>>>>>>     ROW(payload.name AS name) AS payload
>>>>>>>     FROM people
>>>>>>>     WHERE payload.age >= 18
>>>>>>> )
>>>>>>> ```
>>>>>>>
>>>>>>> This query has lots of renaming and boiler-plate, and furthermore,
>>>>>>> ROW(..) doesn't seem well supported in Calcite, I had to jump through some
>>>>>>> hoops (like calling my fields $col1), to make something like this work.
>>>>>>> I think it would be great if we could instead handle flattened,
>>>>>>> payload-only schemas. We would still need to have a separate
>>>>>>> event_timestamp field, but everything else would map to a field in the
>>>>>>> payload. With this change the previous example would look like:
>>>>>>>
>>>>>>> ```
>>>>>>> CREATE TABLE people (
>>>>>>>     event_timestamp TIMESTAMP,
>>>>>>>     name VARCHAR,
>>>>>>>     age INTEGER
>>>>>>>   )
>>>>>>>   TYPE 'pubsub'
>>>>>>>   LOCATION 'projects/my-project/topics/my-topic'
>>>>>>>
>>>>>>> CREATE TABLE eligible_voters ...
>>>>>>>
>>>>>>> INSERT INTO eligible_voters (
>>>>>>>   SELECT
>>>>>>>     name
>>>>>>>     FROM people
>>>>>>>     WHERE age >= 18
>>>>>>> )
>>>>>>> ```
>>>>>>>
>>>>>>> This is much cleaner! But the overall approach has an obvious
>>>>>>> downside - with the tabke definition written like this it's impossible to
>>>>>>> read from or write to the message attributes (unless one is being used for
>>>>>>> event_timestamp). I think we can mitigate this in two ways:
>>>>>>> 1. In the future, this flattened schema definition could be
>>>>>>> represented as something like a view on the expanded definition. We could
>>>>>>> allow users to provide some metadata indicating that a column should
>>>>>>> correspond to a particular attribute, rather than a field in the payload.
>>>>>>> To me this feels similar to how you indicate a column should be indexed in
>>>>>>> a database. It's data that's relevant to the storage system, and not to the
>>>>>>> actual query, so it belongs in CREATE TABLE.
>>>>>>> 2. In the meantime, we can continue to support the current syntax.
>>>>>>> If a pubsub table definition has *exactly* three fields with the expected
>>>>>>> types: event_timestamp TIMESTAMP, payload ROW<...>, and attributes
>>>>>>> MAP<VARCHAR, VARCHAR>, we can continue to use the current codepath.
>>>>>>> Otherwise we will use the flattened schema.
>>>>>>>
>>>>>>> Please let me know if anyone has any objections to this approach,
>>>>>>> otherwise I plan on moving forward with it - I should have a PR up shortly.
>>>>>>>
>>>>>>> Brian
>>>>>>>
>>>>>>
>>>
>>> --
>>>
>>> This email may be confidential and privileged. If you received this
>>> communication by mistake, please don't forward it to anyone else, please
>>> erase all copies and attachments, and please let me know that it has gone
>>> to the wrong person.
>>>
>>> The above terms reflect a potential business arrangement, are provided
>>> solely as a basis for further discussion, and are not intended to be and do
>>> not constitute a legally binding obligation. No legally binding obligations
>>> will be created, implied, or inferred until an agreement in final form is
>>> executed in writing by all parties involved.
>>>
>>