You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/07/28 02:25:40 UTC

arrow git commit: ARROW-1285: [Python] Delete any incomplete file when attempt to write single Parquet file fails

Repository: arrow
Updated Branches:
  refs/heads/master f72279b2d -> b7639c153


ARROW-1285: [Python] Delete any incomplete file when attempt to write single Parquet file fails

cc @jreback

Author: Wes McKinney <we...@twosigma.com>

Closes #902 from wesm/ARROW-1285 and squashes the following commits:

b8f9ef4d [Wes McKinney] Delete any incomplete file when attempt to write single Parquet file fails


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/b7639c15
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/b7639c15
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/b7639c15

Branch: refs/heads/master
Commit: b7639c1539af9327483ce17c95dab3b19896b002
Parents: f72279b
Author: Wes McKinney <we...@twosigma.com>
Authored: Thu Jul 27 22:25:34 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Thu Jul 27 22:25:34 2017 -0400

----------------------------------------------------------------------
 python/pyarrow/_parquet.pyx          |  8 ++++----
 python/pyarrow/parquet.py            | 19 ++++++++++++++++---
 python/pyarrow/tests/test_parquet.py | 25 +++++++++++++++++++++++++
 3 files changed, 45 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/b7639c15/python/pyarrow/_parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index bbe5203..20f189a 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -542,6 +542,7 @@ cdef ParquetCompression compression_from_name(str name):
 cdef class ParquetWriter:
     cdef:
         unique_ptr[FileWriter] writer
+        shared_ptr[OutputStream] sink
 
     cdef readonly:
         object use_dictionary
@@ -555,14 +556,13 @@ cdef class ParquetWriter:
                   MemoryPool memory_pool=None, use_deprecated_int96_timestamps=False):
         cdef:
             shared_ptr[FileOutputStream] filestream
-            shared_ptr[OutputStream] sink
             shared_ptr[WriterProperties] properties
 
         if isinstance(where, six.string_types):
             check_status(FileOutputStream.Open(tobytes(where), &filestream))
-            sink = <shared_ptr[OutputStream]> filestream
+            self.sink = <shared_ptr[OutputStream]> filestream
         else:
-            get_writer(where, &sink)
+            get_writer(where, &self.sink)
 
         self.use_dictionary = use_dictionary
         self.compression = compression
@@ -582,7 +582,7 @@ cdef class ParquetWriter:
         check_status(
             FileWriter.Open(deref(schema.schema),
                             maybe_unbox_memory_pool(memory_pool),
-                            sink, properties, arrow_properties,
+                            self.sink, properties, arrow_properties,
                             &self.writer))
 
     cdef void _set_int96_support(self, ArrowWriterProperties.Builder* props):

http://git-wip-us.apache.org/repos/asf/arrow/blob/b7639c15/python/pyarrow/parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index fea7397..34c1d12 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -769,9 +769,22 @@ def write_table(table, where, row_group_size=None, version='1.0',
         compression=compression,
         version=version,
         use_deprecated_int96_timestamps=use_deprecated_int96_timestamps)
-    writer = ParquetWriter(where, table.schema, **options)
-    writer.write_table(table, row_group_size=row_group_size)
-    writer.close()
+
+    writer = None
+    try:
+        writer = ParquetWriter(where, table.schema, **options)
+        writer.write_table(table, row_group_size=row_group_size)
+    except:
+        if writer is not None:
+            writer.close()
+        if isinstance(where, six.string_types):
+            try:
+                os.remove(where)
+            except os.error:
+                pass
+        raise
+    else:
+        writer.close()
 
 
 def write_metadata(schema, where, version='1.0',

http://git-wip-us.apache.org/repos/asf/arrow/blob/b7639c15/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index 40e44b3..6763fb3 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -1006,3 +1006,28 @@ def test_multiindex_duplicate_values(tmpdir):
 
     result_df = result_table.to_pandas()
     tm.assert_frame_equal(result_df, df)
+
+
+@parquet
+def test_write_error_deletes_incomplete_file(tmpdir):
+    # ARROW-1285
+    df = pd.DataFrame({'a': list('abc'),
+                       'b': list(range(1, 4)),
+                       'c': np.arange(3, 6).astype('u1'),
+                       'd': np.arange(4.0, 7.0, dtype='float64'),
+                       'e': [True, False, True],
+                       'f': pd.Categorical(list('abc')),
+                       'g': pd.date_range('20130101', periods=3),
+                       'h': pd.date_range('20130101', periods=3,
+                                          tz='US/Eastern'),
+                       'i': pd.date_range('20130101', periods=3, freq='ns')})
+
+    pdf = pa.Table.from_pandas(df)
+
+    filename = tmpdir.join('tmp_file').strpath
+    try:
+        _write_table(pdf, filename)
+    except pa.ArrowException:
+        pass
+
+    assert not os.path.exists(filename)