You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Jacob Wujciak-Jens (Jira)" <ji...@apache.org> on 2022/04/08 12:27:00 UTC

[jira] [Updated] (ARROW-15716) [Dataset][Python] Parse a list of fragment paths to gather filters

     [ https://issues.apache.org/jira/browse/ARROW-15716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jacob Wujciak-Jens updated ARROW-15716:
---------------------------------------
    Component/s: Python

> [Dataset][Python] Parse a list of fragment paths to gather filters
> ------------------------------------------------------------------
>
>                 Key: ARROW-15716
>                 URL: https://issues.apache.org/jira/browse/ARROW-15716
>             Project: Apache Arrow
>          Issue Type: Wish
>          Components: Python
>    Affects Versions: 7.0.0
>            Reporter: Lance Dacey
>            Priority: Minor
>
> Is it possible for partitioning.parse() to be updated to parse a list of paths instead of just a single path? 
> I am passing the .paths from file_visitor to downstream tasks to process data which was recently saved, but I can run into problems with this if I overwrite data with delete_matching in order to consolidate small files since the paths won't exist. 
> Here is the output of my current approach to use filters instead of reading the paths directly:
> {code:java}
> # Fragments saved during write_dataset 
> ['dev/dataset/fragments/date_id=20210813/data-0.parquet', 'dev/dataset/fragments/date_id=20210114/data-2.parquet', 'dev/dataset/fragments/date_id=20210114/data-1.parquet', 'dev/dataset/fragments/date_id=20210114/data-0.parquet']
> # Run partitioning.parse() on each fragment 
> [<pyarrow.compute.Expression (date_id == 20210813)>, <pyarrow.compute.Expression (date_id == 20210114)>, <pyarrow.compute.Expression (date_id == 20210114)>, <pyarrow.compute.Expression (date_id == 20210114)>]
> # Format those expressions into a list of tuples
> [('date_id', 'in', [20210114, 20210813])]
> # Convert to an expression which is used as a filter in .to_table()
> is_in(date_id, {value_set=int64:[
>   20210114,
>   20210813
> ], skip_nulls=false})
> {code}
> And here is how I am creating the filter from a list of .paths (perhaps there is a better way?):
> {code:python}
>         partitioning = ds.HivePartitioning(partition_schema)
>         expressions = []
>         for file in paths:
>             expressions.append(partitioning.parse(file))
>         values = []
>         filters = []
>         for expression in expressions:
>             partitions = ds._get_partition_keys(expression)
>             if len(partitions.keys()) > 1:
>                 element = [(k, "==", v) for k, v in partitions.items()]
>                 if element not in filters:
>                     filters.append(element)
>             else:
>                 for k, v in partitions.items():
>                     if v not in values:
>                         values.append(v)
>                 filters = [(k, "in", sorted(values))]
> filt_exp = pa.parquet._filters_to_expression(filters)
> dataset.to_table(filter=filt_exp)
> {code}
> My hope would be to do something like filt_exp = partitioning.parse(paths) which would return a dataset expression.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)