You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Sridevi Nookala <sn...@parallelwireless.com> on 2019/01/14 01:00:21 UTC

ParquetIO write of CSV document data

hi,


I have a bunch of CSV data files that i need to store in Parquet format. I did look at basic documentation on ParquetIO. and ParquetIO.sink() can be used to achive the same.

However there is a dependency on the Avro Schema.

how do i infer/generate Avro schema from CSV document data ?

Does beam have any API for the same.

I tried using Kite SDK API CSVUtil / JsonUtil but had no luck generating avro schema

my CSV data files have headers in them and quite a few of the header fields are hyphenated which are not liked by Kite 's CSVUtil


I think it will be a redundant effort to convert CSV documents to json documents .

Any suggestions on how to infer avro schema from CSV data or a JSON schema will be helpful


thanks

Sri


Re: ParquetIO write of CSV document data

Posted by Sridevi Nookala <sn...@parallelwireless.com>.
hi alex/Jeff


Yes. It did help. however there are other issues that i am running into

the producer of Parquet files is a different source in my case

( field names are having the infamous '-' and '.' in them .  TELCO /3GPP world uses them. They were generated using python pandas/pyarrow  which did not care /complain)

Now i have issues building avro schema and i don't know if there is way to circumvent this and disable field validation etc.


Eventually i have the need to read multiple parquet files, group the parquet based on some criteria, expand and combine them as one big parquet on  a daily basis

the source provided 15 min parquet chunks


Any suggestions here will be helpful

thanks

Sri




________________________________
From: Alexey Romanenko <ar...@gmail.com>
Sent: Friday, January 25, 2019 10:31:37 AM
To: user@beam.apache.org
Subject: Re: ParquetIO write of CSV document data

Hi

Great that this example helped you.
Also, as I can see, Jeff Klukas already answered your questions in the other thread “How to disable sharding with FileIO.write()/FileIO.writeDynamic”.

On 24 Jan 2019, at 03:44, Sridevi Nookala <sn...@parallelwireless.com>> wrote:


Hi Alex,

Thanks for the suggestion. I tried like in the github example by infering AVRO schema,

 PCollection<String> input =
            pipeline.apply(TextIO.read().from("/tmp/beam/input.csv"));
       input
        .apply("Produce Avro records", ParDo.of(new DeterministicallyConstructAvroRecordsFn()))
.setCoder(AvroCoder.of(SCHEMA))
        .apply(
            "Write Parquet files",
            FileIO.<GenericRecord>write().via(ParquetIO.sink(SCHEMA)).to("/tmp/parquet/"));

    pipeline.run();
  }

have 2 simple questions

  1.  how can i disable sharding with FileIO.write().  i want a single parquet file from a single CSV
  2.   how can i change the above to have a custom naming for my parquet file
  3.  do i have to use FileIO.writeDynamic() ?

Hi Lucas,

I am newbie so not there yet to solve BEAM jira's , but it will help immensly if AVRO scehma inference is avoided
some thing like python pandas/pyarrow does

thanks for your help
Sri

________________________________
From: Sridevi Nookala
Sent: Wednesday, January 23, 2019 9:41:02 PM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: Re: ParquetIO write of CSV document data

Hi Alex,

Thanks for the suggestion. I tried like in the github example by infering AVRO schema,


 PCollection<String> input =
            pipeline.apply(TextIO.read().from("/tmp/beam/input.csv"));
       input
        .apply("Produce Avro records", ParDo.of(new DeterministicallyConstructAvroRecordsFn()))
.setCoder(AvroCoder.of(SCHEMA))
        .apply(
            "Write Parquet files",
            FileIO.<GenericRecord>write().via(ParquetIO.sink(SCHEMA)).to("/tmp/parquet/"));

    pipeline.run();
  }


________________________________
From: Łukasz Gajowy <lu...@gmail.com>>
Sent: Tuesday, January 15, 2019 7:02:56 AM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: Re: ParquetIO write of CSV document data

Hi Sri,

it's exactly as Alexey says, although there are plans/ideas to improve ParquetIO in a way that would not require defining the schema manually.

