You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2018/07/24 08:40:40 UTC

[arrow] branch master updated: ARROW-2656: [Python] Improve creation time of ParquetManifest for partitioned datasets using thread pool

This is an automated email from the ASF dual-hosted git repository.

uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 078b806  ARROW-2656: [Python] Improve creation time of ParquetManifest for partitioned datasets using thread pool
078b806 is described below

commit 078b8068f1c7176ff4a56aa95c9b57800aacce82
Author: Robert Gruener <ro...@uber.com>
AuthorDate: Tue Jul 24 10:40:12 2018 +0200

    ARROW-2656: [Python] Improve creation time of ParquetManifest for partitioned datasets using thread pool
    
    https://issues.apache.org/jira/browse/ARROW-2656 I probably should still write a benchmark for this, but figured I can get some quick feedback on this while working on that.
    
    Author: Robert Gruener <ro...@uber.com>
    
    Closes #2185 from rgruener/thread-pooling and squashes the following commits:
    
    4af3cdc3 <Robert Gruener> ARROW-2656:  Improve the creation time of ParquetManifest for partitioned datasets using a thread pool
---
 ci/travis_script_python.sh           |  5 +---
 python/benchmarks/parquet.py         | 54 ++++++++++++++++++++++++++++++++++++
 python/pyarrow/parquet.py            | 40 ++++++++++++++++++++++----
 python/pyarrow/tests/test_parquet.py | 19 +++++++++++++
 python/requirements.txt              |  1 +
 5 files changed, 109 insertions(+), 10 deletions(-)

diff --git a/ci/travis_script_python.sh b/ci/travis_script_python.sh
index 4513fcd..4eeb103 100755
--- a/ci/travis_script_python.sh
+++ b/ci/travis_script_python.sh
@@ -102,9 +102,6 @@ pushd $ARROW_PYTHON_DIR
 
 # Other stuff pip install
 pip install -q -r requirements.txt
-if [ "$PYTHON_VERSION" == "2.7" ]; then
-  pip install -q futures
-fi
 if [ "$ARROW_TRAVIS_COVERAGE" == "1" ]; then
     export PYARROW_GENERATE_COVERAGE=1
     pip install -q coverage
@@ -192,7 +189,7 @@ if [ "$ARROW_TRAVIS_PYTHON_BENCHMARKS" == "1" ] && [ "$PYTHON_VERSION" == "3.6"
   source activate pyarrow_asv
   pip install -q git+https://github.com/pitrou/asv.git@customize_commands
 
-  export PYARROW_WITH_PARQUET=0
+  export PYARROW_WITH_PARQUET=1
   export PYARROW_WITH_PLASMA=1
   export PYARROW_WITH_ORC=0
 
diff --git a/python/benchmarks/parquet.py b/python/benchmarks/parquet.py
new file mode 100644
index 0000000..39b59ad
--- /dev/null
+++ b/python/benchmarks/parquet.py
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from concurrent.futures import ThreadPoolExecutor
+import pandas as pd
+import random
+import shutil
+import tempfile
+
+import pyarrow as pa
+import pyarrow.parquet as pq
+
+
+class ParquetManifestCreation(object):
+    """Benchmark creating a parquet manifest."""
+
+    size = 10 ** 6
+    tmpdir = None
+
+    param_names = ('num_partitions', 'num_threads')
+    params = [(10, 100, 1000), (1, 8, 'default')]
+
+    def setup(self, num_partitions, num_threads):
+        self.tmpdir = tempfile.mkdtemp('benchmark_parquet')
+        num1 = [random.choice(range(0, num_partitions))
+                for _ in range(self.size)]
+        num2 = [random.choice(range(0, 1000)) for _ in range(self.size)]
+        output_df = pd.DataFrame({'num1': num1, 'num2': num2})
+        output_table = pa.Table.from_pandas(output_df)
+        pq.write_to_dataset(output_table, self.tmpdir, ['num1'])
+
+    def teardown(self, num_partitions, num_threads):
+        shutil.rmtree(self.tmpdir)
+
+    def time_manifest_creation(self, num_partitions, num_threads):
+        if num_threads != 'default':
+            thread_pool = ThreadPoolExecutor(num_threads)
+        else:
+            thread_pool = None
+        pq.ParquetManifest(self.tmpdir, thread_pool=thread_pool)
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index bd97678..9c92737 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -16,6 +16,7 @@
 # under the License.
 
 from collections import defaultdict
+from concurrent import futures
 import os
 import inspect
 import json
@@ -634,23 +635,32 @@ class ParquetManifest(object):
 
     """
     def __init__(self, dirpath, filesystem=None, pathsep='/',
-                 partition_scheme='hive'):
+                 partition_scheme='hive', metadata_nthreads=1):
         self.filesystem = filesystem or _get_fs_from_path(dirpath)
         self.pathsep = pathsep
         self.dirpath = dirpath
         self.partition_scheme = partition_scheme
         self.partitions = ParquetPartitions()
         self.pieces = []
+        self._metadata_nthreads = metadata_nthreads
+        self._thread_pool = futures.ThreadPoolExecutor(
+            max_workers=metadata_nthreads)
 
         self.common_metadata_path = None
         self.metadata_path = None
 
         self._visit_level(0, self.dirpath, [])
 
+        # Due to concurrency, pieces will potentially by out of order if the
+        # dataset is partitioned so we sort them to yield stable results
+        self.pieces.sort(key=lambda piece: piece.path)
+
         if self.common_metadata_path is None:
             # _common_metadata is a subset of _metadata
             self.common_metadata_path = self.metadata_path
 
+        self._thread_pool.shutdown()
+
     def _visit_level(self, level, base_path, part_keys):
         fs = self.filesystem
 
@@ -690,13 +700,25 @@ class ParquetManifest(object):
                 file_name in EXCLUDED_PARQUET_PATHS)
 
     def _visit_directories(self, level, directories, part_keys):
+        futures_list = []
         for path in directories:
             head, tail = _path_split(path, self.pathsep)
             name, key = _parse_hive_partition(tail)
 
             index = self.partitions.get_index(level, name, key)
             dir_part_keys = part_keys + [(name, index)]
-            self._visit_level(level + 1, path, dir_part_keys)
+            # If you have less threads than levels, the wait call will block
+            # indefinitely due to multiple waits within a thread.
+            if level < self._metadata_nthreads:
+                future = self._thread_pool.submit(self._visit_level,
+                                                  level + 1,
+                                                  path,
+                                                  dir_part_keys)
+                futures_list.append(future)
+            else:
+                self._visit_level(level + 1, path, dir_part_keys)
+        if futures_list:
+            futures.wait(futures_list)
 
     def _parse_partition(self, dirname):
         if self.partition_scheme == 'hive':
@@ -759,10 +781,14 @@ class ParquetDataset(object):
         List of filters to apply, like ``[('x', '=', 0), ...]``. This
         implements partition-level (hive) filtering only, i.e., to prevent the
         loading of some files of the dataset.
+    metadata_nthreads: int, default 1
+        How many threads to allow the thread pool which is used to read the
+        dataset metadata. Increasing this is helpful to read partitioned
+        datasets.
     """
     def __init__(self, path_or_paths, filesystem=None, schema=None,
                  metadata=None, split_row_groups=False, validate_schema=True,
-                 filters=None):
+                 filters=None, metadata_nthreads=1):
         if filesystem is None:
             a_path = path_or_paths
             if isinstance(a_path, list):
