You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2018/04/06 13:26:29 UTC

[arrow] branch master updated: ARROW-2401 Support filters on Hive partitioned Parquet files

This is an automated email from the ASF dual-hosted git repository.

uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new e3f7edc  ARROW-2401 Support filters on Hive partitioned Parquet files
e3f7edc is described below

commit e3f7edc3ee47c53413ffb05c90a1f4dcbd7fed40
Author: Julius Neuffer <ju...@blue-yonder.com>
AuthorDate: Fri Apr 6 15:26:22 2018 +0200

    ARROW-2401 Support filters on Hive partitioned Parquet files
    
    This PR enables filtering based on a Hive partitioned directory structure of Parquet files. Only `=` and `!=` are supported as filters.
    
    Author: Julius Neuffer <ju...@blue-yonder.com>
    
    Closes #1840 from jneuff/partition-filter-prototype and squashes the following commits:
    
    1f73868 <Julius Neuffer> Adhere to flake8
    fb27566 <Julius Neuffer> Accept non-string filter values
    b3258d5 <Julius Neuffer> Add docstring
    b7f0d0e <Julius Neuffer> Fix flake8 errors
    8466dbe <Julius Neuffer> Refactor partition filtering
    ab5d4b9 <Julius Neuffer> Implement filter prototype on partitions
---
 python/pyarrow/parquet.py            | 35 +++++++++++++++++++++++++++++++++-
 python/pyarrow/tests/test_parquet.py | 37 ++++++++++++++++++++++++++++++++++++
 2 files changed, 71 insertions(+), 1 deletion(-)

diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 0929a15..beeedca 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -711,9 +711,14 @@ class ParquetDataset(object):
         Divide files into pieces for each row group in the file
     validate_schema : boolean, default True
         Check that individual file schemas are all the same / compatible
+    filters : List[Tuple] or None (default)
+        List of filters to apply, like ``[('x', '=', 0), ...]``. This
+        implements partition-level (hive) filtering only, i.e., to prevent the
+        loading of some files of the dataset.
     """
     def __init__(self, path_or_paths, filesystem=None, schema=None,
-                 metadata=None, split_row_groups=False, validate_schema=True):
+                 metadata=None, split_row_groups=False, validate_schema=True,
+                 filters=None):
         if filesystem is None:
             a_path = path_or_paths
             if isinstance(a_path, list):
@@ -744,6 +749,9 @@ class ParquetDataset(object):
         if validate_schema:
             self.validate_schemas()
 
+        if filters:
+            self._filter(filters)
+
     def validate_schemas(self):
         open_file = self._get_open_file_func()
 
@@ -849,6 +857,31 @@ class ParquetDataset(object):
                                    common_metadata=self.common_metadata)
         return open_file
 
+    def _filter(self, filters):
+        def filter_accepts_partition(part_key, filter, level):
+            p_column, p_value_index = part_key
+            f_column, op, f_value = filter
+            if p_column != f_column:
+                return True
+
+            f_value_index = self.partitions.get_index(level, p_column,
+                                                      str(f_value))
+            if op == "=":
+                return f_value_index == p_value_index
+            elif op == "!=":
+                return f_value_index != p_value_index
+            else:
+                return True
+
+        def one_filter_accepts(piece, filter):
+            return all(filter_accepts_partition(part_key, filter, level)
+                       for level, part_key in enumerate(piece.partition_keys))
+
+        def all_filters_accept(piece):
+            return all(one_filter_accepts(piece, f) for f in filters)
+
+        self.pieces = [p for p in self.pieces if all_filters_accept(p)]
+
 
 def _ensure_filesystem(fs):
     fs_type = type(fs)
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index b301de6..27d6bc7 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -996,6 +996,43 @@ def test_read_partitioned_directory(tmpdir):
     _partition_test_for_filesystem(fs, base_path)
 
 
+@parquet
+def test_read_partitioned_directory_filtered(tmpdir):
+    fs = LocalFileSystem.get_instance()
+    base_path = str(tmpdir)
+
+    import pyarrow.parquet as pq
+
+    foo_keys = [0, 1]
+    bar_keys = ['a', 'b', 'c']
+    partition_spec = [
+        ['foo', foo_keys],
+        ['bar', bar_keys]
+    ]
+    N = 30
+
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'foo': np.array(foo_keys, dtype='i4').repeat(15),
+        'bar': np.tile(np.tile(np.array(bar_keys, dtype=object), 5), 2),
+        'values': np.random.randn(N)
+    }, columns=['index', 'foo', 'bar', 'values'])
+
+    _generate_partition_directories(fs, base_path, partition_spec, df)
+
+    dataset = pq.ParquetDataset(
+        base_path, filesystem=fs,
+        filters=[('foo', '=', 1), ('bar', '!=', 'b')]
+    )
+    table = dataset.read()
+    result_df = (table.to_pandas()
+                 .sort_values(by='index')
+                 .reset_index(drop=True))
+
+    assert 0 not in result_df['foo'].values
+    assert 'b' not in result_df['bar'].values
+
+
 @pytest.yield_fixture
 def s3_example():
     access_key = os.environ['PYARROW_TEST_S3_ACCESS_KEY']

-- 
To stop receiving notification emails like this one, please contact
uwe@apache.org.