You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Tao Li <ta...@zillow.com> on 2021/01/06 17:57:00 UTC

Quick question regarding ParquetIO

Hi beam community,

Quick question about ParquetIO<https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/parquet/ParquetIO.html>. Is there a way to avoid specifying the avro schema when reading parquet files? The reason is that we may not know the parquet schema until we read the files. In comparison, spark parquet reader<https://spark.apache.org/docs/latest/sql-data-sources-parquet.html> does not require such a schema specification.

Please advise. Thanks a lot!

Re: Quick question regarding ParquetIO

Posted by Tao Li <ta...@zillow.com>.
Hi Brian,

You are right. The sample code still requires the avro. Is it possible to retrieve the avro schema from PCollection<GenericRecord> (which is from a parquet read without avro schema specification with beam 2.28)? I did not have a chance to give it a try, but I guess we can retrieve a GeneRecord instance and then get the schema attached to it?

Thanks!

From: Brian Hulette <bh...@google.com>
Date: Thursday, January 7, 2021 at 9:38 AM
To: Tao Li <ta...@zillow.com>
Cc: "user@beam.apache.org" <us...@beam.apache.org>
Subject: Re: Quick question regarding ParquetIO



On Wed, Jan 6, 2021 at 11:07 AM Tao Li <ta...@zillow.com>> wrote:
Hi Brian,

Please see my answers inline.

From: Brian Hulette <bh...@google.com>>
Reply-To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Date: Wednesday, January 6, 2021 at 10:43 AM
To: user <us...@beam.apache.org>>
Subject: Re: Quick question regarding ParquetIO

Hey Tao,

It does look like BEAM-11460 could work for you. Note that relies on a dynamic object which won't work with schema-aware transforms and SqlTransform. It's likely this isn't a problem for you, I just wanted to point it out.
[tao] I just need a PCollection<GenericRecord> from IO. Then I can apply below code to enable the schemas transforms (I have verified this code works).

setSchema(
      AvroUtils.toBeamSchema(schema),
      new TypeDescriptor[GenericRecord]() {},
      AvroUtils.getGenericRecordToRowFunction(AvroUtils.toBeamSchema(schema)),
      AvroUtils.getRowToGenericRecordFunction(schema))

This requires specifying the Avro schema doesn't it?




Out of curiosity, for your use-case would it be acceptable if Beam peaked at the files at pipeline construction time to determine the schema for you? This is what we're doing for the new IOs in the Python SDK's DataFrame API. They're based on the pandas read_* methods, and use those methods at construction time to determine the schema.

[taol] If I understand correctly, the behavior of the new dataframe API’s you are mentioning is very similar to spark parquet reader’s behaviors. If that’s the case, then it’s probably what I am looking for 😊



Brian

On Wed, Jan 6, 2021 at 10:13 AM Alexey Romanenko <ar...@gmail.com>> wrote:
Hi Tao,

This jira [1] looks exactly what you are asking but it was merged recently (thanks to Anant Damle for working on this!) and it should be available only in Beam 2.28.0.

[1] https://issues.apache.org/jira/browse/BEAM-11460<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-11460&data=04%7C01%7Ctaol%40zillow.com%7C8a2a09c3042241c0c89308d8b3330ab8%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637456379094295342%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=Wn6Hfsk5gBUHeIALRXQPzoZhPo%2FX8D2PxAk1Q5bNpNM%3D&reserved=0>

Regards,
Alexey

On 6 Jan 2021, at 18:57, Tao Li <ta...@zillow.com>> wrote:

Hi beam community,

Quick question about ParquetIO<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.25.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fparquet%2FParquetIO.html&data=04%7C01%7Ctaol%40zillow.com%7C8a2a09c3042241c0c89308d8b3330ab8%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637456379094305297%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=ag3%2F9FWY0ErfCSaNpb9bIfkk7wkBTamTvGV8VySYVI4%3D&reserved=0>. Is there a way to avoid specifying the avro schema when reading parquet files? The reason is that we may not know the parquet schema until we read the files. In comparison, spark parquet reader<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fsql-data-sources-parquet.html&data=04%7C01%7Ctaol%40zillow.com%7C8a2a09c3042241c0c89308d8b3330ab8%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637456379094305297%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=x%2FK%2BSt3azbgb5asgY2Z%2FpH1jKOQs4s1bE7u22%2Bi8NPk%3D&reserved=0> does not require such a schema specification.

Please advise. Thanks a lot!


Re: Quick question regarding ParquetIO

Posted by Brian Hulette <bh...@google.com>.
On Wed, Jan 6, 2021 at 11:07 AM Tao Li <ta...@zillow.com> wrote:

> Hi Brian,
>
>
>
> Please see my answers inline.
>
>
>
> *From: *Brian Hulette <bh...@google.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Wednesday, January 6, 2021 at 10:43 AM
> *To: *user <us...@beam.apache.org>
> *Subject: *Re: Quick question regarding ParquetIO
>
>
>
> Hey Tao,
>
> It does look like BEAM-11460 could work for you. Note that relies on a
> dynamic object which won't work with schema-aware transforms and
> SqlTransform. It's likely this isn't a problem for you, I just wanted to
> point it out.
>
> [tao] I just need a PCollection<GenericRecord> from IO. Then I can apply
> below code to enable the schemas transforms (I have verified this code
> works).
>
>
>
> setSchema(
>
>       AvroUtils.toBeamSchema(schema),
>
>       new TypeDescriptor[GenericRecord]() {},
>
>
> AvroUtils.getGenericRecordToRowFunction(AvroUtils.toBeamSchema(schema)),
>
>       AvroUtils.getRowToGenericRecordFunction(schema))
>

This requires specifying the Avro schema doesn't it?


>
>
>
>
>
> Out of curiosity, for your use-case would it be acceptable if Beam peaked
> at the files at pipeline construction time to determine the schema for you?
> This is what we're doing for the new IOs in the Python SDK's DataFrame API.
> They're based on the pandas read_* methods, and use those methods at
> construction time to determine the schema.
>
>
>
> [taol] If I understand correctly, the behavior of the new dataframe API’s
> you are mentioning is very similar to spark parquet reader’s behaviors. If
> that’s the case, then it’s probably what I am looking for 😊
>
>
>
>
>
>
>
> Brian
>
>
>
> On Wed, Jan 6, 2021 at 10:13 AM Alexey Romanenko <ar...@gmail.com>
> wrote:
>
> Hi Tao,
>
>
>
> This jira [1] looks exactly what you are asking but it was merged recently
> (thanks to Anant Damle for working on this!) and it should be available
> only in Beam 2.28.0.
>
>
>
> [1] https://issues.apache.org/jira/browse/BEAM-11460
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-11460&data=04%7C01%7Ctaol%40zillow.com%7C757988eb460d4478450208d8b272f105%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637455554037837436%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=boTq%2FeTLXfx%2FBxntkU1%2Fateg0OC5K5N20DGF9cIUclQ%3D&reserved=0>
>
>
>
> Regards,
>
> Alexey
>
>
>
> On 6 Jan 2021, at 18:57, Tao Li <ta...@zillow.com> wrote:
>
>
>
> Hi beam community,
>
>
>
> Quick question about ParquetIO
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.25.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fparquet%2FParquetIO.html&data=04%7C01%7Ctaol%40zillow.com%7C757988eb460d4478450208d8b272f105%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637455554037847391%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=9GM3OcxTsQWcuqjm%2BnlXwRgV4pjFOqIMXmVNp6wGW4o%3D&reserved=0>.
> Is there a way to avoid specifying the avro schema when reading parquet
> files? The reason is that we may not know the parquet schema until we read
> the files. In comparison, spark parquet reader
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fsql-data-sources-parquet.html&data=04%7C01%7Ctaol%40zillow.com%7C757988eb460d4478450208d8b272f105%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637455554037847391%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=ds2Eko1VgUlDVnDQndoHizNeDZTRrkTa276pENCk17Y%3D&reserved=0> does
> not require such a schema specification.
>
>
>
> Please advise. Thanks a lot!
>
>
>
>

Re: Quick question regarding ParquetIO

Posted by Tao Li <ta...@zillow.com>.
Hi Brian,

Please see my answers inline.

From: Brian Hulette <bh...@google.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Wednesday, January 6, 2021 at 10:43 AM
To: user <us...@beam.apache.org>
Subject: Re: Quick question regarding ParquetIO

Hey Tao,

It does look like BEAM-11460 could work for you. Note that relies on a dynamic object which won't work with schema-aware transforms and SqlTransform. It's likely this isn't a problem for you, I just wanted to point it out.

[tao] I just need a PCollection<GenericRecord> from IO. Then I can apply below code to enable the schemas transforms (I have verified this code works).

setSchema(
      AvroUtils.toBeamSchema(schema),
      new TypeDescriptor[GenericRecord]() {},
      AvroUtils.getGenericRecordToRowFunction(AvroUtils.toBeamSchema(schema)),
      AvroUtils.getRowToGenericRecordFunction(schema))



