You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2018/04/06 13:26:29 UTC
[arrow] branch master updated: ARROW-2401 Support filters on Hive
partitioned Parquet files
This is an automated email from the ASF dual-hosted git repository.
uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new e3f7edc ARROW-2401 Support filters on Hive partitioned Parquet files
e3f7edc is described below
commit e3f7edc3ee47c53413ffb05c90a1f4dcbd7fed40
Author: Julius Neuffer <ju...@blue-yonder.com>
AuthorDate: Fri Apr 6 15:26:22 2018 +0200
ARROW-2401 Support filters on Hive partitioned Parquet files
This PR enables filtering based on a Hive partitioned directory structure of Parquet files. Only `=` and `!=` are supported as filters.
Author: Julius Neuffer <ju...@blue-yonder.com>
Closes #1840 from jneuff/partition-filter-prototype and squashes the following commits:
1f73868 <Julius Neuffer> Adhere to flake8
fb27566 <Julius Neuffer> Accept non-string filter values
b3258d5 <Julius Neuffer> Add docstring
b7f0d0e <Julius Neuffer> Fix flake8 errors
8466dbe <Julius Neuffer> Refactor partition filtering
ab5d4b9 <Julius Neuffer> Implement filter prototype on partitions
---
python/pyarrow/parquet.py | 35 +++++++++++++++++++++++++++++++++-
python/pyarrow/tests/test_parquet.py | 37 ++++++++++++++++++++++++++++++++++++
2 files changed, 71 insertions(+), 1 deletion(-)
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 0929a15..beeedca 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -711,9 +711,14 @@ class ParquetDataset(object):
Divide files into pieces for each row group in the file
validate_schema : boolean, default True
Check that individual file schemas are all the same / compatible
+ filters : List[Tuple] or None (default)
+ List of filters to apply, like ``[('x', '=', 0), ...]``. This
+ implements partition-level (hive) filtering only, i.e., to prevent the
+ loading of some files of the dataset.
"""
def __init__(self, path_or_paths, filesystem=None, schema=None,
- metadata=None, split_row_groups=False, validate_schema=True):
+ metadata=None, split_row_groups=False, validate_schema=True,
+ filters=None):
if filesystem is None:
a_path = path_or_paths
if isinstance(a_path, list):
@@ -744,6 +749,9 @@ class ParquetDataset(object):
if validate_schema:
self.validate_schemas()
+ if filters:
+ self._filter(filters)
+
def validate_schemas(self):
open_file = self._get_open_file_func()
@@ -849,6 +857,31 @@ class ParquetDataset(object):
common_metadata=self.common_metadata)
return open_file
+ def _filter(self, filters):
+ def filter_accepts_partition(part_key, filter, level):
+ p_column, p_value_index = part_key
+ f_column, op, f_value = filter
+ if p_column != f_column:
+ return True
+
+ f_value_index = self.partitions.get_index(level, p_column,
+ str(f_value))
+ if op == "=":
+ return f_value_index == p_value_index
+ elif op == "!=":
+ return f_value_index != p_value_index
+ else:
+ return True
+
+ def one_filter_accepts(piece, filter):
+ return all(filter_accepts_partition(part_key, filter, level)
+ for level, part_key in enumerate(piece.partition_keys))
+
+ def all_filters_accept(piece):
+ return all(one_filter_accepts(piece, f) for f in filters)
+
+ self.pieces = [p for p in self.pieces if all_filters_accept(p)]
+
def _ensure_filesystem(fs):
fs_type = type(fs)
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index b301de6..27d6bc7 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -996,6 +996,43 @@ def test_read_partitioned_directory(tmpdir):
_partition_test_for_filesystem(fs, base_path)
+@parquet
+def test_read_partitioned_directory_filtered(tmpdir):
+ fs = LocalFileSystem.get_instance()
+ base_path = str(tmpdir)
+
+ import pyarrow.parquet as pq
+
+ foo_keys = [0, 1]
+ bar_keys = ['a', 'b', 'c']
+ partition_spec = [
+ ['foo', foo_keys],
+ ['bar', bar_keys]
+ ]
+ N = 30
+
+ df = pd.DataFrame({
+ 'index': np.arange(N),
+ 'foo': np.array(foo_keys, dtype='i4').repeat(15),
+ 'bar': np.tile(np.tile(np.array(bar_keys, dtype=object), 5), 2),
+ 'values': np.random.randn(N)
+ }, columns=['index', 'foo', 'bar', 'values'])
+
+ _generate_partition_directories(fs, base_path, partition_spec, df)
+
+ dataset = pq.ParquetDataset(
+ base_path, filesystem=fs,
+ filters=[('foo', '=', 1), ('bar', '!=', 'b')]
+ )
+ table = dataset.read()
+ result_df = (table.to_pandas()
+ .sort_values(by='index')
+ .reset_index(drop=True))
+
+ assert 0 not in result_df['foo'].values
+ assert 'b' not in result_df['bar'].values
+
+
@pytest.yield_fixture
def s3_example():
access_key = os.environ['PYARROW_TEST_S3_ACCESS_KEY']
--
To stop receiving notification emails like this one, please contact
uwe@apache.org.