You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Anton Kedin <ke...@google.com> on 2018/05/02 17:30:17 UTC

Pubsub to Beam SQL

Hi

I am working on adding functionality to support querying Pubsub messages
directly from Beam SQL.

*Goal*
  Provide Beam users a pure  SQL solution to create the pipelines with
Pubsub as a data source, without the need to set up the pipelines in Java
before applying the query.

*High level approach*

   -
   - Build on top of PubsubIO;
   - Pubsub source will be declared using CREATE TABLE DDL statement:
      - Beam SQL already supports declaring sources like Kafka and Text
      using CREATE TABLE DDL;
      - it supports additional configuration using TBLPROPERTIES clause.
      Currently it takes a text blob, where we can put a JSON configuration;
      - wrapping PubsubIO into a similar source looks feasible;
   - The plan is to initially support messages only with JSON payload:
   -
      - more payload formats can be added later;
   - Messages will be fully described in the CREATE TABLE statements:
      - event timestamps. Source of the timestamp is configurable. It is
      required by Beam SQL to have an explicit timestamp column for windowing
      support;
      - messages attributes map;
      - JSON payload schema;
   - Event timestamps will be taken either from publish time or
   user-specified message attribute (configurable);

Thoughts, ideas, comments?

More details are in the doc here:
https://docs.google.com/document/d/1wIXTxh-nQ3u694XbF0iEZX_7-b3yi4ad0ML2pcAxYfE


Thank you,
Anton

Re: Pubsub to Beam SQL

Posted by Ankur Goenka <go...@google.com>.
I like the idea of exposing source timestamp in TBLPROPERTIES which is
closely tied to source (KafkaIO, KinesisIO, MqttIO, AmqpIO, unbounded
FileIO, PubSubIO).
Exposing timestamp as a top level keyword will break the symmetry between
streaming and batch pipelines.
TBLPROPERTIES gives us flexibility on defining timestamp in source specific
way if needed.


On Thu, May 3, 2018 at 4:08 PM Kenneth Knowles <kl...@google.com> wrote:

> It is an interesting question for Beam DDL - since timestamps are
> fundamental to Beam's data model, should we have a DDL extension that makes
> it very explicit? Seems nice, but perhaps TBLPROPERTIES is a way to stage
> the work, getting the functionality in place first and the parsing second.
>
> What would the TIMESTAMP (let's maybe choose a term that isn't already
> reserved) metadata thing look like for e.g. KafkaIO, KinesisIO, MqttIO,
> AmqpIO, unbounded FileIO? I think a lot of these don't actually have any
> configurability so maybe it is moot. Does Calcite already have an opinion
> about timestamps on rows?
>
> Kenn
>
> On Thu, May 3, 2018 at 1:02 PM Andrew Pilloud <ap...@google.com> wrote:
>
>> I like to avoid magic too. I might not have been entirely clear in what I
>> was asking. Here is an example of what I had in mind, replacing the TBLPROPERTIES
>> with a more generic TIMESTAMP option:
>>
>> CREATE TABLE  table_name (
>>   publishTimestamp TIMESTAMP,
>>   attributes MAP(VARCHAR, VARCHAR),
>>   payload ROW (
>>                name VARCHAR,
>>                age INTEGER,
>>                isSWE BOOLEAN,
>>                tags ARRAY(VARCHAR)))
>> TIMESTAMP attributes["createTime"];
>>
>> Andrew
>>
>> On Thu, May 3, 2018 at 12:47 PM Anton Kedin <ke...@google.com> wrote:
>>
>>> I think it makes sense for the case when timestamp is provided in the
>>> payload (including pubsub message attributes).  We can mark the field as an
>>> event timestamp. But if the timestamp is internally defined by the source
>>> (pubsub message publish time) and not exposed in the event body, then we
>>> need a source-specific mechanism to extract and map the event timestamp to
>>> the schema. This is, of course, if we don't automatically add a magic
>>> timestamp field which Beam SQL can populate behind the scenes and add to
>>> the schema. I want to avoid this magic path for now.
>>>
>>> On Thu, May 3, 2018 at 11:10 AM Andrew Pilloud <ap...@google.com>
>>> wrote:
>>>
>>>> This sounds awesome!
>>>>
>>>> Is event timestamp something that we need to specify for every source?
>>>> If so, I would suggest we add this as a first class option on CREATE TABLE
>>>> rather then something hidden in TBLPROPERTIES.
>>>>
>>>> Andrew
>>>>
>>>> On Wed, May 2, 2018 at 10:30 AM Anton Kedin <ke...@google.com> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> I am working on adding functionality to support querying Pubsub
>>>>> messages directly from Beam SQL.
>>>>>
>>>>> *Goal*
>>>>>   Provide Beam users a pure  SQL solution to create the pipelines with
>>>>> Pubsub as a data source, without the need to set up the pipelines in
>>>>> Java before applying the query.
>>>>>
>>>>> *High level approach*
>>>>>
>>>>>    -
>>>>>    - Build on top of PubsubIO;
>>>>>    - Pubsub source will be declared using CREATE TABLE DDL statement:
>>>>>       - Beam SQL already supports declaring sources like Kafka and
>>>>>       Text using CREATE TABLE DDL;
>>>>>       - it supports additional configuration using TBLPROPERTIES
>>>>>       clause. Currently it takes a text blob, where we can put a JSON
>>>>>       configuration;
>>>>>       - wrapping PubsubIO into a similar source looks feasible;
>>>>>    - The plan is to initially support messages only with JSON payload:
>>>>>    -
>>>>>       - more payload formats can be added later;
>>>>>    - Messages will be fully described in the CREATE TABLE statements:
>>>>>       - event timestamps. Source of the timestamp is configurable. It
>>>>>       is required by Beam SQL to have an explicit timestamp column for windowing
>>>>>       support;
>>>>>       - messages attributes map;
>>>>>       - JSON payload schema;
>>>>>    - Event timestamps will be taken either from publish time or
>>>>>    user-specified message attribute (configurable);
>>>>>
>>>>> Thoughts, ideas, comments?
>>>>>
>>>>> More details are in the doc here:
>>>>> https://docs.google.com/document/d/1wIXTxh-nQ3u694XbF0iEZX_7-b3yi4ad0ML2pcAxYfE
>>>>>
>>>>>
>>>>> Thank you,
>>>>> Anton
>>>>>
>>>>

Re: Pubsub to Beam SQL

Posted by Kenneth Knowles <kl...@google.com>.
It is an interesting question for Beam DDL - since timestamps are
fundamental to Beam's data model, should we have a DDL extension that makes
it very explicit? Seems nice, but perhaps TBLPROPERTIES is a way to stage
the work, getting the functionality in place first and the parsing second.

What would the TIMESTAMP (let's maybe choose a term that isn't already
reserved) metadata thing look like for e.g. KafkaIO, KinesisIO, MqttIO,
AmqpIO, unbounded FileIO? I think a lot of these don't actually have any
configurability so maybe it is moot. Does Calcite already have an opinion
about timestamps on rows?

Kenn

On Thu, May 3, 2018 at 1:02 PM Andrew Pilloud <ap...@google.com> wrote:

> I like to avoid magic too. I might not have been entirely clear in what I
> was asking. Here is an example of what I had in mind, replacing the TBLPROPERTIES
> with a more generic TIMESTAMP option:
>
> CREATE TABLE  table_name (
>   publishTimestamp TIMESTAMP,
>   attributes MAP(VARCHAR, VARCHAR),
>   payload ROW (
>                name VARCHAR,
>                age INTEGER,
>                isSWE BOOLEAN,
>                tags ARRAY(VARCHAR)))
> TIMESTAMP attributes["createTime"];
>
> Andrew
>
> On Thu, May 3, 2018 at 12:47 PM Anton Kedin <ke...@google.com> wrote:
>
>> I think it makes sense for the case when timestamp is provided in the
>> payload (including pubsub message attributes).  We can mark the field as an
>> event timestamp. But if the timestamp is internally defined by the source
>> (pubsub message publish time) and not exposed in the event body, then we
>> need a source-specific mechanism to extract and map the event timestamp to
>> the schema. This is, of course, if we don't automatically add a magic
>> timestamp field which Beam SQL can populate behind the scenes and add to
>> the schema. I want to avoid this magic path for now.
>>
>> On Thu, May 3, 2018 at 11:10 AM Andrew Pilloud <ap...@google.com>
>> wrote:
>>
>>> This sounds awesome!
>>>
>>> Is event timestamp something that we need to specify for every source?
>>> If so, I would suggest we add this as a first class option on CREATE TABLE
>>> rather then something hidden in TBLPROPERTIES.
>>>
>>> Andrew
>>>
>>> On Wed, May 2, 2018 at 10:30 AM Anton Kedin <ke...@google.com> wrote:
>>>
>>>> Hi
>>>>
>>>> I am working on adding functionality to support querying Pubsub
>>>> messages directly from Beam SQL.
>>>>
>>>> *Goal*
>>>>   Provide Beam users a pure  SQL solution to create the pipelines with
>>>> Pubsub as a data source, without the need to set up the pipelines in
>>>> Java before applying the query.
>>>>
>>>> *High level approach*
>>>>
>>>>    -
>>>>    - Build on top of PubsubIO;
>>>>    - Pubsub source will be declared using CREATE TABLE DDL statement:
>>>>       - Beam SQL already supports declaring sources like Kafka and
>>>>       Text using CREATE TABLE DDL;
>>>>       - it supports additional configuration using TBLPROPERTIES
>>>>       clause. Currently it takes a text blob, where we can put a JSON
>>>>       configuration;
>>>>       - wrapping PubsubIO into a similar source looks feasible;
>>>>    - The plan is to initially support messages only with JSON payload:
>>>>    -
>>>>       - more payload formats can be added later;
>>>>    - Messages will be fully described in the CREATE TABLE statements:
>>>>       - event timestamps. Source of the timestamp is configurable. It
>>>>       is required by Beam SQL to have an explicit timestamp column for windowing
>>>>       support;
>>>>       - messages attributes map;
>>>>       - JSON payload schema;
>>>>    - Event timestamps will be taken either from publish time or
>>>>    user-specified message attribute (configurable);
>>>>
>>>> Thoughts, ideas, comments?
>>>>
>>>> More details are in the doc here:
>>>> https://docs.google.com/document/d/1wIXTxh-nQ3u694XbF0iEZX_7-b3yi4ad0ML2pcAxYfE
>>>>
>>>>
>>>> Thank you,
>>>> Anton
>>>>
>>>

Re: Pubsub to Beam SQL

Posted by Andrew Pilloud <ap...@google.com>.
I like to avoid magic too. I might not have been entirely clear in what I
was asking. Here is an example of what I had in mind, replacing the
TBLPROPERTIES
with a more generic TIMESTAMP option:

CREATE TABLE  table_name (
  publishTimestamp TIMESTAMP,
  attributes MAP(VARCHAR, VARCHAR),
  payload ROW (
               name VARCHAR,
               age INTEGER,
               isSWE BOOLEAN,
               tags ARRAY(VARCHAR)))
TIMESTAMP attributes["createTime"];

Andrew

On Thu, May 3, 2018 at 12:47 PM Anton Kedin <ke...@google.com> wrote:

> I think it makes sense for the case when timestamp is provided in the
> payload (including pubsub message attributes).  We can mark the field as an
> event timestamp. But if the timestamp is internally defined by the source
> (pubsub message publish time) and not exposed in the event body, then we
> need a source-specific mechanism to extract and map the event timestamp to
> the schema. This is, of course, if we don't automatically add a magic
> timestamp field which Beam SQL can populate behind the scenes and add to
> the schema. I want to avoid this magic path for now.
>
> On Thu, May 3, 2018 at 11:10 AM Andrew Pilloud <ap...@google.com>
> wrote:
>
>> This sounds awesome!
>>
>> Is event timestamp something that we need to specify for every source? If
>> so, I would suggest we add this as a first class option on CREATE TABLE
>> rather then something hidden in TBLPROPERTIES.
>>
>> Andrew
>>
>> On Wed, May 2, 2018 at 10:30 AM Anton Kedin <ke...@google.com> wrote:
>>
>>> Hi
>>>
>>> I am working on adding functionality to support querying Pubsub messages
>>> directly from Beam SQL.
>>>
>>> *Goal*
>>>   Provide Beam users a pure  SQL solution to create the pipelines with
>>> Pubsub as a data source, without the need to set up the pipelines in
>>> Java before applying the query.
>>>
>>> *High level approach*
>>>
>>>    -
>>>    - Build on top of PubsubIO;
>>>    - Pubsub source will be declared using CREATE TABLE DDL statement:
>>>       - Beam SQL already supports declaring sources like Kafka and Text
>>>       using CREATE TABLE DDL;
>>>       - it supports additional configuration using TBLPROPERTIES
>>>       clause. Currently it takes a text blob, where we can put a JSON
>>>       configuration;
>>>       - wrapping PubsubIO into a similar source looks feasible;
>>>    - The plan is to initially support messages only with JSON payload:
>>>    -
>>>       - more payload formats can be added later;
>>>    - Messages will be fully described in the CREATE TABLE statements:
>>>       - event timestamps. Source of the timestamp is configurable. It
>>>       is required by Beam SQL to have an explicit timestamp column for windowing
>>>       support;
>>>       - messages attributes map;
>>>       - JSON payload schema;
>>>    - Event timestamps will be taken either from publish time or
>>>    user-specified message attribute (configurable);
>>>
>>> Thoughts, ideas, comments?
>>>
>>> More details are in the doc here:
>>> https://docs.google.com/document/d/1wIXTxh-nQ3u694XbF0iEZX_7-b3yi4ad0ML2pcAxYfE
>>>
>>>
>>> Thank you,
>>> Anton
>>>
>>

Re: Pubsub to Beam SQL

Posted by Reuven Lax <re...@google.com>.
I think even easier for other sources. PubSub is a tricky one (for us at
least) because Dataflow overrides the Beam native PubSub source with
something different. Kafka is a pure Beam source.

On Thu, May 10, 2018 at 1:39 PM Ismaël Mejía <ie...@gmail.com> wrote:

> Hi, Jumping a bit late to this discussion. This sounds super nice. But I
> could not access the document.
> How hard would it be to do this for other 'unbounded' sources, e.g. Kafka ?
> On Sat, May 5, 2018 at 2:56 AM Andrew Pilloud <ap...@google.com> wrote:
>
> > I don't think we should jump to adding a extension, but TBLPROPERTIES is
> already a DDL extension and it isn't user friendly. We should strive for a
> world where no one needs to use it. SQL needs the timestamp to be exposed
> as a column, we can't hide it without changing the definition of GROUP BY.
> I like Anton's proposal of adding it as an annotation in the column
> definition. That seems even simpler and more user friendly. We might even
> be able to get away with using the PRIMARY KEY keyword.
>
> > Andrew
>
> > On Fri, May 4, 2018 at 12:11 PM Anton Kedin <ke...@google.com> wrote:
>
> >> There are few aspects of the event timestamp definition in SQL, which we
> are talking about here:
>
> >> configuring the source. E.g. for PubsubIO you can choose whether to
> extract event timestamp from the message attributes or the message publish
> time:
>
> >> this is source-specific and cannot be part of the common DDL;
> >> TBLPROPERTIES, on the other hand, is an opaque json blob which exists
> specifically for source configuration;
> >> as Kenn is saying, some sources might not even have such configuration;
> >> at processing time, event timestamp is available in
> ProcessContext.timestamp() regardless of the specifics of the source
> configuration, so it can be extracted the same way for all sources, as
> Raghu said;
>
> >> designating one of the table columns as an event timestamp:
>
> >> query needs to be able to reference the event timestamp so we have to
> declare which column to populate with the event timestamp;
> >> this is common for all sources and we can create a special syntax, e.g.
> "columnName EVENT_TIMESTAMP". It must not contain source-specific
> configuration at this point, in my opinion;
> >> when SQL knows which column is supposed to be the timestamp, then it can
> get it from the ProcessContext.timestamp() and put it into the designated
> field the same way regardless of the source configuration;
>
> >> pubsub-specific message formatting:
>
> >> on top of the above we want to be able to expose pubsub message
> attributes, payload, and timestamp to the user queries, and do it without
> magic or user schema modifications. To do this we can enforce some
> pubsub-specific schema limitations, e.g. by exposing attributes and
> timestamp fields at a top-level schema, with payload going into the second
> level in its own field;
> >> this aspect is not fully implementable until we have support for complex
> types. Until then we cannot map full JSON to the payload field;
>
> >> I will update the doc and the implementation to reflect these comments
> where possible.
>
> >> Thank you,
> >> Anton
>
>
> >> On Fri, May 4, 2018 at 9:48 AM Raghu Angadi <ra...@google.com> wrote:
>
> >>> On Thu, May 3, 2018 at 12:47 PM Anton Kedin <ke...@google.com> wrote:
>
> >>>> I think it makes sense for the case when timestamp is provided in the
> payload (including pubsub message attributes).  We can mark the field as an
> event timestamp. But if the timestamp is internally defined by the source
> (pubsub message publish time) and not exposed in the event body, then we
> need a source-specific mechanism to extract and map the event timestamp to
> the schema. This is, of course, if we don't automatically add a magic
> timestamp field which Beam SQL can populate behind the scenes and add to
> the schema. I want to avoid this magic path for now.
>
>
> >>> Commented on the PR. As Kenn mentioned, every element in Beam has an
> event timestamp, there is no requirement to extract the timestamp by the
> SQL transform. Using the element timestamp takes care of Pubsub publish
> timestamp as well (in fact, this is the default when timestamp attribute is
> not specified in PubsubIO).
>
> >>> How timestamp are customized is specific to each source. That way
> custom timestamp option seem like they belong in TBLPROPERTIES. E.g. for
> KafkaIO, it could specify "logAppendTime", "createTime", or
> "processingTime" etc (though I am not sure how user can provide their own
> custom extractor in Beam SQL, may be it could support a timestamp field in
> json records).
>
> >>> Raghu.
>
>
> >>>> On Thu, May 3, 2018 at 11:10 AM Andrew Pilloud <ap...@google.com>
> wrote:
>
> >>>>> This sounds awesome!
>
> >>>>> Is event timestamp something that we need to specify for every
> source? If so, I would suggest we add this as a first class option on
> CREATE TABLE rather then something hidden in TBLPROPERTIES.
>
> >>>>> Andrew
>
> >>>>> On Wed, May 2, 2018 at 10:30 AM Anton Kedin <ke...@google.com>
> wrote:
>
> >>>>>> Hi
>
> >>>>>> I am working on adding functionality to support querying Pubsub
> messages directly from Beam SQL.
>
> >>>>>> Goal
> >>>>>>    Provide Beam users a pure  SQL solution to create the pipelines
> with Pubsub as a data source, without the need to set up the pipelines in
> Java before applying the query.
>
> >>>>>> High level approach
>
> >>>>>> Build on top of PubsubIO;
> >>>>>> Pubsub source will be declared using CREATE TABLE DDL statement:
>
> >>>>>> Beam SQL already supports declaring sources like Kafka and Text
> using CREATE TABLE DDL;
> >>>>>> it supports additional configuration using TBLPROPERTIES clause.
> Currently it takes a text blob, where we can put a JSON configuration;
> >>>>>> wrapping PubsubIO into a similar source looks feasible;
>
> >>>>>> The plan is to initially support messages only with JSON payload:
>
> >>>>>> more payload formats can be added later;
>
> >>>>>> Messages will be fully described in the CREATE TABLE statements:
>
> >>>>>> event timestamps. Source of the timestamp is configurable. It is
> required by Beam SQL to have an explicit timestamp column for windowing
> support;
> >>>>>> messages attributes map;
> >>>>>> JSON payload schema;
>
> >>>>>> Event timestamps will be taken either from publish time or
> user-specified message attribute (configurable);
>
> >>>>>> Thoughts, ideas, comments?
>
> >>>>>> More details are in the doc here:
>
> https://docs.google.com/document/d/1wIXTxh-nQ3u694XbF0iEZX_7-b3yi4ad0ML2pcAxYfE
>
> >>>>>> Thank you,
> >>>>>> Anton
>

Re: Pubsub to Beam SQL

Posted by Anton Kedin <ke...@google.com>.
Shared the doc.
There is already a table provider for Kafka with CSV records. The
implementation at the moment doesn't touch the IO itself, just wraps it.
Implementing Kafka JSON records can be as easy as wrapping KafkaIO with
JsonToRow
<https://github.com/apache/beam/blob/9c2b43227e1ddac39676f6c09aca1af82a9d4cdb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java>
on top or implementing another SQL-specific transform similar to this
<https://github.com/apache/beam/pull/5253/files#diff-45ffe75359c57e7958a1d508c8a3657b>
.

On Thu, May 10, 2018 at 1:39 PM Ismaël Mejía <ie...@gmail.com> wrote:

