You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2018/07/24 08:40:40 UTC
[arrow] branch master updated: ARROW-2656: [Python] Improve
creation time of ParquetManifest for partitioned datasets using thread pool
This is an automated email from the ASF dual-hosted git repository.
uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 078b806 ARROW-2656: [Python] Improve creation time of ParquetManifest for partitioned datasets using thread pool
078b806 is described below
commit 078b8068f1c7176ff4a56aa95c9b57800aacce82
Author: Robert Gruener <ro...@uber.com>
AuthorDate: Tue Jul 24 10:40:12 2018 +0200
ARROW-2656: [Python] Improve creation time of ParquetManifest for partitioned datasets using thread pool
https://issues.apache.org/jira/browse/ARROW-2656 I probably should still write a benchmark for this, but figured I can get some quick feedback on this while working on that.
Author: Robert Gruener <ro...@uber.com>
Closes #2185 from rgruener/thread-pooling and squashes the following commits:
4af3cdc3 <Robert Gruener> ARROW-2656: Improve the creation time of ParquetManifest for partitioned datasets using a thread pool
---
ci/travis_script_python.sh | 5 +---
python/benchmarks/parquet.py | 54 ++++++++++++++++++++++++++++++++++++
python/pyarrow/parquet.py | 40 ++++++++++++++++++++++----
python/pyarrow/tests/test_parquet.py | 19 +++++++++++++
python/requirements.txt | 1 +
5 files changed, 109 insertions(+), 10 deletions(-)
diff --git a/ci/travis_script_python.sh b/ci/travis_script_python.sh
index 4513fcd..4eeb103 100755
--- a/ci/travis_script_python.sh
+++ b/ci/travis_script_python.sh
@@ -102,9 +102,6 @@ pushd $ARROW_PYTHON_DIR
# Other stuff pip install
pip install -q -r requirements.txt
-if [ "$PYTHON_VERSION" == "2.7" ]; then
- pip install -q futures
-fi
if [ "$ARROW_TRAVIS_COVERAGE" == "1" ]; then
export PYARROW_GENERATE_COVERAGE=1
pip install -q coverage
@@ -192,7 +189,7 @@ if [ "$ARROW_TRAVIS_PYTHON_BENCHMARKS" == "1" ] && [ "$PYTHON_VERSION" == "3.6"
source activate pyarrow_asv
pip install -q git+https://github.com/pitrou/asv.git@customize_commands
- export PYARROW_WITH_PARQUET=0
+ export PYARROW_WITH_PARQUET=1
export PYARROW_WITH_PLASMA=1
export PYARROW_WITH_ORC=0
diff --git a/python/benchmarks/parquet.py b/python/benchmarks/parquet.py
new file mode 100644
index 0000000..39b59ad
--- /dev/null
+++ b/python/benchmarks/parquet.py
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from concurrent.futures import ThreadPoolExecutor
+import pandas as pd
+import random
+import shutil
+import tempfile
+
+import pyarrow as pa
+import pyarrow.parquet as pq
+
+
+class ParquetManifestCreation(object):
+ """Benchmark creating a parquet manifest."""
+
+ size = 10 ** 6
+ tmpdir = None
+
+ param_names = ('num_partitions', 'num_threads')
+ params = [(10, 100, 1000), (1, 8, 'default')]
+
+ def setup(self, num_partitions, num_threads):
+ self.tmpdir = tempfile.mkdtemp('benchmark_parquet')
+ num1 = [random.choice(range(0, num_partitions))
+ for _ in range(self.size)]
+ num2 = [random.choice(range(0, 1000)) for _ in range(self.size)]
+ output_df = pd.DataFrame({'num1': num1, 'num2': num2})
+ output_table = pa.Table.from_pandas(output_df)
+ pq.write_to_dataset(output_table, self.tmpdir, ['num1'])
+
+ def teardown(self, num_partitions, num_threads):
+ shutil.rmtree(self.tmpdir)
+
+ def time_manifest_creation(self, num_partitions, num_threads):
+ if num_threads != 'default':
+ thread_pool = ThreadPoolExecutor(num_threads)
+ else:
+ thread_pool = None
+ pq.ParquetManifest(self.tmpdir, thread_pool=thread_pool)
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index bd97678..9c92737 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -16,6 +16,7 @@
# under the License.
from collections import defaultdict
+from concurrent import futures
import os
import inspect
import json
@@ -634,23 +635,32 @@ class ParquetManifest(object):
"""
def __init__(self, dirpath, filesystem=None, pathsep='/',
- partition_scheme='hive'):
+ partition_scheme='hive', metadata_nthreads=1):
self.filesystem = filesystem or _get_fs_from_path(dirpath)
self.pathsep = pathsep
self.dirpath = dirpath
self.partition_scheme = partition_scheme
self.partitions = ParquetPartitions()
self.pieces = []
+ self._metadata_nthreads = metadata_nthreads
+ self._thread_pool = futures.ThreadPoolExecutor(
+ max_workers=metadata_nthreads)
self.common_metadata_path = None
self.metadata_path = None
self._visit_level(0, self.dirpath, [])
+ # Due to concurrency, pieces will potentially by out of order if the
+ # dataset is partitioned so we sort them to yield stable results
+ self.pieces.sort(key=lambda piece: piece.path)
+
if self.common_metadata_path is None:
# _common_metadata is a subset of _metadata
self.common_metadata_path = self.metadata_path
+ self._thread_pool.shutdown()
+
def _visit_level(self, level, base_path, part_keys):
fs = self.filesystem
@@ -690,13 +700,25 @@ class ParquetManifest(object):
file_name in EXCLUDED_PARQUET_PATHS)
def _visit_directories(self, level, directories, part_keys):
+ futures_list = []
for path in directories:
head, tail = _path_split(path, self.pathsep)
name, key = _parse_hive_partition(tail)
index = self.partitions.get_index(level, name, key)
dir_part_keys = part_keys + [(name, index)]
- self._visit_level(level + 1, path, dir_part_keys)
+ # If you have less threads than levels, the wait call will block
+ # indefinitely due to multiple waits within a thread.
+ if level < self._metadata_nthreads:
+ future = self._thread_pool.submit(self._visit_level,
+ level + 1,
+ path,
+ dir_part_keys)
+ futures_list.append(future)
+ else:
+ self._visit_level(level + 1, path, dir_part_keys)
+ if futures_list:
+ futures.wait(futures_list)
def _parse_partition(self, dirname):
if self.partition_scheme == 'hive':
@@ -759,10 +781,14 @@ class ParquetDataset(object):
List of filters to apply, like ``[('x', '=', 0), ...]``. This
implements partition-level (hive) filtering only, i.e., to prevent the
loading of some files of the dataset.
+ metadata_nthreads: int, default 1
+ How many threads to allow the thread pool which is used to read the
+ dataset metadata. Increasing this is helpful to read partitioned
+ datasets.
"""
def __init__(self, path_or_paths, filesystem=None, schema=None,
metadata=None, split_row_groups=False, validate_schema=True,
- filters=None):
+ filters=None, metadata_nthreads=1):
if filesystem is None:
a_path = path_or_paths
if isinstance(a_path, list):
@@ -776,7 +802,8 @@ class ParquetDataset(object):
(self.pieces,
self.partitions,
self.common_metadata_path,
- self.metadata_path) = _make_manifest(path_or_paths, self.fs)
+ self.metadata_path) = _make_manifest(
+ path_or_paths, self.fs, metadata_nthreads=metadata_nthreads)
if self.common_metadata_path is not None:
with self.fs.open(self.common_metadata_path) as f:
@@ -940,7 +967,7 @@ def _ensure_filesystem(fs):
return fs
-def _make_manifest(path_or_paths, fs, pathsep='/'):
+def _make_manifest(path_or_paths, fs, pathsep='/', metadata_nthreads=1):
partitions = None
common_metadata_path = None
metadata_path = None
@@ -951,7 +978,8 @@ def _make_manifest(path_or_paths, fs, pathsep='/'):
if is_path(path_or_paths) and fs.isdir(path_or_paths):
manifest = ParquetManifest(path_or_paths, filesystem=fs,
- pathsep=fs.pathsep)
+ pathsep=fs.pathsep,
+ metadata_nthreads=metadata_nthreads)
common_metadata_path = manifest.common_metadata_path
metadata_path = manifest.metadata_path
pieces = manifest.pieces
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index 0e51045..d7473e9 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -1033,6 +1033,25 @@ def test_read_partitioned_directory(tmpdir):
@parquet
+def test_create_parquet_dataset_multi_threaded(tmpdir):
+ fs = LocalFileSystem.get_instance()
+ base_path = str(tmpdir)
+
+ _partition_test_for_filesystem(fs, base_path)
+
+ import pyarrow.parquet as pq
+
+ manifest = pq.ParquetManifest(base_path, filesystem=fs,
+ metadata_nthreads=1)
+ dataset = pq.ParquetDataset(base_path, filesystem=fs, metadata_nthreads=16)
+ assert len(dataset.pieces) > 0
+ partitions = dataset.partitions
+ assert len(partitions.partition_names) > 0
+ assert partitions.partition_names == manifest.partitions.partition_names
+ assert len(partitions.levels) == len(manifest.partitions.levels)
+
+
+@parquet
def test_equivalency(tmpdir):
fs = LocalFileSystem.get_instance()
base_path = str(tmpdir)
diff --git a/python/requirements.txt b/python/requirements.txt
index 8d0c33a..7a435ac 100644
--- a/python/requirements.txt
+++ b/python/requirements.txt
@@ -2,3 +2,4 @@ pytest
cloudpickle>=0.4.0
numpy>=1.10.0
six
+futures ; python_version < "3"