Some Jiras that might be interesting in this topic but not yet resolved (maybe you are willing to contribute?):
https://issues.apache.org/jira/browse/BEAM-4454<https://issues.apache.org/jira/browse/BEAM-4454>
https://issues.apache.org/jira/browse/BEAM-4812<https://issues.apache.org/jira/browse/BEAM-4812>
https://issues.apache.org/jira/browse/BEAM-6394<https://issues.apache.org/jira/browse/BEAM-6394>

Thanks,
Łukasz

pon., 14 sty 2019 o 19:16 Alexey Romanenko <ar...@gmail.com>> napisał(a):
Hi Sri,

Afaik, you have to create “PCollection" of "GenericRecord”s and define your Avro schema manually to write your data into Parquet files.
In this case, you will need to create a ParDo for this translation. Also, I expect that your schema is the same for all CSV files.

Basic example of using Parquet Sink with Java SDK could be found here [1]

[1] https://git.io/fhcfV<https://git.io/fhcfV>


On 14 Jan 2019, at 02:00, Sridevi Nookala <sn...@parallelwireless.com>> wrote:

hi,

I have a bunch of CSV data files that i need to store in Parquet format. I did look at basic documentation on ParquetIO. and ParquetIO.sink() can be used to achive the same.
However there is a dependency on the Avro Schema.
how do i infer/generate Avro schema from CSV document data ?
Does beam have any API for the same.
I tried using Kite SDK API CSVUtil / JsonUtil but had no luck generating avro schema
my CSV data files have headers in them and quite a few of the header fields are hyphenated which are not liked by Kite 's CSVUtil

I think it will be a redundant effort to convert CSV documents to json documents .
Any suggestions on how to infer avro schema from CSV data or a JSON schema will be helpful

thanks
Sri


Re: ParquetIO write of CSV document data

Posted by Alexey Romanenko <ar...@gmail.com>.
Hi 

Great that this example helped you. 
Also, as I can see, Jeff Klukas already answered your questions in the other thread “How to disable sharding with FileIO.write()/FileIO.writeDynamic”.