> Hi, Jumping a bit late to this discussion. This sounds super nice. But I
> could not access the document.
> How hard would it be to do this for other 'unbounded' sources, e.g. Kafka ?
> On Sat, May 5, 2018 at 2:56 AM Andrew Pilloud <ap...@google.com> wrote:
>
> > I don't think we should jump to adding a extension, but TBLPROPERTIES is
> already a DDL extension and it isn't user friendly. We should strive for a
> world where no one needs to use it. SQL needs the timestamp to be exposed
> as a column, we can't hide it without changing the definition of GROUP BY.
> I like Anton's proposal of adding it as an annotation in the column
> definition. That seems even simpler and more user friendly. We might even
> be able to get away with using the PRIMARY KEY keyword.
>
> > Andrew
>
> > On Fri, May 4, 2018 at 12:11 PM Anton Kedin <ke...@google.com> wrote:
>
> >> There are few aspects of the event timestamp definition in SQL, which we
> are talking about here:
>
> >> configuring the source. E.g. for PubsubIO you can choose whether to
> extract event timestamp from the message attributes or the message publish
> time:
>
> >> this is source-specific and cannot be part of the common DDL;
> >> TBLPROPERTIES, on the other hand, is an opaque json blob which exists
> specifically for source configuration;
> >> as Kenn is saying, some sources might not even have such configuration;
> >> at processing time, event timestamp is available in
> ProcessContext.timestamp() regardless of the specifics of the source
> configuration, so it can be extracted the same way for all sources, as
> Raghu said;
>
> >> designating one of the table columns as an event timestamp:
>
> >> query needs to be able to reference the event timestamp so we have to
> declare which column to populate with the event timestamp;
> >> this is common for all sources and we can create a special syntax, e.g.
> "columnName EVENT_TIMESTAMP". It must not contain source-specific
> configuration at this point, in my opinion;
> >> when SQL knows which column is supposed to be the timestamp, then it can
> get it from the ProcessContext.timestamp() and put it into the designated
> field the same way regardless of the source configuration;
>
> >> pubsub-specific message formatting:
>
> >> on top of the above we want to be able to expose pubsub message
> attributes, payload, and timestamp to the user queries, and do it without
> magic or user schema modifications. To do this we can enforce some
> pubsub-specific schema limitations, e.g. by exposing attributes and
> timestamp fields at a top-level schema, with payload going into the second
> level in its own field;
> >> this aspect is not fully implementable until we have support for complex
> types. Until then we cannot map full JSON to the payload field;
>
> >> I will update the doc and the implementation to reflect these comments
> where possible.
>
> >> Thank you,
> >> Anton
>
>
> >> On Fri, May 4, 2018 at 9:48 AM Raghu Angadi <ra...@google.com> wrote:
>
> >>> On Thu, May 3, 2018 at 12:47 PM Anton Kedin <ke...@google.com> wrote:
>
> >>>> I think it makes sense for the case when timestamp is provided in the
> payload (including pubsub message attributes).  We can mark the field as an
> event timestamp. But if the timestamp is internally defined by the source
> (pubsub message publish time) and not exposed in the event body, then we
> need a source-specific mechanism to extract and map the event timestamp to
> the schema. This is, of course, if we don't automatically add a magic
> timestamp field which Beam SQL can populate behind the scenes and add to
> the schema. I want to avoid this magic path for now.
>
>
> >>> Commented on the PR. As Kenn mentioned, every element in Beam has an
> event timestamp, there is no requirement to extract the timestamp by the
> SQL transform. Using the element timestamp takes care of Pubsub publish
> timestamp as well (in fact, this is the default when timestamp attribute is
> not specified in PubsubIO).
>
> >>> How timestamp are customized is specific to each source. That way
> custom timestamp option seem like they belong in TBLPROPERTIES. E.g. for
> KafkaIO, it could specify "logAppendTime", "createTime", or
> "processingTime" etc (though I am not sure how user can provide their own
> custom extractor in Beam SQL, may be it could support a timestamp field in
> json records).
>
> >>> Raghu.
>
>
> >>>> On Thu, May 3, 2018 at 11:10 AM Andrew Pilloud <ap...@google.com>
> wrote:
>
> >>>>> This sounds awesome!
>
> >>>>> Is event timestamp something that we need to specify for every
> source? If so, I would suggest we add this as a first class option on
> CREATE TABLE rather then something hidden in TBLPROPERTIES.
>
> >>>>> Andrew
>
> >>>>> On Wed, May 2, 2018 at 10:30 AM Anton Kedin <ke...@google.com>
> wrote:
>
> >>>>>> Hi
>
> >>>>>> I am working on adding functionality to support querying Pubsub
> messages directly from Beam SQL.
>
> >>>>>> Goal
> >>>>>>    Provide Beam users a pure  SQL solution to create the pipelines
> with Pubsub as a data source, without the need to set up the pipelines in
> Java before applying the query.
>
> >>>>>> High level approach
>
> >>>>>> Build on top of PubsubIO;
> >>>>>> Pubsub source will be declared using CREATE TABLE DDL statement:
>
> >>>>>> Beam SQL already supports declaring sources like Kafka and Text
> using CREATE TABLE DDL;
> >>>>>> it supports additional configuration using TBLPROPERTIES clause.
> Currently it takes a text blob, where we can put a JSON configuration;
> >>>>>> wrapping PubsubIO into a similar source looks feasible;
>
> >>>>>> The plan is to initially support messages only with JSON payload:
>
> >>>>>> more payload formats can be added later;
>
> >>>>>> Messages will be fully described in the CREATE TABLE statements:
>
> >>>>>> event timestamps. Source of the timestamp is configurable. It is
> required by Beam SQL to have an explicit timestamp column for windowing
> support;
> >>>>>> messages attributes map;
> >>>>>> JSON payload schema;
>
> >>>>>> Event timestamps will be taken either from publish time or
> user-specified message attribute (configurable);
>
> >>>>>> Thoughts, ideas, comments?
>
> >>>>>> More details are in the doc here:
>
> https://docs.google.com/document/d/1wIXTxh-nQ3u694XbF0iEZX_7-b3yi4ad0ML2pcAxYfE
>
> >>>>>> Thank you,
> >>>>>> Anton
>

Re: Pubsub to Beam SQL

Posted by Ismaël Mejía <ie...@gmail.com>.
Hi, Jumping a bit late to this discussion. This sounds super nice. But I
could not access the document.
How hard would it be to do this for other 'unbounded' sources, e.g. Kafka ?
On Sat, May 5, 2018 at 2:56 AM Andrew Pilloud <ap...@google.com> wrote:

> I don't think we should jump to adding a extension, but TBLPROPERTIES is
already a DDL extension and it isn't user friendly. We should strive for a
world where no one needs to use it. SQL needs the timestamp to be exposed
as a column, we can't hide it without changing the definition of GROUP BY.
I like Anton's proposal of adding it as an annotation in the column
definition. That seems even simpler and more user friendly. We might even
be able to get away with using the PRIMARY KEY keyword.

> Andrew

> On Fri, May 4, 2018 at 12:11 PM Anton Kedin <ke...@google.com> wrote:

>> There are few aspects of the event timestamp definition in SQL, which we
are talking about here:

>> configuring the source. E.g. for PubsubIO you can choose whether to
extract event timestamp from the message attributes or the message publish
time:

>> this is source-specific and cannot be part of the common DDL;
>> TBLPROPERTIES, on the other hand, is an opaque json blob which exists
specifically for source configuration;
>> as Kenn is saying, some sources might not even have such configuration;
>> at processing time, event timestamp is available in
ProcessContext.timestamp() regardless of the specifics of the source
configuration, so it can be extracted the same way for all sources, as
Raghu said;

>> designating one of the table columns as an event timestamp:

>> query needs to be able to reference the event timestamp so we have to
declare which column to populate with the event timestamp;
>> this is common for all sources and we can create a special syntax, e.g.
"columnName EVENT_TIMESTAMP". It must not contain source-specific
configuration at this point, in my opinion;
>> when SQL knows which column is supposed to be the timestamp, then it can
get it from the ProcessContext.timestamp() and put it into the designated
field the same way regardless of the source configuration;

>> pubsub-specific message formatting:

