You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@arrow.apache.org by "Lance Dacey (Jira)" <ji...@apache.org> on 2022/02/17 15:12:00 UTC
[jira] [Created] (ARROW-15716) [Dataset][Python] Parse a list of fragment paths to gather filters
Lance Dacey created ARROW-15716:
-----------------------------------
Summary: [Dataset][Python] Parse a list of fragment paths to gather filters
Key: ARROW-15716
URL: https://issues.apache.org/jira/browse/ARROW-15716
Project: Apache Arrow
Issue Type: Wish
Affects Versions: 7.0.0
Reporter: Lance Dacey
Is it possible for partitioning.parse() to be updated to parse a list of paths instead of just a single path?
I am passing the .paths from file_visitor to downstream tasks to process data which was recently saved, but I can run into problems with this if I overwrite data with delete_matching in order to consolidate small files since the paths won't exist.
Here is the output of my current approach to use filters instead of reading the paths directly:
{code:java}
# Fragments saved during write_dataset
['dev/dataset/fragments/date_id=20210813/data-0.parquet', 'dev/dataset/fragments/date_id=20210114/data-2.parquet', 'dev/dataset/fragments/date_id=20210114/data-1.parquet', 'dev/dataset/fragments/date_id=20210114/data-0.parquet']
# Run partitioning.parse() on each fragment
[<pyarrow.compute.Expression (date_id == 20210813)>, <pyarrow.compute.Expression (date_id == 20210114)>, <pyarrow.compute.Expression (date_id == 20210114)>, <pyarrow.compute.Expression (date_id == 20210114)>]
# Format those expressions into a list of tuples
[('date_id', 'in', [20210114, 20210813])]
# Convert to an expression which is used as a filter in .to_table()
is_in(date_id, {value_set=int64:[
20210114,
20210813
], skip_nulls=false})
{code}
And here is how I am creating the filter from a list of .paths (perhaps there is a better way?):
{code:python}
partitioning = ds.HivePartitioning(partition_schema)
expressions = []
for file in paths:
expressions.append(partitioning.parse(file))
values = []
filters = []
for expression in expressions:
partitions = ds._get_partition_keys(expression)
if len(partitions.keys()) > 1:
element = [(k, "==", v) for k, v in partitions.items()]
if element not in filters:
filters.append(element)
else:
for k, v in partitions.items():
if v not in values:
values.append(v)
filters = [(k, "in", sorted(values))]
filt_exp = pa.parquet._filters_to_expression(filters)
dataset.to_table(filter=filt_exp)
{code}
My hope would be to do something like filt_exp = partitioning.parse(paths) which would return a dataset expression.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)