You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by David Sabater Dinter <da...@google.com> on 2019/10/09 13:49:35 UTC

Re: Meta driven pipelines code (metabeam). Should it be part of Beam extensions?

Hi both,
I bit late but perhaps this can be added to the Beam patterns section
<https://beam.apache.org/documentation/patterns/overview/>?
You can use the below templates:
Jira
<https://issues.apache.org/jira/browse/BEAM-7449?jql=labels%20%3D%20pipeline-patterns>
Github PR <https://github.com/apache/beam/pull/8732>


On Fri, 6 Sep 2019 at 18:46, Alex Van Boxel <al...@vanboxel.be> wrote:

> Yes, on documentation, blog, all aligned. And I prefer todo it within
> Beam. If it's stand-alone somewhere on github it will not have that much
> value because people wont find it.
>
> BigQueryMetaIO is proto independent, it only needs meta data on the Row
> Schema (so it could come from something completely different like, avro,
> etc...). But you will be able to read it in the docs ;-)
>
> The Clustering support finally landed in Beam and works. The examples in
> the mail are fully working on (master + proto PR), tested on Dataflow and
> DirectRunner. But come to my session next week and I'll show it in a demo.
>
>  _/
> _/ Alex Van Boxel
>
>
> On Fri, Sep 6, 2019 at 6:01 PM Pablo Estrada <pa...@google.com> wrote:
>
>> Hi Alex!
>> This sounds useful, and I think it would be great to have it in
>> extensions. I think the main idea would be to document it well, so that
>> users can find it, and know how to use it. Perhaps a Blog post, along with
>> details in the Beam documentation, so that people really get the full value
>> from the contribution.
>>
>> For my edification - does BigQueryMetaIO.with replace the protos with
>> TableRows with all the necessary metadata added to them? Is it conceptually
>> a ParDo.of(DoFn<Protobuf, TableRow> ..) ?
>>
>> If I remember correctly, Java BQIO does not support clustering - is that
>> not the case? Does your extension work around this?
>>
>> Best
>> -P.
>>
>> On Thu, Sep 5, 2019 at 8:20 AM Alex Van Boxel <al...@vanboxel.be> wrote:
>>
>>> Hi all,
>>>
>>> I always liked my Beams dynamic, so that's why I always try to drive my
>>> pipelines through annotations (called options in proto, metadata in Beam
>>> schema). Here is an example:
>>>
>>>
>>> message Jump {
>>>     string ulid = 1 [(metabeam.v1alpha1.description) = "Unique jump id"]
>>> ;
>>>     google.protobuf.Timestamp time_code = 2 [(metabeam.v1alpha1.bigquery_time_partition)
>>> = DAY, (metabeam.v1alpha1.description) = "Jump timestamp"];
>>>     string gate_name = 3 [(metabeam.v1alpha1.bigquery_cluster_key) = 2,
>>> (metabeam.v1alpha1.description) = "Reference to the unique gate name"];
>>>     string ship_name = 4 [(metabeam.v1alpha1.description) = "Reference
>>> to the unique ship name"];
>>>     string account_name = 5 [(metabeam.v1alpha1.bigquery_cluster_key) =
>>> 1, (metabeam.v1alpha1.description) = "Billing account"];
>>>
>>>     .acme.billing.v1alpha1.JumpBilling acount_type = 9 [(metabeam.v1alpha1.description)
>>> = "Billing account type, used for statistics"];
>>>     int32 heat = 6 [deprecated = true, (metabeam.v1alpha1.description)
>>> = "Heat dissipated on this jump."];
>>>     google.protobuf.StringValue note = 7 [(metabeam.v1alpha1.description)
>>> = "Jump note"];
>>>     PowerUsed power_used = 8 [(metabeam.v1alpha1.description) = "Power
>>> usage"];
>>> }
>>> through my pending PR I generate a Beam Row Schema, and I also translate
>>> the options into metadata...
>>>
>>> Field{name=time_code, description=Jump timestamp,
>>> type=FieldType{typeName=DATETIME, nullable=true, logicalType=null,
>>> collectionElementType=null, mapKeyType=null, mapValueType=null,
>>> rowSchema=null, metadata={
>>> *metabeam.v1alpha1.bigquery_time_partition=org.apache.beam.sdk.schemas.Schema$ByteArrayWrapper@17bdb*,
>>> metabeam.v1alpha1.description=org.apache.beam.sdk.schemas.Schema$ByteArrayWrapper@9d029605
>>> }}}
>>> Field{name=gate_name, description=, type=FieldType{typeName=STRING,
>>> nullable=false, logicalType=null, collectionElementType=null,
>>> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={
>>> *metabeam.v1alpha1.bigquery_cluster_key=org.apache.beam.sdk.schemas.Schema$ByteArrayWrapper@51*,
>>> metabeam.v1alpha1.description=org.apache.beam.sdk.schemas.Schema$ByteArrayWrapper@8100daf
>>> }}}
>>> Field{name=ship_name, description=, type=FieldType{typeName=STRING,
>>> nullable=false, logicalType=null, collectionElementType=null,
>>> mapKeyType=null, mapValueType=null, rowSchema=null,
>>> metadata={metabeam.v1alpha1.description=org.apache.beam.sdk.schemas.Schema$ByteArrayWrapper@9f4e3ede
>>> }}}
>>> Field{name=account_name, description=Billing account,
>>> type=FieldType{typeName=STRING, nullable=false, logicalType=null,
>>> collectionElementType=null, mapKeyType=null, mapValueType=null,
>>> rowSchema=null, metadata=
>>> *{metabeam.v1alpha1.bigquery_cluster_key=org.apache.beam.sdk.schemas.Schema$ByteArrayWrapper@50*,
>>> metabeam.v1alpha1.description=org.apache.beam.sdk.schemas.Schema$ByteArrayWrapper@f5313347
>>> }}}
>>>
>>>
>>> But the power comes in using the metadata to actually transform fields
>>> or even change the output sink (in this example the BigQueryMetaIO.with(
>>> will add the partition and cluster keys. All stuff that I had in ProtoBeam
>>> (open sourced, but Proto specific).
>>>
>>> rows.apply(
>>>     BigQueryMetaIO.with(
>>>         BigQueryIO.<Row>write()
>>>             .useBeamSchema()
>>>             .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>             .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>>>             .useBeamSchema()
>>>             .to(
>>>                 "project:alex"
>>>
>>> Above code generates this in BigQuery:
>>>
>>> [image: Screen Shot 2019-09-05 at 16.56.28.png]
>>>
>>> [image: Screen Shot 2019-09-05 at 16.55.38.png]
>>>
>>>
>>> I'm now planning to make the successor, probably called Metabeam todo
>>> the same but on Rows.
>>> *Could this be part of extensions package? Or should I create it
>>> standalone?*
>>>
>>> If extensions is good, I'll start a long running branch and then it
>>> could be part of beam (note, this is not proto specific anymore as others
>>> could add the metadata as well).
>>>
>>> Thoughts?
>>>
>>> You can always discuss next week in Vegas as this is what I'm going to
>>> talk about :-)
>>>
>>>  _/
>>> _/ Alex Van Boxel
>>>
>>

-- 


David Sabater

Google Cloud | Data Analytics Specialist

davidsabater@google.com

Belgrave House, 76 Buckingham Palace Road, London SW1W 9TQ
<https://maps.google.com/?q=76+Buckingham+Palace+Road,+London+SW1W+9TQ&entry=gmail&source=g>

Tel +44 (0)7826 532190


This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.

Google UK Limited
Registered Office: Belgrave House, 76 Buckingham Palace Road, London SW1W
9TQ
Registered in England Number: 3977902