>> on top of the above we want to be able to expose pubsub message
attributes, payload, and timestamp to the user queries, and do it without
magic or user schema modifications. To do this we can enforce some
pubsub-specific schema limitations, e.g. by exposing attributes and
timestamp fields at a top-level schema, with payload going into the second
level in its own field;
>> this aspect is not fully implementable until we have support for complex
types. Until then we cannot map full JSON to the payload field;

>> I will update the doc and the implementation to reflect these comments
where possible.

>> Thank you,
>> Anton


>> On Fri, May 4, 2018 at 9:48 AM Raghu Angadi <ra...@google.com> wrote:

>>> On Thu, May 3, 2018 at 12:47 PM Anton Kedin <ke...@google.com> wrote:

>>>> I think it makes sense for the case when timestamp is provided in the
payload (including pubsub message attributes).  We can mark the field as an
event timestamp. But if the timestamp is internally defined by the source
(pubsub message publish time) and not exposed in the event body, then we
need a source-specific mechanism to extract and map the event timestamp to
the schema. This is, of course, if we don't automatically add a magic
timestamp field which Beam SQL can populate behind the scenes and add to
the schema. I want to avoid this magic path for now.


>>> Commented on the PR. As Kenn mentioned, every element in Beam has an
event timestamp, there is no requirement to extract the timestamp by the
SQL transform. Using the element timestamp takes care of Pubsub publish
timestamp as well (in fact, this is the default when timestamp attribute is
not specified in PubsubIO).

>>> How timestamp are customized is specific to each source. That way
custom timestamp option seem like they belong in TBLPROPERTIES. E.g. for
KafkaIO, it could specify "logAppendTime", "createTime", or
"processingTime" etc (though I am not sure how user can provide their own
custom extractor in Beam SQL, may be it could support a timestamp field in
json records).

>>> Raghu.


>>>> On Thu, May 3, 2018 at 11:10 AM Andrew Pilloud <ap...@google.com>
wrote:

>>>>> This sounds awesome!

>>>>> Is event timestamp something that we need to specify for every
source? If so, I would suggest we add this as a first class option on
CREATE TABLE rather then something hidden in TBLPROPERTIES.

>>>>> Andrew

>>>>> On Wed, May 2, 2018 at 10:30 AM Anton Kedin <ke...@google.com> wrote:

>>>>>> Hi

>>>>>> I am working on adding functionality to support querying Pubsub
messages directly from Beam SQL.

>>>>>> Goal
>>>>>>    Provide Beam users a pure  SQL solution to create the pipelines
with Pubsub as a data source, without the need to set up the pipelines in
Java before applying the query.

>>>>>> High level approach

>>>>>> Build on top of PubsubIO;
>>>>>> Pubsub source will be declared using CREATE TABLE DDL statement:

>>>>>> Beam SQL already supports declaring sources like Kafka and Text
using CREATE TABLE DDL;
>>>>>> it supports additional configuration using TBLPROPERTIES clause.
Currently it takes a text blob, where we can put a JSON configuration;
>>>>>> wrapping PubsubIO into a similar source looks feasible;

>>>>>> The plan is to initially support messages only with JSON payload:

>>>>>> more payload formats can be added later;

>>>>>> Messages will be fully described in the CREATE TABLE statements:

>>>>>> event timestamps. Source of the timestamp is configurable. It is
required by Beam SQL to have an explicit timestamp column for windowing
support;
>>>>>> messages attributes map;
>>>>>> JSON payload schema;

>>>>>> Event timestamps will be taken either from publish time or
user-specified message attribute (configurable);

>>>>>> Thoughts, ideas, comments?

>>>>>> More details are in the doc here:
https://docs.google.com/document/d/1wIXTxh-nQ3u694XbF0iEZX_7-b3yi4ad0ML2pcAxYfE

>>>>>> Thank you,
>>>>>> Anton

Re: Pubsub to Beam SQL

Posted by Andrew Pilloud <ap...@google.com>.
I don't think we should jump to adding a extension, but TBLPROPERTIES is
already a DDL extension and it isn't user friendly. We should strive for a
world where no one needs to use it. SQL needs the timestamp to be exposed
as a column, we can't hide it without changing the definition of GROUP BY.
I like Anton's proposal of adding it as an annotation in the column
definition. That seems even simpler and more user friendly. We might even
be able to get away with using the PRIMARY KEY keyword.

Andrew

On Fri, May 4, 2018 at 12:11 PM Anton Kedin <ke...@google.com> wrote:

> There are few aspects of the event timestamp definition in SQL, which we
> are talking about here:
>
>    - configuring the source. E.g. for PubsubIO you can choose whether to
>    extract event timestamp from the message attributes or the message publish
>    time:
>    - this is source-specific and cannot be part of the common DDL;
>       - TBLPROPERTIES, on the other hand, is an opaque json blob which
>       exists specifically for source configuration;
>       - as Kenn is saying, some sources might not even have such
>       configuration;
>       - at processing time, event timestamp is available in
>       ProcessContext.timestamp() regardless of the specifics of the source
>       configuration, so it can be extracted the same way for all sources, as
>       Raghu said;
>    - designating one of the table columns as an event timestamp:
>       - query needs to be able to reference the event timestamp so we
>       have to declare which column to populate with the event timestamp;
>       - this is common for all sources and we can create a special
>       syntax, e.g. "columnName EVENT_TIMESTAMP". It must not contain
>       source-specific configuration at this point, in my opinion;
>       - when SQL knows which column is supposed to be the timestamp, then
>       it can get it from the ProcessContext.timestamp() and put it into the
>       designated field the same way regardless of the source configuration;
>       - pubsub-specific message formatting:
>       - on top of the above we want to be able to expose pubsub message
>       attributes, payload, and timestamp to the user queries, and do it without
>       magic or user schema modifications. To do this we can enforce some
>       pubsub-specific schema limitations, e.g. by exposing attributes and
>       timestamp fields at a top-level schema, with payload going into the second
>       level in its own field;
>       - this aspect is not fully implementable until we have support for
>       complex types. Until then we cannot map full JSON to the payload field;
>
> I will update the doc and the implementation to reflect these comments
> where possible.
>
> Thank you,
> Anton
>
>
> On Fri, May 4, 2018 at 9:48 AM Raghu Angadi <ra...@google.com> wrote:
>
>> On Thu, May 3, 2018 at 12:47 PM Anton Kedin <ke...@google.com> wrote:
>>
>>> I think it makes sense for the case when timestamp is provided in the
>>> payload (including pubsub message attributes).  We can mark the field as an
>>> event timestamp. But if the timestamp is internally defined by the source
>>> (pubsub message publish time) and not exposed in the event body, then we
>>> need a source-specific mechanism to extract and map the event timestamp to
>>> the schema. This is, of course, if we don't automatically add a magic
>>> timestamp field which Beam SQL can populate behind the scenes and add to
>>> the schema. I want to avoid this magic path for now.
>>>
>>
>> Commented on the PR. As Kenn mentioned, every element in Beam has an
>> event timestamp, there is no requirement to extract the timestamp by the
>> SQL transform. Using the element timestamp takes care of Pubsub publish
>> timestamp as well (in fact, this is the default when timestamp attribute is
>> not specified in PubsubIO).
>>
>> How timestamp are customized is specific to each source. That way custom
>> timestamp option seem like they belong in TBLPROPERTIES. E.g. for KafkaIO,
>> it could specify "logAppendTime", "createTime", or "processingTime" etc
>> (though I am not sure how user can provide their own custom extractor in
>> Beam SQL, may be it could support a timestamp field in json records).
>>
>> Raghu.
>>
>>>
>>> On Thu, May 3, 2018 at 11:10 AM Andrew Pilloud <ap...@google.com>
>>> wrote:
>>>
>>>> This sounds awesome!
>>>>
>>>> Is event timestamp something that we need to specify for every source?
>>>> If so, I would suggest we add this as a first class option on CREATE TABLE
>>>> rather then something hidden in TBLPROPERTIES.
>>>>
>>>> Andrew
>>>>
>>>> On Wed, May 2, 2018 at 10:30 AM Anton Kedin <ke...@google.com> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> I am working on adding functionality to support querying Pubsub
>>>>> messages directly from Beam SQL.
>>>>>
>>>>> *Goal*
>>>>>   Provide Beam users a pure  SQL solution to create the pipelines with
>>>>> Pubsub as a data source, without the need to set up the pipelines in
>>>>> Java before applying the query.
>>>>>
>>>>> *High level approach*
>>>>>
>>>>>    -
>>>>>    - Build on top of PubsubIO;
>>>>>    - Pubsub source will be declared using CREATE TABLE DDL statement:
>>>>>       - Beam SQL already supports declaring sources like Kafka and
>>>>>       Text using CREATE TABLE DDL;
>>>>>       - it supports additional configuration using TBLPROPERTIES
>>>>>       clause. Currently it takes a text blob, where we can put a JSON
>>>>>       configuration;
>>>>>       - wrapping PubsubIO into a similar source looks feasible;
>>>>>    - The plan is to initially support messages only with JSON payload:
>>>>>    -
>>>>>       - more payload formats can be added later;
>>>>>    - Messages will be fully described in the CREATE TABLE statements:
>>>>>       - event timestamps. Source of the timestamp is configurable. It
>>>>>       is required by Beam SQL to have an explicit timestamp column for windowing
>>>>>       support;
>>>>>       - messages attributes map;
>>>>>       - JSON payload schema;
>>>>>    - Event timestamps will be taken either from publish time or
>>>>>    user-specified message attribute (configurable);
>>>>>
>>>>> Thoughts, ideas, comments?
>>>>>
>>>>> More details are in the doc here:
>>>>> https://docs.google.com/document/d/1wIXTxh-nQ3u694XbF0iEZX_7-b3yi4ad0ML2pcAxYfE
>>>>>
>>>>>
>>>>> Thank you,
>>>>> Anton
>>>>>
>>>>