Out of curiosity, for your use-case would it be acceptable if Beam peaked at the files at pipeline construction time to determine the schema for you? This is what we're doing for the new IOs in the Python SDK's DataFrame API. They're based on the pandas read_* methods, and use those methods at construction time to determine the schema.

[taol] If I understand correctly, the behavior of the new dataframe API’s you are mentioning is very similar to spark parquet reader’s behaviors. If that’s the case, then it’s probably what I am looking for 😊



Brian

On Wed, Jan 6, 2021 at 10:13 AM Alexey Romanenko <ar...@gmail.com>> wrote:
Hi Tao,

This jira [1] looks exactly what you are asking but it was merged recently (thanks to Anant Damle for working on this!) and it should be available only in Beam 2.28.0.

[1] https://issues.apache.org/jira/browse/BEAM-11460<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-11460&data=04%7C01%7Ctaol%40zillow.com%7C757988eb460d4478450208d8b272f105%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637455554037837436%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=boTq%2FeTLXfx%2FBxntkU1%2Fateg0OC5K5N20DGF9cIUclQ%3D&reserved=0>

Regards,
Alexey


On 6 Jan 2021, at 18:57, Tao Li <ta...@zillow.com>> wrote:

Hi beam community,

Quick question about ParquetIO<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.25.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fparquet%2FParquetIO.html&data=04%7C01%7Ctaol%40zillow.com%7C757988eb460d4478450208d8b272f105%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637455554037847391%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=9GM3OcxTsQWcuqjm%2BnlXwRgV4pjFOqIMXmVNp6wGW4o%3D&reserved=0>. Is there a way to avoid specifying the avro schema when reading parquet files? The reason is that we may not know the parquet schema until we read the files. In comparison, spark parquet reader<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fsql-data-sources-parquet.html&data=04%7C01%7Ctaol%40zillow.com%7C757988eb460d4478450208d8b272f105%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637455554037847391%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=ds2Eko1VgUlDVnDQndoHizNeDZTRrkTa276pENCk17Y%3D&reserved=0> does not require such a schema specification.

Please advise. Thanks a lot!


Re: Quick question regarding ParquetIO

Posted by Brian Hulette <bh...@google.com>.
Hey Tao,

It does look like BEAM-11460 could work for you. Note that relies on a
dynamic object which won't work with schema-aware transforms and
SqlTransform. It's likely this isn't a problem for you, I just wanted to
point it out.

Out of curiosity, for your use-case would it be acceptable if Beam peaked
at the files at pipeline construction time to determine the schema for you?
This is what we're doing for the new IOs in the Python SDK's DataFrame API.
They're based on the pandas read_* methods, and use those methods at
construction time to determine the schema.

Brian

On Wed, Jan 6, 2021 at 10:13 AM Alexey Romanenko <ar...@gmail.com>
wrote:

> Hi Tao,
>
> This jira [1] looks exactly what you are asking but it was merged recently
> (thanks to Anant Damle for working on this!) and it should be available
> only in Beam 2.28.0.
>
> [1] https://issues.apache.org/jira/browse/BEAM-11460
>
> Regards,
> Alexey
>
> On 6 Jan 2021, at 18:57, Tao Li <ta...@zillow.com> wrote:
>
> Hi beam community,
>
> Quick question about ParquetIO
> <https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/parquet/ParquetIO.html>.
> Is there a way to avoid specifying the avro schema when reading parquet
> files? The reason is that we may not know the parquet schema until we read
> the files. In comparison, spark parquet reader
> <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html> does
> not require such a schema specification.
>
> Please advise. Thanks a lot!
>
>
>

Re: Quick question regarding ParquetIO

Posted by Tao Li <ta...@zillow.com>.
Thanks @Ismaël Mejía for your detailed answers. Also thanks for creating BEAM-11650 to track my feature request. I do understand that reading parquet files without providing a schema may have some problems in the schema evolution scenarios, but this could also be very handy in the cases when we just want to read whatever schema exists in the parquet files.

Regarding withBeamSchemas() method, I did see it from AvroIO, but not from ParquetIO. It's good to know this handy API to enable schemas transforms though.

Regarding this usage "mycollection.setCoder(AvroUtils.schemaCoder(schema))" to enable the schemas transforms, thanks for sharing that and that's similar to what I have been using in my beam app.

Regarding the conversion of PCollection<Row> to PCollection<GenericRecord>, thanks for sharing the new converter feature from beam 2.28. Below is the code I have been using for this purpose.

MapElements
       .into(new TypeDescriptor<GenericRecord>() {})
       .via(AvroUtils.getRowToGenericRecordFunction(avroSchema)))
      .setCoder(AvroCoder.of(GenericRecord.class, avroSchema))





