You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "ARF (Jira)" <ji...@apache.org> on 2021/02/28 17:30:00 UTC

[jira] [Created] (ARROW-11826) Improve performance of repeated filtered parquet reads

ARF created ARROW-11826:
---------------------------

             Summary: Improve performance of repeated filtered parquet reads
                 Key: ARROW-11826
                 URL: https://issues.apache.org/jira/browse/ARROW-11826
             Project: Apache Arrow
          Issue Type: Improvement
          Components: Python
    Affects Versions: 3.0.0
            Reporter: ARF


{{pq.read_table(..., filters=...)}} is slower than it probably should be... In particular, there should be a faster solution for repeatedly reading from the same file.

For a 1,8 GB parquet file with about 5000 row groups (written with row-group metadata statistics), reading with a filter that selects a sub-section of a single row group takes 1.65 s.

Repeated, similarly small reads from the same file each take the same amount of time with {{pq.read_table(..., filters=...)}}.

I say "slower than it probably should be" because with the rudimentary example code below, the initial filtered read takes about 975 ms. Any subsequent read takes only 14ms.

I have no idea what makes the C++ code slow, but in Python I observed two dominant time-wasters on for repeated queries:
 # {{pq.ParquetFile('test.parquet')}} takes 170ms (this might be a Windows issue), so opening the file only once helps
 # iterating though metadata surprisingly slow: 80 ms for a single pass, e.g. {{[pq_file.metadata.row_group(i).column(0).statistics.min for i in range(pq_file.metadata.num_row_groups)]}}. E.g. six passes add up to about 500ms. (One pass each for minimum and maximum on three columns.) Caching the statistics really helps here.

{code:python}
from functools import cached_property, lru_cache

import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq


def equal(x, y, **kwargs):
    """Enable equal comparison for dictionary array vs. scalar."""
    type_x = type(x.type)
    type_y = type(y.type)

    # fastpath
    if type_x is pa.DataType and type_y is pa.DataType:
        return pc._equal(x, y, **kwargs)

    if type_x is pa.DictionaryType and type_y is pa.DataType:
        array_arg = x
        scalar_arg = y
    elif type_x is pa.DataType and type_x is pa.DictionaryType:
        array_arg = y
        scalar_arg = x
    else:
        # fallback to default implemetation (which will error out...)
        return pc._equal(x, y, **kwargs)

    # have dictionary vs. scalar comparison
    def chunk_generator():
        for chunk in array_arg.iterchunks():
            # for large dictionaries use pyarrow to search index
            if len(chunk.dictionary) > 30:
                index = pc.index_in(scalar_arg, options=pc.SetLookupOptions(
                    value_set=chunk.dictionary))
                if index.is_valid:
                    yield pc.equal(chunk.indices, index)
                    continue
            # for small dictionaries index search in python is faster
            try:
                index = chunk.dictionary.to_pylist().index(scalar_arg.as_py())
                yield pc.equal(chunk.indices, pa.scalar(index, chunk.indices.type))
                continue
            except ValueError:
                pass
            # value not found in dictionary
            yield pa.nulls(len(chunk), pa.bool_()).fill_null(False)
    return pa.chunked_array(chunk_generator())


pc._equal = pc.equal
pc.equal = equal


@lru_cache(maxsize=None)
def column_names_from_parquet_file(parquet_file):
    """"Returns a dict associating column names with column number
    for a given parquet file. The result is cached."""
    return {
        parquet_file.metadata.row_group(0).column(i).path_in_schema: i
        for i in range(parquet_file.metadata.row_group(0).num_columns)
    }