Re: Pubsub to Beam SQL

Posted by Anton Kedin <ke...@google.com>.
There are few aspects of the event timestamp definition in SQL, which we
are talking about here:

   - configuring the source. E.g. for PubsubIO you can choose whether to
   extract event timestamp from the message attributes or the message publish
   time:
   - this is source-specific and cannot be part of the common DDL;
      - TBLPROPERTIES, on the other hand, is an opaque json blob which
      exists specifically for source configuration;
      - as Kenn is saying, some sources might not even have such
      configuration;
      - at processing time, event timestamp is available in
      ProcessContext.timestamp() regardless of the specifics of the source
      configuration, so it can be extracted the same way for all sources, as
      Raghu said;
   - designating one of the table columns as an event timestamp:
      - query needs to be able to reference the event timestamp so we have
      to declare which column to populate with the event timestamp;
      - this is common for all sources and we can create a special syntax,
      e.g. "columnName EVENT_TIMESTAMP". It must not contain source-specific
      configuration at this point, in my opinion;
      - when SQL knows which column is supposed to be the timestamp, then
      it can get it from the ProcessContext.timestamp() and put it into the
      designated field the same way regardless of the source configuration;
      - pubsub-specific message formatting:
      - on top of the above we want to be able to expose pubsub message
      attributes, payload, and timestamp to the user queries, and do it without
      magic or user schema modifications. To do this we can enforce some
      pubsub-specific schema limitations, e.g. by exposing attributes and
      timestamp fields at a top-level schema, with payload going into
the second
      level in its own field;
      - this aspect is not fully implementable until we have support for
      complex types. Until then we cannot map full JSON to the payload field;

I will update the doc and the implementation to reflect these comments
where possible.

Thank you,
Anton


On Fri, May 4, 2018 at 9:48 AM Raghu Angadi <ra...@google.com> wrote:

> On Thu, May 3, 2018 at 12:47 PM Anton Kedin <ke...@google.com> wrote:
>
>> I think it makes sense for the case when timestamp is provided in the
>> payload (including pubsub message attributes).  We can mark the field as an
>> event timestamp. But if the timestamp is internally defined by the source
>> (pubsub message publish time) and not exposed in the event body, then we
>> need a source-specific mechanism to extract and map the event timestamp to
>> the schema. This is, of course, if we don't automatically add a magic
>> timestamp field which Beam SQL can populate behind the scenes and add to
>> the schema. I want to avoid this magic path for now.
>>
>
> Commented on the PR. As Kenn mentioned, every element in Beam has an event
> timestamp, there is no requirement to extract the timestamp by the SQL
> transform. Using the element timestamp takes care of Pubsub publish
> timestamp as well (in fact, this is the default when timestamp attribute is
> not specified in PubsubIO).
>
> How timestamp are customized is specific to each source. That way custom
> timestamp option seem like they belong in TBLPROPERTIES. E.g. for KafkaIO,
> it could specify "logAppendTime", "createTime", or "processingTime" etc
> (though I am not sure how user can provide their own custom extractor in
> Beam SQL, may be it could support a timestamp field in json records).
>
> Raghu.
>
>>
>> On Thu, May 3, 2018 at 11:10 AM Andrew Pilloud <ap...@google.com>
>> wrote:
>>
>>> This sounds awesome!
>>>
>>> Is event timestamp something that we need to specify for every source?
>>> If so, I would suggest we add this as a first class option on CREATE TABLE
>>> rather then something hidden in TBLPROPERTIES.
>>>
>>> Andrew
>>>
>>> On Wed, May 2, 2018 at 10:30 AM Anton Kedin <ke...@google.com> wrote:
>>>
>>>> Hi
>>>>
>>>> I am working on adding functionality to support querying Pubsub
>>>> messages directly from Beam SQL.
>>>>
>>>> *Goal*
>>>>   Provide Beam users a pure  SQL solution to create the pipelines with
>>>> Pubsub as a data source, without the need to set up the pipelines in
>>>> Java before applying the query.
>>>>
>>>> *High level approach*
>>>>
>>>>    -
>>>>    - Build on top of PubsubIO;
>>>>    - Pubsub source will be declared using CREATE TABLE DDL statement:
>>>>       - Beam SQL already supports declaring sources like Kafka and
>>>>       Text using CREATE TABLE DDL;
>>>>       - it supports additional configuration using TBLPROPERTIES
>>>>       clause. Currently it takes a text blob, where we can put a JSON
>>>>       configuration;
>>>>       - wrapping PubsubIO into a similar source looks feasible;
>>>>    - The plan is to initially support messages only with JSON payload:
>>>>    -
>>>>       - more payload formats can be added later;
>>>>    - Messages will be fully described in the CREATE TABLE statements:
>>>>       - event timestamps. Source of the timestamp is configurable. It
>>>>       is required by Beam SQL to have an explicit timestamp column for windowing
>>>>       support;
>>>>       - messages attributes map;
>>>>       - JSON payload schema;
>>>>    - Event timestamps will be taken either from publish time or
>>>>    user-specified message attribute (configurable);
>>>>
>>>> Thoughts, ideas, comments?
>>>>
>>>> More details are in the doc here:
>>>> https://docs.google.com/document/d/1wIXTxh-nQ3u694XbF0iEZX_7-b3yi4ad0ML2pcAxYfE
>>>>
>>>>
>>>> Thank you,
>>>> Anton
>>>>
>>>

