You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2018/07/24 08:34:27 UTC

[arrow] branch master updated: ARROW-2575: [Python] Exclude hidden files starting with . in ParquetManifest

This is an automated email from the ASF dual-hosted git repository.

uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new aaf8d5d  ARROW-2575: [Python] Exclude hidden files starting with . in ParquetManifest
aaf8d5d is described below

commit aaf8d5defe22c173be00bd23ec252c08ddd6cac8
Author: Wes McKinney <we...@apache.org>
AuthorDate: Tue Jul 24 10:34:21 2018 +0200

    ARROW-2575: [Python] Exclude hidden files starting with . in ParquetManifest
    
    I also moved a large nested helper function to a slightly more appropriate place (we need to develop abstract interfaces for different partition schemes)
    
    Author: Wes McKinney <we...@apache.org>
    
    Closes #2312 from wesm/ARROW-2575 and squashes the following commits:
    
    92dec69d <Wes McKinney> Fix py3 issue
    a8aa3e1c <Wes McKinney> Exclude hidden files starting with . in ParquetManifest
---
 python/pyarrow/parquet.py            | 93 ++++++++++++++++++------------------
 python/pyarrow/tests/test_parquet.py | 45 ++++++++++++-----
 2 files changed, 81 insertions(+), 57 deletions(-)

diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 70c70b6..bd97678 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -580,6 +580,49 @@ class ParquetPartitions(object):
 
         return self.levels[level].get_index(key)
 
+    def filter_accepts_partition(self, part_key, filter, level):
+        p_column, p_value_index = part_key
+        f_column, op, f_value = filter
+        if p_column != f_column:
+            return True
+
+        f_type = type(f_value)
+
+        if isinstance(f_value, set):
+            if not f_value:
+                raise ValueError("Cannot use empty set as filter value")
+            if op not in {'in', 'not in'}:
+                raise ValueError("Op '%s' not supported with set value",
+                                 op)
+            if len(set([type(item) for item in f_value])) != 1:
+                raise ValueError("All elements of set '%s' must be of"
+                                 " same type", f_value)
+            f_type = type(next(iter(f_value)))
+
+        p_value = f_type((self.levels[level]
+                          .dictionary[p_value_index]
+                          .as_py()))
+
+        if op == "=" or op == "==":
+            return p_value == f_value
+        elif op == "!=":
+            return p_value != f_value
+        elif op == '<':
+            return p_value < f_value
+        elif op == '>':
+            return p_value > f_value
+        elif op == '<=':
+            return p_value <= f_value
+        elif op == '>=':
+            return p_value >= f_value
+        elif op == 'in':
+            return p_value in f_value
+        elif op == 'not in':
+            return p_value not in f_value
+        else:
+            raise ValueError("'%s' is not a valid operator in predicates.",
+                             filter[1])
+
 
 def is_path(x):
     return (isinstance(x, six.string_types)
@@ -642,7 +685,8 @@ class ParquetManifest(object):
             self._push_pieces(filtered_files, part_keys)
 
     def _should_silently_exclude(self, file_name):
-        return (file_name.endswith('.crc') or
+        return (file_name.endswith('.crc') or  # Checksums
+                file_name.startswith('.') or  # Hidden files
                 file_name in EXCLUDED_PARQUET_PATHS)
 
     def _visit_directories(self, level, directories, part_keys):
@@ -865,53 +909,10 @@ class ParquetDataset(object):
         return open_file
 
     def _filter(self, filters):
-        def filter_accepts_partition(part_key, filter, level):
-
-            p_column, p_value_index = part_key
-            f_column, op, f_value = filter
-            if p_column != f_column:
-                return True
-
-            f_type = type(f_value)
-
-            if isinstance(f_value, set):
-                if not f_value:
-                    raise ValueError("Cannot use empty set as filter value")
-                if op not in {'in', 'not in'}:
-                    raise ValueError("Op '%s' not supported with set value",
-                                     op)
-                if len(set([type(item) for item in f_value])) != 1:
-                    raise ValueError("All elements of set '%s' must be of"
-                                     " same type", f_value)
-                f_type = type(next(iter(f_value)))
-
-            p_value = f_type((self.partitions
-                                  .levels[level]
-                                  .dictionary[p_value_index]
-                                  .as_py()))
-
-            if op == "=" or op == "==":
-                return p_value == f_value
-            elif op == "!=":
-                return p_value != f_value
-            elif op == '<':
-                return p_value < f_value
-            elif op == '>':
-                return p_value > f_value
-            elif op == '<=':
-                return p_value <= f_value
-            elif op == '>=':
-                return p_value >= f_value
-            elif op == 'in':
-                return p_value in f_value
-            elif op == 'not in':
-                return p_value not in f_value
-            else:
-                raise ValueError("'%s' is not a valid operator in predicates.",
-                                 filter[1])
+        accepts_filter = self.partitions.filter_accepts_partition
 
         def one_filter_accepts(piece, filter):
-            return all(filter_accepts_partition(part_key, filter, level)
+            return all(accepts_filter(part_key, filter, level)
                        for level, part_key in enumerate(piece.partition_keys))
 
         def all_filters_accept(piece):
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index 21610d9..0e51045 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -1643,24 +1643,27 @@ def test_dataset_read_pandas_common_metadata(tmpdir, preserve_index):
     tm.assert_frame_equal(result, expected)
 
 
+def _make_example_multifile_dataset(base_path, nfiles=10, file_nrows=5):
+    test_data = []
+    paths = []
+    for i in range(nfiles):
+        df = _test_dataframe(file_nrows, seed=i)
+        path = pjoin(base_path, '{0}.parquet'.format(i))
+
+        test_data.append(_write_table(df, path))
+        paths.append(path)
+    return paths
+
+
 @parquet
 def test_ignore_private_directories(tmpdir):
     import pyarrow.parquet as pq
 
-    nfiles = 10
-    size = 5
-
     dirpath = tmpdir.join(guid()).strpath
     os.mkdir(dirpath)
 
-    test_data = []
-    paths = []
-    for i in range(nfiles):
-        df = _test_dataframe(size, seed=i)
-        path = pjoin(dirpath, '{0}.parquet'.format(i))
-
-        test_data.append(_write_table(df, path))
-        paths.append(path)
+    paths = _make_example_multifile_dataset(dirpath, nfiles=10,
+                                            file_nrows=5)
 
     # private directory
     os.mkdir(pjoin(dirpath, '_impala_staging'))
@@ -1670,6 +1673,26 @@ def test_ignore_private_directories(tmpdir):
 
 
 @parquet
+def test_ignore_hidden_files(tmpdir):
+    import pyarrow.parquet as pq
+
+    dirpath = tmpdir.join(guid()).strpath
+    os.mkdir(dirpath)
+
+    paths = _make_example_multifile_dataset(dirpath, nfiles=10,
+                                            file_nrows=5)
+
+    with open(pjoin(dirpath, '.DS_Store'), 'wb') as f:
+        f.write(b'gibberish')
+
+    with open(pjoin(dirpath, '.private'), 'wb') as f:
+        f.write(b'gibberish')
+
+    dataset = pq.ParquetDataset(dirpath)
+    assert set(paths) == set(x.path for x in dataset.pieces)
+
+
+@parquet
 def test_multiindex_duplicate_values(tmpdir):
     num_rows = 3
     numbers = list(range(num_rows))