@lru_cache(maxsize=None)
def metadata_from_parquet_file(parquet_file, field_name):
    """Returns a tuple(min_, max_) where min_ and max_ are lists with the 
    row group metadata statistics min and max respectively.
    The result is cached."""
    column_id = column_names_from_parquet_file(parquet_file)[field_name]
    pq_metadata = parquet_file.metadata
    min_ = [
        pq_metadata.row_group(i).column(column_id).statistics.min
        for i in range(pq_metadata.num_row_groups)
    ]
    max_ = [
        pq_metadata.row_group(i).column(column_id).statistics.max
        for i in range(pq_metadata.num_row_groups)
    ]
    return min_, max_


class Node:
    """Base class for a node in a computation graph."""

    def __init__(self, left=None, right=None):
        self.left = left
        self.right = right

    def __eq__(self, other):
        return Node_Equal(self, other)

    def __and__(self, other):
        return Node_And(self, other)

    def __or__(self, other):
        return Node_Or(self, other)

    def filter_table(self, table):
        """Applies the computation graph as a filter on a table."""
        mask = self.evaluate_on_table(table)
        return table.filter(mask)


class Node_Equal(Node):
    def _identify_field_and_literal(self):
        if isinstance(self.left, Field):
            self.__dict__['field'] = field = self.left
            self.__dict__['literal'] = literal = self.right
        else:
            self.field = self.right
            self.literal = self.left
        assert isinstance(literal, (str, float, int))
        return field, literal

    @cached_property
    def field(self):
        field = self.__dict__.get(
            'field', None) or self._identify_field_and_literal()[0]
        return field

    @cached_property
    def literal(self):
        literal = self.__dict__.get(
            'literal', None) or self._identify_field_and_literal()[1]
        return literal

    def evaluate_on_metadata(self, parquet_file, row_groups=None):
        field = self.field
        literal = self.literal
        min_, max_ = metadata_from_parquet_file(parquet_file, field.name)
        row_groups = row_groups or range(len(min_))
        return [
            i
            for i in row_groups
            if min_[i] <= literal <= max_[i]
        ]

    def evaluate_on_table(self, table):
        field = self.field
        literal = self.literal
        column = table[field.name]

        if type(column.type) is pa.DictionaryType:
            pa_scalar = pa.scalar(literal, column.type.value_type)
        else:
            pa_scalar = pa.scalar(literal, column.type)
        return pc.equal(column, pa_scalar)


class Node_And(Node):
    def evaluate_on_metadata(self, parquet_file, row_groups=None):
        filtered_row_groups = self.left.evaluate_on_metadata(
            parquet_file, row_groups)
        return self.right.evaluate_on_metadata(parquet_file, filtered_row_groups)

    def evaluate_on_table(self, table):
        mask1 = self.left.evaluate_on_table(table)
        mask2 = self.right.evaluate_on_table(table)
        return pc.and_(mask1, mask2)


class Node_Or(Node):
    def evaluate_on_metadata(self, parquet_file, row_groups=None):
        row_groups1 = self.left.evaluate_on_metadata(parquet_file, row_groups)
        row_groups2 = self.right.evaluate_on_metadata(parquet_file, row_groups)
        return sorted(set(row_groups1) | set(row_groups2))

    def evaluate_on_table(self, table):
        mask1 = self.left.evaluate_on_table(table)
        mask2 = self.right.evaluate_on_table(table)
        return pc.or_(mask1, mask2)


class Field:
    def __init__(self, name):
        self.name = name

    def __eq__(self, other):
        return Node_Equal(self, other)


def read_table_filtered(parquet_file, filter=None):
    """Fast filtered read from a parquet file."""
    if isinstance(parquet_file, str):
        parquet_file = pq.ParquetFile(parquet_file)
    row_groups = filter.evaluate_on_metadata(parquet_file)
    table = parquet_file.read_row_groups(row_groups)
    return filter.filter_table(table)


if __name__ == '__main__':
    pq_file = pq.ParquetFile('test.parquet')
    filter = (Field('code') == 12345) & (
        Field('foo') == 'foo value') & (Field('bar') == 'bar value')
    table = read_table_filtered(pq_file, filter)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)