Re: Pubsub to Beam SQL

Posted by Raghu Angadi <ra...@google.com>.
On Thu, May 3, 2018 at 12:47 PM Anton Kedin <ke...@google.com> wrote:

> I think it makes sense for the case when timestamp is provided in the
> payload (including pubsub message attributes).  We can mark the field as an
> event timestamp. But if the timestamp is internally defined by the source
> (pubsub message publish time) and not exposed in the event body, then we
> need a source-specific mechanism to extract and map the event timestamp to
> the schema. This is, of course, if we don't automatically add a magic
> timestamp field which Beam SQL can populate behind the scenes and add to
> the schema. I want to avoid this magic path for now.
>

Commented on the PR. As Kenn mentioned, every element in Beam has an event
timestamp, there is no requirement to extract the timestamp by the SQL
transform. Using the element timestamp takes care of Pubsub publish
timestamp as well (in fact, this is the default when timestamp attribute is
not specified in PubsubIO).

How timestamp are customized is specific to each source. That way custom
timestamp option seem like they belong in TBLPROPERTIES. E.g. for KafkaIO,
it could specify "logAppendTime", "createTime", or "processingTime" etc
(though I am not sure how user can provide their own custom extractor in
Beam SQL, may be it could support a timestamp field in json records).

Raghu.

>
> On Thu, May 3, 2018 at 11:10 AM Andrew Pilloud <ap...@google.com>
> wrote:
>
>> This sounds awesome!
>>
>> Is event timestamp something that we need to specify for every source? If
>> so, I would suggest we add this as a first class option on CREATE TABLE
>> rather then something hidden in TBLPROPERTIES.
>>
>> Andrew
>>
>> On Wed, May 2, 2018 at 10:30 AM Anton Kedin <ke...@google.com> wrote:
>>
>>> Hi
>>>
>>> I am working on adding functionality to support querying Pubsub messages
>>> directly from Beam SQL.
>>>
>>> *Goal*
>>>   Provide Beam users a pure  SQL solution to create the pipelines with
>>> Pubsub as a data source, without the need to set up the pipelines in
>>> Java before applying the query.
>>>
>>> *High level approach*
>>>
>>>    -
>>>    - Build on top of PubsubIO;
>>>    - Pubsub source will be declared using CREATE TABLE DDL statement:
>>>       - Beam SQL already supports declaring sources like Kafka and Text
>>>       using CREATE TABLE DDL;
>>>       - it supports additional configuration using TBLPROPERTIES
>>>       clause. Currently it takes a text blob, where we can put a JSON
>>>       configuration;
>>>       - wrapping PubsubIO into a similar source looks feasible;
>>>    - The plan is to initially support messages only with JSON payload:
>>>    -
>>>       - more payload formats can be added later;
>>>    - Messages will be fully described in the CREATE TABLE statements:
>>>       - event timestamps. Source of the timestamp is configurable. It
>>>       is required by Beam SQL to have an explicit timestamp column for windowing
>>>       support;
>>>       - messages attributes map;
>>>       - JSON payload schema;
>>>    - Event timestamps will be taken either from publish time or
>>>    user-specified message attribute (configurable);
>>>
>>> Thoughts, ideas, comments?
>>>
>>> More details are in the doc here:
>>> https://docs.google.com/document/d/1wIXTxh-nQ3u694XbF0iEZX_7-b3yi4ad0ML2pcAxYfE
>>>
>>>
>>> Thank you,
>>> Anton
>>>
>>

Re: Pubsub to Beam SQL

Posted by Anton Kedin <ke...@google.com>.
I think it makes sense for the case when timestamp is provided in the
payload (including pubsub message attributes).  We can mark the field as an
event timestamp. But if the timestamp is internally defined by the source
(pubsub message publish time) and not exposed in the event body, then we
need a source-specific mechanism to extract and map the event timestamp to
the schema. This is, of course, if we don't automatically add a magic
timestamp field which Beam SQL can populate behind the scenes and add to
the schema. I want to avoid this magic path for now.

On Thu, May 3, 2018 at 11:10 AM Andrew Pilloud <ap...@google.com> wrote:

> This sounds awesome!
>
> Is event timestamp something that we need to specify for every source? If
> so, I would suggest we add this as a first class option on CREATE TABLE
> rather then something hidden in TBLPROPERTIES.
>
> Andrew
>
> On Wed, May 2, 2018 at 10:30 AM Anton Kedin <ke...@google.com> wrote:
>
>> Hi
>>
>> I am working on adding functionality to support querying Pubsub messages
>> directly from Beam SQL.
>>
>> *Goal*
>>   Provide Beam users a pure  SQL solution to create the pipelines with
>> Pubsub as a data source, without the need to set up the pipelines in
>> Java before applying the query.
>>
>> *High level approach*
>>
>>    -
>>    - Build on top of PubsubIO;
>>    - Pubsub source will be declared using CREATE TABLE DDL statement:
>>       - Beam SQL already supports declaring sources like Kafka and Text
>>       using CREATE TABLE DDL;
>>       - it supports additional configuration using TBLPROPERTIES clause.
>>       Currently it takes a text blob, where we can put a JSON configuration;
>>       - wrapping PubsubIO into a similar source looks feasible;
>>    - The plan is to initially support messages only with JSON payload:
>>    -
>>       - more payload formats can be added later;
>>    - Messages will be fully described in the CREATE TABLE statements:
>>       - event timestamps. Source of the timestamp is configurable. It is
>>       required by Beam SQL to have an explicit timestamp column for windowing
>>       support;
>>       - messages attributes map;
>>       - JSON payload schema;
>>    - Event timestamps will be taken either from publish time or
>>    user-specified message attribute (configurable);
>>
>> Thoughts, ideas, comments?
>>
>> More details are in the doc here:
>> https://docs.google.com/document/d/1wIXTxh-nQ3u694XbF0iEZX_7-b3yi4ad0ML2pcAxYfE
>>
>>
>> Thank you,
>> Anton
>>
>

Re: Pubsub to Beam SQL

Posted by Andrew Pilloud <ap...@google.com>.
This sounds awesome!

Is event timestamp something that we need to specify for every source? If
so, I would suggest we add this as a first class option on CREATE TABLE
rather then something hidden in TBLPROPERTIES.

Andrew

On Wed, May 2, 2018 at 10:30 AM Anton Kedin <ke...@google.com> wrote:

