You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/01/10 22:04:23 UTC

[arrow] branch master updated: ARROW-1980: [Python] Fix race condition in write_to_dataset

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new d0a93ba  ARROW-1980: [Python] Fix race condition in write_to_dataset
d0a93ba is described below

commit d0a93ba3fac91fe12b83d2ef7fccf97435c4f4ec
Author: Jim Crist <ji...@gmail.com>
AuthorDate: Wed Jan 10 17:04:15 2018 -0500

    ARROW-1980: [Python] Fix race condition in write_to_dataset
    
    One race condition was already fixed, but another one exists when
    writing by partition.
    
    Author: Jim Crist <ji...@gmail.com>
    
    Closes #1468 from jcrist/fix-race-condition-parquet-partition and squashes the following commits:
    
    180c4a24 [Jim Crist] Fix race condition in write_to_dataset
---
 python/pyarrow/parquet.py | 17 ++++++++++-------
 1 file changed, 10 insertions(+), 7 deletions(-)

diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index d9f1bd2..151e0df 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -966,6 +966,14 @@ where: string or pyarrow.io.NativeFile
 """.format(_parquet_writer_arg_docs)
 
 
+def _mkdir_if_not_exists(fs, path):
+    if fs._isfilestore() and not fs.exists(path):
+        try:
+            fs.mkdir(path)
+        except OSError:
+            assert fs.exists(path)
+
+
 def write_to_dataset(table, root_path, partition_cols=None,
                      filesystem=None, preserve_index=True, **kwargs):
     """
@@ -1012,11 +1020,7 @@ def write_to_dataset(table, root_path, partition_cols=None,
     else:
         fs = _ensure_filesystem(filesystem)
 
-    if fs._isfilestore() and not fs.exists(root_path):
-        try:
-            fs.mkdir(root_path)
-        except OSError:
-            assert fs.exists(root_path)
+    _mkdir_if_not_exists(fs, root_path)
 
     if partition_cols is not None and len(partition_cols) > 0:
         df = table.to_pandas()
@@ -1034,8 +1038,7 @@ def write_to_dataset(table, root_path, partition_cols=None,
             subtable = Table.from_pandas(subgroup,
                                          preserve_index=preserve_index)
             prefix = "/".join([root_path, subdir])
-            if fs._isfilestore() and not fs.exists(prefix):
-                fs.mkdir(prefix)
+            _mkdir_if_not_exists(fs, prefix)
             outfile = compat.guid() + ".parquet"
             full_path = "/".join([prefix, outfile])
             with fs.open(full_path, 'wb') as f:

-- 
To stop receiving notification emails like this one, please contact
['"commits@arrow.apache.org" <co...@arrow.apache.org>'].