You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2018/05/01 10:20:57 UTC
[arrow] branch master updated: ARROW-2422: Support more operators
for partition filtering
This is an automated email from the ASF dual-hosted git repository.
uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 138c491 ARROW-2422: Support more operators for partition filtering
138c491 is described below
commit 138c4919a1b31adba205ffa38ac16f2459059c0a
Author: Markus Klein <ma...@blue-yonder.com>
AuthorDate: Tue May 1 12:20:18 2018 +0200
ARROW-2422: Support more operators for partition filtering
This extends the functionality of https://github.com/apache/arrow/pull/1840, by adding support for '<', '>', '<=', '>=' comparison operators in filters.
Author: Markus Klein <ma...@blue-yonder.com>
Author: Julius Neuffer <ju...@blue-yonder.com>
Closes #1861 from jneuff/extend-partition-filters and squashes the following commits:
8c8cca91 <Markus Klein> test invalid predicat operator
849a5bbf <Markus Klein> formatting
f99554c9 <Markus Klein> remove nested TestParquetFilter
942fd330 <Julius Neuffer> Merge branch 'master' of github.com:jneuff/arrow into extend-partition-filters
b5313a61 <Markus Klein> Merge branch 'master' into extend-partition-filters
7887d64b <Markus Klein> pep8 formatting
ed3a1760 <Markus Klein> fix tests
e2f4ad43 <Markus Klein> replace foo -> integers
af6c13a6 <Julius Neuffer> Extend filter tests to boolean and datetime
7efa79c6 <Markus Klein> extra test for boolean, failing test for dates
36bd4f05 <Julius Neuffer> Support more operators for partition filtering
---
python/pyarrow/parquet.py | 26 +++--
python/pyarrow/tests/test_parquet.py | 179 ++++++++++++++++++++++++++++++++---
2 files changed, 184 insertions(+), 21 deletions(-)
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 34aa55a..18f329e 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -858,19 +858,33 @@ class ParquetDataset(object):
def _filter(self, filters):
def filter_accepts_partition(part_key, filter, level):
+
p_column, p_value_index = part_key
f_column, op, f_value = filter
if p_column != f_column:
return True
- f_value_index = self.partitions.get_index(level, p_column,
- str(f_value))
- if op == "=":
- return f_value_index == p_value_index
+ f_type = type(f_value)
+ p_value = f_type((self.partitions
+ .levels[level]
+ .dictionary[p_value_index]
+ .as_py()))
+
+ if op == "=" or op == "==":
+ return p_value == f_value
elif op == "!=":
- return f_value_index != p_value_index
+ return p_value != f_value
+ elif op == '<':
+ return p_value < f_value
+ elif op == '>':
+ return p_value > f_value
+ elif op == '<=':
+ return p_value <= f_value
+ elif op == '>=':
+ return p_value >= f_value
else:
- return True
+ raise ValueError("'%s' is not a valid operator in predicates.",
+ filter[1])
def one_filter_accepts(piece, filter):
return all(filter_accepts_partition(part_key, filter, level)
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index 3fec0f7..86cf417 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -1022,40 +1022,189 @@ def test_read_partitioned_directory(tmpdir):
@parquet
-def test_read_partitioned_directory_filtered(tmpdir):
+def test_equivalency(tmpdir):
fs = LocalFileSystem.get_instance()
base_path = str(tmpdir)
import pyarrow.parquet as pq
- foo_keys = [0, 1]
- bar_keys = ['a', 'b', 'c']
+ integer_keys = [0, 1]
+ string_keys = ['a', 'b', 'c']
+ boolean_keys = [True, False]
partition_spec = [
- ['foo', foo_keys],
- ['bar', bar_keys]
+ ['integer', integer_keys],
+ ['string', string_keys],
+ ['boolean', boolean_keys]
]
- N = 30
+
+ df = pd.DataFrame({
+ 'integer': np.array(integer_keys, dtype='i4').repeat(15),
+ 'string': np.tile(np.tile(np.array(string_keys, dtype=object), 5), 2),
+ 'boolean': np.tile(np.tile(np.array(boolean_keys, dtype='bool'), 5),
+ 3),
+ }, columns=['integer', 'string', 'boolean'])
+
+ _generate_partition_directories(fs, base_path, partition_spec, df)
+
+ dataset = pq.ParquetDataset(
+ base_path, filesystem=fs,
+ filters=[('integer', '=', 1), ('string', '!=', 'b'),
+ ('boolean', '==', True)]
+ )
+ table = dataset.read()
+ result_df = (table.to_pandas().reset_index(drop=True))
+
+ assert 0 not in result_df['integer'].values
+ assert 'b' not in result_df['string'].values
+ assert False not in result_df['boolean'].values
+
+ @parquet
+ def test_cutoff_exclusive_integer(tmpdir):
+ fs = LocalFileSystem.get_instance()
+ base_path = str(tmpdir)
+
+ import pyarrow.parquet as pq
+
+ integer_keys = [0, 1, 2, 3, 4]
+ partition_spec = [
+ ['integers', integer_keys],
+ ]
+ N = 5
+
+ df = pd.DataFrame({
+ 'index': np.arange(N),
+ 'integers': np.array(integer_keys, dtype='i4'),
+ }, columns=['index', 'integers'])
+
+ _generate_partition_directories(fs, base_path, partition_spec, df)
+
+ dataset = pq.ParquetDataset(
+ base_path, filesystem=fs,
+ filters=[
+ ('integers', '<', 4),
+ ('integers', '>', 1),
+ ]
+ )
+ table = dataset.read()
+ result_df = (table.to_pandas()
+ .sort_values(by='index')
+ .reset_index(drop=True))
+
+ result_list = [x for x in map(int, result_df['integers'].values)]
+ assert result_list == [2, 3]
+
+
+@parquet
+@pytest.mark.xfail(
+ raises=TypeError,
+ reason='Loss of type information in creation of categoricals.'
+)
+def test_cutoff_exclusive_datetime(tmpdir):
+ fs = LocalFileSystem.get_instance()
+ base_path = str(tmpdir)
+
+ import pyarrow.parquet as pq
+
+ date_keys = [
+ datetime.date(2018, 4, 9),
+ datetime.date(2018, 4, 10),
+ datetime.date(2018, 4, 11),
+ datetime.date(2018, 4, 12),
+ datetime.date(2018, 4, 13)
+ ]
+ partition_spec = [
+ ['dates', date_keys]
+ ]
+ N = 5
df = pd.DataFrame({
'index': np.arange(N),
- 'foo': np.array(foo_keys, dtype='i4').repeat(15),
- 'bar': np.tile(np.tile(np.array(bar_keys, dtype=object), 5), 2),
- 'values': np.random.randn(N)
- }, columns=['index', 'foo', 'bar', 'values'])
+ 'dates': np.array(date_keys, dtype='datetime64'),
+ }, columns=['index', 'dates'])
_generate_partition_directories(fs, base_path, partition_spec, df)
dataset = pq.ParquetDataset(
base_path, filesystem=fs,
- filters=[('foo', '=', 1), ('bar', '!=', 'b')]
+ filters=[
+ ('dates', '<', "2018-04-12"),
+ ('dates', '>', "2018-04-10")
+ ]
)
table = dataset.read()
result_df = (table.to_pandas()
- .sort_values(by='index')
- .reset_index(drop=True))
+ .sort_values(by='index')
+ .reset_index(drop=True))
- assert 0 not in result_df['foo'].values
- assert 'b' not in result_df['bar'].values
+ expected = pd.Categorical(
+ np.array([datetime.date(2018, 4, 11)], dtype='datetime64'),
+ categories=np.array(date_keys, dtype='datetime64'))
+
+ assert result_df['dates'].values == expected
+
+
+@parquet
+def test_inclusive_integer(tmpdir):
+ fs = LocalFileSystem.get_instance()
+ base_path = str(tmpdir)
+
+ import pyarrow.parquet as pq
+
+ integer_keys = [0, 1, 2, 3, 4]
+ partition_spec = [
+ ['integers', integer_keys],
+ ]
+ N = 5
+
+ df = pd.DataFrame({
+ 'index': np.arange(N),
+ 'integers': np.array(integer_keys, dtype='i4'),
+ }, columns=['index', 'integers'])
+
+ _generate_partition_directories(fs, base_path, partition_spec, df)
+
+ dataset = pq.ParquetDataset(
+ base_path, filesystem=fs,
+ filters=[
+ ('integers', '<=', 3),
+ ('integers', '>=', 2),
+ ]
+ )
+ table = dataset.read()
+ result_df = (table.to_pandas()
+ .sort_values(by='index')
+ .reset_index(drop=True))
+
+ result_list = [int(x) for x in map(int, result_df['integers'].values)]
+ assert result_list == [2, 3]
+
+
+@parquet
+def test_invalid_pred_op(tmpdir):
+ fs = LocalFileSystem.get_instance()
+ base_path = str(tmpdir)
+
+ import pyarrow.parquet as pq
+
+ integer_keys = [0, 1, 2, 3, 4]
+ partition_spec = [
+ ['integers', integer_keys],
+ ]
+ N = 5
+
+ df = pd.DataFrame({
+ 'index': np.arange(N),
+ 'integers': np.array(integer_keys, dtype='i4'),
+ }, columns=['index', 'integers'])
+
+ _generate_partition_directories(fs, base_path, partition_spec, df)
+
+ with pytest.raises(ValueError):
+ pq.ParquetDataset(base_path,
+ filesystem=fs,
+ filters=[
+ ('integers', '=<', 3),
+ ])
@pytest.yield_fixture
--
To stop receiving notification emails like this one, please contact
uwe@apache.org.