On 1/18/21, 3:19 AM, "Ismaël Mejía" <ie...@gmail.com> wrote:

    Catching up on this thread sorry if late to the party :) and my excuses because
    this is going to be loooong but worth.

    > It does look like BEAM-11460 could work for you. Note that relies on a dynamic
    > object which won't work with schema-aware transforms and SqlTransform. It's
    > likely this isn't a problem for you, I just wanted to point it out.

    We may be missing in this discussion the existence of the
    `withBeamSchemas(true)` method on the IOs that produce Avro objects. This method
    sets up a Schema-based coder for the output of the PCollection generated by the
    read. This allows both SQL and Schema-based transforms just afterwards by
    auto-infering the Beam Row schema and auto-transforming everything into Rows
    when needed.

    PCollection<GenericRecord> input =
        p.apply(
          ParquetIO.read(SCHEMA)
              .from(path)
              .withBeamSchemas(true));

    Now input can be used by SQL/Schema-based PTransforms.

    > @Kobe Feng thank you so much for the insights. Agree that it may be a good
    > practice to read all sorts of file formats (e.g. parquet, avro etc) into a
    > PCollection<Row> and then perform the schema aware transforms that you are
    > referring to.

    This is not the case at the moment because most IOs precede the schema-based
    APIs, but more and more PTransforms are supporting it. Notice that for dynamic
    objects or Schema-aware PCollection you don't even need them to produce
    PCollection<Row>. You can take a PCollection<GenericRecord> (like above) and
    connect directly to schema-aware transformations as if it was a PCollection<Row>
    the transformation is done automatically for the user because of the
    Schema-based coder.

    You can do this manually if you have a non-schema PCollection of GenericRecords
    by setting explicitly a Schema-based coder for the PCollection:

        mycollection.setCoder(AvroUtils.schemaCoder(schema));

    Beam also includes the schema-based `Convert` transform to convert different
    types from/to Rows so this could be handy for cases when you need to transform
    in both directions and it is not supported. Beam 2.28.0 introduces an
    improvement that allows to Convert from any Schema-based PCollection (Rows or
    others) into GenericRecords. This is really useful because Avro/Parquet based
    writes expect a PCollection<GenericRecord> not one of rows, and now you can just
    transform a schema-based PCollection (e.g. PCollection<Row> or of other objects)
    into a PCollection<GenericRecord> like this:

        myrowcollection.apply(Convert.to(GenericRecord.class)).apply(AnAvroBasedSinkIO.write(...))

    https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-11571&amp;data=04%7C01%7Ctaol%40zillow.com%7C7cc9c01c692c4c00b8b108d8bba2ea2a%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637465655684451869%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=HdRLVZmo0TFw%2BgnwRjHNIfMw1sTTfAIzv3qhz%2BMpES0%3D&amp;reserved=0

    So now the full scenario is covered for reads via .withBeamSchemas(true) or by
    setting manually an AvroCoder for schemas and for writes by preceding the Sinks
    with `Convert.to`. That's the beauty of Beam's bidirectional Schema coders.

    Note that this probably can be better documented in the programming guide or in
    the javadocs so contributions welcome!

    And now back to the initial question:

    > Quick question about ParquetIO. Is there a way to avoid specifying the avro
    > schema when reading parquet files?

    No, you cannot at the moment. BEAM-11460 allows you to parametrize the
    transformation from a GenericRecord (with a schema you expect in advance even if
    you don't specify it) into your own type of objects.

    In Parquet/Avro the schema you use to write can differ from the schema you use
    to read, this is done to support schema evolution, so the most general use case
    is to allow users to read from specific versions of the Schema provided into
    their objects. That's probably one of the reasons why this is not supported.

    Since the Schema is part of the Parquet file metadata I suppose we could somehow
    use it and produce the Schema for the output collection, notice however that if
    the schema differs on the files this will break in runtime.

    Filled https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-11650&amp;data=04%7C01%7Ctaol%40zillow.com%7C7cc9c01c692c4c00b8b108d8bba2ea2a%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637465655684451869%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=Uim3XsNtFa9ynXWixl1Vmm9gRu6%2F4LwrAAPLIha2oAQ%3D&amp;reserved=0 to track this.

    On Wed, Jan 13, 2021 at 7:42 PM Tao Li <ta...@zillow.com> wrote:
    >
    > @Kobe Feng thank you so much for the insights. Agree that it may be a good practice to read all sorts of file formats (e.g. parquet, avro etc) into a PCollection<Row> and then perform the schema aware transforms that you are referring to.
    >
    >
    >
    > The new dataframe APIs for Python SDK sound pretty cool and I can imagine it will save a lot of hassles during a beam app development. Hopefully it will be added to Java SDK as well.
    >
    >
    >
    > From: Kobe Feng <fl...@gmail.com>
    > Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
    > Date: Friday, January 8, 2021 at 11:39 AM
    > To: "user@beam.apache.org" <us...@beam.apache.org>
    > Subject: Re: Quick question regarding ParquetIO
    >
    >
    >
    > Tao,
    > I'm not an expert, and good intuition, all you want is schema awareness transformations or let's say schema based transformation in Beam not only for IO but also for other DoFn, etc, and possibly have schema revolution in future as well.
    >
    >
    > This is how I try to understand and explain in other places before:  Not like spark, flink to leverage internal/built-in types (e.g, catalyst struct type)  for built-in operators as more as possible to infer the schema when IOs could convert to, beam is trying to have capable to handle any type during transforms for people to migrate existing ones to beam (Do spark map partition func with own type, Encoder can't be avoided as well, right). Also yes, we could leverage beam own type "Row" to do all transformations and converting all in/out types like parquet, avro, orc, etc at IO side, and then do schema inferring in built-in operators base on row type when we know they will operate on internal types, that's how to avoid the coder or explicit schema there, more further, provide IO for schema registry capability and then transform will lookup when necessary for the revolution. I saw beam put schema base transformation in goals last year which will be convenient for people (since normally people would rather use builtin types instead of providing their own types' coder for following operators until we have to), that's why dataframe APIs for python SDK here I think.
    >
    > Kobe
    >
    >
    >
    >
    > On Fri, Jan 8, 2021 at 9:34 AM Tao Li <ta...@zillow.com> wrote:
    >
    > Thanks Alexey for your explanation. That’s also what I was thinking. Parquet files already have the schema built in, so it might be feasible to infer a coder automatically (like spark parquet reader). It would be great if  we have some experts chime in here. @Brian Hulette already mentioned that the community is working on new DataFrame APIs in Python SDK, which are based on the pandas methods and use those methods at construction time to determine the schema. I think this is very close to the schema inference we have been discussing. Not sure it will be available to Java SDK though.
    >
    >
    >
    > Regarding BEAM-11460, looks like it may not totally solve my problem. As @Alexey Romanenko mentioned, we may still need to know the avro or beam schema for following operations after the parquet read. A dumb question is, with BEAM-11460, after we get a PCollection<GenericRecord>  from parquet read (without the need to specify avro schema), is it possible to get the attached avro schema from a GenericRecord element of this PCollection<GenericRecord>?
    >
    >
    >
    > Really appreciate it if you can help clarify my questions. Thanks!
    >
    >
    >
    >
    >
    > From: Alexey Romanenko <ar...@gmail.com>
    > Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
    > Date: Friday, January 8, 2021 at 4:48 AM
    > To: "user@beam.apache.org" <us...@beam.apache.org>
    > Subject: Re: Quick question regarding ParquetIO
    >
    >
    >
    > Well, this is how I see it, let me explain.
    >
    >
    >
    > Since every PCollection is required to have a Coder to materialize the intermediate data, we need to have a coder for "PCollection<GenericRecord>" as well. If I’m not mistaken, for “GenericRecord" we used to set AvroCoder that is based on Avro (or Beam too?) schema.
    >
    >
    >
    > Actually, currently it will throw an exception if you will try to use “parseGenericRecords()” with a PCollection<GenericRecord> as output pcollection since it can’t infer a Coder based on provided “parseFn”. I guess it was done intentially in this way and I doubt that we can have a proper coder for PCollection<GenericRecord> without knowing a schema. Maybe some Avro experts here can add more on this if we can somehow overcome it.
    >
    >
    >
    > On 7 Jan 2021, at 19:44, Tao Li <ta...@zillow.com> wrote:
    >
    >
    >
    > Alexey,
    >
    >
    >
    > Why do I need to set AvroCoder? I assume with BEAM-11460 we don’t need to specify a schema when reading parquet files to get aPCollection<GenericRecord>. Is my understanding correct? Am I missing anything here?
    >
    >
    >
    > Thanks!
    >
    >
    >
    > From: Alexey Romanenko <ar...@gmail.com>
    > Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
    > Date: Thursday, January 7, 2021 at 9:56 AM
    > To: "user@beam.apache.org" <us...@beam.apache.org>
    > Subject: Re: Quick question regarding ParquetIO
    >
    >
    >
    > If you want to get just a PCollection<GenericRecord> as output then you would still need to set AvroCoder, but which schema to use in this case?
    >
    >
    >
    > On 6 Jan 2021, at 19:53, Tao Li <ta...@zillow.com> wrote:
    >
    >
    >
    > Hi Alexey,
    >
    >
    >
    > Thank you so much for this info. I will definitely give it a try once 2.28 is released.
    >
    >
    >
    > Regarding this feature, it’s basically mimicking the feature from AvroIO:https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.26.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FAvroIO.html&amp;data=04%7C01%7Ctaol%40zillow.com%7C7cc9c01c692c4c00b8b108d8bba2ea2a%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637465655684451869%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=Ryw8WY39cDMFDehjxLDLYiIEdsBXZ8Iz8EyLw66eNVM%3D&amp;reserved=0
    >
    >
    >
    > I have one more quick question regarding the “reading records of an unknown schema” scenario. In the sample code a PCollection<Foo> is being returned and the parseGenericRecords requires a parsing logic. What if I just want to get a PCollection<GenericRecord> instead of a specific class (e.g. Foo in the example)? I guess I can just skip the ParquetIO.parseGenericRecords transform? So do I still have to specify the dummy parsing logic like below? Thanks!
    >
    >
    >
    > p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, GenericRecord >() {
    >
    >        public Foo apply(GenericRecord record) {
    >
    >          return record;
    >
    >        }
    >
    >
    >
    > From: Alexey Romanenko <ar...@gmail.com>
    > Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
    > Date: Wednesday, January 6, 2021 at 10:13 AM
    > To: "user@beam.apache.org" <us...@beam.apache.org>
    > Subject: Re: Quick question regarding ParquetIO
    >
    >
    >
    > Hi Tao,
    >
    >
    >
    > This jira [1] looks exactly what you are asking but it was merged recently (thanks to Anant Damle for working on this!) and it should be available only in Beam 2.28.0.
    >
    >
    >
    > [1] https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-11460&amp;data=04%7C01%7Ctaol%40zillow.com%7C7cc9c01c692c4c00b8b108d8bba2ea2a%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637465655684451869%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=mzIG8CoXtcU87jWUZoc3oaU8auNyg7i2e8VdarRV6HY%3D&amp;reserved=0
    >
    >
    >
    > Regards,
    >
    > Alexey
    >
    >
    >
    >
    > On 6 Jan 2021, at 18:57, Tao Li <ta...@zillow.com> wrote:
    >
    >
    >
    > Hi beam community,
    >
    >
    >
    > Quick question about ParquetIO. Is there a way to avoid specifying the avro schema when reading parquet files? The reason is that we may not know the parquet schema until we read the files. In comparison, spark parquet reader does not require such a schema specification.
    >
    >
    >
    > Please advise. Thanks a lot!
    >
    >
    >
    >
    >
    >
    > --
    >
    > Yours Sincerely
    > Kobe Feng


Re: Quick question regarding ParquetIO

Posted by Ismaël Mejía <ie...@gmail.com>.
Catching up on this thread sorry if late to the party :) and my excuses because
this is going to be loooong but worth.

> It does look like BEAM-11460 could work for you. Note that relies on a dynamic
> object which won't work with schema-aware transforms and SqlTransform. It's
> likely this isn't a problem for you, I just wanted to point it out.

We may be missing in this discussion the existence of the
`withBeamSchemas(true)` method on the IOs that produce Avro objects. This method
sets up a Schema-based coder for the output of the PCollection generated by the
read. This allows both SQL and Schema-based transforms just afterwards by
auto-infering the Beam Row schema and auto-transforming everything into Rows
when needed.

PCollection<GenericRecord> input =
    p.apply(
      ParquetIO.read(SCHEMA)
          .from(path)
          .withBeamSchemas(true));

Now input can be used by SQL/Schema-based PTransforms.

> @Kobe Feng thank you so much for the insights. Agree that it may be a good
> practice to read all sorts of file formats (e.g. parquet, avro etc) into a
> PCollection<Row> and then perform the schema aware transforms that you are
> referring to.

This is not the case at the moment because most IOs precede the schema-based
APIs, but more and more PTransforms are supporting it. Notice that for dynamic
objects or Schema-aware PCollection you don't even need them to produce
PCollection<Row>. You can take a PCollection<GenericRecord> (like above) and
connect directly to schema-aware transformations as if it was a PCollection<Row>
the transformation is done automatically for the user because of the
Schema-based coder.

You can do this manually if you have a non-schema PCollection of GenericRecords
by setting explicitly a Schema-based coder for the PCollection:

    mycollection.setCoder(AvroUtils.schemaCoder(schema));

Beam also includes the schema-based `Convert` transform to convert different
types from/to Rows so this could be handy for cases when you need to transform
in both directions and it is not supported. Beam 2.28.0 introduces an
improvement that allows to Convert from any Schema-based PCollection (Rows or
others) into GenericRecords. This is really useful because Avro/Parquet based
writes expect a PCollection<GenericRecord> not one of rows, and now you can just
transform a schema-based PCollection (e.g. PCollection<Row> or of other objects)
into a PCollection<GenericRecord> like this:

    myrowcollection.apply(Convert.to(GenericRecord.class)).apply(AnAvroBasedSinkIO.write(...))

https://issues.apache.org/jira/browse/BEAM-11571

So now the full scenario is covered for reads via .withBeamSchemas(true) or by
setting manually an AvroCoder for schemas and for writes by preceding the Sinks
with `Convert.to`. That's the beauty of Beam's bidirectional Schema coders.

Note that this probably can be better documented in the programming guide or in
the javadocs so contributions welcome!

And now back to the initial question:

> Quick question about ParquetIO. Is there a way to avoid specifying the avro
> schema when reading parquet files?

No, you cannot at the moment. BEAM-11460 allows you to parametrize the
transformation from a GenericRecord (with a schema you expect in advance even if
you don't specify it) into your own type of objects.

In Parquet/Avro the schema you use to write can differ from the schema you use
to read, this is done to support schema evolution, so the most general use case
is to allow users to read from specific versions of the Schema provided into
their objects. That's probably one of the reasons why this is not supported.

Since the Schema is part of the Parquet file metadata I suppose we could somehow
use it and produce the Schema for the output collection, notice however that if
the schema differs on the files this will break in runtime.

Filled https://issues.apache.org/jira/browse/BEAM-11650 to track this.

On Wed, Jan 13, 2021 at 7:42 PM Tao Li <ta...@zillow.com> wrote:
>
> @Kobe Feng thank you so much for the insights. Agree that it may be a good practice to read all sorts of file formats (e.g. parquet, avro etc) into a PCollection<Row> and then perform the schema aware transforms that you are referring to.
>
>
>
> The new dataframe APIs for Python SDK sound pretty cool and I can imagine it will save a lot of hassles during a beam app development. Hopefully it will be added to Java SDK as well.
>
>
>
> From: Kobe Feng <fl...@gmail.com>
> Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
> Date: Friday, January 8, 2021 at 11:39 AM
> To: "user@beam.apache.org" <us...@beam.apache.org>
> Subject: Re: Quick question regarding ParquetIO
>
>
>
> Tao,
> I'm not an expert, and good intuition, all you want is schema awareness transformations or let's say schema based transformation in Beam not only for IO but also for other DoFn, etc, and possibly have schema revolution in future as well.
>
>
> This is how I try to understand and explain in other places before:  Not like spark, flink to leverage internal/built-in types (e.g, catalyst struct type)  for built-in operators as more as possible to infer the schema when IOs could convert to, beam is trying to have capable to handle any type during transforms for people to migrate existing ones to beam (Do spark map partition func with own type, Encoder can't be avoided as well, right). Also yes, we could leverage beam own type "Row" to do all transformations and converting all in/out types like parquet, avro, orc, etc at IO side, and then do schema inferring in built-in operators base on row type when we know they will operate on internal types, that's how to avoid the coder or explicit schema there, more further, provide IO for schema registry capability and then transform will lookup when necessary for the revolution. I saw beam put schema base transformation in goals last year which will be convenient for people (since normally people would rather use builtin types instead of providing their own types' coder for following operators until we have to), that's why dataframe APIs for python SDK here I think.
>
> Kobe
>
>
>
>
> On Fri, Jan 8, 2021 at 9:34 AM Tao Li <ta...@zillow.com> wrote:
>
> Thanks Alexey for your explanation. That’s also what I was thinking. Parquet files already have the schema built in, so it might be feasible to infer a coder automatically (like spark parquet reader). It would be great if  we have some experts chime in here. @Brian Hulette already mentioned that the community is working on new DataFrame APIs in Python SDK, which are based on the pandas methods and use those methods at construction time to determine the schema. I think this is very close to the schema inference we have been discussing. Not sure it will be available to Java SDK though.
>
>
>
> Regarding BEAM-11460, looks like it may not totally solve my problem. As @Alexey Romanenko mentioned, we may still need to know the avro or beam schema for following operations after the parquet read. A dumb question is, with BEAM-11460, after we get a PCollection<GenericRecord>  from parquet read (without the need to specify avro schema), is it possible to get the attached avro schema from a GenericRecord element of this PCollection<GenericRecord>?
>
>
>
> Really appreciate it if you can help clarify my questions. Thanks!
>
>
>
>
>
> From: Alexey Romanenko <ar...@gmail.com>
> Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
> Date: Friday, January 8, 2021 at 4:48 AM
> To: "user@beam.apache.org" <us...@beam.apache.org>
> Subject: Re: Quick question regarding ParquetIO
>
>
>
> Well, this is how I see it, let me explain.
>
>
>
> Since every PCollection is required to have a Coder to materialize the intermediate data, we need to have a coder for "PCollection<GenericRecord>" as well. If I’m not mistaken, for “GenericRecord" we used to set AvroCoder that is based on Avro (or Beam too?) schema.
>
>
>
> Actually, currently it will throw an exception if you will try to use “parseGenericRecords()” with a PCollection<GenericRecord> as output pcollection since it can’t infer a Coder based on provided “parseFn”. I guess it was done intentially in this way and I doubt that we can have a proper coder for PCollection<GenericRecord> without knowing a schema. Maybe some Avro experts here can add more on this if we can somehow overcome it.
>
>
>
> On 7 Jan 2021, at 19:44, Tao Li <ta...@zillow.com> wrote:
>
>
>
> Alexey,
>
>
>
> Why do I need to set AvroCoder? I assume with BEAM-11460 we don’t need to specify a schema when reading parquet files to get aPCollection<GenericRecord>. Is my understanding correct? Am I missing anything here?
>
>
>
> Thanks!
>
>
>
> From: Alexey Romanenko <ar...@gmail.com>
> Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
> Date: Thursday, January 7, 2021 at 9:56 AM
> To: "user@beam.apache.org" <us...@beam.apache.org>
> Subject: Re: Quick question regarding ParquetIO
>
>
>
> If you want to get just a PCollection<GenericRecord> as output then you would still need to set AvroCoder, but which schema to use in this case?
>
>
>
> On 6 Jan 2021, at 19:53, Tao Li <ta...@zillow.com> wrote:
>
>
>
> Hi Alexey,
>
>
>
> Thank you so much for this info. I will definitely give it a try once 2.28 is released.
>
>
>
> Regarding this feature, it’s basically mimicking the feature from AvroIO:https://beam.apache.org/releases/javadoc/2.26.0/org/apache/beam/sdk/io/AvroIO.html
>
>
>
> I have one more quick question regarding the “reading records of an unknown schema” scenario. In the sample code a PCollection<Foo> is being returned and the parseGenericRecords requires a parsing logic. What if I just want to get a PCollection<GenericRecord> instead of a specific class (e.g. Foo in the example)? I guess I can just skip the ParquetIO.parseGenericRecords transform? So do I still have to specify the dummy parsing logic like below? Thanks!
>
>
>
> p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, GenericRecord >() {
>
>        public Foo apply(GenericRecord record) {
>
>          return record;
>
>        }
>
>
>
> From: Alexey Romanenko <ar...@gmail.com>
> Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
> Date: Wednesday, January 6, 2021 at 10:13 AM
> To: "user@beam.apache.org" <us...@beam.apache.org>
> Subject: Re: Quick question regarding ParquetIO
>
>
>
> Hi Tao,
>
>
>
> This jira [1] looks exactly what you are asking but it was merged recently (thanks to Anant Damle for working on this!) and it should be available only in Beam 2.28.0.
>
>
>
> [1] https://issues.apache.org/jira/browse/BEAM-11460
>
>
>
> Regards,
>
> Alexey
>
>
>
>
> On 6 Jan 2021, at 18:57, Tao Li <ta...@zillow.com> wrote:
>
>
>
> Hi beam community,
>
>
>
> Quick question about ParquetIO. Is there a way to avoid specifying the avro schema when reading parquet files? The reason is that we may not know the parquet schema until we read the files. In comparison, spark parquet reader does not require such a schema specification.
>
>
>
> Please advise. Thanks a lot!
>
>
>
>
>
>
> --
>
> Yours Sincerely
> Kobe Feng

Re: Quick question regarding ParquetIO

Posted by Tao Li <ta...@zillow.com>.
@Kobe Feng<ma...@gmail.com> thank you so much for the insights. Agree that it may be a good practice to read all sorts of file formats (e.g. parquet, avro etc) into a PCollection<Row> and then perform the schema aware transforms that you are referring to.

The new dataframe APIs for Python SDK sound pretty cool and I can imagine it will save a lot of hassles during a beam app development. Hopefully it will be added to Java SDK as well.

From: Kobe Feng <fl...@gmail.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Friday, January 8, 2021 at 11:39 AM
To: "user@beam.apache.org" <us...@beam.apache.org>
Subject: Re: Quick question regarding ParquetIO

Tao,
I'm not an expert, and good intuition, all you want is schema awareness transformations or let's say schema based transformation in Beam not only for IO but also for other DoFn, etc, and possibly have schema revolution in future as well.

This is how I try to understand and explain in other places before:  Not like spark, flink to leverage internal/built-in types (e.g, catalyst struct type)  for built-in operators as more as possible to infer the schema when IOs could convert to, beam is trying to have capable to handle any type during transforms for people to migrate existing ones to beam (Do spark map partition func with own type, Encoder can't be avoided as well, right). Also yes, we could leverage beam own type "Row" to do all transformations and converting all in/out types like parquet, avro, orc, etc at IO side, and then do schema inferring in built-in operators base on row type when we know they will operate on internal types, that's how to avoid the coder or explicit schema there, more further, provide IO for schema registry capability and then transform will lookup when necessary for the revolution. I saw beam put schema base transformation in goals last year which will be convenient for people (since normally people would rather use builtin types instead of providing their own types' coder for following operators until we have to), that's why dataframe APIs for python SDK here I think.

Kobe


On Fri, Jan 8, 2021 at 9:34 AM Tao Li <ta...@zillow.com>> wrote:
Thanks Alexey for your explanation. That’s also what I was thinking. Parquet files already have the schema built in, so it might be feasible to infer a coder automatically (like spark parquet reader). It would be great if  we have some experts chime in here. @Brian Hulette<ma...@google.com> already mentioned that the community is working on new DataFrame APIs in Python SDK, which are based on the pandas methods and use those methods at construction time to determine the schema. I think this is very close to the schema inference we have been discussing. Not sure it will be available to Java SDK though.


Regarding BEAM-11460, looks like it may not totally solve my problem. As @Alexey Romanenko<ma...@gmail.com> mentioned, we may still need to know the avro or beam schema for following operations after the parquet read. A dumb question is, with BEAM-11460, after we get a PCollection<GenericRecord>  from parquet read (without the need to specify avro schema), is it possible to get the attached avro schema from a GenericRecord element of this PCollection<GenericRecord>?

Really appreciate it if you can help clarify my questions. Thanks!



From: Alexey Romanenko <ar...@gmail.com>>
Reply-To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Date: Friday, January 8, 2021 at 4:48 AM
To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Subject: Re: Quick question regarding ParquetIO

Well, this is how I see it, let me explain.

Since every PCollection is required to have a Coder to materialize the intermediate data, we need to have a coder for "PCollection<GenericRecord>" as well. If I’m not mistaken, for “GenericRecord" we used to set AvroCoder that is based on Avro (or Beam too?) schema.

Actually, currently it will throw an exception if you will try to use “parseGenericRecords()” with a PCollection<GenericRecord> as output pcollection since it can’t infer a Coder based on provided “parseFn”. I guess it was done intentially in this way and I doubt that we can have a proper coder for PCollection<GenericRecord> without knowing a schema. Maybe some Avro experts here can add more on this if we can somehow overcome it.

On 7 Jan 2021, at 19:44, Tao Li <ta...@zillow.com>> wrote:

Alexey,

Why do I need to set AvroCoder? I assume with BEAM-11460 we don’t need to specify a schema when reading parquet files to get aPCollection<GenericRecord>. Is my understanding correct? Am I missing anything here?

Thanks!

From: Alexey Romanenko <ar...@gmail.com>>
Reply-To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Date: Thursday, January 7, 2021 at 9:56 AM
To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Subject: Re: Quick question regarding ParquetIO

If you want to get just a PCollection<GenericRecord> as output then you would still need to set AvroCoder, but which schema to use in this case?


On 6 Jan 2021, at 19:53, Tao Li <ta...@zillow.com>> wrote:

Hi Alexey,

Thank you so much for this info. I will definitely give it a try once 2.28 is released.

Regarding this feature, it’s basically mimicking the feature from AvroIO:https://beam.apache.org/releases/javadoc/2.26.0/org/apache/beam/sdk/io/AvroIO.html<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.26.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FAvroIO.html&data=04%7C01%7Ctaol%40zillow.com%7C66ae839b698f4463966008d8b40d1478%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637457315565097462%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=zqWDMuEkWlL8MSWPAZNIeLdtK9lxuwHyNNmyALmAYr8%3D&reserved=0>

I have one more quick question regarding the “reading records of an unknown schema” scenario. In the sample code a PCollection<Foo> is being returned and the parseGenericRecords requires a parsing logic. What if I just want to get a PCollection<GenericRecord> instead of a specific class (e.g. Foo in the example)? I guess I can just skip the ParquetIO.parseGenericRecords transform? So do I still have to specify the dummy parsing logic like below? Thanks!

p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, GenericRecord >() {
       public Foo apply(GenericRecord record) {
         return record;
       }

From: Alexey Romanenko <ar...@gmail.com>>
Reply-To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Date: Wednesday, January 6, 2021 at 10:13 AM
To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Subject: Re: Quick question regarding ParquetIO

Hi Tao,

This jira [1] looks exactly what you are asking but it was merged recently (thanks to Anant Damle for working on this!) and it should be available only in Beam 2.28.0.

[1] https://issues.apache.org/jira/browse/BEAM-11460<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-11460&data=04%7C01%7Ctaol%40zillow.com%7C66ae839b698f4463966008d8b40d1478%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637457315565107417%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=naxxM2re90r1M9hxcqJeaUHBGo8i5v3beX3dwZn9Kyg%3D&reserved=0>

Regards,
Alexey



On 6 Jan 2021, at 18:57, Tao Li <ta...@zillow.com>> wrote:

Hi beam community,

Quick question about ParquetIO<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.25.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fparquet%2FParquetIO.html&data=04%7C01%7Ctaol%40zillow.com%7C66ae839b698f4463966008d8b40d1478%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637457315565107417%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=mI8Y%2FK7TMVsnRp1GXq1cHYDNppfGvm5lGXrImX3k7NE%3D&reserved=0>. Is there a way to avoid specifying the avro schema when reading parquet files? The reason is that we may not know the parquet schema until we read the files. In comparison, spark parquet reader<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fsql-data-sources-parquet.html&data=04%7C01%7Ctaol%40zillow.com%7C66ae839b698f4463966008d8b40d1478%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637457315565117373%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=2lRkbQsNNSQVuLT92z8FQSqGlgZXLdQvE2B2%2BkUy27o%3D&reserved=0> does not require such a schema specification.

Please advise. Thanks a lot!



--
Yours Sincerely
Kobe Feng

Re: Quick question regarding ParquetIO

Posted by Kobe Feng <fl...@gmail.com>.
Tao,
I'm not an expert, and good intuition, all you want is schema awareness
transformations or let's say schema based transformation in Beam not only
for IO but also for other DoFn, etc, and possibly have schema revolution in
future as well.

This is how I try to understand and explain in other places before:  Not
like spark, flink to leverage internal/built-in types (e.g, catalyst struct
type)  for built-in operators as more as possible to infer the schema when
IOs could convert to, beam is trying to have capable to handle any type
during transforms for people to migrate existing ones to beam (Do spark map
partition func with own type, Encoder can't be avoided as well, right).
Also yes, we could leverage beam own type "Row" to do all transformations
and converting all in/out types like parquet, avro, orc, etc at IO side,
and then do schema inferring in built-in operators base on row type when we
know they will operate on internal types, that's how to avoid the coder or
explicit schema there, more further, provide IO for schema
registry capability and then transform will lookup when necessary for the
revolution. I saw beam put schema base transformation in goals last year
which will be convenient for people (since normally people would rather use
builtin types instead of providing their own types' coder for following
operators until we have to), that's why dataframe APIs for python SDK here
I think.

Kobe


On Fri, Jan 8, 2021 at 9:34 AM Tao Li <ta...@zillow.com> wrote:

> Thanks Alexey for your explanation. That’s also what I was thinking.
> Parquet files already have the schema built in, so it might be feasible to
> infer a coder automatically (like spark parquet reader). It would be great
> if  we have some experts chime in here. @Brian Hulette
> <bh...@google.com> already mentioned that the community is working on
> new DataFrame APIs in Python SDK, which are based on the pandas methods and
> use those methods at construction time to determine the schema. I think
> this is very close to the schema inference we have been discussing. Not
> sure it will be available to Java SDK though.
>
>
>
> Regarding BEAM-11460, looks like it may not totally solve my problem. As @Alexey
> Romanenko <ar...@gmail.com> mentioned, we may still need to know
> the avro or beam schema for following operations after the parquet read. A
> dumb question is, with BEAM-11460, after we get a PCollection<GenericRecord>
>  from parquet read (without the need to specify avro schema), is it
> possible to get the attached avro schema from a GenericRecord element of
> this PCollection<GenericRecord>?
>
>
>
> Really appreciate it if you can help clarify my questions. Thanks!
>
>
>
>
>
> *From: *Alexey Romanenko <ar...@gmail.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Friday, January 8, 2021 at 4:48 AM
> *To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Subject: *Re: Quick question regarding ParquetIO
>
>
>
> Well, this is how I see it, let me explain.
>
>
>
> Since every PCollection is required to have a Coder to materialize the
> intermediate data, we need to have a coder for "PCollection<GenericRecord>"
> as well. If I’m not mistaken, for “GenericRecord" we used to set AvroCoder
> that is based on Avro (or Beam too?) schema.
>
>
>
> Actually, currently it will throw an exception if you will try to use
> “parseGenericRecords()” with a PCollection<GenericRecord> as output
> pcollection since it can’t infer a Coder based on provided “parseFn”. I
> guess it was done intentially in this way and I doubt that we can have a
> proper coder for PCollection<GenericRecord> without knowing a schema. Maybe
> some Avro experts here can add more on this if we can somehow overcome it.
>
>
>
> On 7 Jan 2021, at 19:44, Tao Li <ta...@zillow.com> wrote:
>
>
>
> Alexey,
>
>
>
> Why do I need to set AvroCoder? I assume with BEAM-11460 we don’t need to
> specify a schema when reading parquet files to get aPCollection<GenericRecord>.
> Is my understanding correct? Am I missing anything here?
>
>
>
> Thanks!
>
>
>
> *From: *Alexey Romanenko <ar...@gmail.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Thursday, January 7, 2021 at 9:56 AM
> *To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Subject: *Re: Quick question regarding ParquetIO
>
>
>
> If you want to get just a PCollection<GenericRecord> as output then you
> would still need to set AvroCoder, but which schema to use in this case?
>
>
>
>
> On 6 Jan 2021, at 19:53, Tao Li <ta...@zillow.com> wrote:
>
>
>
> Hi Alexey,
>
>
>
> Thank you so much for this info. I will definitely give it a try once 2.28
> is released.
>
>
>
> Regarding this feature, it’s basically mimicking the feature from AvroIO:
> https://beam.apache.org/releases/javadoc/2.26.0/org/apache/beam/sdk/io/AvroIO.html
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.26.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FAvroIO.html&data=04%7C01%7Ctaol%40zillow.com%7Cdab3777011ed4b6e0ec708d8b3d3c2b5%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637457069377524619%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=bouNPiimOXG8wvarFt2huIJ6cd8k5c2ekp2Sd4WqPjc%3D&reserved=0>
>
>
>
> I have one more quick question regarding the “reading records of an
> unknown schema” scenario. In the sample code a PCollection<Foo> is being
> returned and the parseGenericRecords requires a parsing logic. What if I
> just want to get a PCollection<GenericRecord> instead of a specific class
> (e.g. Foo in the example)? I guess I can just skip the
> ParquetIO.parseGenericRecords transform? So do I still have to specify the
> dummy parsing logic like below? Thanks!
>
>
>
> p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord,
> GenericRecord >() {
>
>        public Foo apply(GenericRecord record) {
>
>          return record;
>
>        }
>
>
>
> *From: *Alexey Romanenko <ar...@gmail.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Wednesday, January 6, 2021 at 10:13 AM
> *To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Subject: *Re: Quick question regarding ParquetIO
>
>
>
> Hi Tao,
>
>
>
> This jira [1] looks exactly what you are asking but it was merged recently
> (thanks to Anant Damle for working on this!) and it should be available
> only in Beam 2.28.0.
>
>
>
> [1] https://issues.apache.org/jira/browse/BEAM-11460
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-11460&data=04%7C01%7Ctaol%40zillow.com%7Cdab3777011ed4b6e0ec708d8b3d3c2b5%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637457069377534575%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=OSgzxc3ZpzDO71H5qgDn8cgOUrgTk3pa8r9TbbnjCWk%3D&reserved=0>
>
>
>
> Regards,
>
> Alexey
>
>
>
>
>
> On 6 Jan 2021, at 18:57, Tao Li <ta...@zillow.com> wrote:
>
>
>
> Hi beam community,
>
>
>
> Quick question about ParquetIO
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.25.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fparquet%2FParquetIO.html&data=04%7C01%7Ctaol%40zillow.com%7Cdab3777011ed4b6e0ec708d8b3d3c2b5%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637457069377534575%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=U4Cma5wX8rv8ZY%2FeDRa3s%2FuSgZmgwg5iL%2FdQYs1OSpA%3D&reserved=0>.
> Is there a way to avoid specifying the avro schema when reading parquet
> files? The reason is that we may not know the parquet schema until we read
> the files. In comparison, spark parquet reader
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fsql-data-sources-parquet.html&data=04%7C01%7Ctaol%40zillow.com%7Cdab3777011ed4b6e0ec708d8b3d3c2b5%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637457069377544530%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=zAmSyGlNveyeI4aKA5GreuLxBKrwRDS0pM55CP6bzeY%3D&reserved=0>
>  does not require such a schema specification.
>
>
>
> Please advise. Thanks a lot!
>
>
>


-- 
Yours Sincerely
Kobe Feng

Re: Quick question regarding ParquetIO

Posted by Tao Li <ta...@zillow.com>.
Thanks Alexey for your explanation. That’s also what I was thinking. Parquet files already have the schema built in, so it might be feasible to infer a coder automatically (like spark parquet reader). It would be great if  we have some experts chime in here. @Brian Hulette<ma...@google.com> already mentioned that the community is working on new DataFrame APIs in Python SDK, which are based on the pandas methods and use those methods at construction time to determine the schema. I think this is very close to the schema inference we have been discussing. Not sure it will be available to Java SDK though.


Regarding BEAM-11460, looks like it may not totally solve my problem. As @Alexey Romanenko<ma...@gmail.com> mentioned, we may still need to know the avro or beam schema for following operations after the parquet read. A dumb question is, with BEAM-11460, after we get a PCollection<GenericRecord>  from parquet read (without the need to specify avro schema), is it possible to get the attached avro schema from a GenericRecord element of this PCollection<GenericRecord>?

Really appreciate it if you can help clarify my questions. Thanks!



From: Alexey Romanenko <ar...@gmail.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Friday, January 8, 2021 at 4:48 AM
To: "user@beam.apache.org" <us...@beam.apache.org>
Subject: Re: Quick question regarding ParquetIO

Well, this is how I see it, let me explain.

Since every PCollection is required to have a Coder to materialize the intermediate data, we need to have a coder for "PCollection<GenericRecord>" as well. If I’m not mistaken, for “GenericRecord" we used to set AvroCoder that is based on Avro (or Beam too?) schema.

Actually, currently it will throw an exception if you will try to use “parseGenericRecords()” with a PCollection<GenericRecord> as output pcollection since it can’t infer a Coder based on provided “parseFn”. I guess it was done intentially in this way and I doubt that we can have a proper coder for PCollection<GenericRecord> without knowing a schema. Maybe some Avro experts here can add more on this if we can somehow overcome it.


On 7 Jan 2021, at 19:44, Tao Li <ta...@zillow.com>> wrote:

Alexey,

Why do I need to set AvroCoder? I assume with BEAM-11460 we don’t need to specify a schema when reading parquet files to get aPCollection<GenericRecord>. Is my understanding correct? Am I missing anything here?

Thanks!

From: Alexey Romanenko <ar...@gmail.com>>
Reply-To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Date: Thursday, January 7, 2021 at 9:56 AM
To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Subject: Re: Quick question regarding ParquetIO

If you want to get just a PCollection<GenericRecord> as output then you would still need to set AvroCoder, but which schema to use in this case?



On 6 Jan 2021, at 19:53, Tao Li <ta...@zillow.com>> wrote:

Hi Alexey,

Thank you so much for this info. I will definitely give it a try once 2.28 is released.

Regarding this feature, it’s basically mimicking the feature from AvroIO:https://beam.apache.org/releases/javadoc/2.26.0/org/apache/beam/sdk/io/AvroIO.html<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.26.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FAvroIO.html&data=04%7C01%7Ctaol%40zillow.com%7Cdab3777011ed4b6e0ec708d8b3d3c2b5%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637457069377524619%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=bouNPiimOXG8wvarFt2huIJ6cd8k5c2ekp2Sd4WqPjc%3D&reserved=0>

I have one more quick question regarding the “reading records of an unknown schema” scenario. In the sample code a PCollection<Foo> is being returned and the parseGenericRecords requires a parsing logic. What if I just want to get a PCollection<GenericRecord> instead of a specific class (e.g. Foo in the example)? I guess I can just skip the ParquetIO.parseGenericRecords transform? So do I still have to specify the dummy parsing logic like below? Thanks!

p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, GenericRecord >() {
       public Foo apply(GenericRecord record) {
         return record;
       }

From: Alexey Romanenko <ar...@gmail.com>>
Reply-To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Date: Wednesday, January 6, 2021 at 10:13 AM
To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Subject: Re: Quick question regarding ParquetIO

Hi Tao,

This jira [1] looks exactly what you are asking but it was merged recently (thanks to Anant Damle for working on this!) and it should be available only in Beam 2.28.0.

[1] https://issues.apache.org/jira/browse/BEAM-11460<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-11460&data=04%7C01%7Ctaol%40zillow.com%7Cdab3777011ed4b6e0ec708d8b3d3c2b5%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637457069377534575%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=OSgzxc3ZpzDO71H5qgDn8cgOUrgTk3pa8r9TbbnjCWk%3D&reserved=0>

Regards,
Alexey




On 6 Jan 2021, at 18:57, Tao Li <ta...@zillow.com>> wrote:

Hi beam community,

Quick question about ParquetIO<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.25.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fparquet%2FParquetIO.html&data=04%7C01%7Ctaol%40zillow.com%7Cdab3777011ed4b6e0ec708d8b3d3c2b5%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637457069377534575%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=U4Cma5wX8rv8ZY%2FeDRa3s%2FuSgZmgwg5iL%2FdQYs1OSpA%3D&reserved=0>. Is there a way to avoid specifying the avro schema when reading parquet files? The reason is that we may not know the parquet schema until we read the files. In comparison, spark parquet reader<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fsql-data-sources-parquet.html&data=04%7C01%7Ctaol%40zillow.com%7Cdab3777011ed4b6e0ec708d8b3d3c2b5%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637457069377544530%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=zAmSyGlNveyeI4aKA5GreuLxBKrwRDS0pM55CP6bzeY%3D&reserved=0> does not require such a schema specification.

Please advise. Thanks a lot!


Re: Quick question regarding ParquetIO

Posted by Alexey Romanenko <ar...@gmail.com>.
Well, this is how I see it, let me explain. 

Since every PCollection is required to have a Coder to materialize the intermediate data, we need to have a coder for "PCollection<GenericRecord>" as well. If I’m not mistaken, for “GenericRecord" we used to set AvroCoder that is based on Avro (or Beam too?) schema.

Actually, currently it will throw an exception if you will try to use “parseGenericRecords()” with a PCollection<GenericRecord> as output pcollection since it can’t infer a Coder based on provided “parseFn”. I guess it was done intentially in this way and I doubt that we can have a proper coder for PCollection<GenericRecord> without knowing a schema. Maybe some Avro experts here can add more on this if we can somehow overcome it.

> On 7 Jan 2021, at 19:44, Tao Li <ta...@zillow.com> wrote:
> 
> Alexey,
>  
> Why do I need to set AvroCoder? I assume with BEAM-11460 we don’t need to specify a schema when reading parquet files to get aPCollection<GenericRecord>. Is my understanding correct? Am I missing anything here?
>  
> Thanks!
>  
> From: Alexey Romanenko <ar...@gmail.com>
> Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
> Date: Thursday, January 7, 2021 at 9:56 AM
> To: "user@beam.apache.org" <us...@beam.apache.org>
> Subject: Re: Quick question regarding ParquetIO
>  
> If you want to get just a PCollection<GenericRecord> as output then you would still need to set AvroCoder, but which schema to use in this case? 
> 
> 
>> On 6 Jan 2021, at 19:53, Tao Li <taol@zillow.com <ma...@zillow.com>> wrote:
>>  
>> Hi Alexey,
>>  
>> Thank you so much for this info. I will definitely give it a try once 2.28 is released.
>>  
>> Regarding this feature, it’s basically mimicking the feature from AvroIO:https://beam.apache.org/releases/javadoc/2.26.0/org/apache/beam/sdk/io/AvroIO.html <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.26.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FAvroIO.html&data=04%7C01%7Ctaol%40zillow.com%7C7309c049186b4f96709608d8b33592df%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637456389975572542%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=DFj0d81U%2F0rjQ3loRTzMLBOZFdJ9rEPK2PERsu7KgAo%3D&reserved=0>
>>  
>> I have one more quick question regarding the “reading records of an unknown schema” scenario. In the sample code a PCollection<Foo> is being returned and the parseGenericRecords requires a parsing logic. What if I just want to get a PCollection<GenericRecord> instead of a specific class (e.g. Foo in the example)? I guess I can just skip the ParquetIO.parseGenericRecords transform? So do I still have to specify the dummy parsing logic like below? Thanks!
>>  
>> p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, GenericRecord >() {
>>        public Foo apply(GenericRecord record) {
>>          return record;
>>        }
>>  
>> From: Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>>
>> Reply-To: "user@beam.apache.org <ma...@beam.apache.org>" <user@beam.apache.org <ma...@beam.apache.org>>
>> Date: Wednesday, January 6, 2021 at 10:13 AM
>> To: "user@beam.apache.org <ma...@beam.apache.org>" <user@beam.apache.org <ma...@beam.apache.org>>
>> Subject: Re: Quick question regarding ParquetIO
>>  
>> Hi Tao,
>>  
>> This jira [1] looks exactly what you are asking but it was merged recently (thanks to Anant Damle for working on this!) and it should be available only in Beam 2.28.0.
>>  
>> [1] https://issues.apache.org/jira/browse/BEAM-11460 <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-11460&data=04%7C01%7Ctaol%40zillow.com%7C7309c049186b4f96709608d8b33592df%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637456389975572542%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=YpH3Rtz%2FcnE9LwfLzNyPOalaW8OUSL5sxffolKiOv%2Bk%3D&reserved=0>
>>  
>> Regards,
>> Alexey
>> 
>> 
>> 
>>> On 6 Jan 2021, at 18:57, Tao Li <taol@zillow.com <ma...@zillow.com>> wrote:
>>>  
>>> Hi beam community,
>>>  
>>> Quick question about ParquetIO <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.25.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fparquet%2FParquetIO.html&data=04%7C01%7Ctaol%40zillow.com%7C7309c049186b4f96709608d8b33592df%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637456389975582489%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=cr5MTRb4cZCLof85nfPUxtMKGRQvhJ4zLPEJa7STEjM%3D&reserved=0>. Is there a way to avoid specifying the avro schema when reading parquet files? The reason is that we may not know the parquet schema until we read the files. In comparison, spark parquet reader <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fsql-data-sources-parquet.html&data=04%7C01%7Ctaol%40zillow.com%7C7309c049186b4f96709608d8b33592df%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637456389975582489%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=WJWWqx%2B4OLzHeypOs1Dyvlio9fg%2BXGGk1OgocJu3m8g%3D&reserved=0> does not require such a schema specification.
>>>  
>>> Please advise. Thanks a lot!


Re: Quick question regarding ParquetIO

Posted by Tao Li <ta...@zillow.com>.
Alexey,

Why do I need to set AvroCoder? I assume with BEAM-11460 we don’t need to specify a schema when reading parquet files to get a PCollection<GenericRecord>. Is my understanding correct? Am I missing anything here?

Thanks!

From: Alexey Romanenko <ar...@gmail.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Thursday, January 7, 2021 at 9:56 AM
To: "user@beam.apache.org" <us...@beam.apache.org>
Subject: Re: Quick question regarding ParquetIO

If you want to get just a PCollection<GenericRecord> as output then you would still need to set AvroCoder, but which schema to use in this case?


On 6 Jan 2021, at 19:53, Tao Li <ta...@zillow.com>> wrote:

Hi Alexey,

Thank you so much for this info. I will definitely give it a try once 2.28 is released.

Regarding this feature, it’s basically mimicking the feature from AvroIO:https://beam.apache.org/releases/javadoc/2.26.0/org/apache/beam/sdk/io/AvroIO.html<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.26.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FAvroIO.html&data=04%7C01%7Ctaol%40zillow.com%7C7309c049186b4f96709608d8b33592df%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637456389975572542%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=DFj0d81U%2F0rjQ3loRTzMLBOZFdJ9rEPK2PERsu7KgAo%3D&reserved=0>

I have one more quick question regarding the “reading records of an unknown schema” scenario. In the sample code a PCollection<Foo> is being returned and the parseGenericRecords requires a parsing logic. What if I just want to get a PCollection<GenericRecord> instead of a specific class (e.g. Foo in the example)? I guess I can just skip the ParquetIO.parseGenericRecords transform? So do I still have to specify the dummy parsing logic like below? Thanks!

p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, GenericRecord >() {
       public Foo apply(GenericRecord record) {
         return record;
       }

From: Alexey Romanenko <ar...@gmail.com>>
Reply-To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Date: Wednesday, January 6, 2021 at 10:13 AM
To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Subject: Re: Quick question regarding ParquetIO

Hi Tao,

This jira [1] looks exactly what you are asking but it was merged recently (thanks to Anant Damle for working on this!) and it should be available only in Beam 2.28.0.

[1] https://issues.apache.org/jira/browse/BEAM-11460<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-11460&data=04%7C01%7Ctaol%40zillow.com%7C7309c049186b4f96709608d8b33592df%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637456389975572542%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=YpH3Rtz%2FcnE9LwfLzNyPOalaW8OUSL5sxffolKiOv%2Bk%3D&reserved=0>

Regards,
Alexey



On 6 Jan 2021, at 18:57, Tao Li <ta...@zillow.com>> wrote:

Hi beam community,

Quick question about ParquetIO<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.25.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fparquet%2FParquetIO.html&data=04%7C01%7Ctaol%40zillow.com%7C7309c049186b4f96709608d8b33592df%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637456389975582489%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=cr5MTRb4cZCLof85nfPUxtMKGRQvhJ4zLPEJa7STEjM%3D&reserved=0>. Is there a way to avoid specifying the avro schema when reading parquet files? The reason is that we may not know the parquet schema until we read the files. In comparison, spark parquet reader<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fsql-data-sources-parquet.html&data=04%7C01%7Ctaol%40zillow.com%7C7309c049186b4f96709608d8b33592df%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637456389975582489%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=WJWWqx%2B4OLzHeypOs1Dyvlio9fg%2BXGGk1OgocJu3m8g%3D&reserved=0> does not require such a schema specification.

Please advise. Thanks a lot!



Re: Quick question regarding ParquetIO

Posted by Alexey Romanenko <ar...@gmail.com>.
If you want to get just a PCollection<GenericRecord> as output then you would still need to set AvroCoder, but which schema to use in this case? 

> On 6 Jan 2021, at 19:53, Tao Li <ta...@zillow.com> wrote:
> 
> Hi Alexey,
>  
> Thank you so much for this info. I will definitely give it a try once 2.28 is released.
>  
> Regarding this feature, it’s basically mimicking the feature from AvroIO:https://beam.apache.org/releases/javadoc/2.26.0/org/apache/beam/sdk/io/AvroIO.html <https://beam.apache.org/releases/javadoc/2.26.0/org/apache/beam/sdk/io/AvroIO.html>
>  
> I have one more quick question regarding the “reading records of an unknown schema” scenario. In the sample code a PCollection<Foo> is being returned and the parseGenericRecords requires a parsing logic. What if I just want to get a PCollection<GenericRecord> instead of a specific class (e.g. Foo in the example)? I guess I can just skip the ParquetIO.parseGenericRecords transform? So do I still have to specify the dummy parsing logic like below? Thanks!
>  
> p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, GenericRecord >() {
>        public Foo apply(GenericRecord record) {
>          return record;
>        }
>  
> From: Alexey Romanenko <ar...@gmail.com>
> Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
> Date: Wednesday, January 6, 2021 at 10:13 AM
> To: "user@beam.apache.org" <us...@beam.apache.org>
> Subject: Re: Quick question regarding ParquetIO
>  
> Hi Tao,
>  
> This jira [1] looks exactly what you are asking but it was merged recently (thanks to Anant Damle for working on this!) and it should be available only in Beam 2.28.0.
>  
> [1] https://issues.apache.org/jira/browse/BEAM-11460 <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-11460&data=04%7C01%7Ctaol%40zillow.com%7Cc1a2c7a32ee64bdaf32b08d8b26ec466%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637455536115879373%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=pLjqharsCRGvC7%2FJNPtOwMBAsXbNfujs%2BCnbbew0MLA%3D&reserved=0>
>  
> Regards,
> Alexey
> 
> 
>> On 6 Jan 2021, at 18:57, Tao Li <taol@zillow.com <ma...@zillow.com>> wrote:
>>  
>> Hi beam community,
>>  
>> Quick question about ParquetIO <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.25.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fparquet%2FParquetIO.html&data=04%7C01%7Ctaol%40zillow.com%7Cc1a2c7a32ee64bdaf32b08d8b26ec466%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637455536115889330%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=NvZGeUUZoMNBqRVBNNviMUq6uanJH4XNk05EEHTrngc%3D&reserved=0>. Is there a way to avoid specifying the avro schema when reading parquet files? The reason is that we may not know the parquet schema until we read the files. In comparison, spark parquet reader <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fsql-data-sources-parquet.html&data=04%7C01%7Ctaol%40zillow.com%7Cc1a2c7a32ee64bdaf32b08d8b26ec466%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637455536115889330%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=xc4IanHypjltv8PeeDbt9eSQpgyFNUxE9nv1SgB2eTQ%3D&reserved=0> does not require such a schema specification.
>>  
>> Please advise. Thanks a lot!


Re: Quick question regarding ParquetIO

Posted by Tao Li <ta...@zillow.com>.
Hi Alexey,

Thank you so much for this info. I will definitely give it a try once 2.28 is released.

Regarding this feature, it’s basically mimicking the feature from AvroIO: https://beam.apache.org/releases/javadoc/2.26.0/org/apache/beam/sdk/io/AvroIO.html

I have one more quick question regarding the “reading records of an unknown schema” scenario. In the sample code a PCollection<Foo> is being returned and the parseGenericRecords requires a parsing logic. What if I just want to get a PCollection<GenericRecord> instead of a specific class (e.g. Foo in the example)? I guess I can just skip the ParquetIO.parseGenericRecords transform? So do I still have to specify the dummy parsing logic like below? Thanks!

p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, GenericRecord >() {
       public Foo apply(GenericRecord record) {
         return record;
       }

From: Alexey Romanenko <ar...@gmail.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Wednesday, January 6, 2021 at 10:13 AM
To: "user@beam.apache.org" <us...@beam.apache.org>
Subject: Re: Quick question regarding ParquetIO

Hi Tao,

This jira [1] looks exactly what you are asking but it was merged recently (thanks to Anant Damle for working on this!) and it should be available only in Beam 2.28.0.

[1] https://issues.apache.org/jira/browse/BEAM-11460<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-11460&data=04%7C01%7Ctaol%40zillow.com%7Cc1a2c7a32ee64bdaf32b08d8b26ec466%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637455536115879373%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=pLjqharsCRGvC7%2FJNPtOwMBAsXbNfujs%2BCnbbew0MLA%3D&reserved=0>

Regards,
Alexey


On 6 Jan 2021, at 18:57, Tao Li <ta...@zillow.com>> wrote:

Hi beam community,

Quick question about ParquetIO<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.25.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fparquet%2FParquetIO.html&data=04%7C01%7Ctaol%40zillow.com%7Cc1a2c7a32ee64bdaf32b08d8b26ec466%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637455536115889330%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=NvZGeUUZoMNBqRVBNNviMUq6uanJH4XNk05EEHTrngc%3D&reserved=0>. Is there a way to avoid specifying the avro schema when reading parquet files? The reason is that we may not know the parquet schema until we read the files. In comparison, spark parquet reader<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fsql-data-sources-parquet.html&data=04%7C01%7Ctaol%40zillow.com%7Cc1a2c7a32ee64bdaf32b08d8b26ec466%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637455536115889330%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=xc4IanHypjltv8PeeDbt9eSQpgyFNUxE9nv1SgB2eTQ%3D&reserved=0> does not require such a schema specification.

Please advise. Thanks a lot!


Re: Quick question regarding ParquetIO

Posted by Alexey Romanenko <ar...@gmail.com>.
Hi Tao,

This jira [1] looks exactly what you are asking but it was merged recently (thanks to Anant Damle for working on this!) and it should be available only in Beam 2.28.0.

[1] https://issues.apache.org/jira/browse/BEAM-11460

Regards,
Alexey

> On 6 Jan 2021, at 18:57, Tao Li <ta...@zillow.com> wrote:
> 
> Hi beam community,
>  
> Quick question about ParquetIO <https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/parquet/ParquetIO.html>. Is there a way to avoid specifying the avro schema when reading parquet files? The reason is that we may not know the parquet schema until we read the files. In comparison, spark parquet reader <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html> does not require such a schema specification.
>  
> Please advise. Thanks a lot!