> On 24 Jan 2019, at 03:44, Sridevi Nookala <sn...@parallelwireless.com> wrote:
> 
> Hi Alex,
> 
> Thanks for the suggestion. I tried like in the github example by infering AVRO schema,
> 
>  PCollection<String> input =
>             pipeline.apply(TextIO.read().from("/tmp/beam/input.csv"));
>        input
>         .apply("Produce Avro records", ParDo.of(new DeterministicallyConstructAvroRecordsFn()))     
> .setCoder(AvroCoder.of(SCHEMA))
>         .apply(
>             "Write Parquet files",
>             FileIO.<GenericRecord>write().via(ParquetIO.sink(SCHEMA)).to("/tmp/parquet/"));
> 
>     pipeline.run();
>   }
> 
> have 2 simple questions
> how can i disable sharding with FileIO.write().  i want a single parquet file from a single CSV
>  how can i change the above to have a custom naming for my parquet file
> do i have to use FileIO.writeDynamic() ?
> 
> Hi Lucas,
> 
> I am newbie so not there yet to solve BEAM jira's , but it will help immensly if AVRO scehma inference is avoided
> some thing like python pandas/pyarrow does
> 
> thanks for your help
> Sri
> 
> From: Sridevi Nookala
> Sent: Wednesday, January 23, 2019 9:41:02 PM
> To: user@beam.apache.org
> Subject: Re: ParquetIO write of CSV document data
>  
> Hi Alex,
> 
> Thanks for the suggestion. I tried like in the github example by infering AVRO schema,
> 
>  PCollection<String> input =
>             pipeline.apply(TextIO.read().from("/tmp/beam/input.csv"));
>        input
>         .apply("Produce Avro records", ParDo.of(new DeterministicallyConstructAvroRecordsFn()))     
> .setCoder(AvroCoder.of(SCHEMA))
>         .apply(
>             "Write Parquet files",
>             FileIO.<GenericRecord>write().via(ParquetIO.sink(SCHEMA)).to("/tmp/parquet/"));
> 
>     pipeline.run();
>   }
> 
> From: Łukasz Gajowy <lu...@gmail.com>
> Sent: Tuesday, January 15, 2019 7:02:56 AM
> To: user@beam.apache.org
> Subject: Re: ParquetIO write of CSV document data
>  
> Hi Sri, 
> 
> it's exactly as Alexey says, although there are plans/ideas to improve ParquetIO in a way that would not require defining the schema manually. 
> 
> Some Jiras that might be interesting in this topic but not yet resolved (maybe you are willing to contribute?): 
> https://issues.apache.org/jira/browse/BEAM-4454 <https://issues.apache.org/jira/browse/BEAM-4454>
> https://issues.apache.org/jira/browse/BEAM-4812 <https://issues.apache.org/jira/browse/BEAM-4812>
> https://issues.apache.org/jira/browse/BEAM-6394 <https://issues.apache.org/jira/browse/BEAM-6394>
> 
> Thanks, 
> Łukasz
> 
> pon., 14 sty 2019 o 19:16 Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> napisał(a):
> Hi Sri,
> 
> Afaik, you have to create “PCollection" of "GenericRecord”s and define your Avro schema manually to write your data into Parquet files.
> In this case, you will need to create a ParDo for this translation. Also, I expect that your schema is the same for all CSV files.
> 
> Basic example of using Parquet Sink with Java SDK could be found here [1]
> 
> [1] https://git.io/fhcfV <https://git.io/fhcfV>
> 
> 
>> On 14 Jan 2019, at 02:00, Sridevi Nookala <snookala@parallelwireless.com <ma...@parallelwireless.com>> wrote:
>> 
>> hi,
>> 
>> I have a bunch of CSV data files that i need to store in Parquet format. I did look at basic documentation on ParquetIO. and ParquetIO.sink() can be used to achive the same.
>> However there is a dependency on the Avro Schema.
>> how do i infer/generate Avro schema from CSV document data ?
>> Does beam have any API for the same.
>> I tried using Kite SDK API CSVUtil / JsonUtil but had no luck generating avro schema
>> my CSV data files have headers in them and quite a few of the header fields are hyphenated which are not liked by Kite 's CSVUtil
>> 
>> I think it will be a redundant effort to convert CSV documents to json documents .
>> Any suggestions on how to infer avro schema from CSV data or a JSON schema will be helpful
>> 
>> thanks
>> Sri


Re: ParquetIO write of CSV document data

Posted by Sridevi Nookala <sn...@parallelwireless.com>.
Hi Alex,


Thanks for the suggestion. I tried like in the github example by infering AVRO schema,


 PCollection<String> input =
            pipeline.apply(TextIO.read().from("/tmp/beam/input.csv"));
       input
        .apply("Produce Avro records", ParDo.of(new DeterministicallyConstructAvroRecordsFn()))
.setCoder(AvroCoder.of(SCHEMA))
        .apply(
            "Write Parquet files",
            FileIO.<GenericRecord>write().via(ParquetIO.sink(SCHEMA)).to("/tmp/parquet/"));

    pipeline.run();
  }


have 2 simple questions

  1.  how can i disable sharding with FileIO.write().  i want a single parquet file from a single CSV
  2.   how can i change the above to have a custom naming for my parquet file
  3.  do i have to use FileIO.writeDynamic() ?

Hi Lucas,

I am newbie so not there yet to solve BEAM jira's , but it will help immensly if AVRO scehma inference is avoided
some thing like python pandas/pyarrow does

thanks for your help
Sri

________________________________
From: Sridevi Nookala
Sent: Wednesday, January 23, 2019 9:41:02 PM
To: user@beam.apache.org
Subject: Re: ParquetIO write of CSV document data


Hi Alex,


Thanks for the suggestion. I tried like in the github example by infering AVRO schema,


 PCollection<String> input =
            pipeline.apply(TextIO.read().from("/tmp/beam/input.csv"));
       input
        .apply("Produce Avro records", ParDo.of(new DeterministicallyConstructAvroRecordsFn()))
.setCoder(AvroCoder.of(SCHEMA))
        .apply(
            "Write Parquet files",
            FileIO.<GenericRecord>write().via(ParquetIO.sink(SCHEMA)).to("/tmp/parquet/"));

    pipeline.run();
  }


