You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2018/05/01 10:20:57 UTC

[arrow] branch master updated: ARROW-2422: Support more operators for partition filtering

This is an automated email from the ASF dual-hosted git repository.

uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 138c491  ARROW-2422: Support more operators for partition filtering
138c491 is described below

commit 138c4919a1b31adba205ffa38ac16f2459059c0a
Author: Markus Klein <ma...@blue-yonder.com>
AuthorDate: Tue May 1 12:20:18 2018 +0200

    ARROW-2422: Support more operators for partition filtering
    
    This extends the functionality of https://github.com/apache/arrow/pull/1840, by adding support for '<', '>', '<=', '>=' comparison operators in filters.
    
    Author: Markus Klein <ma...@blue-yonder.com>
    Author: Julius Neuffer <ju...@blue-yonder.com>
    
    Closes #1861 from jneuff/extend-partition-filters and squashes the following commits:
    
    8c8cca91 <Markus Klein> test invalid predicat operator
    849a5bbf <Markus Klein> formatting
    f99554c9 <Markus Klein> remove nested TestParquetFilter
    942fd330 <Julius Neuffer> Merge branch 'master' of github.com:jneuff/arrow into extend-partition-filters
    b5313a61 <Markus Klein> Merge branch 'master' into extend-partition-filters
    7887d64b <Markus Klein> pep8 formatting
    ed3a1760 <Markus Klein> fix tests
    e2f4ad43 <Markus Klein> replace foo -> integers
    af6c13a6 <Julius Neuffer> Extend filter tests to boolean and datetime
    7efa79c6 <Markus Klein> extra test for boolean, failing test for dates
    36bd4f05 <Julius Neuffer> Support more operators for partition filtering
---
 python/pyarrow/parquet.py            |  26 +++--
 python/pyarrow/tests/test_parquet.py | 179 ++++++++++++++++++++++++++++++++---
 2 files changed, 184 insertions(+), 21 deletions(-)

diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 34aa55a..18f329e 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -858,19 +858,33 @@ class ParquetDataset(object):
 
     def _filter(self, filters):
         def filter_accepts_partition(part_key, filter, level):
+
             p_column, p_value_index = part_key
             f_column, op, f_value = filter
             if p_column != f_column:
                 return True
 
-            f_value_index = self.partitions.get_index(level, p_column,
-                                                      str(f_value))
-            if op == "=":
-                return f_value_index == p_value_index
+            f_type = type(f_value)
+            p_value = f_type((self.partitions
+                                  .levels[level]
+                                  .dictionary[p_value_index]
+                                  .as_py()))
+
+            if op == "=" or op == "==":
+                return p_value == f_value
             elif op == "!=":
-                return f_value_index != p_value_index
+                return p_value != f_value
+            elif op == '<':
+                return p_value < f_value
+            elif op == '>':
+                return p_value > f_value
+            elif op == '<=':
+                return p_value <= f_value
+            elif op == '>=':
+                return p_value >= f_value
             else:
