You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "ARF (Jira)" <ji...@apache.org> on 2021/02/28 17:30:00 UTC
[jira] [Created] (ARROW-11826) Improve performance of repeated
filtered parquet reads
ARF created ARROW-11826:
---------------------------
Summary: Improve performance of repeated filtered parquet reads
Key: ARROW-11826
URL: https://issues.apache.org/jira/browse/ARROW-11826
Project: Apache Arrow
Issue Type: Improvement
Components: Python
Affects Versions: 3.0.0
Reporter: ARF
{{pq.read_table(..., filters=...)}} is slower than it probably should be... In particular, there should be a faster solution for repeatedly reading from the same file.
For a 1,8 GB parquet file with about 5000 row groups (written with row-group metadata statistics), reading with a filter that selects a sub-section of a single row group takes 1.65 s.
Repeated, similarly small reads from the same file each take the same amount of time with {{pq.read_table(..., filters=...)}}.
I say "slower than it probably should be" because with the rudimentary example code below, the initial filtered read takes about 975 ms. Any subsequent read takes only 14ms.
I have no idea what makes the C++ code slow, but in Python I observed two dominant time-wasters on for repeated queries:
# {{pq.ParquetFile('test.parquet')}} takes 170ms (this might be a Windows issue), so opening the file only once helps
# iterating though metadata surprisingly slow: 80 ms for a single pass, e.g. {{[pq_file.metadata.row_group(i).column(0).statistics.min for i in range(pq_file.metadata.num_row_groups)]}}. E.g. six passes add up to about 500ms. (One pass each for minimum and maximum on three columns.) Caching the statistics really helps here.
{code:python}
from functools import cached_property, lru_cache
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq
def equal(x, y, **kwargs):
"""Enable equal comparison for dictionary array vs. scalar."""
type_x = type(x.type)
type_y = type(y.type)
# fastpath
if type_x is pa.DataType and type_y is pa.DataType:
return pc._equal(x, y, **kwargs)
if type_x is pa.DictionaryType and type_y is pa.DataType:
array_arg = x
scalar_arg = y
elif type_x is pa.DataType and type_x is pa.DictionaryType:
array_arg = y
scalar_arg = x
else:
# fallback to default implemetation (which will error out...)
return pc._equal(x, y, **kwargs)
# have dictionary vs. scalar comparison
def chunk_generator():
for chunk in array_arg.iterchunks():
# for large dictionaries use pyarrow to search index
if len(chunk.dictionary) > 30:
index = pc.index_in(scalar_arg, options=pc.SetLookupOptions(
value_set=chunk.dictionary))
if index.is_valid:
yield pc.equal(chunk.indices, index)
continue
# for small dictionaries index search in python is faster
try:
index = chunk.dictionary.to_pylist().index(scalar_arg.as_py())
yield pc.equal(chunk.indices, pa.scalar(index, chunk.indices.type))
continue
except ValueError:
pass
# value not found in dictionary
yield pa.nulls(len(chunk), pa.bool_()).fill_null(False)
return pa.chunked_array(chunk_generator())
pc._equal = pc.equal
pc.equal = equal
@lru_cache(maxsize=None)
def column_names_from_parquet_file(parquet_file):
""""Returns a dict associating column names with column number
for a given parquet file. The result is cached."""
return {
parquet_file.metadata.row_group(0).column(i).path_in_schema: i
for i in range(parquet_file.metadata.row_group(0).num_columns)
}
@lru_cache(maxsize=None)
def metadata_from_parquet_file(parquet_file, field_name):
"""Returns a tuple(min_, max_) where min_ and max_ are lists with the
row group metadata statistics min and max respectively.
The result is cached."""
column_id = column_names_from_parquet_file(parquet_file)[field_name]
pq_metadata = parquet_file.metadata
min_ = [
pq_metadata.row_group(i).column(column_id).statistics.min
for i in range(pq_metadata.num_row_groups)
]
max_ = [
pq_metadata.row_group(i).column(column_id).statistics.max
for i in range(pq_metadata.num_row_groups)
]
return min_, max_
class Node:
"""Base class for a node in a computation graph."""
def __init__(self, left=None, right=None):
self.left = left
self.right = right
def __eq__(self, other):
return Node_Equal(self, other)
def __and__(self, other):
return Node_And(self, other)
def __or__(self, other):
return Node_Or(self, other)
def filter_table(self, table):
"""Applies the computation graph as a filter on a table."""
mask = self.evaluate_on_table(table)
return table.filter(mask)
class Node_Equal(Node):
def _identify_field_and_literal(self):
if isinstance(self.left, Field):
self.__dict__['field'] = field = self.left
self.__dict__['literal'] = literal = self.right
else:
self.field = self.right
self.literal = self.left
assert isinstance(literal, (str, float, int))
return field, literal
@cached_property
def field(self):
field = self.__dict__.get(
'field', None) or self._identify_field_and_literal()[0]
return field
@cached_property
def literal(self):
literal = self.__dict__.get(
'literal', None) or self._identify_field_and_literal()[1]
return literal
def evaluate_on_metadata(self, parquet_file, row_groups=None):
field = self.field
literal = self.literal
min_, max_ = metadata_from_parquet_file(parquet_file, field.name)
row_groups = row_groups or range(len(min_))
return [
i
for i in row_groups
if min_[i] <= literal <= max_[i]
]
def evaluate_on_table(self, table):
field = self.field
literal = self.literal
column = table[field.name]
if type(column.type) is pa.DictionaryType:
pa_scalar = pa.scalar(literal, column.type.value_type)
else:
pa_scalar = pa.scalar(literal, column.type)
return pc.equal(column, pa_scalar)
class Node_And(Node):
def evaluate_on_metadata(self, parquet_file, row_groups=None):
filtered_row_groups = self.left.evaluate_on_metadata(
parquet_file, row_groups)
return self.right.evaluate_on_metadata(parquet_file, filtered_row_groups)
def evaluate_on_table(self, table):
mask1 = self.left.evaluate_on_table(table)
mask2 = self.right.evaluate_on_table(table)
return pc.and_(mask1, mask2)
class Node_Or(Node):
def evaluate_on_metadata(self, parquet_file, row_groups=None):
row_groups1 = self.left.evaluate_on_metadata(parquet_file, row_groups)
row_groups2 = self.right.evaluate_on_metadata(parquet_file, row_groups)
return sorted(set(row_groups1) | set(row_groups2))
def evaluate_on_table(self, table):
mask1 = self.left.evaluate_on_table(table)
mask2 = self.right.evaluate_on_table(table)
return pc.or_(mask1, mask2)
class Field:
def __init__(self, name):
self.name = name
def __eq__(self, other):
return Node_Equal(self, other)
def read_table_filtered(parquet_file, filter=None):
"""Fast filtered read from a parquet file."""
if isinstance(parquet_file, str):
parquet_file = pq.ParquetFile(parquet_file)
row_groups = filter.evaluate_on_metadata(parquet_file)
table = parquet_file.read_row_groups(row_groups)
return filter.filter_table(table)
if __name__ == '__main__':
pq_file = pq.ParquetFile('test.parquet')
filter = (Field('code') == 12345) & (
Field('foo') == 'foo value') & (Field('bar') == 'bar value')
table = read_table_filtered(pq_file, filter)
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)