> Hi
>
> I am working on adding functionality to support querying Pubsub messages
> directly from Beam SQL.
>
> *Goal*
>   Provide Beam users a pure  SQL solution to create the pipelines with
> Pubsub as a data source, without the need to set up the pipelines in Java
> before applying the query.
>
> *High level approach*
>
>    -
>    - Build on top of PubsubIO;
>    - Pubsub source will be declared using CREATE TABLE DDL statement:
>       - Beam SQL already supports declaring sources like Kafka and Text
>       using CREATE TABLE DDL;
>       - it supports additional configuration using TBLPROPERTIES clause.
>       Currently it takes a text blob, where we can put a JSON configuration;
>       - wrapping PubsubIO into a similar source looks feasible;
>    - The plan is to initially support messages only with JSON payload:
>    -
>       - more payload formats can be added later;
>    - Messages will be fully described in the CREATE TABLE statements:
>       - event timestamps. Source of the timestamp is configurable. It is
>       required by Beam SQL to have an explicit timestamp column for windowing
>       support;
>       - messages attributes map;
>       - JSON payload schema;
>    - Event timestamps will be taken either from publish time or
>    user-specified message attribute (configurable);
>
> Thoughts, ideas, comments?
>
> More details are in the doc here:
> https://docs.google.com/document/d/1wIXTxh-nQ3u694XbF0iEZX_7-b3yi4ad0ML2pcAxYfE
>
>
> Thank you,
> Anton
>

Re: Pubsub to Beam SQL

Posted by Reuven Lax <re...@google.com>.
I believe PubSubIO already exposes the publish timestamp if no timestamp
attribute is set.

On Thu, May 3, 2018 at 12:52 PM Anton Kedin <ke...@google.com> wrote:

> A SQL-specific wrapper+custom transforms for PubsubIO should suffice. We
> will probably need to a way to expose a message publish timestamp if we
> want to use it as an event timestamp, but that will be consumed by the same
> wrapper/transform without adding anything schema or SQL-specific to
> PubsubIO itself.
>
> On Thu, May 3, 2018 at 11:44 AM Reuven Lax <re...@google.com> wrote:
>
>> Are you planning on integrating this directly into PubSubIO, or add a
>> follow-on transform?
>>
>> On Wed, May 2, 2018 at 10:30 AM Anton Kedin <ke...@google.com> wrote:
>>
>>> Hi
>>>
>>> I am working on adding functionality to support querying Pubsub messages
>>> directly from Beam SQL.
>>>
>>> *Goal*
>>>   Provide Beam users a pure  SQL solution to create the pipelines with
>>> Pubsub as a data source, without the need to set up the pipelines in
>>> Java before applying the query.
>>>
>>> *High level approach*
>>>
>>>    -
>>>    - Build on top of PubsubIO;
>>>    - Pubsub source will be declared using CREATE TABLE DDL statement:
>>>       - Beam SQL already supports declaring sources like Kafka and Text
>>>       using CREATE TABLE DDL;
>>>       - it supports additional configuration using TBLPROPERTIES
>>>       clause. Currently it takes a text blob, where we can put a JSON
>>>       configuration;
>>>       - wrapping PubsubIO into a similar source looks feasible;
>>>    - The plan is to initially support messages only with JSON payload:
>>>    -
>>>       - more payload formats can be added later;
>>>    - Messages will be fully described in the CREATE TABLE statements:
>>>       - event timestamps. Source of the timestamp is configurable. It
>>>       is required by Beam SQL to have an explicit timestamp column for windowing
>>>       support;
>>>       - messages attributes map;
>>>       - JSON payload schema;
>>>    - Event timestamps will be taken either from publish time or
>>>    user-specified message attribute (configurable);
>>>
>>> Thoughts, ideas, comments?
>>>
>>> More details are in the doc here:
>>> https://docs.google.com/document/d/1wIXTxh-nQ3u694XbF0iEZX_7-b3yi4ad0ML2pcAxYfE
>>>
>>>
>>> Thank you,
>>> Anton
>>>
>>

Re: Pubsub to Beam SQL

Posted by Anton Kedin <ke...@google.com>.
A SQL-specific wrapper+custom transforms for PubsubIO should suffice. We
will probably need to a way to expose a message publish timestamp if we
want to use it as an event timestamp, but that will be consumed by the same
wrapper/transform without adding anything schema or SQL-specific to
PubsubIO itself.

On Thu, May 3, 2018 at 11:44 AM Reuven Lax <re...@google.com> wrote:

> Are you planning on integrating this directly into PubSubIO, or add a
> follow-on transform?
>
> On Wed, May 2, 2018 at 10:30 AM Anton Kedin <ke...@google.com> wrote:
>
>> Hi
>>
>> I am working on adding functionality to support querying Pubsub messages
>> directly from Beam SQL.
>>
>> *Goal*
>>   Provide Beam users a pure  SQL solution to create the pipelines with
>> Pubsub as a data source, without the need to set up the pipelines in
>> Java before applying the query.
>>
>> *High level approach*
>>
>>    -
>>    - Build on top of PubsubIO;
>>    - Pubsub source will be declared using CREATE TABLE DDL statement:
>>       - Beam SQL already supports declaring sources like Kafka and Text
>>       using CREATE TABLE DDL;
>>       - it supports additional configuration using TBLPROPERTIES clause.
>>       Currently it takes a text blob, where we can put a JSON configuration;
>>       - wrapping PubsubIO into a similar source looks feasible;
>>    - The plan is to initially support messages only with JSON payload:
>>    -
>>       - more payload formats can be added later;
>>    - Messages will be fully described in the CREATE TABLE statements:
>>       - event timestamps. Source of the timestamp is configurable. It is
>>       required by Beam SQL to have an explicit timestamp column for windowing
>>       support;
>>       - messages attributes map;
>>       - JSON payload schema;
>>    - Event timestamps will be taken either from publish time or
>>    user-specified message attribute (configurable);
>>
>> Thoughts, ideas, comments?
>>
>> More details are in the doc here:
>> https://docs.google.com/document/d/1wIXTxh-nQ3u694XbF0iEZX_7-b3yi4ad0ML2pcAxYfE
>>
>>
>> Thank you,
>> Anton
>>
>

Re: Pubsub to Beam SQL

Posted by Reuven Lax <re...@google.com>.
Are you planning on integrating this directly into PubSubIO, or add a
follow-on transform?

On Wed, May 2, 2018 at 10:30 AM Anton Kedin <ke...@google.com> wrote:

> Hi
>
> I am working on adding functionality to support querying Pubsub messages
> directly from Beam SQL.
>
> *Goal*
>   Provide Beam users a pure  SQL solution to create the pipelines with
> Pubsub as a data source, without the need to set up the pipelines in Java
> before applying the query.
>
> *High level approach*
>
>    -
>    - Build on top of PubsubIO;
>    - Pubsub source will be declared using CREATE TABLE DDL statement:
>       - Beam SQL already supports declaring sources like Kafka and Text
>       using CREATE TABLE DDL;
>       - it supports additional configuration using TBLPROPERTIES clause.
>       Currently it takes a text blob, where we can put a JSON configuration;
>       - wrapping PubsubIO into a similar source looks feasible;
>    - The plan is to initially support messages only with JSON payload:
>    -
>       - more payload formats can be added later;
>    - Messages will be fully described in the CREATE TABLE statements:
>       - event timestamps. Source of the timestamp is configurable. It is
>       required by Beam SQL to have an explicit timestamp column for windowing
>       support;
>       - messages attributes map;
>       - JSON payload schema;
>    - Event timestamps will be taken either from publish time or
>    user-specified message attribute (configurable);
>
> Thoughts, ideas, comments?
>
> More details are in the doc here:
> https://docs.google.com/document/d/1wIXTxh-nQ3u694XbF0iEZX_7-b3yi4ad0ML2pcAxYfE
>
>
> Thank you,
> Anton
>