-                return True
+                raise ValueError("'%s' is not a valid operator in predicates.",
+                                 filter[1])
 
         def one_filter_accepts(piece, filter):
             return all(filter_accepts_partition(part_key, filter, level)
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index 3fec0f7..86cf417 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -1022,40 +1022,189 @@ def test_read_partitioned_directory(tmpdir):
 
 
 @parquet
-def test_read_partitioned_directory_filtered(tmpdir):
+def test_equivalency(tmpdir):
     fs = LocalFileSystem.get_instance()
     base_path = str(tmpdir)
 
     import pyarrow.parquet as pq
 
-    foo_keys = [0, 1]
-    bar_keys = ['a', 'b', 'c']
+    integer_keys = [0, 1]
+    string_keys = ['a', 'b', 'c']
+    boolean_keys = [True, False]
     partition_spec = [
-        ['foo', foo_keys],
-        ['bar', bar_keys]
+        ['integer', integer_keys],
+        ['string', string_keys],
+        ['boolean', boolean_keys]
     ]
-    N = 30
+
+    df = pd.DataFrame({
+        'integer': np.array(integer_keys, dtype='i4').repeat(15),
+        'string': np.tile(np.tile(np.array(string_keys, dtype=object), 5), 2),
+        'boolean': np.tile(np.tile(np.array(boolean_keys, dtype='bool'), 5),
+                           3),
+    }, columns=['integer', 'string', 'boolean'])
+
+    _generate_partition_directories(fs, base_path, partition_spec, df)
+
+    dataset = pq.ParquetDataset(
+        base_path, filesystem=fs,
+        filters=[('integer', '=', 1), ('string', '!=', 'b'),
+                 ('boolean', '==', True)]
+    )
+    table = dataset.read()
+    result_df = (table.to_pandas().reset_index(drop=True))
+
+    assert 0 not in result_df['integer'].values
+    assert 'b' not in result_df['string'].values
+    assert False not in result_df['boolean'].values
+
+    @parquet
+    def test_cutoff_exclusive_integer(tmpdir):
+        fs = LocalFileSystem.get_instance()
+        base_path = str(tmpdir)
+
+        import pyarrow.parquet as pq
+
+        integer_keys = [0, 1, 2, 3, 4]
+        partition_spec = [
+            ['integers', integer_keys],
+        ]
+        N = 5
+
+        df = pd.DataFrame({
+            'index': np.arange(N),
+            'integers': np.array(integer_keys, dtype='i4'),
+        }, columns=['index', 'integers'])
+
+        _generate_partition_directories(fs, base_path, partition_spec, df)
+
+        dataset = pq.ParquetDataset(
+            base_path, filesystem=fs,
+            filters=[
+                ('integers', '<', 4),
+                ('integers', '>', 1),
+            ]
+        )
+        table = dataset.read()
+        result_df = (table.to_pandas()
+                          .sort_values(by='index')
+                          .reset_index(drop=True))
+
+        result_list = [x for x in map(int, result_df['integers'].values)]
+        assert result_list == [2, 3]
+
+
+@parquet
+@pytest.mark.xfail(
+    raises=TypeError,
+    reason='Loss of type information in creation of categoricals.'
+)
+def test_cutoff_exclusive_datetime(tmpdir):
+    fs = LocalFileSystem.get_instance()
+    base_path = str(tmpdir)
+
+    import pyarrow.parquet as pq
+
+    date_keys = [
+        datetime.date(2018, 4, 9),
+        datetime.date(2018, 4, 10),
+        datetime.date(2018, 4, 11),
+        datetime.date(2018, 4, 12),
+        datetime.date(2018, 4, 13)
+    ]
+    partition_spec = [
+        ['dates', date_keys]
+    ]
+    N = 5
 
     df = pd.DataFrame({
         'index': np.arange(N),
-        'foo': np.array(foo_keys, dtype='i4').repeat(15),
-        'bar': np.tile(np.tile(np.array(bar_keys, dtype=object), 5), 2),
-        'values': np.random.randn(N)
-    }, columns=['index', 'foo', 'bar', 'values'])
+        'dates': np.array(date_keys, dtype='datetime64'),
+    }, columns=['index', 'dates'])
 
     _generate_partition_directories(fs, base_path, partition_spec, df)
 
     dataset = pq.ParquetDataset(
         base_path, filesystem=fs,
-        filters=[('foo', '=', 1), ('bar', '!=', 'b')]
+        filters=[
+            ('dates', '<', "2018-04-12"),
+            ('dates', '>', "2018-04-10")
+        ]
     )
     table = dataset.read()
     result_df = (table.to_pandas()
-                 .sort_values(by='index')
-                 .reset_index(drop=True))
+                      .sort_values(by='index')
+                      .reset_index(drop=True))
 
-    assert 0 not in result_df['foo'].values
-    assert 'b' not in result_df['bar'].values
+    expected = pd.Categorical(
+        np.array([datetime.date(2018, 4, 11)], dtype='datetime64'),
+        categories=np.array(date_keys, dtype='datetime64'))
+
+    assert result_df['dates'].values == expected
+
+
+@parquet
+def test_inclusive_integer(tmpdir):
+    fs = LocalFileSystem.get_instance()
+    base_path = str(tmpdir)
+
+    import pyarrow.parquet as pq
+
+    integer_keys = [0, 1, 2, 3, 4]
+    partition_spec = [
+        ['integers', integer_keys],
+    ]
+    N = 5
+
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'integers': np.array(integer_keys, dtype='i4'),
+    }, columns=['index', 'integers'])
+
+    _generate_partition_directories(fs, base_path, partition_spec, df)
+
+    dataset = pq.ParquetDataset(
+        base_path, filesystem=fs,
+        filters=[
+            ('integers', '<=', 3),
+            ('integers', '>=', 2),
+        ]
+    )
+    table = dataset.read()
+    result_df = (table.to_pandas()
+                      .sort_values(by='index')
+                      .reset_index(drop=True))
+
+    result_list = [int(x) for x in map(int, result_df['integers'].values)]
+    assert result_list == [2, 3]
+
+
+@parquet
+def test_invalid_pred_op(tmpdir):
+    fs = LocalFileSystem.get_instance()
+    base_path = str(tmpdir)
+
+    import pyarrow.parquet as pq
+
+    integer_keys = [0, 1, 2, 3, 4]
+    partition_spec = [
+        ['integers', integer_keys],
+    ]
+    N = 5
+
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'integers': np.array(integer_keys, dtype='i4'),
+    }, columns=['index', 'integers'])
+
+    _generate_partition_directories(fs, base_path, partition_spec, df)
+
+    with pytest.raises(ValueError):
+        pq.ParquetDataset(base_path,
+                          filesystem=fs,
+                          filters=[
+                            ('integers', '=<', 3),
+                          ])
 
 
 @pytest.yield_fixture

-- 
To stop receiving notification emails like this one, please contact
uwe@apache.org.