You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/07/18 00:34:56 UTC

arrow git commit: ARROW-1079: [Python] Filter out private directories when building Parquet dataset manifest

Repository: arrow
Updated Branches:
  refs/heads/master b4d34f8fd -> a1c8b83b4


ARROW-1079: [Python] Filter out private directories when building Parquet dataset manifest

Some systems like Hive and Impala use special files or directories to signal to other readers that a dataset modification is in progress. If such directories (starting with an underscore) exist in a flat Parquet directory, it currently breaks the dataset reader.

Author: Wes McKinney <we...@twosigma.com>

Closes #860 from wesm/ARROW-1079 and squashes the following commits:

c1c445b4 [Wes McKinney] Filter out private directories when building Parquet dataset manifest


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/a1c8b83b
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/a1c8b83b
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/a1c8b83b

Branch: refs/heads/master
Commit: a1c8b83b49192230bd2c91bd009e2ff272d89310
Parents: b4d34f8
Author: Wes McKinney <we...@twosigma.com>
Authored: Mon Jul 17 20:34:51 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Mon Jul 17 20:34:51 2017 -0400

----------------------------------------------------------------------
 python/pyarrow/parquet.py            |  9 ++++++++
 python/pyarrow/tests/test_parquet.py | 35 +++++++++++++++++++++++++++++--
 2 files changed, 42 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/a1c8b83b/python/pyarrow/parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index dc26dab..aa2352c 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import os
 import json
 
 import six
@@ -414,6 +415,9 @@ class ParquetManifest(object):
             elif fs.isdir(path):
                 directories.append(path)
 
+        # ARROW-1079: Filter out "private" directories starting with underscore
+        directories = [x for x in directories if not _is_private_directory(x)]
+
         if len(files) > 0 and len(directories) > 0:
             raise ValueError('Found files in an intermediate '
                              'directory: {0}'.format(base_path))
@@ -456,6 +460,11 @@ def _parse_hive_partition(value):
     return value.split('=', 1)
 
 
+def _is_private_directory(x):
+    _, tail = os.path.split(x)
+    return tail.startswith('_') and '=' not in tail
+
+
 def _path_split(path, sep):
     i = path.rfind(sep) + 1
     head, tail = path[:i], path[i:]

http://git-wip-us.apache.org/repos/asf/arrow/blob/a1c8b83b/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index f606a7f..0f44d16 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -36,9 +36,14 @@ import pandas.util.testing as tm
 parquet = pytest.mark.parquet
 
 
-def _write_table(*args, **kwargs):
+def _write_table(table, path, **kwargs):
     import pyarrow.parquet as pq
-    return pq.write_table(*args, **kwargs)
+
+    if isinstance(table, pd.DataFrame):
+        table = pa.Table.from_pandas(table)
+
+    pq.write_table(table, path, **kwargs)
+    return table
 
 
 def _read_table(*args, **kwargs):
@@ -852,6 +857,32 @@ def test_read_multiple_files(tmpdir):
 
 
 @parquet
+def test_ignore_private_directories(tmpdir):
+    import pyarrow.parquet as pq
+
+    nfiles = 10
+    size = 5
+
+    dirpath = tmpdir.join(guid()).strpath
+    os.mkdir(dirpath)
+
+    test_data = []
+    paths = []
+    for i in range(nfiles):
+        df = _test_dataframe(size, seed=i)
+        path = pjoin(dirpath, '{0}.parquet'.format(i))
+
+        test_data.append(_write_table(df, path))
+        paths.append(path)
+
+    # private directory
+    os.mkdir(pjoin(dirpath, '_impala_staging'))
+
+    dataset = pq.ParquetDataset(dirpath)
+    assert set(paths) == set(x.path for x in dataset.pieces)
+
+
+@parquet
 def test_multiindex_duplicate_values(tmpdir):
     num_rows = 3
     numbers = list(range(num_rows))