You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@arrow.apache.org by Ted Gooch <tg...@netflix.com> on 2020/11/10 17:29:26 UTC

[Python][Dataset] API Batched file reads with multiple files schemas

I'm currently leveraging the Datasets API to read parquet files and
running into a bit of an issue that I can't figure out. I have a set of
files and a target schema. Each file in the set may have the same or a
different schema than the target, but if the schema is different, it can be
coerced into the target  from the source schema, by rearranging column
order, changing column names, adding null columns and/or a limited set of
type upcasting(e.g int32->int64).

As far as I can tell, there doesn't seem to be a way to do this with the
Datasets API if you don't have a file schema ahead of time.  I had been
using the following:




*arrow_dataset = ds.FileSystemDataset.from_paths([self._input.location()],

schema=self._arrow_file.schema_arrow,
                      format=ds.ParquetFileFormat(),
                                    filesystem=fs.LocalFileSystem())*

But in this case, I have to fetch the schema, and read a single-file at a
time.  I was hoping to be able to get more mileage from the Datasets API
batching up and managing the memory for the reads. Is there any way that I
can get around this?

thanks!
Ted Gooch

Re: [Python][Dataset] API Batched file reads with multiple files schemas

Posted by Joris Van den Bossche <jo...@gmail.com>.
A slow follow-up on this:

On Wed, 11 Nov 2020 at 19:11, Ted Gooch <tg...@netflix.com> wrote:

> That's right.  For more context, I'm building out the Parquet read-path
> for iceberg <https://iceberg.apache.org/>, and two of the main features
> are working against us here: 1) the table data does not depend on the
> physical layout on the filesystem, eg. folders may have many files some of
> which belong to the current state and some of which do not. 2)
> schema-evolution - files may have different schemas, and we don't know
> ahead of time which version of the schema a given file will have.
>

The first point shouldn't be a problem, normally, since you are already
using the lower-level FileSystemDataset.from_paths, where you can provide
this list of file paths manually, and that doesn't need to map to all files
in a directory. Of course, the currently limited schema evolution will
still be a problem. I will try to focus on that in Arrow right now, so we
can hopefully get some improvements in the next release.


> Here is the current PR if you are interested to see the full context
> in-code:
> https://github.com/apache/iceberg/pull/1727
>

Cool, exciting to see that!


>
> On Wed, Nov 11, 2020 at 4:25 AM Joris Van den Bossche <
> jorisvandenbossche@gmail.com> wrote:
>
>> Hi Ted,
>>
>> The eventual goal is certainly to be able to deal with this kind of
>> schema "normalization" to a target schema, but currently only a limited set
>> of schema evolutions are supported: different column order, missing columns
>> (filled with nulls), upcasting null to any type are currently supported.
>> But eg any other type casting or renaming columns not yet.
>>
>> For how to do this (but so within the limits of what kind of
>> normalizations are supported), there are two ways:
>>
>> - Using the dataset "factory" function to let pyarrow discover the
>> dataset (crawl the filesystem to find data files, infer the schema). By
>> default, this `ds.dataset(..)` function "infers" the schema by reading it
>> from the first file it encounters. In C++ there is actually the option to
>> check all files to create a common schema, but this is not yet exposed in
>> Python (https://issues.apache.org/jira/browse/ARROW-8221). Then, there
>> is also the option pass a schema manually, if you know this beforehand.
>> - Using the lower level `ds.FileSystemDataset` interface as you are using
>> below. In this case you need to specify all the data file paths manually,
>> as well as the final schema of the dataset. So this is specifically meant
>> for the case where you know this information already, and want to avoid the
>> overhead of inferring it with the `ds.dataset()` factory function mentioned
>> above.
>>
>> So from reading your mail, it seems you need the following features that
>> are currently not yet implemented:
>>
>> - The ability to specify in the `ds.dataset(..)` function to infer the
>> schema from all files (ARROW-8221))
>> - More advanced schema normalization routines (type casting, column
>> renaming)
>>
>> Does that sound correct?
>>
>> Joris
>>
>>
>> On Tue, 10 Nov 2020 at 18:31, Ted Gooch <tg...@netflix.com> wrote:
>>
>>> I'm currently leveraging the Datasets API to read parquet files and
>>> running into a bit of an issue that I can't figure out. I have a set of
>>> files and a target schema. Each file in the set may have the same or a
>>> different schema than the target, but if the schema is different, it can be
>>> coerced into the target  from the source schema, by rearranging column
>>> order, changing column names, adding null columns and/or a limited set of
>>> type upcasting(e.g int32->int64).
>>>
>>> As far as I can tell, there doesn't seem to be a way to do this with the
>>> Datasets API if you don't have a file schema ahead of time.  I had been
>>> using the following:
>>>
>>>
>>>
>>>
>>> *arrow_dataset =
>>> ds.FileSystemDataset.from_paths([self._input.location()],
>>>
>>> schema=self._arrow_file.schema_arrow,
>>>                       format=ds.ParquetFileFormat(),
>>>                                     filesystem=fs.LocalFileSystem())*
>>>
>>> But in this case, I have to fetch the schema, and read a single-file at
>>> a time.
>>>
>>
>> Note that you can pass a list of files, so you don't need to read a
>> single file at a time.
>>
>
> I have tried this, but if the schemas don't line up it will error out.
>
>
>>
>>
>>> I was hoping to be able to get more mileage from the Datasets API
>>> batching up and managing the memory for the reads. Is there any way that I
>>> can get around this?
>>>
>>> thanks!
>>> Ted Gooch
>>>
>>