________________________________
From: Łukasz Gajowy <lu...@gmail.com>
Sent: Tuesday, January 15, 2019 7:02:56 AM
To: user@beam.apache.org
Subject: Re: ParquetIO write of CSV document data

Hi Sri,

it's exactly as Alexey says, although there are plans/ideas to improve ParquetIO in a way that would not require defining the schema manually.

Some Jiras that might be interesting in this topic but not yet resolved (maybe you are willing to contribute?):
https://issues.apache.org/jira/browse/BEAM-4454<https://issues.apache.org/jira/browse/BEAM-4454>
https://issues.apache.org/jira/browse/BEAM-4812<https://issues.apache.org/jira/browse/BEAM-4812>
https://issues.apache.org/jira/browse/BEAM-6394<https://issues.apache.org/jira/browse/BEAM-6394>

Thanks,
Łukasz

pon., 14 sty 2019 o 19:16 Alexey Romanenko <ar...@gmail.com>> napisał(a):
Hi Sri,

Afaik, you have to create “PCollection" of "GenericRecord”s and define your Avro schema manually to write your data into Parquet files.
In this case, you will need to create a ParDo for this translation. Also, I expect that your schema is the same for all CSV files.

Basic example of using Parquet Sink with Java SDK could be found here [1]

[1] https://git.io/fhcfV<https://git.io/fhcfV>


On 14 Jan 2019, at 02:00, Sridevi Nookala <sn...@parallelwireless.com>> wrote:

hi,

I have a bunch of CSV data files that i need to store in Parquet format. I did look at basic documentation on ParquetIO. and ParquetIO.sink() can be used to achive the same.
However there is a dependency on the Avro Schema.
how do i infer/generate Avro schema from CSV document data ?
Does beam have any API for the same.
I tried using Kite SDK API CSVUtil / JsonUtil but had no luck generating avro schema
my CSV data files have headers in them and quite a few of the header fields are hyphenated which are not liked by Kite 's CSVUtil

I think it will be a redundant effort to convert CSV documents to json documents .
Any suggestions on how to infer avro schema from CSV data or a JSON schema will be helpful

thanks
Sri


Re: ParquetIO write of CSV document data

Posted by Sridevi Nookala <sn...@parallelwireless.com>.
Hi Alex,


Thanks for the suggestion. I tried like in the github example by infering AVRO schema,


 PCollection<String> input =
            pipeline.apply(TextIO.read().from("/tmp/beam/input.csv"));
       input
        .apply("Produce Avro records", ParDo.of(new DeterministicallyConstructAvroRecordsFn()))
.setCoder(AvroCoder.of(SCHEMA))
        .apply(
            "Write Parquet files",
            FileIO.<GenericRecord>write().via(ParquetIO.sink(SCHEMA)).to("/tmp/parquet/"));

    pipeline.run();
  }


________________________________
From: Łukasz Gajowy <lu...@gmail.com>
Sent: Tuesday, January 15, 2019 7:02:56 AM
To: user@beam.apache.org
Subject: Re: ParquetIO write of CSV document data

Hi Sri,

it's exactly as Alexey says, although there are plans/ideas to improve ParquetIO in a way that would not require defining the schema manually.

Some Jiras that might be interesting in this topic but not yet resolved (maybe you are willing to contribute?):
https://issues.apache.org/jira/browse/BEAM-4454<https://issues.apache.org/jira/browse/BEAM-4454>
https://issues.apache.org/jira/browse/BEAM-4812<https://issues.apache.org/jira/browse/BEAM-4812>
https://issues.apache.org/jira/browse/BEAM-6394<https://issues.apache.org/jira/browse/BEAM-6394>

Thanks,
Łukasz

pon., 14 sty 2019 o 19:16 Alexey Romanenko <ar...@gmail.com>> napisał(a):
Hi Sri,

Afaik, you have to create “PCollection" of "GenericRecord”s and define your Avro schema manually to write your data into Parquet files.
In this case, you will need to create a ParDo for this translation. Also, I expect that your schema is the same for all CSV files.

Basic example of using Parquet Sink with Java SDK could be found here [1]

