You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Sridevi Nookala <sn...@parallelwireless.com> on 2019/01/14 01:00:21 UTC
ParquetIO write of CSV document data
hi,
I have a bunch of CSV data files that i need to store in Parquet format. I did look at basic documentation on ParquetIO. and ParquetIO.sink() can be used to achive the same.
However there is a dependency on the Avro Schema.
how do i infer/generate Avro schema from CSV document data ?
Does beam have any API for the same.
I tried using Kite SDK API CSVUtil / JsonUtil but had no luck generating avro schema
my CSV data files have headers in them and quite a few of the header fields are hyphenated which are not liked by Kite 's CSVUtil
I think it will be a redundant effort to convert CSV documents to json documents .
Any suggestions on how to infer avro schema from CSV data or a JSON schema will be helpful
thanks
Sri
Re: ParquetIO write of CSV document data
Posted by Sridevi Nookala <sn...@parallelwireless.com>.
hi alex/Jeff
Yes. It did help. however there are other issues that i am running into
the producer of Parquet files is a different source in my case
( field names are having the infamous '-' and '.' in them . TELCO /3GPP world uses them. They were generated using python pandas/pyarrow which did not care /complain)
Now i have issues building avro schema and i don't know if there is way to circumvent this and disable field validation etc.
Eventually i have the need to read multiple parquet files, group the parquet based on some criteria, expand and combine them as one big parquet on a daily basis
the source provided 15 min parquet chunks
Any suggestions here will be helpful
thanks
Sri
________________________________
From: Alexey Romanenko <ar...@gmail.com>
Sent: Friday, January 25, 2019 10:31:37 AM
To: user@beam.apache.org
Subject: Re: ParquetIO write of CSV document data
Hi
Great that this example helped you.
Also, as I can see, Jeff Klukas already answered your questions in the other thread “How to disable sharding with FileIO.write()/FileIO.writeDynamic”.
On 24 Jan 2019, at 03:44, Sridevi Nookala <sn...@parallelwireless.com>> wrote:
Hi Alex,
Thanks for the suggestion. I tried like in the github example by infering AVRO schema,
PCollection<String> input =
pipeline.apply(TextIO.read().from("/tmp/beam/input.csv"));
input
.apply("Produce Avro records", ParDo.of(new DeterministicallyConstructAvroRecordsFn()))
.setCoder(AvroCoder.of(SCHEMA))
.apply(
"Write Parquet files",
FileIO.<GenericRecord>write().via(ParquetIO.sink(SCHEMA)).to("/tmp/parquet/"));
pipeline.run();
}
have 2 simple questions
1. how can i disable sharding with FileIO.write(). i want a single parquet file from a single CSV
2. how can i change the above to have a custom naming for my parquet file
3. do i have to use FileIO.writeDynamic() ?
Hi Lucas,
I am newbie so not there yet to solve BEAM jira's , but it will help immensly if AVRO scehma inference is avoided
some thing like python pandas/pyarrow does
thanks for your help
Sri
________________________________
From: Sridevi Nookala
Sent: Wednesday, January 23, 2019 9:41:02 PM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: Re: ParquetIO write of CSV document data
Hi Alex,
Thanks for the suggestion. I tried like in the github example by infering AVRO schema,
PCollection<String> input =
pipeline.apply(TextIO.read().from("/tmp/beam/input.csv"));
input
.apply("Produce Avro records", ParDo.of(new DeterministicallyConstructAvroRecordsFn()))
.setCoder(AvroCoder.of(SCHEMA))
.apply(
"Write Parquet files",
FileIO.<GenericRecord>write().via(ParquetIO.sink(SCHEMA)).to("/tmp/parquet/"));
pipeline.run();
}
________________________________
From: Łukasz Gajowy <lu...@gmail.com>>
Sent: Tuesday, January 15, 2019 7:02:56 AM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: Re: ParquetIO write of CSV document data
Hi Sri,
it's exactly as Alexey says, although there are plans/ideas to improve ParquetIO in a way that would not require defining the schema manually.
Some Jiras that might be interesting in this topic but not yet resolved (maybe you are willing to contribute?):
https://issues.apache.org/jira/browse/BEAM-4454<https://issues.apache.org/jira/browse/BEAM-4454>
https://issues.apache.org/jira/browse/BEAM-4812<https://issues.apache.org/jira/browse/BEAM-4812>
https://issues.apache.org/jira/browse/BEAM-6394<https://issues.apache.org/jira/browse/BEAM-6394>
Thanks,
Łukasz
pon., 14 sty 2019 o 19:16 Alexey Romanenko <ar...@gmail.com>> napisał(a):
Hi Sri,
Afaik, you have to create “PCollection" of "GenericRecord”s and define your Avro schema manually to write your data into Parquet files.
In this case, you will need to create a ParDo for this translation. Also, I expect that your schema is the same for all CSV files.
Basic example of using Parquet Sink with Java SDK could be found here [1]
[1] https://git.io/fhcfV<https://git.io/fhcfV>
On 14 Jan 2019, at 02:00, Sridevi Nookala <sn...@parallelwireless.com>> wrote:
hi,
I have a bunch of CSV data files that i need to store in Parquet format. I did look at basic documentation on ParquetIO. and ParquetIO.sink() can be used to achive the same.
However there is a dependency on the Avro Schema.
how do i infer/generate Avro schema from CSV document data ?
Does beam have any API for the same.
I tried using Kite SDK API CSVUtil / JsonUtil but had no luck generating avro schema
my CSV data files have headers in them and quite a few of the header fields are hyphenated which are not liked by Kite 's CSVUtil
I think it will be a redundant effort to convert CSV documents to json documents .
Any suggestions on how to infer avro schema from CSV data or a JSON schema will be helpful
thanks
Sri
Re: ParquetIO write of CSV document data
Posted by Alexey Romanenko <ar...@gmail.com>.
Hi
Great that this example helped you.
Also, as I can see, Jeff Klukas already answered your questions in the other thread “How to disable sharding with FileIO.write()/FileIO.writeDynamic”.
> On 24 Jan 2019, at 03:44, Sridevi Nookala <sn...@parallelwireless.com> wrote:
>
> Hi Alex,
>
> Thanks for the suggestion. I tried like in the github example by infering AVRO schema,
>
> PCollection<String> input =
> pipeline.apply(TextIO.read().from("/tmp/beam/input.csv"));
> input
> .apply("Produce Avro records", ParDo.of(new DeterministicallyConstructAvroRecordsFn()))
> .setCoder(AvroCoder.of(SCHEMA))
> .apply(
> "Write Parquet files",
> FileIO.<GenericRecord>write().via(ParquetIO.sink(SCHEMA)).to("/tmp/parquet/"));
>
> pipeline.run();
> }
>
> have 2 simple questions
> how can i disable sharding with FileIO.write(). i want a single parquet file from a single CSV
> how can i change the above to have a custom naming for my parquet file
> do i have to use FileIO.writeDynamic() ?
>
> Hi Lucas,
>
> I am newbie so not there yet to solve BEAM jira's , but it will help immensly if AVRO scehma inference is avoided
> some thing like python pandas/pyarrow does
>
> thanks for your help
> Sri
>
> From: Sridevi Nookala
> Sent: Wednesday, January 23, 2019 9:41:02 PM
> To: user@beam.apache.org
> Subject: Re: ParquetIO write of CSV document data
>
> Hi Alex,
>
> Thanks for the suggestion. I tried like in the github example by infering AVRO schema,
>
> PCollection<String> input =
> pipeline.apply(TextIO.read().from("/tmp/beam/input.csv"));
> input
> .apply("Produce Avro records", ParDo.of(new DeterministicallyConstructAvroRecordsFn()))
> .setCoder(AvroCoder.of(SCHEMA))
> .apply(
> "Write Parquet files",
> FileIO.<GenericRecord>write().via(ParquetIO.sink(SCHEMA)).to("/tmp/parquet/"));
>
> pipeline.run();
> }
>
> From: Łukasz Gajowy <lu...@gmail.com>
> Sent: Tuesday, January 15, 2019 7:02:56 AM
> To: user@beam.apache.org
> Subject: Re: ParquetIO write of CSV document data
>
> Hi Sri,
>
> it's exactly as Alexey says, although there are plans/ideas to improve ParquetIO in a way that would not require defining the schema manually.
>
> Some Jiras that might be interesting in this topic but not yet resolved (maybe you are willing to contribute?):
> https://issues.apache.org/jira/browse/BEAM-4454 <https://issues.apache.org/jira/browse/BEAM-4454>
> https://issues.apache.org/jira/browse/BEAM-4812 <https://issues.apache.org/jira/browse/BEAM-4812>
> https://issues.apache.org/jira/browse/BEAM-6394 <https://issues.apache.org/jira/browse/BEAM-6394>
>
> Thanks,
> Łukasz
>
> pon., 14 sty 2019 o 19:16 Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> napisał(a):
> Hi Sri,
>
> Afaik, you have to create “PCollection" of "GenericRecord”s and define your Avro schema manually to write your data into Parquet files.
> In this case, you will need to create a ParDo for this translation. Also, I expect that your schema is the same for all CSV files.
>
> Basic example of using Parquet Sink with Java SDK could be found here [1]
>
> [1] https://git.io/fhcfV <https://git.io/fhcfV>
>
>
>> On 14 Jan 2019, at 02:00, Sridevi Nookala <snookala@parallelwireless.com <ma...@parallelwireless.com>> wrote:
>>
>> hi,
>>
>> I have a bunch of CSV data files that i need to store in Parquet format. I did look at basic documentation on ParquetIO. and ParquetIO.sink() can be used to achive the same.
>> However there is a dependency on the Avro Schema.
>> how do i infer/generate Avro schema from CSV document data ?
>> Does beam have any API for the same.
>> I tried using Kite SDK API CSVUtil / JsonUtil but had no luck generating avro schema
>> my CSV data files have headers in them and quite a few of the header fields are hyphenated which are not liked by Kite 's CSVUtil
>>
>> I think it will be a redundant effort to convert CSV documents to json documents .
>> Any suggestions on how to infer avro schema from CSV data or a JSON schema will be helpful
>>
>> thanks
>> Sri
Re: ParquetIO write of CSV document data
Posted by Sridevi Nookala <sn...@parallelwireless.com>.
Hi Alex,
Thanks for the suggestion. I tried like in the github example by infering AVRO schema,
PCollection<String> input =
pipeline.apply(TextIO.read().from("/tmp/beam/input.csv"));
input
.apply("Produce Avro records", ParDo.of(new DeterministicallyConstructAvroRecordsFn()))
.setCoder(AvroCoder.of(SCHEMA))
.apply(
"Write Parquet files",
FileIO.<GenericRecord>write().via(ParquetIO.sink(SCHEMA)).to("/tmp/parquet/"));
pipeline.run();
}
have 2 simple questions
1. how can i disable sharding with FileIO.write(). i want a single parquet file from a single CSV
2. how can i change the above to have a custom naming for my parquet file
3. do i have to use FileIO.writeDynamic() ?
Hi Lucas,
I am newbie so not there yet to solve BEAM jira's , but it will help immensly if AVRO scehma inference is avoided
some thing like python pandas/pyarrow does
thanks for your help
Sri
________________________________
From: Sridevi Nookala
Sent: Wednesday, January 23, 2019 9:41:02 PM
To: user@beam.apache.org
Subject: Re: ParquetIO write of CSV document data
Hi Alex,
Thanks for the suggestion. I tried like in the github example by infering AVRO schema,
PCollection<String> input =
pipeline.apply(TextIO.read().from("/tmp/beam/input.csv"));
input
.apply("Produce Avro records", ParDo.of(new DeterministicallyConstructAvroRecordsFn()))
.setCoder(AvroCoder.of(SCHEMA))
.apply(
"Write Parquet files",
FileIO.<GenericRecord>write().via(ParquetIO.sink(SCHEMA)).to("/tmp/parquet/"));
pipeline.run();
}
________________________________
From: Łukasz Gajowy <lu...@gmail.com>
Sent: Tuesday, January 15, 2019 7:02:56 AM
To: user@beam.apache.org
Subject: Re: ParquetIO write of CSV document data
Hi Sri,
it's exactly as Alexey says, although there are plans/ideas to improve ParquetIO in a way that would not require defining the schema manually.
Some Jiras that might be interesting in this topic but not yet resolved (maybe you are willing to contribute?):
https://issues.apache.org/jira/browse/BEAM-4454<https://issues.apache.org/jira/browse/BEAM-4454>
https://issues.apache.org/jira/browse/BEAM-4812<https://issues.apache.org/jira/browse/BEAM-4812>
https://issues.apache.org/jira/browse/BEAM-6394<https://issues.apache.org/jira/browse/BEAM-6394>
Thanks,
Łukasz
pon., 14 sty 2019 o 19:16 Alexey Romanenko <ar...@gmail.com>> napisał(a):
Hi Sri,
Afaik, you have to create “PCollection" of "GenericRecord”s and define your Avro schema manually to write your data into Parquet files.
In this case, you will need to create a ParDo for this translation. Also, I expect that your schema is the same for all CSV files.
Basic example of using Parquet Sink with Java SDK could be found here [1]
[1] https://git.io/fhcfV<https://git.io/fhcfV>
On 14 Jan 2019, at 02:00, Sridevi Nookala <sn...@parallelwireless.com>> wrote:
hi,
I have a bunch of CSV data files that i need to store in Parquet format. I did look at basic documentation on ParquetIO. and ParquetIO.sink() can be used to achive the same.
However there is a dependency on the Avro Schema.
how do i infer/generate Avro schema from CSV document data ?
Does beam have any API for the same.
I tried using Kite SDK API CSVUtil / JsonUtil but had no luck generating avro schema
my CSV data files have headers in them and quite a few of the header fields are hyphenated which are not liked by Kite 's CSVUtil
I think it will be a redundant effort to convert CSV documents to json documents .
Any suggestions on how to infer avro schema from CSV data or a JSON schema will be helpful
thanks
Sri
Re: ParquetIO write of CSV document data
Posted by Sridevi Nookala <sn...@parallelwireless.com>.
Hi Alex,
Thanks for the suggestion. I tried like in the github example by infering AVRO schema,
PCollection<String> input =
pipeline.apply(TextIO.read().from("/tmp/beam/input.csv"));
input
.apply("Produce Avro records", ParDo.of(new DeterministicallyConstructAvroRecordsFn()))
.setCoder(AvroCoder.of(SCHEMA))
.apply(
"Write Parquet files",
FileIO.<GenericRecord>write().via(ParquetIO.sink(SCHEMA)).to("/tmp/parquet/"));
pipeline.run();
}
________________________________
From: Łukasz Gajowy <lu...@gmail.com>
Sent: Tuesday, January 15, 2019 7:02:56 AM
To: user@beam.apache.org
Subject: Re: ParquetIO write of CSV document data
Hi Sri,
it's exactly as Alexey says, although there are plans/ideas to improve ParquetIO in a way that would not require defining the schema manually.
Some Jiras that might be interesting in this topic but not yet resolved (maybe you are willing to contribute?):
https://issues.apache.org/jira/browse/BEAM-4454<https://issues.apache.org/jira/browse/BEAM-4454>
https://issues.apache.org/jira/browse/BEAM-4812<https://issues.apache.org/jira/browse/BEAM-4812>
https://issues.apache.org/jira/browse/BEAM-6394<https://issues.apache.org/jira/browse/BEAM-6394>
Thanks,
Łukasz
pon., 14 sty 2019 o 19:16 Alexey Romanenko <ar...@gmail.com>> napisał(a):
Hi Sri,
Afaik, you have to create “PCollection" of "GenericRecord”s and define your Avro schema manually to write your data into Parquet files.
In this case, you will need to create a ParDo for this translation. Also, I expect that your schema is the same for all CSV files.
Basic example of using Parquet Sink with Java SDK could be found here [1]
[1] https://git.io/fhcfV<https://git.io/fhcfV>
On 14 Jan 2019, at 02:00, Sridevi Nookala <sn...@parallelwireless.com>> wrote:
hi,
I have a bunch of CSV data files that i need to store in Parquet format. I did look at basic documentation on ParquetIO. and ParquetIO.sink() can be used to achive the same.
However there is a dependency on the Avro Schema.
how do i infer/generate Avro schema from CSV document data ?
Does beam have any API for the same.
I tried using Kite SDK API CSVUtil / JsonUtil but had no luck generating avro schema
my CSV data files have headers in them and quite a few of the header fields are hyphenated which are not liked by Kite 's CSVUtil
I think it will be a redundant effort to convert CSV documents to json documents .
Any suggestions on how to infer avro schema from CSV data or a JSON schema will be helpful
thanks
Sri
Re: ParquetIO write of CSV document data
Posted by Łukasz Gajowy <lu...@gmail.com>.
Hi Sri,
it's exactly as Alexey says, although there are plans/ideas to improve
ParquetIO in a way that would not require defining the schema manually.
Some Jiras that might be interesting in this topic but not yet resolved
(maybe you are willing to contribute?):
https://issues.apache.org/jira/browse/BEAM-4454
https://issues.apache.org/jira/browse/BEAM-4812
https://issues.apache.org/jira/browse/BEAM-6394
Thanks,
Łukasz
pon., 14 sty 2019 o 19:16 Alexey Romanenko <ar...@gmail.com>
napisał(a):
> Hi Sri,
>
> Afaik, you have to create “PCollection" of "GenericRecord”s and define
> your Avro schema manually to write your data into Parquet files.
> In this case, you will need to create a ParDo for this translation. Also,
> I expect that your schema is the same for all CSV files.
>
> Basic example of using Parquet Sink with Java SDK could be found here [1]
>
> [1] https://git.io/fhcfV
>
>
> On 14 Jan 2019, at 02:00, Sridevi Nookala <sn...@parallelwireless.com>
> wrote:
>
> hi,
>
> I have a bunch of CSV data files that i need to store in Parquet format. I
> did look at basic documentation on ParquetIO. and ParquetIO.sink() can be
> used to achive the same.
> However there is a dependency on the Avro Schema.
> how do i infer/generate Avro schema from CSV document data ?
> Does beam have any API for the same.
> I tried using Kite SDK API CSVUtil / JsonUtil but had no luck generating
> avro schema
> my CSV data files have headers in them and quite a few of the header
> fields are hyphenated which are not liked by Kite 's CSVUtil
>
> I think it will be a redundant effort to convert CSV documents to json
> documents .
> Any suggestions on how to infer avro schema from CSV data or a JSON schema
> will be helpful
>
> thanks
> Sri
>
>
>
Re: ParquetIO write of CSV document data
Posted by Alexey Romanenko <ar...@gmail.com>.
Hi Sri,
Afaik, you have to create “PCollection" of "GenericRecord”s and define your Avro schema manually to write your data into Parquet files.
In this case, you will need to create a ParDo for this translation. Also, I expect that your schema is the same for all CSV files.
Basic example of using Parquet Sink with Java SDK could be found here [1]
[1] https://git.io/fhcfV <https://git.io/fhcfV>
> On 14 Jan 2019, at 02:00, Sridevi Nookala <sn...@parallelwireless.com> wrote:
>
> hi,
>
> I have a bunch of CSV data files that i need to store in Parquet format. I did look at basic documentation on ParquetIO. and ParquetIO.sink() can be used to achive the same.
> However there is a dependency on the Avro Schema.
> how do i infer/generate Avro schema from CSV document data ?
> Does beam have any API for the same.
> I tried using Kite SDK API CSVUtil / JsonUtil but had no luck generating avro schema
> my CSV data files have headers in them and quite a few of the header fields are hyphenated which are not liked by Kite 's CSVUtil
>
> I think it will be a redundant effort to convert CSV documents to json documents .
> Any suggestions on how to infer avro schema from CSV data or a JSON schema will be helpful
>
> thanks
> Sri