You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@arrow.apache.org by Josh Mayer <jo...@gmail.com> on 2020/09/24 19:01:27 UTC

[Python] dataset filter performance and partitioning

I am comparing two datasets with a filter on a string column (that is also
a partition column). I create the dataset from a common metadata file. In
the first case I omit the partitioning information whereas in the second I
include it. I would expect the performance to be similar since the column
statistics should be able to identify the same row groups as the
partitioning. However, I'm seeing the first case run almost 3x slower. Is
this expected?

An example is here (I'm running on linux, python 3.8, pyarrow 1.0.1):

https://gist.github.com/josham/5d7cf52f9ea60b1b2bbef1e768ea992f

Re: [Python] dataset filter performance and partitioning

Posted by Troy Zimmerman <ta...@me.com>.
Please disregard. As I said, it was dumb. :) I spent a bit of time familiarizing myself with the code and it makes much more sense now.

> On Sep 25, 2020, at 10:56, Troy Zimmerman <ta...@me.com> wrote:
> 
> 
> Hi Joris,
> 
> I have a dumb question — if the “isin” expression is returning all row groups, why does it appear to still work?
> 
> For example ole, I created a similar toy setup, and while all row groups seem to “match” the expression (i.e. I get all fragments with the expression), the table that “to_table” returns has only the rows I expect. Does the filtering happen again somewhere upstream?
> 
> Best,
> Troy
> 
>>> On Sep 25, 2020, at 07:35, Joris Van den Bossche <jo...@gmail.com> wrote:
>>> 
>> 
>> Using a small toy example, the "isin" filter is indeed not working for filtering row groups:
>> 
>> >>> table = pa.table({"name": np.repeat(["a", "b", "c", "d"], 5), "value": np.arange(20)})
>> >>> pq.write_table(table, "test_filter_string.parquet", row_group_size=5)
>> >>> dataset = ds.dataset("test_filter_string.parquet")
>> # get the single file fragment (dataset consists of one file)
>> >>> fragment = list(dataset.get_fragments())[0]
>> >>> fragment.ensure_complete_metadata()
>> 
>> # check that we do have statistics for our row groups
>> >>> fragment.row_groups[0].statistics
>> {'name': {'min': 'a', 'max': 'a'}, 'value': {'min': 0, 'max': 4}}
>> 
>> # I created the file such that there are 4 row groups (each with a unique value in the name column)
>> >>> fragment.split_by_row_group()
>> [<pyarrow._dataset.ParquetFileFragment at 0x7ff783939810>,
>>  <pyarrow._dataset.ParquetFileFragment at 0x7ff783728cd8>,
>>  <pyarrow._dataset.ParquetFileFragment at 0x7ff78376c9c0>,
>>  <pyarrow._dataset.ParquetFileFragment at 0x7ff7835efd68>]
>> 
>> # simple equality filter works as expected -> only single row group left
>> >>> filter = ds.field("name") == "a"
>> >>> fragment.split_by_row_group(filter)
>> [<pyarrow._dataset.ParquetFileFragment at 0x7ff783662738>]
>> 
>> # isin filter does not work
>> >>> filter = ds.field("name").isin(["a", "b"])
>> >>> fragment.split_by_row_group(filter)
>> [<pyarrow._dataset.ParquetFileFragment at 0x7ff7837f46f0>,
>>  <pyarrow._dataset.ParquetFileFragment at 0x7ff783627a98>,
>>  <pyarrow._dataset.ParquetFileFragment at 0x7ff783581b70>,
>>  <pyarrow._dataset.ParquetFileFragment at 0x7ff7835fb780>]
>> 
>> While filtering with "isin" on partition columns is working fine. I opened https://issues.apache.org/jira/browse/ARROW-10091 to track this as a possible enhancement. 
>> Now, to explain why for partitions this is an "easier" case: the partition information gets translated into an equality expression, with your example an expression like "name == 'a' ", while the statistics give a bigger/lesser than expression, such as "(name > 'a') & (name < 'a')" (from the min/max). So for the equality it is more trivial to compare this with an "isin" expression like "name in ['a', 'b']" (for the min/max expression, we would need to check the special case where min/max is equal).
>> 
>> Joris
>> 
>>> On Fri, 25 Sep 2020 at 14:06, Joris Van den Bossche <jo...@gmail.com> wrote:
>>> Hi Josh,
>>> 
>>> Thanks for the question!
>>> 
>>> In general, filtering on partition columns will be faster than filtering on actual data columns using row group statistics. For partition-based filtering, the scanner can skip full files based on the information from the file path (in your example case, there are 11 files, for which it can select 4 of them to actually read), while for row-group-based filtering, it actually needs to parse the statistics of all row groups of all files to determine which can be skipped, which is typically more information to process compared to the file paths. 
>>> 
>>> That said, there are some oddities I noticed:
>>> 
>>> - As I mentioned, I expect partition-based filtering to faster, but not that much faster (certainly in a case with a limited number of files, the overhead of the parsing / filtering row groups should be really minimal)
>>> - Inspecting the result a bit, it seems that for the first dataset (without partitioning) it's not actually applying the filter correctly. The min/max for the name column are included in the row group statistics, but the isin filter didn't actually filter them out. Something to investigate, but that certainly explains the difference in performance (it's actually reading all data, and only filtering after reading, not skipping some parts before reading)
>>> - In your case, the partitioning has the same name as one of the actual columns in the data files. I am not sure this corner case of duplicate fields is tested very well, or how the filtering will work?
>>> 
>>> Joris
>>> 
>>>> On Thu, 24 Sep 2020 at 21:02, Josh Mayer <jo...@gmail.com> wrote:
>>>> I am comparing two datasets with a filter on a string column (that is also a partition column). I create the dataset from a common metadata file. In the first case I omit the partitioning information whereas in the second I include it. I would expect the performance to be similar since the column statistics should be able to identify the same row groups as the partitioning. However, I'm seeing the first case run almost 3x slower. Is this expected?
>>>> 
>>>> An example is here (I'm running on linux, python 3.8, pyarrow 1.0.1):
>>>> 
>>>> https://gist.github.com/josham/5d7cf52f9ea60b1b2bbef1e768ea992f

Re: [Python] dataset filter performance and partitioning

Posted by Troy Zimmerman <ta...@me.com>.
Hi Joris,

I have a dumb question — if the “isin” expression is returning all row groups, why does it appear to still work?

For example ole, I created a similar toy setup, and while all row groups seem to “match” the expression (i.e. I get all fragments with the expression), the table that “to_table” returns has only the rows I expect. Does the filtering happen again somewhere upstream?

Best,
Troy

> On Sep 25, 2020, at 07:35, Joris Van den Bossche <jo...@gmail.com> wrote:
> 
> 
> Using a small toy example, the "isin" filter is indeed not working for filtering row groups:
> 
> >>> table = pa.table({"name": np.repeat(["a", "b", "c", "d"], 5), "value": np.arange(20)})
> >>> pq.write_table(table, "test_filter_string.parquet", row_group_size=5)
> >>> dataset = ds.dataset("test_filter_string.parquet")
> # get the single file fragment (dataset consists of one file)
> >>> fragment = list(dataset.get_fragments())[0]
> >>> fragment.ensure_complete_metadata()
> 
> # check that we do have statistics for our row groups
> >>> fragment.row_groups[0].statistics
> {'name': {'min': 'a', 'max': 'a'}, 'value': {'min': 0, 'max': 4}}
> 
> # I created the file such that there are 4 row groups (each with a unique value in the name column)
> >>> fragment.split_by_row_group()
> [<pyarrow._dataset.ParquetFileFragment at 0x7ff783939810>,
>  <pyarrow._dataset.ParquetFileFragment at 0x7ff783728cd8>,
>  <pyarrow._dataset.ParquetFileFragment at 0x7ff78376c9c0>,
>  <pyarrow._dataset.ParquetFileFragment at 0x7ff7835efd68>]
> 
> # simple equality filter works as expected -> only single row group left
> >>> filter = ds.field("name") == "a"
> >>> fragment.split_by_row_group(filter)
> [<pyarrow._dataset.ParquetFileFragment at 0x7ff783662738>]
> 
> # isin filter does not work
> >>> filter = ds.field("name").isin(["a", "b"])
> >>> fragment.split_by_row_group(filter)
> [<pyarrow._dataset.ParquetFileFragment at 0x7ff7837f46f0>,
>  <pyarrow._dataset.ParquetFileFragment at 0x7ff783627a98>,
>  <pyarrow._dataset.ParquetFileFragment at 0x7ff783581b70>,
>  <pyarrow._dataset.ParquetFileFragment at 0x7ff7835fb780>]
> 
> While filtering with "isin" on partition columns is working fine. I opened https://issues.apache.org/jira/browse/ARROW-10091 to track this as a possible enhancement. 
> Now, to explain why for partitions this is an "easier" case: the partition information gets translated into an equality expression, with your example an expression like "name == 'a' ", while the statistics give a bigger/lesser than expression, such as "(name > 'a') & (name < 'a')" (from the min/max). So for the equality it is more trivial to compare this with an "isin" expression like "name in ['a', 'b']" (for the min/max expression, we would need to check the special case where min/max is equal).
> 
> Joris
> 
>> On Fri, 25 Sep 2020 at 14:06, Joris Van den Bossche <jo...@gmail.com> wrote:
>> Hi Josh,
>> 
>> Thanks for the question!
>> 
>> In general, filtering on partition columns will be faster than filtering on actual data columns using row group statistics. For partition-based filtering, the scanner can skip full files based on the information from the file path (in your example case, there are 11 files, for which it can select 4 of them to actually read), while for row-group-based filtering, it actually needs to parse the statistics of all row groups of all files to determine which can be skipped, which is typically more information to process compared to the file paths. 
>> 
>> That said, there are some oddities I noticed:
>> 
>> - As I mentioned, I expect partition-based filtering to faster, but not that much faster (certainly in a case with a limited number of files, the overhead of the parsing / filtering row groups should be really minimal)
>> - Inspecting the result a bit, it seems that for the first dataset (without partitioning) it's not actually applying the filter correctly. The min/max for the name column are included in the row group statistics, but the isin filter didn't actually filter them out. Something to investigate, but that certainly explains the difference in performance (it's actually reading all data, and only filtering after reading, not skipping some parts before reading)
>> - In your case, the partitioning has the same name as one of the actual columns in the data files. I am not sure this corner case of duplicate fields is tested very well, or how the filtering will work?
>> 
>> Joris
>> 
>>> On Thu, 24 Sep 2020 at 21:02, Josh Mayer <jo...@gmail.com> wrote:
>>> I am comparing two datasets with a filter on a string column (that is also a partition column). I create the dataset from a common metadata file. In the first case I omit the partitioning information whereas in the second I include it. I would expect the performance to be similar since the column statistics should be able to identify the same row groups as the partitioning. However, I'm seeing the first case run almost 3x slower. Is this expected?
>>> 
>>> An example is here (I'm running on linux, python 3.8, pyarrow 1.0.1):
>>> 
>>> https://gist.github.com/josham/5d7cf52f9ea60b1b2bbef1e768ea992f

Re: [Python] dataset filter performance and partitioning

Posted by Wes McKinney <we...@gmail.com>.
hi Matt,

This is because `arrow::compute::IsIn` is being called on each batch
materialized by the datasets API so the internal kernel state is being
set up and torn down for every batch. This means that a large hash
table is being set up and torn down many times rather than only
created once as with pandas.

https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/filter.cc#L1223

This is definitely a performance problem that must be fixed at some point.

https://issues.apache.org/jira/browse/ARROW-10097

Thanks,
Wes

On Fri, Sep 25, 2020 at 10:49 AM Matthew Corley <ma...@gmail.com> wrote:
>
> I don't want to cloud this discussion, but I do want to mention that when I tested isin filtering with a high cardinality (on the order of many millions) set on a parquet dataset with a single rowgroup (so filtering should have to happen after data load), it performed much worse in terms of runtime and peak memory utilization than waiting to do the filtering after converting the Table to a pandas DataFrame.  This surprised me given all the copying that has to occur in the latter case.
>
> I don't have my exact experiment laying around anymore to give concrete numbers, but it might be worth investigating while the isin filter code is under consideration.
>
> On Fri, Sep 25, 2020 at 6:55 AM Josh Mayer <jo...@gmail.com> wrote:
>>
>> Thanks Joris, that info is very helpful. A few follow up questions, you mention that:
>>
>> > ... it actually needs to parse the statistics of all row groups of all files to determine which can be skipped ...
>>
>> Is that something that is only done once (and perhaps stored inside a dataset object in some optimized form) or performed on every to_table call?
>>
>> In the case that I am creating a dataset from a common metadata file is it possible to attach manual partitioning information (using field expressions on to each file), similar to how it is done in the manual dataset creation case (https://arrow.apache.org/docs/python/dataset.html#manual-specification-of-the-dataset)?
>>
>> Josh
>>
>> On Fri, Sep 25, 2020 at 8:34 AM Joris Van den Bossche <jo...@gmail.com> wrote:
>>>
>>> Using a small toy example, the "isin" filter is indeed not working for filtering row groups:
>>>
>>> >>> table = pa.table({"name": np.repeat(["a", "b", "c", "d"], 5), "value": np.arange(20)})
>>> >>> pq.write_table(table, "test_filter_string.parquet", row_group_size=5)
>>> >>> dataset = ds.dataset("test_filter_string.parquet")
>>> # get the single file fragment (dataset consists of one file)
>>> >>> fragment = list(dataset.get_fragments())[0]
>>> >>> fragment.ensure_complete_metadata()
>>>
>>> # check that we do have statistics for our row groups
>>> >>> fragment.row_groups[0].statistics
>>> {'name': {'min': 'a', 'max': 'a'}, 'value': {'min': 0, 'max': 4}}
>>>
>>> # I created the file such that there are 4 row groups (each with a unique value in the name column)
>>> >>> fragment.split_by_row_group()
>>> [<pyarrow._dataset.ParquetFileFragment at 0x7ff783939810>,
>>>  <pyarrow._dataset.ParquetFileFragment at 0x7ff783728cd8>,
>>>  <pyarrow._dataset.ParquetFileFragment at 0x7ff78376c9c0>,
>>>  <pyarrow._dataset.ParquetFileFragment at 0x7ff7835efd68>]
>>>
>>> # simple equality filter works as expected -> only single row group left
>>> >>> filter = ds.field("name") == "a"
>>> >>> fragment.split_by_row_group(filter)
>>> [<pyarrow._dataset.ParquetFileFragment at 0x7ff783662738>]
>>>
>>> # isin filter does not work
>>> >>> filter = ds.field("name").isin(["a", "b"])
>>> >>> fragment.split_by_row_group(filter)
>>> [<pyarrow._dataset.ParquetFileFragment at 0x7ff7837f46f0>,
>>>  <pyarrow._dataset.ParquetFileFragment at 0x7ff783627a98>,
>>>  <pyarrow._dataset.ParquetFileFragment at 0x7ff783581b70>,
>>>  <pyarrow._dataset.ParquetFileFragment at 0x7ff7835fb780>]
>>>
>>> While filtering with "isin" on partition columns is working fine. I opened https://issues.apache.org/jira/browse/ARROW-10091 to track this as a possible enhancement.
>>> Now, to explain why for partitions this is an "easier" case: the partition information gets translated into an equality expression, with your example an expression like "name == 'a' ", while the statistics give a bigger/lesser than expression, such as "(name > 'a') & (name < 'a')" (from the min/max). So for the equality it is more trivial to compare this with an "isin" expression like "name in ['a', 'b']" (for the min/max expression, we would need to check the special case where min/max is equal).
>>>
>>> Joris
>>>
>>> On Fri, 25 Sep 2020 at 14:06, Joris Van den Bossche <jo...@gmail.com> wrote:
>>>>
>>>> Hi Josh,
>>>>
>>>> Thanks for the question!
>>>>
>>>> In general, filtering on partition columns will be faster than filtering on actual data columns using row group statistics. For partition-based filtering, the scanner can skip full files based on the information from the file path (in your example case, there are 11 files, for which it can select 4 of them to actually read), while for row-group-based filtering, it actually needs to parse the statistics of all row groups of all files to determine which can be skipped, which is typically more information to process compared to the file paths.
>>>>
>>>> That said, there are some oddities I noticed:
>>>>
>>>> - As I mentioned, I expect partition-based filtering to faster, but not that much faster (certainly in a case with a limited number of files, the overhead of the parsing / filtering row groups should be really minimal)
>>>> - Inspecting the result a bit, it seems that for the first dataset (without partitioning) it's not actually applying the filter correctly. The min/max for the name column are included in the row group statistics, but the isin filter didn't actually filter them out. Something to investigate, but that certainly explains the difference in performance (it's actually reading all data, and only filtering after reading, not skipping some parts before reading)
>>>> - In your case, the partitioning has the same name as one of the actual columns in the data files. I am not sure this corner case of duplicate fields is tested very well, or how the filtering will work?
>>>>
>>>> Joris
>>>>
>>>> On Thu, 24 Sep 2020 at 21:02, Josh Mayer <jo...@gmail.com> wrote:
>>>>>
>>>>> I am comparing two datasets with a filter on a string column (that is also a partition column). I create the dataset from a common metadata file. In the first case I omit the partitioning information whereas in the second I include it. I would expect the performance to be similar since the column statistics should be able to identify the same row groups as the partitioning. However, I'm seeing the first case run almost 3x slower. Is this expected?
>>>>>
>>>>> An example is here (I'm running on linux, python 3.8, pyarrow 1.0.1):
>>>>>
>>>>> https://gist.github.com/josham/5d7cf52f9ea60b1b2bbef1e768ea992f

Re: [Python] dataset filter performance and partitioning

Posted by Matthew Corley <ma...@gmail.com>.
I don't want to cloud this discussion, but I do want to mention that when I
tested isin filtering with a high cardinality (on the order of many
millions) set on a parquet dataset with a single rowgroup (so filtering
should have to happen after data load), it performed much worse in terms of
runtime and peak memory utilization than waiting to do the filtering after
converting the Table to a pandas DataFrame.  This surprised me given all
the copying that has to occur in the latter case.

I don't have my exact experiment laying around anymore to give concrete
numbers, but it might be worth investigating while the isin filter code is
under consideration.

On Fri, Sep 25, 2020 at 6:55 AM Josh Mayer <jo...@gmail.com> wrote:

> Thanks Joris, that info is very helpful. A few follow up questions, you
> mention that:
>
> > ... it actually needs to parse the statistics of all row groups of all
> files to determine which can be skipped ...
>
> Is that something that is only done once (and perhaps stored inside a
> dataset object in some optimized form) or performed on every to_table call?
>
> In the case that I am creating a dataset from a common metadata file is it
> possible to attach manual partitioning information (using field expressions
> on to each file), similar to how it is done in the manual dataset creation
> case (
> https://arrow.apache.org/docs/python/dataset.html#manual-specification-of-the-dataset
> )?
>
> Josh
>
> On Fri, Sep 25, 2020 at 8:34 AM Joris Van den Bossche <
> jorisvandenbossche@gmail.com> wrote:
>
>> Using a small toy example, the "isin" filter is indeed not working for
>> filtering row groups:
>>
>> >>> table = pa.table({"name": np.repeat(["a", "b", "c", "d"], 5),
>> "value": np.arange(20)})
>> >>> pq.write_table(table, "test_filter_string.parquet", row_group_size=5)
>> >>> dataset = ds.dataset("test_filter_string.parquet")
>> # get the single file fragment (dataset consists of one file)
>> >>> fragment = list(dataset.get_fragments())[0]
>> >>> fragment.ensure_complete_metadata()
>>
>> # check that we do have statistics for our row groups
>> >>> fragment.row_groups[0].statistics
>> {'name': {'min': 'a', 'max': 'a'}, 'value': {'min': 0, 'max': 4}}
>>
>> # I created the file such that there are 4 row groups (each with a unique
>> value in the name column)
>> >>> fragment.split_by_row_group()
>> [<pyarrow._dataset.ParquetFileFragment at 0x7ff783939810>,
>>  <pyarrow._dataset.ParquetFileFragment at 0x7ff783728cd8>,
>>  <pyarrow._dataset.ParquetFileFragment at 0x7ff78376c9c0>,
>>  <pyarrow._dataset.ParquetFileFragment at 0x7ff7835efd68>]
>>
>> # simple equality filter works as expected -> only single row group left
>> >>> filter = ds.field("name") == "a"
>> >>> fragment.split_by_row_group(filter)
>> [<pyarrow._dataset.ParquetFileFragment at 0x7ff783662738>]
>>
>> # isin filter does not work
>> >>> filter = ds.field("name").isin(["a", "b"])
>> >>> fragment.split_by_row_group(filter)
>> [<pyarrow._dataset.ParquetFileFragment at 0x7ff7837f46f0>,
>>  <pyarrow._dataset.ParquetFileFragment at 0x7ff783627a98>,
>>  <pyarrow._dataset.ParquetFileFragment at 0x7ff783581b70>,
>>  <pyarrow._dataset.ParquetFileFragment at 0x7ff7835fb780>]
>>
>> While filtering with "isin" on partition columns is working fine. I
>> opened https://issues.apache.org/jira/browse/ARROW-10091 to track this
>> as a possible enhancement.
>> Now, to explain why for partitions this is an "easier" case: the
>> partition information gets translated into an equality expression, with
>> your example an expression like "name == 'a' ", while the statistics give a
>> bigger/lesser than expression, such as "(name > 'a') & (name < 'a')" (from
>> the min/max). So for the equality it is more trivial to compare this with
>> an "isin" expression like "name in ['a', 'b']" (for the min/max expression,
>> we would need to check the special case where min/max is equal).
>>
>> Joris
>>
>> On Fri, 25 Sep 2020 at 14:06, Joris Van den Bossche <
>> jorisvandenbossche@gmail.com> wrote:
>>
>>> Hi Josh,
>>>
>>> Thanks for the question!
>>>
>>> In general, filtering on partition columns will be faster than filtering
>>> on actual data columns using row group statistics. For partition-based
>>> filtering, the scanner can skip full files based on the information from
>>> the file path (in your example case, there are 11 files, for which it can
>>> select 4 of them to actually read), while for row-group-based filtering, it
>>> actually needs to parse the statistics of all row groups of all files to
>>> determine which can be skipped, which is typically more information to
>>> process compared to the file paths.
>>>
>>> That said, there are some oddities I noticed:
>>>
>>> - As I mentioned, I expect partition-based filtering to faster, but not
>>> that much faster (certainly in a case with a limited number of files, the
>>> overhead of the parsing / filtering row groups should be really minimal)
>>> - Inspecting the result a bit, it seems that for the first dataset
>>> (without partitioning) it's not actually applying the filter correctly. The
>>> min/max for the name column are included in the row group statistics, but
>>> the isin filter didn't actually filter them out. Something to investigate,
>>> but that certainly explains the difference in performance (it's actually
>>> reading all data, and only filtering after reading, not skipping some parts
>>> before reading)
>>> - In your case, the partitioning has the same name as one of the actual
>>> columns in the data files. I am not sure this corner case of duplicate
>>> fields is tested very well, or how the filtering will work?
>>>
>>> Joris
>>>
>>> On Thu, 24 Sep 2020 at 21:02, Josh Mayer <jo...@gmail.com> wrote:
>>>
>>>> I am comparing two datasets with a filter on a string column (that is
>>>> also a partition column). I create the dataset from a common metadata file.
>>>> In the first case I omit the partitioning information whereas in the second
>>>> I include it. I would expect the performance to be similar since the column
>>>> statistics should be able to identify the same row groups as the
>>>> partitioning. However, I'm seeing the first case run almost 3x slower. Is
>>>> this expected?
>>>>
>>>> An example is here (I'm running on linux, python 3.8, pyarrow 1.0.1):
>>>>
>>>> https://gist.github.com/josham/5d7cf52f9ea60b1b2bbef1e768ea992f
>>>>
>>>

Re: [Python] dataset filter performance and partitioning

Posted by Josh Mayer <jo...@gmail.com>.
Thanks Joris, that info is very helpful. A few follow up questions, you
mention that:

> ... it actually needs to parse the statistics of all row groups of all
files to determine which can be skipped ...

Is that something that is only done once (and perhaps stored inside a
dataset object in some optimized form) or performed on every to_table call?

In the case that I am creating a dataset from a common metadata file is it
possible to attach manual partitioning information (using field expressions
on to each file), similar to how it is done in the manual dataset creation
case (
https://arrow.apache.org/docs/python/dataset.html#manual-specification-of-the-dataset
)?

Josh

On Fri, Sep 25, 2020 at 8:34 AM Joris Van den Bossche <
jorisvandenbossche@gmail.com> wrote:

> Using a small toy example, the "isin" filter is indeed not working for
> filtering row groups:
>
> >>> table = pa.table({"name": np.repeat(["a", "b", "c", "d"], 5), "value":
> np.arange(20)})
> >>> pq.write_table(table, "test_filter_string.parquet", row_group_size=5)
> >>> dataset = ds.dataset("test_filter_string.parquet")
> # get the single file fragment (dataset consists of one file)
> >>> fragment = list(dataset.get_fragments())[0]
> >>> fragment.ensure_complete_metadata()
>
> # check that we do have statistics for our row groups
> >>> fragment.row_groups[0].statistics
> {'name': {'min': 'a', 'max': 'a'}, 'value': {'min': 0, 'max': 4}}
>
> # I created the file such that there are 4 row groups (each with a unique
> value in the name column)
> >>> fragment.split_by_row_group()
> [<pyarrow._dataset.ParquetFileFragment at 0x7ff783939810>,
>  <pyarrow._dataset.ParquetFileFragment at 0x7ff783728cd8>,
>  <pyarrow._dataset.ParquetFileFragment at 0x7ff78376c9c0>,
>  <pyarrow._dataset.ParquetFileFragment at 0x7ff7835efd68>]
>
> # simple equality filter works as expected -> only single row group left
> >>> filter = ds.field("name") == "a"
> >>> fragment.split_by_row_group(filter)
> [<pyarrow._dataset.ParquetFileFragment at 0x7ff783662738>]
>
> # isin filter does not work
> >>> filter = ds.field("name").isin(["a", "b"])
> >>> fragment.split_by_row_group(filter)
> [<pyarrow._dataset.ParquetFileFragment at 0x7ff7837f46f0>,
>  <pyarrow._dataset.ParquetFileFragment at 0x7ff783627a98>,
>  <pyarrow._dataset.ParquetFileFragment at 0x7ff783581b70>,
>  <pyarrow._dataset.ParquetFileFragment at 0x7ff7835fb780>]
>
> While filtering with "isin" on partition columns is working fine. I opened
> https://issues.apache.org/jira/browse/ARROW-10091 to track this as a
> possible enhancement.
> Now, to explain why for partitions this is an "easier" case: the partition
> information gets translated into an equality expression, with your example
> an expression like "name == 'a' ", while the statistics give a
> bigger/lesser than expression, such as "(name > 'a') & (name < 'a')" (from
> the min/max). So for the equality it is more trivial to compare this with
> an "isin" expression like "name in ['a', 'b']" (for the min/max expression,
> we would need to check the special case where min/max is equal).
>
> Joris
>
> On Fri, 25 Sep 2020 at 14:06, Joris Van den Bossche <
> jorisvandenbossche@gmail.com> wrote:
>
>> Hi Josh,
>>
>> Thanks for the question!
>>
>> In general, filtering on partition columns will be faster than filtering
>> on actual data columns using row group statistics. For partition-based
>> filtering, the scanner can skip full files based on the information from
>> the file path (in your example case, there are 11 files, for which it can
>> select 4 of them to actually read), while for row-group-based filtering, it
>> actually needs to parse the statistics of all row groups of all files to
>> determine which can be skipped, which is typically more information to
>> process compared to the file paths.
>>
>> That said, there are some oddities I noticed:
>>
>> - As I mentioned, I expect partition-based filtering to faster, but not
>> that much faster (certainly in a case with a limited number of files, the
>> overhead of the parsing / filtering row groups should be really minimal)
>> - Inspecting the result a bit, it seems that for the first dataset
>> (without partitioning) it's not actually applying the filter correctly. The
>> min/max for the name column are included in the row group statistics, but
>> the isin filter didn't actually filter them out. Something to investigate,
>> but that certainly explains the difference in performance (it's actually
>> reading all data, and only filtering after reading, not skipping some parts
>> before reading)
>> - In your case, the partitioning has the same name as one of the actual
>> columns in the data files. I am not sure this corner case of duplicate
>> fields is tested very well, or how the filtering will work?
>>
>> Joris
>>
>> On Thu, 24 Sep 2020 at 21:02, Josh Mayer <jo...@gmail.com> wrote:
>>
>>> I am comparing two datasets with a filter on a string column (that is
>>> also a partition column). I create the dataset from a common metadata file.
>>> In the first case I omit the partitioning information whereas in the second
>>> I include it. I would expect the performance to be similar since the column
>>> statistics should be able to identify the same row groups as the
>>> partitioning. However, I'm seeing the first case run almost 3x slower. Is
>>> this expected?
>>>
>>> An example is here (I'm running on linux, python 3.8, pyarrow 1.0.1):
>>>
>>> https://gist.github.com/josham/5d7cf52f9ea60b1b2bbef1e768ea992f
>>>
>>

Re: [Python] dataset filter performance and partitioning

Posted by Joris Van den Bossche <jo...@gmail.com>.
Coming back to this older thread, specifically on the topic of "duplicated"
information as both partition field and actual column in the data:

On Fri, 25 Sep 2020 at 14:43, Robin Kåveland Hansen <ka...@gmail.com>
wrote:

> Hi,
>
> Just thought I'd chime in on this point:
>
> > - In your case, the partitioning has the same name as one of the actual
> columns in the data files. I am not sure this corner case of duplicate
> fields is tested very well, or how the filtering will work?
>
> I _think_ this is the default behaviour for pyspark for writes. Eg. the
> column is both in the data files as well as in the partition.
>
> I think this might actually make sense, though, since putting the
> partition column in the schema means you'll know what type it should be
> when you read it back from disk (at least for data files that support
> schemas).
>

Thanks for this feedback!
I wasn't aware that this is something pyspark can do (for example, I know
that Dask does not include the partition column in the actual data). But
then we need to ensure we handle this correctly.

I did a few experiments to check the support (I don't know if we explicitly
ensured such support when implementing the datasets), and I observe the
following behaviour in case of duplicate partition field / actual data
column:

* The schema of the dataset doesn't include the column as duplicated, and
uses the schema of the parquet file (it includes parquet metadata like
field_id)
* When reading, it actually returns the values as they are in the physical
parquet files.
* When filtering, it uses the partition fields (i.e. information in the
file paths), and doesn't do any additional check / filter using the
physical data in the column (so if your partition field vs column is not in
sync, this can give wrong results).
* When the partition field's inferred type doesn't match with the file's
schema for the partition column, you get an appropriate error (only where
the types are "compatible", like int32 and int64, we should actually
support this, because right now this also errors)

I _think_ this behaviour is correct /  as expected, but feedback on that is
certainly welcome.

Actual code with output of the small experiment can be seen in this
notebook:
https://nbviewer.jupyter.org/gist/jorisvandenbossche/9382de2eb96db5db2ef801f63a359082

It would probably be good to add some explicit tests to ensure we support
this use case properly (I opened
https://issues.apache.org/jira/browse/ARROW-10347 for this)

Joris


>
> --
> Kind regards,
> Robin Kåveland
>
>

Re: [Python] dataset filter performance and partitioning

Posted by Robin Kåveland Hansen <ka...@gmail.com>.
Hi,

Just thought I'd chime in on this point:

> - In your case, the partitioning has the same name as one of the actual
columns in the data files. I am not sure this corner case of duplicate
fields is tested very well, or how the filtering will work?

I _think_ this is the default behaviour for pyspark for writes. Eg. the
column is both in the data files as well as in the partition.

I think this might actually make sense, though, since putting the partition
column in the schema means you'll know what type it should be when you read
it back from disk (at least for data files that support schemas).

-- 
Kind regards,
Robin Kåveland

Re: [Python] dataset filter performance and partitioning

Posted by Joris Van den Bossche <jo...@gmail.com>.
Using a small toy example, the "isin" filter is indeed not working for
filtering row groups:

>>> table = pa.table({"name": np.repeat(["a", "b", "c", "d"], 5), "value":
np.arange(20)})
>>> pq.write_table(table, "test_filter_string.parquet", row_group_size=5)
>>> dataset = ds.dataset("test_filter_string.parquet")
# get the single file fragment (dataset consists of one file)
>>> fragment = list(dataset.get_fragments())[0]
>>> fragment.ensure_complete_metadata()

# check that we do have statistics for our row groups
>>> fragment.row_groups[0].statistics
{'name': {'min': 'a', 'max': 'a'}, 'value': {'min': 0, 'max': 4}}

# I created the file such that there are 4 row groups (each with a unique
value in the name column)
>>> fragment.split_by_row_group()
[<pyarrow._dataset.ParquetFileFragment at 0x7ff783939810>,
 <pyarrow._dataset.ParquetFileFragment at 0x7ff783728cd8>,
 <pyarrow._dataset.ParquetFileFragment at 0x7ff78376c9c0>,
 <pyarrow._dataset.ParquetFileFragment at 0x7ff7835efd68>]

# simple equality filter works as expected -> only single row group left
>>> filter = ds.field("name") == "a"
>>> fragment.split_by_row_group(filter)
[<pyarrow._dataset.ParquetFileFragment at 0x7ff783662738>]

# isin filter does not work
>>> filter = ds.field("name").isin(["a", "b"])
>>> fragment.split_by_row_group(filter)
[<pyarrow._dataset.ParquetFileFragment at 0x7ff7837f46f0>,
 <pyarrow._dataset.ParquetFileFragment at 0x7ff783627a98>,
 <pyarrow._dataset.ParquetFileFragment at 0x7ff783581b70>,
 <pyarrow._dataset.ParquetFileFragment at 0x7ff7835fb780>]

While filtering with "isin" on partition columns is working fine. I opened
https://issues.apache.org/jira/browse/ARROW-10091 to track this as a
possible enhancement.
Now, to explain why for partitions this is an "easier" case: the partition
information gets translated into an equality expression, with your example
an expression like "name == 'a' ", while the statistics give a
bigger/lesser than expression, such as "(name > 'a') & (name < 'a')" (from
the min/max). So for the equality it is more trivial to compare this with
an "isin" expression like "name in ['a', 'b']" (for the min/max expression,
we would need to check the special case where min/max is equal).

Joris

On Fri, 25 Sep 2020 at 14:06, Joris Van den Bossche <
jorisvandenbossche@gmail.com> wrote:

> Hi Josh,
>
> Thanks for the question!
>
> In general, filtering on partition columns will be faster than filtering
> on actual data columns using row group statistics. For partition-based
> filtering, the scanner can skip full files based on the information from
> the file path (in your example case, there are 11 files, for which it can
> select 4 of them to actually read), while for row-group-based filtering, it
> actually needs to parse the statistics of all row groups of all files to
> determine which can be skipped, which is typically more information to
> process compared to the file paths.
>
> That said, there are some oddities I noticed:
>
> - As I mentioned, I expect partition-based filtering to faster, but not
> that much faster (certainly in a case with a limited number of files, the
> overhead of the parsing / filtering row groups should be really minimal)
> - Inspecting the result a bit, it seems that for the first dataset
> (without partitioning) it's not actually applying the filter correctly. The
> min/max for the name column are included in the row group statistics, but
> the isin filter didn't actually filter them out. Something to investigate,
> but that certainly explains the difference in performance (it's actually
> reading all data, and only filtering after reading, not skipping some parts
> before reading)
> - In your case, the partitioning has the same name as one of the actual
> columns in the data files. I am not sure this corner case of duplicate
> fields is tested very well, or how the filtering will work?
>
> Joris
>
> On Thu, 24 Sep 2020 at 21:02, Josh Mayer <jo...@gmail.com> wrote:
>
>> I am comparing two datasets with a filter on a string column (that is
>> also a partition column). I create the dataset from a common metadata file.
>> In the first case I omit the partitioning information whereas in the second
>> I include it. I would expect the performance to be similar since the column
>> statistics should be able to identify the same row groups as the
>> partitioning. However, I'm seeing the first case run almost 3x slower. Is
>> this expected?
>>
>> An example is here (I'm running on linux, python 3.8, pyarrow 1.0.1):
>>
>> https://gist.github.com/josham/5d7cf52f9ea60b1b2bbef1e768ea992f
>>
>

Re: [Python] dataset filter performance and partitioning

Posted by Joris Van den Bossche <jo...@gmail.com>.
Hi Josh,

Thanks for the question!

In general, filtering on partition columns will be faster than filtering on
actual data columns using row group statistics. For partition-based
filtering, the scanner can skip full files based on the information from
the file path (in your example case, there are 11 files, for which it can
select 4 of them to actually read), while for row-group-based filtering, it
actually needs to parse the statistics of all row groups of all files to
determine which can be skipped, which is typically more information to
process compared to the file paths.

That said, there are some oddities I noticed:

- As I mentioned, I expect partition-based filtering to faster, but not
that much faster (certainly in a case with a limited number of files, the
overhead of the parsing / filtering row groups should be really minimal)
- Inspecting the result a bit, it seems that for the first dataset (without
partitioning) it's not actually applying the filter correctly. The min/max
for the name column are included in the row group statistics, but the isin
filter didn't actually filter them out. Something to investigate, but that
certainly explains the difference in performance (it's actually reading all
data, and only filtering after reading, not skipping some parts before
reading)
- In your case, the partitioning has the same name as one of the actual
columns in the data files. I am not sure this corner case of duplicate
fields is tested very well, or how the filtering will work?

Joris

On Thu, 24 Sep 2020 at 21:02, Josh Mayer <jo...@gmail.com> wrote:

> I am comparing two datasets with a filter on a string column (that is also
> a partition column). I create the dataset from a common metadata file. In
> the first case I omit the partitioning information whereas in the second I
> include it. I would expect the performance to be similar since the column
> statistics should be able to identify the same row groups as the
> partitioning. However, I'm seeing the first case run almost 3x slower. Is
> this expected?
>
> An example is here (I'm running on linux, python 3.8, pyarrow 1.0.1):
>
> https://gist.github.com/josham/5d7cf52f9ea60b1b2bbef1e768ea992f
>