[1] https://git.io/fhcfV<https://git.io/fhcfV>


On 14 Jan 2019, at 02:00, Sridevi Nookala <sn...@parallelwireless.com>> wrote:

hi,

I have a bunch of CSV data files that i need to store in Parquet format. I did look at basic documentation on ParquetIO. and ParquetIO.sink() can be used to achive the same.
However there is a dependency on the Avro Schema.
how do i infer/generate Avro schema from CSV document data ?
Does beam have any API for the same.
I tried using Kite SDK API CSVUtil / JsonUtil but had no luck generating avro schema
my CSV data files have headers in them and quite a few of the header fields are hyphenated which are not liked by Kite 's CSVUtil

I think it will be a redundant effort to convert CSV documents to json documents .
Any suggestions on how to infer avro schema from CSV data or a JSON schema will be helpful

thanks
Sri


Re: ParquetIO write of CSV document data

Posted by Łukasz Gajowy <lu...@gmail.com>.
Hi Sri,

it's exactly as Alexey says, although there are plans/ideas to improve
ParquetIO in a way that would not require defining the schema manually.

Some Jiras that might be interesting in this topic but not yet resolved
(maybe you are willing to contribute?):
https://issues.apache.org/jira/browse/BEAM-4454
https://issues.apache.org/jira/browse/BEAM-4812
https://issues.apache.org/jira/browse/BEAM-6394

Thanks,
Łukasz

pon., 14 sty 2019 o 19:16 Alexey Romanenko <ar...@gmail.com>
napisał(a):

> Hi Sri,
>
> Afaik, you have to create “PCollection" of "GenericRecord”s and define
> your Avro schema manually to write your data into Parquet files.
> In this case, you will need to create a ParDo for this translation. Also,
> I expect that your schema is the same for all CSV files.
>
> Basic example of using Parquet Sink with Java SDK could be found here [1]
>
> [1] https://git.io/fhcfV
>
>
> On 14 Jan 2019, at 02:00, Sridevi Nookala <sn...@parallelwireless.com>
> wrote:
>
> hi,
>
> I have a bunch of CSV data files that i need to store in Parquet format. I
> did look at basic documentation on ParquetIO. and ParquetIO.sink() can be
> used to achive the same.
> However there is a dependency on the Avro Schema.
> how do i infer/generate Avro schema from CSV document data ?
> Does beam have any API for the same.
> I tried using Kite SDK API CSVUtil / JsonUtil but had no luck generating
> avro schema
> my CSV data files have headers in them and quite a few of the header
> fields are hyphenated which are not liked by Kite 's CSVUtil
>
> I think it will be a redundant effort to convert CSV documents to json
> documents .
> Any suggestions on how to infer avro schema from CSV data or a JSON schema
> will be helpful
>
> thanks
> Sri
>
>
>

Re: ParquetIO write of CSV document data

Posted by Alexey Romanenko <ar...@gmail.com>.
Hi Sri,

Afaik, you have to create “PCollection" of "GenericRecord”s and define your Avro schema manually to write your data into Parquet files. 
In this case, you will need to create a ParDo for this translation. Also, I expect that your schema is the same for all CSV files.

Basic example of using Parquet Sink with Java SDK could be found here [1]

[1] https://git.io/fhcfV <https://git.io/fhcfV>


> On 14 Jan 2019, at 02:00, Sridevi Nookala <sn...@parallelwireless.com> wrote:
> 
> hi,
> 
> I have a bunch of CSV data files that i need to store in Parquet format. I did look at basic documentation on ParquetIO. and ParquetIO.sink() can be used to achive the same.
> However there is a dependency on the Avro Schema.
> how do i infer/generate Avro schema from CSV document data ?
> Does beam have any API for the same.
> I tried using Kite SDK API CSVUtil / JsonUtil but had no luck generating avro schema
> my CSV data files have headers in them and quite a few of the header fields are hyphenated which are not liked by Kite 's CSVUtil
> 
> I think it will be a redundant effort to convert CSV documents to json documents .
> Any suggestions on how to infer avro schema from CSV data or a JSON schema will be helpful
> 
> thanks
> Sri