You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Lance Dacey (Jira)" <ji...@apache.org> on 2021/04/13 13:35:00 UTC

[jira] [Created] (ARROW-12364) [Python] [Dataset] Add metadata_collector option to ds.write_dataset()

Lance Dacey created ARROW-12364:
-----------------------------------

             Summary: [Python] [Dataset] Add metadata_collector option to ds.write_dataset()
                 Key: ARROW-12364
                 URL: https://issues.apache.org/jira/browse/ARROW-12364
             Project: Apache Arrow
          Issue Type: Wish
          Components: Parquet, Python
    Affects Versions: 3.0.0
         Environment: Ubuntu 18.04
            Reporter: Lance Dacey


The legacy pq.write_to_dataset() has an option to save metadata to a list when writing partitioned data.
{code:python}
    collector = []
    pq.write_to_dataset(
        table=table,
        root_path=output_path,
        use_legacy_dataset=True,
        metadata_collector=collector,
    )
    fragments = []
    for piece in collector:
        files.append(filesystem.sep.join([output_path, piece.row_group(0).column(0).file_path]))
{code}
This allows me to save a list of the specific parquet files which were created when writing the partitions to storage. I use this when scheduling tasks with Airflow.

Task A downloads data and partitions it --> Task B reads the file fragments which were just saved and transforms it --> Task C creates a list of dataset filters from the file fragments I transformed, reads each filter to into a table and then processes the data further (normally dropping duplicates or selecting a subset of the columns) and saves it for visualization
{code:java}
fragments = ['dev/date_id=20180111/transform-split-20210301013200-68.parquet', 'dev/date_id=20180114/transform-split-20210301013200-69.parquet', 'dev/date_id=20180128/transform-split-20210301013200-57.parquet', ]
{code}
I can use this list downstream to do two things:
 1) I can read the list of fragments directly as a new dataset and transform the data
{code:java}
ds.dataset(fragments)
{code}
2) I can generate filters from the fragment paths which were saved using ds._get_partition_keys(). This allows me to query the dataset and retrieve all fragments within the partition. For example, if I partition by date and I process data every 30 minutes I might have 48 individual file fragments within a single partition. I need to know to query the *entire* partition instead of reading a single fragment.
{code:java}
def consolidate_filters(fragments):
    """Retrieves the partition_expressions from a list of dataset fragments to build a list of unique filters"""
    filters = []
    for frag in fragments:
        partitions = ds._get_partition_keys(frag.partition_expression)
        filter = [(k, "==", v) for k, v in partitions.items()]
        if filter not in filters:
            filters.append(filter)
    return filters

filter_expression = pq._filters_to_expression(
                filters=consolidate_filters(fragments=fragments)
            )
{code}
My current problem is that when I use ds.write_dataset(), I do not have a convenient method for generating a list of the file fragments I just saved. My only choice is to use basename_template and fs.glob() to find a list of the files based on the basename_template pattern. This is much slower and a waste of listing files on blob storage. [Related stackoverflow question with the basis of the approach I am using now |https://stackoverflow.com/questions/66252660/pyarrow-identify-the-fragments-written-or-filters-used-when-writing-a-parquet/66266585#66266585]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)