You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "ARF (Jira)" <ji...@apache.org> on 2021/02/28 17:32:00 UTC

[jira] [Updated] (ARROW-11826) Improve performance of repeated filtered parquet reads

     [ https://issues.apache.org/jira/browse/ARROW-11826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

ARF updated ARROW-11826:
------------------------
    Description: 
{{pq.read_table(..., filters=...)}} is slower than it probably should be... In particular, there should be a faster solution for repeatedly reading from the same file.

For a 1,8 GB parquet file with about 5000 row groups (written with row-group metadata statistics), reading with a filter that selects a sub-section of a single row group takes 1.65 s.

Repeated, similarly small reads from the same file each take the same amount of time with {{pq.read_table(..., filters=...)}}.

I say "slower than it probably should be" because with the rudimentary example code below, the initial filtered read takes about 975 ms. Any subsequent read takes only 14ms.

I have no idea what makes the C++ code slow, but in Python I observed two dominant time-wasters for repeated queries:
 # {{pq.ParquetFile('test.parquet')}} takes 170ms (this might be a Windows issue), so opening the file only once helps
 # iterating though metadata surprisingly slow: 80 ms for a single pass, e.g. {{[pq_file.metadata.row_group(i).column(0).statistics.min for i in range(pq_file.metadata.num_row_groups)]}}. E.g. six passes add up to about 500ms. (One pass each for minimum and maximum on three columns.) Caching the statistics really helps here.

{code:python}
from functools import cached_property, lru_cache

import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq


def equal(x, y, **kwargs):
    """Enable equal comparison for dictionary array vs. scalar."""
    type_x = type(x.type)
    type_y = type(y.type)

    # fastpath
    if type_x is pa.DataType and type_y is pa.DataType:
        return pc._equal(x, y, **kwargs)

    if type_x is pa.DictionaryType and type_y is pa.DataType:
        array_arg = x
        scalar_arg = y
    elif type_x is pa.DataType and type_x is pa.DictionaryType:
        array_arg = y
        scalar_arg = x
    else:
        # fallback to default implemetation (which will error out...)
        return pc._equal(x, y, **kwargs)

    # have dictionary vs. scalar comparison
    def chunk_generator():
        for chunk in array_arg.iterchunks():
            # for large dictionaries use pyarrow to search index
            if len(chunk.dictionary) > 30:
                index = pc.index_in(scalar_arg, options=pc.SetLookupOptions(
                    value_set=chunk.dictionary))
                if index.is_valid:
                    yield pc.equal(chunk.indices, index)
                    continue
            # for small dictionaries index search in python is faster
            try:
                index = chunk.dictionary.to_pylist().index(scalar_arg.as_py())
                yield pc.equal(chunk.indices, pa.scalar(index, chunk.indices.type))
                continue
            except ValueError:
                pass
            # value not found in dictionary
            yield pa.nulls(len(chunk), pa.bool_()).fill_null(False)
    return pa.chunked_array(chunk_generator())


pc._equal = pc.equal
pc.equal = equal


@lru_cache(maxsize=None)
def column_names_from_parquet_file(parquet_file):
    """"Returns a dict associating column names with column number
    for a given parquet file. The result is cached."""
    return {
        parquet_file.metadata.row_group(0).column(i).path_in_schema: i
        for i in range(parquet_file.metadata.row_group(0).num_columns)
    }


@lru_cache(maxsize=None)
def metadata_from_parquet_file(parquet_file, field_name):
    """Returns a tuple(min_, max_) where min_ and max_ are lists with the 
    row group metadata statistics min and max respectively.
    The result is cached."""
    column_id = column_names_from_parquet_file(parquet_file)[field_name]
    pq_metadata = parquet_file.metadata
    min_ = [
        pq_metadata.row_group(i).column(column_id).statistics.min
        for i in range(pq_metadata.num_row_groups)
    ]
    max_ = [
        pq_metadata.row_group(i).column(column_id).statistics.max
        for i in range(pq_metadata.num_row_groups)
    ]
    return min_, max_


class Node:
    """Base class for a node in a computation graph."""

    def __init__(self, left=None, right=None):
        self.left = left
        self.right = right

    def __eq__(self, other):
        return Node_Equal(self, other)

    def __and__(self, other):
        return Node_And(self, other)

    def __or__(self, other):
        return Node_Or(self, other)

    def filter_table(self, table):
        """Applies the computation graph as a filter on a table."""
        mask = self.evaluate_on_table(table)
        return table.filter(mask)


class Node_Equal(Node):
    def _identify_field_and_literal(self):
        if isinstance(self.left, Field):
            self.__dict__['field'] = field = self.left
            self.__dict__['literal'] = literal = self.right
        else:
            self.field = self.right
            self.literal = self.left
        assert isinstance(literal, (str, float, int))
        return field, literal

    @cached_property
    def field(self):
        field = self.__dict__.get(
            'field', None) or self._identify_field_and_literal()[0]
        return field

    @cached_property
    def literal(self):
        literal = self.__dict__.get(
            'literal', None) or self._identify_field_and_literal()[1]
        return literal

    def evaluate_on_metadata(self, parquet_file, row_groups=None):
        field = self.field
        literal = self.literal
        min_, max_ = metadata_from_parquet_file(parquet_file, field.name)
        row_groups = row_groups or range(len(min_))
        return [
            i
            for i in row_groups
            if min_[i] <= literal <= max_[i]
        ]

    def evaluate_on_table(self, table):
        field = self.field
        literal = self.literal
        column = table[field.name]

        if type(column.type) is pa.DictionaryType:
            pa_scalar = pa.scalar(literal, column.type.value_type)
        else:
            pa_scalar = pa.scalar(literal, column.type)
        return pc.equal(column, pa_scalar)


class Node_And(Node):
    def evaluate_on_metadata(self, parquet_file, row_groups=None):
        filtered_row_groups = self.left.evaluate_on_metadata(
            parquet_file, row_groups)
        return self.right.evaluate_on_metadata(parquet_file, filtered_row_groups)

    def evaluate_on_table(self, table):
        mask1 = self.left.evaluate_on_table(table)
        mask2 = self.right.evaluate_on_table(table)
        return pc.and_(mask1, mask2)


class Node_Or(Node):
    def evaluate_on_metadata(self, parquet_file, row_groups=None):
        row_groups1 = self.left.evaluate_on_metadata(parquet_file, row_groups)
        row_groups2 = self.right.evaluate_on_metadata(parquet_file, row_groups)
        return sorted(set(row_groups1) | set(row_groups2))

    def evaluate_on_table(self, table):
        mask1 = self.left.evaluate_on_table(table)
        mask2 = self.right.evaluate_on_table(table)
        return pc.or_(mask1, mask2)


class Field:
    def __init__(self, name):
        self.name = name

    def __eq__(self, other):
        return Node_Equal(self, other)


def read_table_filtered(parquet_file, filter=None):
    """Fast filtered read from a parquet file."""
    if isinstance(parquet_file, str):
        parquet_file = pq.ParquetFile(parquet_file)
    row_groups = filter.evaluate_on_metadata(parquet_file)
    table = parquet_file.read_row_groups(row_groups)
    return filter.filter_table(table)


if __name__ == '__main__':
    pq_file = pq.ParquetFile('test.parquet')
    filter = (Field('code') == 12345) & (
        Field('foo') == 'foo value') & (Field('bar') == 'bar value')
    table = read_table_filtered(pq_file, filter)
{code}

  was:
{{pq.read_table(..., filters=...)}} is slower than it probably should be... In particular, there should be a faster solution for repeatedly reading from the same file.

For a 1,8 GB parquet file with about 5000 row groups (written with row-group metadata statistics), reading with a filter that selects a sub-section of a single row group takes 1.65 s.

Repeated, similarly small reads from the same file each take the same amount of time with {{pq.read_table(..., filters=...)}}.

I say "slower than it probably should be" because with the rudimentary example code below, the initial filtered read takes about 975 ms. Any subsequent read takes only 14ms.

I have no idea what makes the C++ code slow, but in Python I observed two dominant time-wasters on for repeated queries:
 # {{pq.ParquetFile('test.parquet')}} takes 170ms (this might be a Windows issue), so opening the file only once helps
 # iterating though metadata surprisingly slow: 80 ms for a single pass, e.g. {{[pq_file.metadata.row_group(i).column(0).statistics.min for i in range(pq_file.metadata.num_row_groups)]}}. E.g. six passes add up to about 500ms. (One pass each for minimum and maximum on three columns.) Caching the statistics really helps here.

{code:python}
from functools import cached_property, lru_cache

import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq


def equal(x, y, **kwargs):
    """Enable equal comparison for dictionary array vs. scalar."""
    type_x = type(x.type)
    type_y = type(y.type)

    # fastpath
    if type_x is pa.DataType and type_y is pa.DataType:
        return pc._equal(x, y, **kwargs)

    if type_x is pa.DictionaryType and type_y is pa.DataType:
        array_arg = x
        scalar_arg = y
    elif type_x is pa.DataType and type_x is pa.DictionaryType:
        array_arg = y
        scalar_arg = x
    else:
        # fallback to default implemetation (which will error out...)
        return pc._equal(x, y, **kwargs)

    # have dictionary vs. scalar comparison
    def chunk_generator():
        for chunk in array_arg.iterchunks():
            # for large dictionaries use pyarrow to search index
            if len(chunk.dictionary) > 30:
                index = pc.index_in(scalar_arg, options=pc.SetLookupOptions(
                    value_set=chunk.dictionary))
                if index.is_valid:
                    yield pc.equal(chunk.indices, index)
                    continue
            # for small dictionaries index search in python is faster
            try:
                index = chunk.dictionary.to_pylist().index(scalar_arg.as_py())
                yield pc.equal(chunk.indices, pa.scalar(index, chunk.indices.type))
                continue
            except ValueError:
                pass
            # value not found in dictionary
            yield pa.nulls(len(chunk), pa.bool_()).fill_null(False)
    return pa.chunked_array(chunk_generator())


pc._equal = pc.equal
pc.equal = equal


@lru_cache(maxsize=None)
def column_names_from_parquet_file(parquet_file):
    """"Returns a dict associating column names with column number
    for a given parquet file. The result is cached."""
    return {
        parquet_file.metadata.row_group(0).column(i).path_in_schema: i
        for i in range(parquet_file.metadata.row_group(0).num_columns)
    }


@lru_cache(maxsize=None)
def metadata_from_parquet_file(parquet_file, field_name):
    """Returns a tuple(min_, max_) where min_ and max_ are lists with the 
    row group metadata statistics min and max respectively.
    The result is cached."""
    column_id = column_names_from_parquet_file(parquet_file)[field_name]
    pq_metadata = parquet_file.metadata
    min_ = [
        pq_metadata.row_group(i).column(column_id).statistics.min
        for i in range(pq_metadata.num_row_groups)
    ]
    max_ = [
        pq_metadata.row_group(i).column(column_id).statistics.max
        for i in range(pq_metadata.num_row_groups)
    ]
    return min_, max_


class Node:
    """Base class for a node in a computation graph."""

    def __init__(self, left=None, right=None):
        self.left = left
        self.right = right

    def __eq__(self, other):
        return Node_Equal(self, other)

    def __and__(self, other):
        return Node_And(self, other)

    def __or__(self, other):
        return Node_Or(self, other)

    def filter_table(self, table):
        """Applies the computation graph as a filter on a table."""
        mask = self.evaluate_on_table(table)
        return table.filter(mask)


class Node_Equal(Node):
    def _identify_field_and_literal(self):
        if isinstance(self.left, Field):
            self.__dict__['field'] = field = self.left
            self.__dict__['literal'] = literal = self.right
        else:
            self.field = self.right
            self.literal = self.left
        assert isinstance(literal, (str, float, int))
        return field, literal

    @cached_property
    def field(self):
        field = self.__dict__.get(
            'field', None) or self._identify_field_and_literal()[0]
        return field

    @cached_property
    def literal(self):
        literal = self.__dict__.get(
            'literal', None) or self._identify_field_and_literal()[1]
        return literal

    def evaluate_on_metadata(self, parquet_file, row_groups=None):
        field = self.field
        literal = self.literal
        min_, max_ = metadata_from_parquet_file(parquet_file, field.name)
        row_groups = row_groups or range(len(min_))
        return [
            i
            for i in row_groups
            if min_[i] <= literal <= max_[i]
        ]

    def evaluate_on_table(self, table):
        field = self.field
        literal = self.literal
        column = table[field.name]

        if type(column.type) is pa.DictionaryType:
            pa_scalar = pa.scalar(literal, column.type.value_type)
        else:
            pa_scalar = pa.scalar(literal, column.type)
        return pc.equal(column, pa_scalar)


class Node_And(Node):
    def evaluate_on_metadata(self, parquet_file, row_groups=None):
        filtered_row_groups = self.left.evaluate_on_metadata(
            parquet_file, row_groups)
        return self.right.evaluate_on_metadata(parquet_file, filtered_row_groups)

    def evaluate_on_table(self, table):
        mask1 = self.left.evaluate_on_table(table)
        mask2 = self.right.evaluate_on_table(table)
        return pc.and_(mask1, mask2)


class Node_Or(Node):
    def evaluate_on_metadata(self, parquet_file, row_groups=None):
        row_groups1 = self.left.evaluate_on_metadata(parquet_file, row_groups)
        row_groups2 = self.right.evaluate_on_metadata(parquet_file, row_groups)
        return sorted(set(row_groups1) | set(row_groups2))

    def evaluate_on_table(self, table):
        mask1 = self.left.evaluate_on_table(table)
        mask2 = self.right.evaluate_on_table(table)
        return pc.or_(mask1, mask2)


class Field:
    def __init__(self, name):
        self.name = name

    def __eq__(self, other):
        return Node_Equal(self, other)


def read_table_filtered(parquet_file, filter=None):
    """Fast filtered read from a parquet file."""
    if isinstance(parquet_file, str):
        parquet_file = pq.ParquetFile(parquet_file)
    row_groups = filter.evaluate_on_metadata(parquet_file)
    table = parquet_file.read_row_groups(row_groups)
    return filter.filter_table(table)


if __name__ == '__main__':
    pq_file = pq.ParquetFile('test.parquet')
    filter = (Field('code') == 12345) & (
        Field('foo') == 'foo value') & (Field('bar') == 'bar value')
    table = read_table_filtered(pq_file, filter)
{code}


> Improve performance of repeated filtered parquet reads
> ------------------------------------------------------
>
>                 Key: ARROW-11826
>                 URL: https://issues.apache.org/jira/browse/ARROW-11826
>             Project: Apache Arrow
>          Issue Type: Improvement
>          Components: Python
>    Affects Versions: 3.0.0
>            Reporter: ARF
>            Priority: Minor
>
> {{pq.read_table(..., filters=...)}} is slower than it probably should be... In particular, there should be a faster solution for repeatedly reading from the same file.
> For a 1,8 GB parquet file with about 5000 row groups (written with row-group metadata statistics), reading with a filter that selects a sub-section of a single row group takes 1.65 s.
> Repeated, similarly small reads from the same file each take the same amount of time with {{pq.read_table(..., filters=...)}}.
> I say "slower than it probably should be" because with the rudimentary example code below, the initial filtered read takes about 975 ms. Any subsequent read takes only 14ms.
> I have no idea what makes the C++ code slow, but in Python I observed two dominant time-wasters for repeated queries:
>  # {{pq.ParquetFile('test.parquet')}} takes 170ms (this might be a Windows issue), so opening the file only once helps
>  # iterating though metadata surprisingly slow: 80 ms for a single pass, e.g. {{[pq_file.metadata.row_group(i).column(0).statistics.min for i in range(pq_file.metadata.num_row_groups)]}}. E.g. six passes add up to about 500ms. (One pass each for minimum and maximum on three columns.) Caching the statistics really helps here.
> {code:python}
> from functools import cached_property, lru_cache
> import pyarrow as pa
> import pyarrow.compute as pc
> import pyarrow.parquet as pq
> def equal(x, y, **kwargs):
>     """Enable equal comparison for dictionary array vs. scalar."""
>     type_x = type(x.type)
>     type_y = type(y.type)
>     # fastpath
>     if type_x is pa.DataType and type_y is pa.DataType:
>         return pc._equal(x, y, **kwargs)
>     if type_x is pa.DictionaryType and type_y is pa.DataType:
>         array_arg = x
>         scalar_arg = y
>     elif type_x is pa.DataType and type_x is pa.DictionaryType:
>         array_arg = y
>         scalar_arg = x
>     else:
>         # fallback to default implemetation (which will error out...)
>         return pc._equal(x, y, **kwargs)
>     # have dictionary vs. scalar comparison
>     def chunk_generator():
>         for chunk in array_arg.iterchunks():
>             # for large dictionaries use pyarrow to search index
>             if len(chunk.dictionary) > 30:
>                 index = pc.index_in(scalar_arg, options=pc.SetLookupOptions(
>                     value_set=chunk.dictionary))
>                 if index.is_valid:
>                     yield pc.equal(chunk.indices, index)
>                     continue
>             # for small dictionaries index search in python is faster
>             try:
>                 index = chunk.dictionary.to_pylist().index(scalar_arg.as_py())
>                 yield pc.equal(chunk.indices, pa.scalar(index, chunk.indices.type))
>                 continue
>             except ValueError:
>                 pass
>             # value not found in dictionary
>             yield pa.nulls(len(chunk), pa.bool_()).fill_null(False)
>     return pa.chunked_array(chunk_generator())
> pc._equal = pc.equal
> pc.equal = equal
> @lru_cache(maxsize=None)
> def column_names_from_parquet_file(parquet_file):
>     """"Returns a dict associating column names with column number
>     for a given parquet file. The result is cached."""
>     return {
>         parquet_file.metadata.row_group(0).column(i).path_in_schema: i
>         for i in range(parquet_file.metadata.row_group(0).num_columns)
>     }
> @lru_cache(maxsize=None)
> def metadata_from_parquet_file(parquet_file, field_name):
>     """Returns a tuple(min_, max_) where min_ and max_ are lists with the 
>     row group metadata statistics min and max respectively.
>     The result is cached."""
>     column_id = column_names_from_parquet_file(parquet_file)[field_name]
>     pq_metadata = parquet_file.metadata
>     min_ = [
>         pq_metadata.row_group(i).column(column_id).statistics.min
>         for i in range(pq_metadata.num_row_groups)
>     ]
>     max_ = [
>         pq_metadata.row_group(i).column(column_id).statistics.max
>         for i in range(pq_metadata.num_row_groups)
>     ]
>     return min_, max_
> class Node:
>     """Base class for a node in a computation graph."""
>     def __init__(self, left=None, right=None):
>         self.left = left
>         self.right = right
>     def __eq__(self, other):
>         return Node_Equal(self, other)
>     def __and__(self, other):
>         return Node_And(self, other)
>     def __or__(self, other):
>         return Node_Or(self, other)
>     def filter_table(self, table):
>         """Applies the computation graph as a filter on a table."""
>         mask = self.evaluate_on_table(table)
>         return table.filter(mask)
> class Node_Equal(Node):
>     def _identify_field_and_literal(self):
>         if isinstance(self.left, Field):
>             self.__dict__['field'] = field = self.left
>             self.__dict__['literal'] = literal = self.right
>         else:
>             self.field = self.right
>             self.literal = self.left
>         assert isinstance(literal, (str, float, int))
>         return field, literal
>     @cached_property
>     def field(self):
>         field = self.__dict__.get(
>             'field', None) or self._identify_field_and_literal()[0]
>         return field
>     @cached_property
>     def literal(self):
>         literal = self.__dict__.get(
>             'literal', None) or self._identify_field_and_literal()[1]
>         return literal
>     def evaluate_on_metadata(self, parquet_file, row_groups=None):
>         field = self.field
>         literal = self.literal
>         min_, max_ = metadata_from_parquet_file(parquet_file, field.name)
>         row_groups = row_groups or range(len(min_))
>         return [
>             i
>             for i in row_groups
>             if min_[i] <= literal <= max_[i]
>         ]
>     def evaluate_on_table(self, table):
>         field = self.field
>         literal = self.literal
>         column = table[field.name]
>         if type(column.type) is pa.DictionaryType:
>             pa_scalar = pa.scalar(literal, column.type.value_type)
>         else:
>             pa_scalar = pa.scalar(literal, column.type)
>         return pc.equal(column, pa_scalar)
> class Node_And(Node):
>     def evaluate_on_metadata(self, parquet_file, row_groups=None):
>         filtered_row_groups = self.left.evaluate_on_metadata(
>             parquet_file, row_groups)
>         return self.right.evaluate_on_metadata(parquet_file, filtered_row_groups)
>     def evaluate_on_table(self, table):
>         mask1 = self.left.evaluate_on_table(table)
>         mask2 = self.right.evaluate_on_table(table)
>         return pc.and_(mask1, mask2)
> class Node_Or(Node):
>     def evaluate_on_metadata(self, parquet_file, row_groups=None):
>         row_groups1 = self.left.evaluate_on_metadata(parquet_file, row_groups)
>         row_groups2 = self.right.evaluate_on_metadata(parquet_file, row_groups)
>         return sorted(set(row_groups1) | set(row_groups2))
>     def evaluate_on_table(self, table):
>         mask1 = self.left.evaluate_on_table(table)
>         mask2 = self.right.evaluate_on_table(table)
>         return pc.or_(mask1, mask2)
> class Field:
>     def __init__(self, name):
>         self.name = name
>     def __eq__(self, other):
>         return Node_Equal(self, other)
> def read_table_filtered(parquet_file, filter=None):
>     """Fast filtered read from a parquet file."""
>     if isinstance(parquet_file, str):
>         parquet_file = pq.ParquetFile(parquet_file)
>     row_groups = filter.evaluate_on_metadata(parquet_file)
>     table = parquet_file.read_row_groups(row_groups)
>     return filter.filter_table(table)
> if __name__ == '__main__':
>     pq_file = pq.ParquetFile('test.parquet')
>     filter = (Field('code') == 12345) & (
>         Field('foo') == 'foo value') & (Field('bar') == 'bar value')
>     table = read_table_filtered(pq_file, filter)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)