You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ko...@apache.org on 2021/11/10 02:06:57 UTC

[arrow] 12/12: ARROW-14620: [Python] Missing bindings for existing_data_behavior makes it impossible to maintain old behavior

This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch maint-6.0.x
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit aa9a7a698e33e278abe053f4634170b3b026e48e
Author: Weston Pace <we...@gmail.com>
AuthorDate: Mon Nov 8 12:36:23 2021 -1000

    ARROW-14620: [Python] Missing bindings for existing_data_behavior makes it impossible to maintain old behavior
    
    Closes #11632 from westonpace/bugfix/ARROW-14620--existing-data-behavior-missing-in-python
    
    Authored-by: Weston Pace <we...@gmail.com>
    Signed-off-by: Weston Pace <we...@gmail.com>
---
 python/pyarrow/_dataset.pyx                  | 16 ++++++++-
 python/pyarrow/dataset.py                    | 21 ++++++++++--
 python/pyarrow/includes/libarrow_dataset.pxd |  9 +++++
 python/pyarrow/tests/test_dataset.py         | 49 ++++++++++++++++++++++++++++
 4 files changed, 92 insertions(+), 3 deletions(-)

diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index 42d7020..459c3b8 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -3364,7 +3364,8 @@ def _filesystemdataset_write(
     Partitioning partitioning not None,
     FileWriteOptions file_options not None,
     int max_partitions,
-    object file_visitor
+    object file_visitor,
+    str existing_data_behavior not None
 ):
     """
     CFileSystemDataset.Write wrapper
@@ -3381,6 +3382,19 @@ def _filesystemdataset_write(
     c_options.partitioning = partitioning.unwrap()
     c_options.max_partitions = max_partitions
     c_options.basename_template = tobytes(basename_template)
+    if existing_data_behavior == 'error':
+        c_options.existing_data_behavior = ExistingDataBehavior_ERROR
+    elif existing_data_behavior == 'overwrite_or_ignore':
+        c_options.existing_data_behavior =\
+            ExistingDataBehavior_OVERWRITE_OR_IGNORE
+    elif existing_data_behavior == 'delete_matching':
+        c_options.existing_data_behavior = ExistingDataBehavior_DELETE_MATCHING
+    else:
+        raise ValueError(
+            ("existing_data_behavior must be one of 'error', ",
+             "'overwrite_or_ignore' or 'delete_matching'")
+        )
+
     if file_visitor is not None:
         visit_args = {'base_dir': c_options.base_dir,
                       'file_visitor': file_visitor}
diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py
index 70aeb15..42515a9 100644
--- a/python/pyarrow/dataset.py
+++ b/python/pyarrow/dataset.py
@@ -736,7 +736,8 @@ def _ensure_write_partitioning(part, schema, flavor):
 def write_dataset(data, base_dir, basename_template=None, format=None,
                   partitioning=None, partitioning_flavor=None, schema=None,
                   filesystem=None, file_options=None, use_threads=True,
-                  max_partitions=None, file_visitor=None):
+                  max_partitions=None, file_visitor=None,
+                  existing_data_behavior='error'):
     """
     Write a dataset to a given format and partitioning.
 