Re: [Python][Dataset] API Batched file reads with multiple files schemas

Posted by Ted Gooch <tg...@netflix.com>.
That's right.  For more context, I'm building out the Parquet read-path for
iceberg <https://iceberg.apache.org/>, and two of the main features are
working against us here: 1) the table data does not depend on the physical
layout on the filesystem, eg. folders may have many files some of which
belong to the current state and some of which do not. 2) schema-evolution -
files may have different schemas, and we don't know ahead of time which
version of the schema a given file will have.

Here is the current PR if you are interested to see the full context
in-code:
https://github.com/apache/iceberg/pull/1727

On Wed, Nov 11, 2020 at 4:25 AM Joris Van den Bossche <
jorisvandenbossche@gmail.com> wrote:

> Hi Ted,
>
> The eventual goal is certainly to be able to deal with this kind of schema
> "normalization" to a target schema, but currently only a limited set of
> schema evolutions are supported: different column order, missing columns
> (filled with nulls), upcasting null to any type are currently supported.
> But eg any other type casting or renaming columns not yet.
>
> For how to do this (but so within the limits of what kind of
> normalizations are supported), there are two ways:
>
> - Using the dataset "factory" function to let pyarrow discover the dataset
> (crawl the filesystem to find data files, infer the schema). By default,
> this `ds.dataset(..)` function "infers" the schema by reading it from the
> first file it encounters. In C++ there is actually the option to check all
> files to create a common schema, but this is not yet exposed in Python (
> https://issues.apache.org/jira/browse/ARROW-8221). Then, there is also
> the option pass a schema manually, if you know this beforehand.
> - Using the lower level `ds.FileSystemDataset` interface as you are using
> below. In this case you need to specify all the data file paths manually,
> as well as the final schema of the dataset. So this is specifically meant
> for the case where you know this information already, and want to avoid the
> overhead of inferring it with the `ds.dataset()` factory function mentioned
> above.
>
> So from reading your mail, it seems you need the following features that
> are currently not yet implemented:
>
> - The ability to specify in the `ds.dataset(..)` function to infer the
> schema from all files (ARROW-8221))
> - More advanced schema normalization routines (type casting, column
> renaming)
>
> Does that sound correct?
>
> Joris
>
>
> On Tue, 10 Nov 2020 at 18:31, Ted Gooch <tg...@netflix.com> wrote:
>
>> I'm currently leveraging the Datasets API to read parquet files and
>> running into a bit of an issue that I can't figure out. I have a set of
>> files and a target schema. Each file in the set may have the same or a
>> different schema than the target, but if the schema is different, it can be
>> coerced into the target  from the source schema, by rearranging column
>> order, changing column names, adding null columns and/or a limited set of
>> type upcasting(e.g int32->int64).
>>
>> As far as I can tell, there doesn't seem to be a way to do this with the
>> Datasets API if you don't have a file schema ahead of time.  I had been
>> using the following:
>>
>>
>>
>>
>> *arrow_dataset =
>> ds.FileSystemDataset.from_paths([self._input.location()],
>>
>> schema=self._arrow_file.schema_arrow,
>>                       format=ds.ParquetFileFormat(),
>>                                     filesystem=fs.LocalFileSystem())*
>>
>> But in this case, I have to fetch the schema, and read a single-file at a
>> time.
>>
>
> Note that you can pass a list of files, so you don't need to read a single
> file at a time.
>

