You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2018/07/24 08:34:27 UTC
[arrow] branch master updated: ARROW-2575: [Python] Exclude hidden
files starting with . in ParquetManifest
This is an automated email from the ASF dual-hosted git repository.
uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new aaf8d5d ARROW-2575: [Python] Exclude hidden files starting with . in ParquetManifest
aaf8d5d is described below
commit aaf8d5defe22c173be00bd23ec252c08ddd6cac8
Author: Wes McKinney <we...@apache.org>
AuthorDate: Tue Jul 24 10:34:21 2018 +0200
ARROW-2575: [Python] Exclude hidden files starting with . in ParquetManifest
I also moved a large nested helper function to a slightly more appropriate place (we need to develop abstract interfaces for different partition schemes)
Author: Wes McKinney <we...@apache.org>
Closes #2312 from wesm/ARROW-2575 and squashes the following commits:
92dec69d <Wes McKinney> Fix py3 issue
a8aa3e1c <Wes McKinney> Exclude hidden files starting with . in ParquetManifest
---
python/pyarrow/parquet.py | 93 ++++++++++++++++++------------------
python/pyarrow/tests/test_parquet.py | 45 ++++++++++++-----
2 files changed, 81 insertions(+), 57 deletions(-)
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 70c70b6..bd97678 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -580,6 +580,49 @@ class ParquetPartitions(object):
return self.levels[level].get_index(key)
+ def filter_accepts_partition(self, part_key, filter, level):
+ p_column, p_value_index = part_key
+ f_column, op, f_value = filter
+ if p_column != f_column:
+ return True
+
+ f_type = type(f_value)
+
+ if isinstance(f_value, set):
+ if not f_value:
+ raise ValueError("Cannot use empty set as filter value")
+ if op not in {'in', 'not in'}:
+ raise ValueError("Op '%s' not supported with set value",
+ op)
+ if len(set([type(item) for item in f_value])) != 1:
+ raise ValueError("All elements of set '%s' must be of"
+ " same type", f_value)
+ f_type = type(next(iter(f_value)))
+
+ p_value = f_type((self.levels[level]
+ .dictionary[p_value_index]
+ .as_py()))
+
+ if op == "=" or op == "==":
+ return p_value == f_value
+ elif op == "!=":
+ return p_value != f_value
+ elif op == '<':
+ return p_value < f_value
+ elif op == '>':
+ return p_value > f_value
+ elif op == '<=':
+ return p_value <= f_value
+ elif op == '>=':
+ return p_value >= f_value
+ elif op == 'in':
+ return p_value in f_value
+ elif op == 'not in':
+ return p_value not in f_value
+ else:
+ raise ValueError("'%s' is not a valid operator in predicates.",
+ filter[1])
+
def is_path(x):
return (isinstance(x, six.string_types)
@@ -642,7 +685,8 @@ class ParquetManifest(object):
self._push_pieces(filtered_files, part_keys)
def _should_silently_exclude(self, file_name):
- return (file_name.endswith('.crc') or
+ return (file_name.endswith('.crc') or # Checksums
+ file_name.startswith('.') or # Hidden files
file_name in EXCLUDED_PARQUET_PATHS)
def _visit_directories(self, level, directories, part_keys):
@@ -865,53 +909,10 @@ class ParquetDataset(object):
return open_file
def _filter(self, filters):
- def filter_accepts_partition(part_key, filter, level):
-
- p_column, p_value_index = part_key
- f_column, op, f_value = filter
- if p_column != f_column:
- return True
-
- f_type = type(f_value)
-
- if isinstance(f_value, set):
- if not f_value:
- raise ValueError("Cannot use empty set as filter value")
- if op not in {'in', 'not in'}:
- raise ValueError("Op '%s' not supported with set value",
- op)
- if len(set([type(item) for item in f_value])) != 1:
- raise ValueError("All elements of set '%s' must be of"
- " same type", f_value)
- f_type = type(next(iter(f_value)))
-
- p_value = f_type((self.partitions
- .levels[level]
- .dictionary[p_value_index]
- .as_py()))
-
- if op == "=" or op == "==":
- return p_value == f_value
- elif op == "!=":
- return p_value != f_value
- elif op == '<':
- return p_value < f_value
- elif op == '>':
- return p_value > f_value
- elif op == '<=':
- return p_value <= f_value
- elif op == '>=':
- return p_value >= f_value
- elif op == 'in':
- return p_value in f_value
- elif op == 'not in':
- return p_value not in f_value
- else:
- raise ValueError("'%s' is not a valid operator in predicates.",
- filter[1])
+ accepts_filter = self.partitions.filter_accepts_partition
def one_filter_accepts(piece, filter):
- return all(filter_accepts_partition(part_key, filter, level)
+ return all(accepts_filter(part_key, filter, level)
for level, part_key in enumerate(piece.partition_keys))
def all_filters_accept(piece):
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index 21610d9..0e51045 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -1643,24 +1643,27 @@ def test_dataset_read_pandas_common_metadata(tmpdir, preserve_index):
tm.assert_frame_equal(result, expected)
+def _make_example_multifile_dataset(base_path, nfiles=10, file_nrows=5):
+ test_data = []
+ paths = []
+ for i in range(nfiles):
+ df = _test_dataframe(file_nrows, seed=i)
+ path = pjoin(base_path, '{0}.parquet'.format(i))
+
+ test_data.append(_write_table(df, path))
+ paths.append(path)
+ return paths
+
+
@parquet
def test_ignore_private_directories(tmpdir):
import pyarrow.parquet as pq
- nfiles = 10
- size = 5
-
dirpath = tmpdir.join(guid()).strpath
os.mkdir(dirpath)
- test_data = []
- paths = []
- for i in range(nfiles):
- df = _test_dataframe(size, seed=i)
- path = pjoin(dirpath, '{0}.parquet'.format(i))
-
- test_data.append(_write_table(df, path))
- paths.append(path)
+ paths = _make_example_multifile_dataset(dirpath, nfiles=10,
+ file_nrows=5)
# private directory
os.mkdir(pjoin(dirpath, '_impala_staging'))
@@ -1670,6 +1673,26 @@ def test_ignore_private_directories(tmpdir):
@parquet
+def test_ignore_hidden_files(tmpdir):
+ import pyarrow.parquet as pq
+
+ dirpath = tmpdir.join(guid()).strpath
+ os.mkdir(dirpath)
+
+ paths = _make_example_multifile_dataset(dirpath, nfiles=10,
+ file_nrows=5)
+
+ with open(pjoin(dirpath, '.DS_Store'), 'wb') as f:
+ f.write(b'gibberish')
+
+ with open(pjoin(dirpath, '.private'), 'wb') as f:
+ f.write(b'gibberish')
+
+ dataset = pq.ParquetDataset(dirpath)
+ assert set(paths) == set(x.path for x in dataset.pieces)
+
+
+@parquet
def test_multiindex_duplicate_values(tmpdir):
num_rows = 3
numbers = list(range(num_rows))