@@ -798,6 +799,22 @@ def write_dataset(data, base_dir, basename_template=None, format=None,
 
             def file_visitor(written_file):
                 visited_paths.append(written_file.path)
+    existing_data_behavior : 'error' | 'overwrite_or_ignore' | \
+'delete_matching'
+        Controls how the dataset will handle data that already exists in
+        the destination.  The default behavior ('error') is to raise an error
+        if any data exists in the destination.
+
+        'overwrite_or_ignore' will ignore any existing data and will
+        overwrite files with the same name as an output file.  Other
+        existing files will be ignored.  This behavior, in combination
+        with a unique basename_template for each write, will allow for
+        an append workflow.
+
+        'delete_matching' is useful when you are writing a partitioned
+        dataset.  The first time each partition directory is encountered
+        the entire directory will be deleted.  This allows you to overwrite
+        old partitions completely.
     """
     from pyarrow.fs import _resolve_filesystem_and_path
 
@@ -860,5 +877,5 @@ def write_dataset(data, base_dir, basename_template=None, format=None,
 
     _filesystemdataset_write(
         scanner, base_dir, basename_template, filesystem, partitioning,
-        file_options, max_partitions, file_visitor
+        file_options, max_partitions, file_visitor, existing_data_behavior
     )
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd
index 2527cde..abc79fe 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -86,6 +86,14 @@ ctypedef void cb_writer_finish(dict, CFileWriter*)
 
 cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
 
+    cdef enum ExistingDataBehavior" arrow::dataset::ExistingDataBehavior":
+        ExistingDataBehavior_DELETE_MATCHING" \
+            arrow::dataset::ExistingDataBehavior::kDeleteMatchingPartitions"
+        ExistingDataBehavior_OVERWRITE_OR_IGNORE" \
+            arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore"
+        ExistingDataBehavior_ERROR" \
+            arrow::dataset::ExistingDataBehavior::kError"
+
     cdef cppclass CScanOptions "arrow::dataset::ScanOptions":
         @staticmethod
         shared_ptr[CScanOptions] Make(shared_ptr[CSchema] schema)
@@ -278,6 +286,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
         c_string basename_template
         function[cb_writer_finish_internal] writer_pre_finish
         function[cb_writer_finish_internal] writer_post_finish
+        ExistingDataBehavior existing_data_behavior
 
     cdef cppclass CFileSystemDataset \
             "arrow::dataset::FileSystemDataset"(CDataset):
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index e5590c4..20b1231 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -3480,6 +3480,55 @@ def test_write_dataset_with_dataset(tempdir):
         assert dict(load_back_table.to_pydict()) == table.to_pydict()
 
 
+@pytest.mark.pandas
+def test_write_dataset_existing_data(tempdir):
+    directory = tempdir / 'ds'
+    table = pa.table({'b': ['x', 'y', 'z'], 'c': [1, 2, 3]})
+    partitioning = ds.partitioning(schema=pa.schema(
+        [pa.field('c', pa.int64())]), flavor='hive')
+
+    def compare_tables_ignoring_order(t1, t2):
+        df1 = t1.to_pandas().sort_values('b').reset_index(drop=True)
+        df2 = t2.to_pandas().sort_values('b').reset_index(drop=True)
+        assert df1.equals(df2)
+
+    # First write is ok
+    ds.write_dataset(table, directory, partitioning=partitioning, format='ipc')
+
+    table = pa.table({'b': ['a', 'b', 'c'], 'c': [2, 3, 4]})
+
+    # Second write should fail
+    with pytest.raises(pa.ArrowInvalid):
+        ds.write_dataset(table, directory,
+                         partitioning=partitioning, format='ipc')
+
+    extra_table = pa.table({'b': ['e']})
+    extra_file = directory / 'c=2' / 'foo.arrow'
+    pyarrow.feather.write_feather(extra_table, extra_file)
+
+    # Should be ok and overwrite with overwrite behavior
+    ds.write_dataset(table, directory, partitioning=partitioning,
+                     format='ipc',
+                     existing_data_behavior='overwrite_or_ignore')
+
+    overwritten = pa.table(
+        {'b': ['e', 'x', 'a', 'b', 'c'], 'c': [2, 1, 2, 3, 4]})
+    readback = ds.dataset(tempdir, format='ipc',
+                          partitioning=partitioning).to_table()
+    compare_tables_ignoring_order(readback, overwritten)
+    assert extra_file.exists()
+
+    # Should be ok and delete matching with delete_matching
+    ds.write_dataset(table, directory, partitioning=partitioning,
+                     format='ipc', existing_data_behavior='delete_matching')
+
+    overwritten = pa.table({'b': ['x', 'a', 'b', 'c'], 'c': [1, 2, 3, 4]})
+    readback = ds.dataset(tempdir, format='ipc',
+                          partitioning=partitioning).to_table()
+    compare_tables_ignoring_order(readback, overwritten)
+    assert not extra_file.exists()
+
+
 @pytest.mark.parquet
 @pytest.mark.pandas
 def test_write_dataset_partitioned_dict(tempdir):