I have tried this, but if the schemas don't line up it will error out.


>
>
>> I was hoping to be able to get more mileage from the Datasets API
>> batching up and managing the memory for the reads. Is there any way that I
>> can get around this?
>>
>> thanks!
>> Ted Gooch
>>
>

Re: [Python][Dataset] API Batched file reads with multiple files schemas

Posted by Joris Van den Bossche <jo...@gmail.com>.
Hi Ted,

The eventual goal is certainly to be able to deal with this kind of schema
"normalization" to a target schema, but currently only a limited set of
schema evolutions are supported: different column order, missing columns
(filled with nulls), upcasting null to any type are currently supported.
But eg any other type casting or renaming columns not yet.

For how to do this (but so within the limits of what kind of normalizations
are supported), there are two ways:

- Using the dataset "factory" function to let pyarrow discover the dataset
(crawl the filesystem to find data files, infer the schema). By default,
this `ds.dataset(..)` function "infers" the schema by reading it from the
first file it encounters. In C++ there is actually the option to check all
files to create a common schema, but this is not yet exposed in Python (
https://issues.apache.org/jira/browse/ARROW-8221). Then, there is also the
option pass a schema manually, if you know this beforehand.
- Using the lower level `ds.FileSystemDataset` interface as you are using
below. In this case you need to specify all the data file paths manually,
as well as the final schema of the dataset. So this is specifically meant
for the case where you know this information already, and want to avoid the
overhead of inferring it with the `ds.dataset()` factory function mentioned
above.

So from reading your mail, it seems you need the following features that
are currently not yet implemented:

- The ability to specify in the `ds.dataset(..)` function to infer the
schema from all files (ARROW-8221))
- More advanced schema normalization routines (type casting, column
renaming)

Does that sound correct?

Joris


On Tue, 10 Nov 2020 at 18:31, Ted Gooch <tg...@netflix.com> wrote:

> I'm currently leveraging the Datasets API to read parquet files and
> running into a bit of an issue that I can't figure out. I have a set of
> files and a target schema. Each file in the set may have the same or a
> different schema than the target, but if the schema is different, it can be
> coerced into the target  from the source schema, by rearranging column
> order, changing column names, adding null columns and/or a limited set of
> type upcasting(e.g int32->int64).
>
> As far as I can tell, there doesn't seem to be a way to do this with the
> Datasets API if you don't have a file schema ahead of time.  I had been
> using the following:
>
>
>
>
> *arrow_dataset =
> ds.FileSystemDataset.from_paths([self._input.location()],
>
> schema=self._arrow_file.schema_arrow,
>                       format=ds.ParquetFileFormat(),
>                                     filesystem=fs.LocalFileSystem())*
>
> But in this case, I have to fetch the schema, and read a single-file at a
> time.
>

Note that you can pass a list of files, so you don't need to read a single
file at a time.


> I was hoping to be able to get more mileage from the Datasets API batching
> up and managing the memory for the reads. Is there any way that I can get
> around this?
>
> thanks!
> Ted Gooch
>