You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/01/10 22:04:23 UTC
[arrow] branch master updated: ARROW-1980: [Python] Fix race
condition in write_to_dataset
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new d0a93ba ARROW-1980: [Python] Fix race condition in write_to_dataset
d0a93ba is described below
commit d0a93ba3fac91fe12b83d2ef7fccf97435c4f4ec
Author: Jim Crist <ji...@gmail.com>
AuthorDate: Wed Jan 10 17:04:15 2018 -0500
ARROW-1980: [Python] Fix race condition in write_to_dataset
One race condition was already fixed, but another one exists when
writing by partition.
Author: Jim Crist <ji...@gmail.com>
Closes #1468 from jcrist/fix-race-condition-parquet-partition and squashes the following commits:
180c4a24 [Jim Crist] Fix race condition in write_to_dataset
---
python/pyarrow/parquet.py | 17 ++++++++++-------
1 file changed, 10 insertions(+), 7 deletions(-)
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index d9f1bd2..151e0df 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -966,6 +966,14 @@ where: string or pyarrow.io.NativeFile
""".format(_parquet_writer_arg_docs)
+def _mkdir_if_not_exists(fs, path):
+ if fs._isfilestore() and not fs.exists(path):
+ try:
+ fs.mkdir(path)
+ except OSError:
+ assert fs.exists(path)
+
+
def write_to_dataset(table, root_path, partition_cols=None,
filesystem=None, preserve_index=True, **kwargs):
"""
@@ -1012,11 +1020,7 @@ def write_to_dataset(table, root_path, partition_cols=None,
else:
fs = _ensure_filesystem(filesystem)
- if fs._isfilestore() and not fs.exists(root_path):
- try:
- fs.mkdir(root_path)
- except OSError:
- assert fs.exists(root_path)
+ _mkdir_if_not_exists(fs, root_path)
if partition_cols is not None and len(partition_cols) > 0:
df = table.to_pandas()
@@ -1034,8 +1038,7 @@ def write_to_dataset(table, root_path, partition_cols=None,
subtable = Table.from_pandas(subgroup,
preserve_index=preserve_index)
prefix = "/".join([root_path, subdir])
- if fs._isfilestore() and not fs.exists(prefix):
- fs.mkdir(prefix)
+ _mkdir_if_not_exists(fs, prefix)
outfile = compat.guid() + ".parquet"
full_path = "/".join([prefix, outfile])
with fs.open(full_path, 'wb') as f:
--
To stop receiving notification emails like this one, please contact
['"commits@arrow.apache.org" <co...@arrow.apache.org>'].