You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@arrow.apache.org by "Lance Dacey (Jira)" <ji...@apache.org> on 2022/02/17 15:12:00 UTC

[jira] [Created] (ARROW-15716) [Dataset][Python] Parse a list of fragment paths to gather filters

Lance Dacey created ARROW-15716:
-----------------------------------

             Summary: [Dataset][Python] Parse a list of fragment paths to gather filters
                 Key: ARROW-15716
                 URL: https://issues.apache.org/jira/browse/ARROW-15716
             Project: Apache Arrow
          Issue Type: Wish
    Affects Versions: 7.0.0
            Reporter: Lance Dacey


Is it possible for partitioning.parse() to be updated to parse a list of paths instead of just a single path? 

I am passing the .paths from file_visitor to downstream tasks to process data which was recently saved, but I can run into problems with this if I overwrite data with delete_matching in order to consolidate small files since the paths won't exist. 

Here is the output of my current approach to use filters instead of reading the paths directly:

{code:java}
# Fragments saved during write_dataset 
['dev/dataset/fragments/date_id=20210813/data-0.parquet', 'dev/dataset/fragments/date_id=20210114/data-2.parquet', 'dev/dataset/fragments/date_id=20210114/data-1.parquet', 'dev/dataset/fragments/date_id=20210114/data-0.parquet']

# Run partitioning.parse() on each fragment 
[<pyarrow.compute.Expression (date_id == 20210813)>, <pyarrow.compute.Expression (date_id == 20210114)>, <pyarrow.compute.Expression (date_id == 20210114)>, <pyarrow.compute.Expression (date_id == 20210114)>]

# Format those expressions into a list of tuples
[('date_id', 'in', [20210114, 20210813])]

# Convert to an expression which is used as a filter in .to_table()
is_in(date_id, {value_set=int64:[
  20210114,
  20210813
], skip_nulls=false})
{code}

And here is how I am creating the filter from a list of .paths (perhaps there is a better way?):

{code:python}
        partitioning = ds.HivePartitioning(partition_schema)
        expressions = []
        for file in paths:
            expressions.append(partitioning.parse(file))
        values = []
        filters = []
        for expression in expressions:
            partitions = ds._get_partition_keys(expression)
            if len(partitions.keys()) > 1:
                element = [(k, "==", v) for k, v in partitions.items()]
                if element not in filters:
                    filters.append(element)
            else:
                for k, v in partitions.items():
                    if v not in values:
                        values.append(v)
                filters = [(k, "in", sorted(values))]

filt_exp = pa.parquet._filters_to_expression(filters)
dataset.to_table(filter=filt_exp)
{code}


My hope would be to do something like filt_exp = partitioning.parse(paths) which would return a dataset expression.




--
This message was sent by Atlassian Jira
(v8.20.1#820001)