@@ -776,7 +802,8 @@ class ParquetDataset(object):
         (self.pieces,
          self.partitions,
          self.common_metadata_path,
-         self.metadata_path) = _make_manifest(path_or_paths, self.fs)
+         self.metadata_path) = _make_manifest(
+            path_or_paths, self.fs, metadata_nthreads=metadata_nthreads)
 
         if self.common_metadata_path is not None:
             with self.fs.open(self.common_metadata_path) as f:
@@ -940,7 +967,7 @@ def _ensure_filesystem(fs):
         return fs
 
 
-def _make_manifest(path_or_paths, fs, pathsep='/'):
+def _make_manifest(path_or_paths, fs, pathsep='/', metadata_nthreads=1):
     partitions = None
     common_metadata_path = None
     metadata_path = None
@@ -951,7 +978,8 @@ def _make_manifest(path_or_paths, fs, pathsep='/'):
 
     if is_path(path_or_paths) and fs.isdir(path_or_paths):
         manifest = ParquetManifest(path_or_paths, filesystem=fs,
-                                   pathsep=fs.pathsep)
+                                   pathsep=fs.pathsep,
+                                   metadata_nthreads=metadata_nthreads)
         common_metadata_path = manifest.common_metadata_path
         metadata_path = manifest.metadata_path
         pieces = manifest.pieces
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index 0e51045..d7473e9 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -1033,6 +1033,25 @@ def test_read_partitioned_directory(tmpdir):
 
 
 @parquet
+def test_create_parquet_dataset_multi_threaded(tmpdir):
+    fs = LocalFileSystem.get_instance()
+    base_path = str(tmpdir)
+
+    _partition_test_for_filesystem(fs, base_path)
+
+    import pyarrow.parquet as pq
+
+    manifest = pq.ParquetManifest(base_path, filesystem=fs,
+                                  metadata_nthreads=1)
+    dataset = pq.ParquetDataset(base_path, filesystem=fs, metadata_nthreads=16)
+    assert len(dataset.pieces) > 0
+    partitions = dataset.partitions
+    assert len(partitions.partition_names) > 0
+    assert partitions.partition_names == manifest.partitions.partition_names
+    assert len(partitions.levels) == len(manifest.partitions.levels)
+
+
+@parquet
 def test_equivalency(tmpdir):
     fs = LocalFileSystem.get_instance()
     base_path = str(tmpdir)
diff --git a/python/requirements.txt b/python/requirements.txt
index 8d0c33a..7a435ac 100644
--- a/python/requirements.txt
+++ b/python/requirements.txt
@@ -2,3 +2,4 @@ pytest
 cloudpickle>=0.4.0
 numpy>=1.10.0
 six
+futures ; python_version < "3"