You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "ARF (Jira)" <ji...@apache.org> on 2021/02/28 17:32:00 UTC
[jira] [Updated] (ARROW-11826) Improve performance of repeated
filtered parquet reads
[ https://issues.apache.org/jira/browse/ARROW-11826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ARF updated ARROW-11826:
------------------------
Description:
{{pq.read_table(..., filters=...)}} is slower than it probably should be... In particular, there should be a faster solution for repeatedly reading from the same file.
For a 1,8 GB parquet file with about 5000 row groups (written with row-group metadata statistics), reading with a filter that selects a sub-section of a single row group takes 1.65 s.
Repeated, similarly small reads from the same file each take the same amount of time with {{pq.read_table(..., filters=...)}}.
I say "slower than it probably should be" because with the rudimentary example code below, the initial filtered read takes about 975 ms. Any subsequent read takes only 14ms.
I have no idea what makes the C++ code slow, but in Python I observed two dominant time-wasters for repeated queries:
# {{pq.ParquetFile('test.parquet')}} takes 170ms (this might be a Windows issue), so opening the file only once helps
# iterating though metadata surprisingly slow: 80 ms for a single pass, e.g. {{[pq_file.metadata.row_group(i).column(0).statistics.min for i in range(pq_file.metadata.num_row_groups)]}}. E.g. six passes add up to about 500ms. (One pass each for minimum and maximum on three columns.) Caching the statistics really helps here.
{code:python}
from functools import cached_property, lru_cache
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq
def equal(x, y, **kwargs):
"""Enable equal comparison for dictionary array vs. scalar."""
type_x = type(x.type)
type_y = type(y.type)
# fastpath
if type_x is pa.DataType and type_y is pa.DataType:
return pc._equal(x, y, **kwargs)
if type_x is pa.DictionaryType and type_y is pa.DataType:
array_arg = x
scalar_arg = y
elif type_x is pa.DataType and type_x is pa.DictionaryType:
array_arg = y
scalar_arg = x
else:
# fallback to default implemetation (which will error out...)
return pc._equal(x, y, **kwargs)
# have dictionary vs. scalar comparison
def chunk_generator():
for chunk in array_arg.iterchunks():
# for large dictionaries use pyarrow to search index
if len(chunk.dictionary) > 30:
index = pc.index_in(scalar_arg, options=pc.SetLookupOptions(
value_set=chunk.dictionary))
if index.is_valid:
yield pc.equal(chunk.indices, index)
continue
# for small dictionaries index search in python is faster
try:
index = chunk.dictionary.to_pylist().index(scalar_arg.as_py())
yield pc.equal(chunk.indices, pa.scalar(index, chunk.indices.type))
continue
except ValueError:
pass
# value not found in dictionary
yield pa.nulls(len(chunk), pa.bool_()).fill_null(False)
return pa.chunked_array(chunk_generator())
pc._equal = pc.equal
pc.equal = equal
@lru_cache(maxsize=None)
def column_names_from_parquet_file(parquet_file):
""""Returns a dict associating column names with column number
for a given parquet file. The result is cached."""
return {
parquet_file.metadata.row_group(0).column(i).path_in_schema: i
for i in range(parquet_file.metadata.row_group(0).num_columns)
}
@lru_cache(maxsize=None)
def metadata_from_parquet_file(parquet_file, field_name):
"""Returns a tuple(min_, max_) where min_ and max_ are lists with the
row group metadata statistics min and max respectively.
The result is cached."""
column_id = column_names_from_parquet_file(parquet_file)[field_name]
pq_metadata = parquet_file.metadata
min_ = [
pq_metadata.row_group(i).column(column_id).statistics.min
for i in range(pq_metadata.num_row_groups)
]
max_ = [
pq_metadata.row_group(i).column(column_id).statistics.max
for i in range(pq_metadata.num_row_groups)
]
return min_, max_
class Node:
"""Base class for a node in a computation graph."""
def __init__(self, left=None, right=None):
self.left = left
self.right = right
def __eq__(self, other):
return Node_Equal(self, other)
def __and__(self, other):
return Node_And(self, other)
def __or__(self, other):
return Node_Or(self, other)
def filter_table(self, table):
"""Applies the computation graph as a filter on a table."""
mask = self.evaluate_on_table(table)
return table.filter(mask)
class Node_Equal(Node):
def _identify_field_and_literal(self):
if isinstance(self.left, Field):
self.__dict__['field'] = field = self.left
self.__dict__['literal'] = literal = self.right
else:
self.field = self.right
self.literal = self.left
assert isinstance(literal, (str, float, int))
return field, literal
@cached_property
def field(self):
field = self.__dict__.get(
'field', None) or self._identify_field_and_literal()[0]
return field
@cached_property
def literal(self):
literal = self.__dict__.get(
'literal', None) or self._identify_field_and_literal()[1]
return literal
def evaluate_on_metadata(self, parquet_file, row_groups=None):
field = self.field
literal = self.literal
min_, max_ = metadata_from_parquet_file(parquet_file, field.name)
row_groups = row_groups or range(len(min_))
return [
i
for i in row_groups
if min_[i] <= literal <= max_[i]
]
def evaluate_on_table(self, table):
field = self.field
literal = self.literal
column = table[field.name]
if type(column.type) is pa.DictionaryType:
pa_scalar = pa.scalar(literal, column.type.value_type)
else:
pa_scalar = pa.scalar(literal, column.type)
return pc.equal(column, pa_scalar)
class Node_And(Node):
def evaluate_on_metadata(self, parquet_file, row_groups=None):
filtered_row_groups = self.left.evaluate_on_metadata(
parquet_file, row_groups)
return self.right.evaluate_on_metadata(parquet_file, filtered_row_groups)
def evaluate_on_table(self, table):
mask1 = self.left.evaluate_on_table(table)
mask2 = self.right.evaluate_on_table(table)
return pc.and_(mask1, mask2)
class Node_Or(Node):
def evaluate_on_metadata(self, parquet_file, row_groups=None):
row_groups1 = self.left.evaluate_on_metadata(parquet_file, row_groups)
row_groups2 = self.right.evaluate_on_metadata(parquet_file, row_groups)
return sorted(set(row_groups1) | set(row_groups2))
def evaluate_on_table(self, table):
mask1 = self.left.evaluate_on_table(table)
mask2 = self.right.evaluate_on_table(table)
return pc.or_(mask1, mask2)
class Field:
def __init__(self, name):
self.name = name
def __eq__(self, other):
return Node_Equal(self, other)
def read_table_filtered(parquet_file, filter=None):
"""Fast filtered read from a parquet file."""
if isinstance(parquet_file, str):
parquet_file = pq.ParquetFile(parquet_file)
row_groups = filter.evaluate_on_metadata(parquet_file)
table = parquet_file.read_row_groups(row_groups)
return filter.filter_table(table)
if __name__ == '__main__':
pq_file = pq.ParquetFile('test.parquet')
filter = (Field('code') == 12345) & (
Field('foo') == 'foo value') & (Field('bar') == 'bar value')
table = read_table_filtered(pq_file, filter)
{code}
was:
{{pq.read_table(..., filters=...)}} is slower than it probably should be... In particular, there should be a faster solution for repeatedly reading from the same file.
For a 1,8 GB parquet file with about 5000 row groups (written with row-group metadata statistics), reading with a filter that selects a sub-section of a single row group takes 1.65 s.
Repeated, similarly small reads from the same file each take the same amount of time with {{pq.read_table(..., filters=...)}}.
I say "slower than it probably should be" because with the rudimentary example code below, the initial filtered read takes about 975 ms. Any subsequent read takes only 14ms.
I have no idea what makes the C++ code slow, but in Python I observed two dominant time-wasters on for repeated queries:
# {{pq.ParquetFile('test.parquet')}} takes 170ms (this might be a Windows issue), so opening the file only once helps
# iterating though metadata surprisingly slow: 80 ms for a single pass, e.g. {{[pq_file.metadata.row_group(i).column(0).statistics.min for i in range(pq_file.metadata.num_row_groups)]}}. E.g. six passes add up to about 500ms. (One pass each for minimum and maximum on three columns.) Caching the statistics really helps here.
{code:python}
from functools import cached_property, lru_cache
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq
def equal(x, y, **kwargs):
"""Enable equal comparison for dictionary array vs. scalar."""
type_x = type(x.type)
type_y = type(y.type)
# fastpath
if type_x is pa.DataType and type_y is pa.DataType:
return pc._equal(x, y, **kwargs)
if type_x is pa.DictionaryType and type_y is pa.DataType:
array_arg = x
scalar_arg = y
elif type_x is pa.DataType and type_x is pa.DictionaryType:
array_arg = y
scalar_arg = x
else:
# fallback to default implemetation (which will error out...)
return pc._equal(x, y, **kwargs)
# have dictionary vs. scalar comparison
def chunk_generator():
for chunk in array_arg.iterchunks():
# for large dictionaries use pyarrow to search index
if len(chunk.dictionary) > 30:
index = pc.index_in(scalar_arg, options=pc.SetLookupOptions(
value_set=chunk.dictionary))
if index.is_valid:
yield pc.equal(chunk.indices, index)
continue
# for small dictionaries index search in python is faster
try:
index = chunk.dictionary.to_pylist().index(scalar_arg.as_py())
yield pc.equal(chunk.indices, pa.scalar(index, chunk.indices.type))
continue
except ValueError:
pass
# value not found in dictionary
yield pa.nulls(len(chunk), pa.bool_()).fill_null(False)
return pa.chunked_array(chunk_generator())
pc._equal = pc.equal
pc.equal = equal
@lru_cache(maxsize=None)
def column_names_from_parquet_file(parquet_file):
""""Returns a dict associating column names with column number
for a given parquet file. The result is cached."""
return {
parquet_file.metadata.row_group(0).column(i).path_in_schema: i
for i in range(parquet_file.metadata.row_group(0).num_columns)
}
@lru_cache(maxsize=None)
def metadata_from_parquet_file(parquet_file, field_name):
"""Returns a tuple(min_, max_) where min_ and max_ are lists with the
row group metadata statistics min and max respectively.
The result is cached."""
column_id = column_names_from_parquet_file(parquet_file)[field_name]
pq_metadata = parquet_file.metadata
min_ = [
pq_metadata.row_group(i).column(column_id).statistics.min
for i in range(pq_metadata.num_row_groups)
]
max_ = [
pq_metadata.row_group(i).column(column_id).statistics.max
for i in range(pq_metadata.num_row_groups)
]
return min_, max_
class Node:
"""Base class for a node in a computation graph."""
def __init__(self, left=None, right=None):
self.left = left
self.right = right
def __eq__(self, other):
return Node_Equal(self, other)
def __and__(self, other):
return Node_And(self, other)
def __or__(self, other):
return Node_Or(self, other)
def filter_table(self, table):
"""Applies the computation graph as a filter on a table."""
mask = self.evaluate_on_table(table)
return table.filter(mask)
class Node_Equal(Node):
def _identify_field_and_literal(self):
if isinstance(self.left, Field):
self.__dict__['field'] = field = self.left
self.__dict__['literal'] = literal = self.right
else:
self.field = self.right
self.literal = self.left
assert isinstance(literal, (str, float, int))
return field, literal
@cached_property
def field(self):
field = self.__dict__.get(
'field', None) or self._identify_field_and_literal()[0]
return field
@cached_property
def literal(self):
literal = self.__dict__.get(
'literal', None) or self._identify_field_and_literal()[1]
return literal
def evaluate_on_metadata(self, parquet_file, row_groups=None):
field = self.field
literal = self.literal
min_, max_ = metadata_from_parquet_file(parquet_file, field.name)
row_groups = row_groups or range(len(min_))
return [
i
for i in row_groups
if min_[i] <= literal <= max_[i]
]
def evaluate_on_table(self, table):
field = self.field
literal = self.literal
column = table[field.name]
if type(column.type) is pa.DictionaryType:
pa_scalar = pa.scalar(literal, column.type.value_type)
else:
pa_scalar = pa.scalar(literal, column.type)
return pc.equal(column, pa_scalar)
class Node_And(Node):
def evaluate_on_metadata(self, parquet_file, row_groups=None):
filtered_row_groups = self.left.evaluate_on_metadata(
parquet_file, row_groups)
return self.right.evaluate_on_metadata(parquet_file, filtered_row_groups)
def evaluate_on_table(self, table):
mask1 = self.left.evaluate_on_table(table)
mask2 = self.right.evaluate_on_table(table)
return pc.and_(mask1, mask2)
class Node_Or(Node):
def evaluate_on_metadata(self, parquet_file, row_groups=None):
row_groups1 = self.left.evaluate_on_metadata(parquet_file, row_groups)
row_groups2 = self.right.evaluate_on_metadata(parquet_file, row_groups)
return sorted(set(row_groups1) | set(row_groups2))
def evaluate_on_table(self, table):
mask1 = self.left.evaluate_on_table(table)
mask2 = self.right.evaluate_on_table(table)
return pc.or_(mask1, mask2)
class Field:
def __init__(self, name):
self.name = name
def __eq__(self, other):
return Node_Equal(self, other)
def read_table_filtered(parquet_file, filter=None):
"""Fast filtered read from a parquet file."""
if isinstance(parquet_file, str):
parquet_file = pq.ParquetFile(parquet_file)
row_groups = filter.evaluate_on_metadata(parquet_file)
table = parquet_file.read_row_groups(row_groups)
return filter.filter_table(table)
if __name__ == '__main__':
pq_file = pq.ParquetFile('test.parquet')
filter = (Field('code') == 12345) & (
Field('foo') == 'foo value') & (Field('bar') == 'bar value')
table = read_table_filtered(pq_file, filter)
{code}
> Improve performance of repeated filtered parquet reads
> ------------------------------------------------------
>
> Key: ARROW-11826
> URL: https://issues.apache.org/jira/browse/ARROW-11826
> Project: Apache Arrow
> Issue Type: Improvement
> Components: Python
> Affects Versions: 3.0.0
> Reporter: ARF
> Priority: Minor
>
> {{pq.read_table(..., filters=...)}} is slower than it probably should be... In particular, there should be a faster solution for repeatedly reading from the same file.
> For a 1,8 GB parquet file with about 5000 row groups (written with row-group metadata statistics), reading with a filter that selects a sub-section of a single row group takes 1.65 s.
> Repeated, similarly small reads from the same file each take the same amount of time with {{pq.read_table(..., filters=...)}}.
> I say "slower than it probably should be" because with the rudimentary example code below, the initial filtered read takes about 975 ms. Any subsequent read takes only 14ms.
> I have no idea what makes the C++ code slow, but in Python I observed two dominant time-wasters for repeated queries:
> # {{pq.ParquetFile('test.parquet')}} takes 170ms (this might be a Windows issue), so opening the file only once helps
> # iterating though metadata surprisingly slow: 80 ms for a single pass, e.g. {{[pq_file.metadata.row_group(i).column(0).statistics.min for i in range(pq_file.metadata.num_row_groups)]}}. E.g. six passes add up to about 500ms. (One pass each for minimum and maximum on three columns.) Caching the statistics really helps here.
> {code:python}
> from functools import cached_property, lru_cache
> import pyarrow as pa
> import pyarrow.compute as pc
> import pyarrow.parquet as pq
> def equal(x, y, **kwargs):
> """Enable equal comparison for dictionary array vs. scalar."""
> type_x = type(x.type)
> type_y = type(y.type)
> # fastpath
> if type_x is pa.DataType and type_y is pa.DataType:
> return pc._equal(x, y, **kwargs)
> if type_x is pa.DictionaryType and type_y is pa.DataType:
> array_arg = x
> scalar_arg = y
> elif type_x is pa.DataType and type_x is pa.DictionaryType:
> array_arg = y
> scalar_arg = x
> else:
> # fallback to default implemetation (which will error out...)
> return pc._equal(x, y, **kwargs)
> # have dictionary vs. scalar comparison
> def chunk_generator():
> for chunk in array_arg.iterchunks():
> # for large dictionaries use pyarrow to search index
> if len(chunk.dictionary) > 30:
> index = pc.index_in(scalar_arg, options=pc.SetLookupOptions(
> value_set=chunk.dictionary))
> if index.is_valid:
> yield pc.equal(chunk.indices, index)
> continue
> # for small dictionaries index search in python is faster
> try:
> index = chunk.dictionary.to_pylist().index(scalar_arg.as_py())
> yield pc.equal(chunk.indices, pa.scalar(index, chunk.indices.type))
> continue
> except ValueError:
> pass
> # value not found in dictionary
> yield pa.nulls(len(chunk), pa.bool_()).fill_null(False)
> return pa.chunked_array(chunk_generator())
> pc._equal = pc.equal
> pc.equal = equal
> @lru_cache(maxsize=None)
> def column_names_from_parquet_file(parquet_file):
> """"Returns a dict associating column names with column number
> for a given parquet file. The result is cached."""
> return {
> parquet_file.metadata.row_group(0).column(i).path_in_schema: i
> for i in range(parquet_file.metadata.row_group(0).num_columns)
> }
> @lru_cache(maxsize=None)
> def metadata_from_parquet_file(parquet_file, field_name):
> """Returns a tuple(min_, max_) where min_ and max_ are lists with the
> row group metadata statistics min and max respectively.
> The result is cached."""
> column_id = column_names_from_parquet_file(parquet_file)[field_name]
> pq_metadata = parquet_file.metadata
> min_ = [
> pq_metadata.row_group(i).column(column_id).statistics.min
> for i in range(pq_metadata.num_row_groups)
> ]
> max_ = [
> pq_metadata.row_group(i).column(column_id).statistics.max
> for i in range(pq_metadata.num_row_groups)
> ]
> return min_, max_
> class Node:
> """Base class for a node in a computation graph."""
> def __init__(self, left=None, right=None):
> self.left = left
> self.right = right
> def __eq__(self, other):
> return Node_Equal(self, other)
> def __and__(self, other):
> return Node_And(self, other)
> def __or__(self, other):
> return Node_Or(self, other)
> def filter_table(self, table):
> """Applies the computation graph as a filter on a table."""
> mask = self.evaluate_on_table(table)
> return table.filter(mask)
> class Node_Equal(Node):
> def _identify_field_and_literal(self):
> if isinstance(self.left, Field):
> self.__dict__['field'] = field = self.left
> self.__dict__['literal'] = literal = self.right
> else:
> self.field = self.right
> self.literal = self.left
> assert isinstance(literal, (str, float, int))
> return field, literal
> @cached_property
> def field(self):
> field = self.__dict__.get(
> 'field', None) or self._identify_field_and_literal()[0]
> return field
> @cached_property
> def literal(self):
> literal = self.__dict__.get(
> 'literal', None) or self._identify_field_and_literal()[1]
> return literal
> def evaluate_on_metadata(self, parquet_file, row_groups=None):
> field = self.field
> literal = self.literal
> min_, max_ = metadata_from_parquet_file(parquet_file, field.name)
> row_groups = row_groups or range(len(min_))
> return [
> i
> for i in row_groups
> if min_[i] <= literal <= max_[i]
> ]
> def evaluate_on_table(self, table):
> field = self.field
> literal = self.literal
> column = table[field.name]
> if type(column.type) is pa.DictionaryType:
> pa_scalar = pa.scalar(literal, column.type.value_type)
> else:
> pa_scalar = pa.scalar(literal, column.type)
> return pc.equal(column, pa_scalar)
> class Node_And(Node):
> def evaluate_on_metadata(self, parquet_file, row_groups=None):
> filtered_row_groups = self.left.evaluate_on_metadata(
> parquet_file, row_groups)
> return self.right.evaluate_on_metadata(parquet_file, filtered_row_groups)
> def evaluate_on_table(self, table):
> mask1 = self.left.evaluate_on_table(table)
> mask2 = self.right.evaluate_on_table(table)
> return pc.and_(mask1, mask2)
> class Node_Or(Node):
> def evaluate_on_metadata(self, parquet_file, row_groups=None):
> row_groups1 = self.left.evaluate_on_metadata(parquet_file, row_groups)
> row_groups2 = self.right.evaluate_on_metadata(parquet_file, row_groups)
> return sorted(set(row_groups1) | set(row_groups2))
> def evaluate_on_table(self, table):
> mask1 = self.left.evaluate_on_table(table)
> mask2 = self.right.evaluate_on_table(table)
> return pc.or_(mask1, mask2)
> class Field:
> def __init__(self, name):
> self.name = name
> def __eq__(self, other):
> return Node_Equal(self, other)
> def read_table_filtered(parquet_file, filter=None):
> """Fast filtered read from a parquet file."""
> if isinstance(parquet_file, str):
> parquet_file = pq.ParquetFile(parquet_file)
> row_groups = filter.evaluate_on_metadata(parquet_file)
> table = parquet_file.read_row_groups(row_groups)
> return filter.filter_table(table)
> if __name__ == '__main__':
> pq_file = pq.ParquetFile('test.parquet')
> filter = (Field('code') == 12345) & (
> Field('foo') == 'foo value') & (Field('bar') == 'bar value')
> table = read_table_filtered(pq_file, filter)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)