You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ko...@apache.org on 2021/11/10 02:06:57 UTC
[arrow] 12/12: ARROW-14620: [Python] Missing bindings for
existing_data_behavior makes it impossible to maintain old behavior
This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch maint-6.0.x
in repository https://gitbox.apache.org/repos/asf/arrow.git
commit aa9a7a698e33e278abe053f4634170b3b026e48e
Author: Weston Pace <we...@gmail.com>
AuthorDate: Mon Nov 8 12:36:23 2021 -1000
ARROW-14620: [Python] Missing bindings for existing_data_behavior makes it impossible to maintain old behavior
Closes #11632 from westonpace/bugfix/ARROW-14620--existing-data-behavior-missing-in-python
Authored-by: Weston Pace <we...@gmail.com>
Signed-off-by: Weston Pace <we...@gmail.com>
---
python/pyarrow/_dataset.pyx | 16 ++++++++-
python/pyarrow/dataset.py | 21 ++++++++++--
python/pyarrow/includes/libarrow_dataset.pxd | 9 +++++
python/pyarrow/tests/test_dataset.py | 49 ++++++++++++++++++++++++++++
4 files changed, 92 insertions(+), 3 deletions(-)
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index 42d7020..459c3b8 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -3364,7 +3364,8 @@ def _filesystemdataset_write(
Partitioning partitioning not None,
FileWriteOptions file_options not None,
int max_partitions,
- object file_visitor
+ object file_visitor,
+ str existing_data_behavior not None
):
"""
CFileSystemDataset.Write wrapper
@@ -3381,6 +3382,19 @@ def _filesystemdataset_write(
c_options.partitioning = partitioning.unwrap()
c_options.max_partitions = max_partitions
c_options.basename_template = tobytes(basename_template)
+ if existing_data_behavior == 'error':
+ c_options.existing_data_behavior = ExistingDataBehavior_ERROR
+ elif existing_data_behavior == 'overwrite_or_ignore':
+ c_options.existing_data_behavior =\
+ ExistingDataBehavior_OVERWRITE_OR_IGNORE
+ elif existing_data_behavior == 'delete_matching':
+ c_options.existing_data_behavior = ExistingDataBehavior_DELETE_MATCHING
+ else:
+ raise ValueError(
+ ("existing_data_behavior must be one of 'error', ",
+ "'overwrite_or_ignore' or 'delete_matching'")
+ )
+
if file_visitor is not None:
visit_args = {'base_dir': c_options.base_dir,
'file_visitor': file_visitor}
diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py
index 70aeb15..42515a9 100644
--- a/python/pyarrow/dataset.py
+++ b/python/pyarrow/dataset.py
@@ -736,7 +736,8 @@ def _ensure_write_partitioning(part, schema, flavor):
def write_dataset(data, base_dir, basename_template=None, format=None,
partitioning=None, partitioning_flavor=None, schema=None,
filesystem=None, file_options=None, use_threads=True,
- max_partitions=None, file_visitor=None):
+ max_partitions=None, file_visitor=None,
+ existing_data_behavior='error'):
"""
Write a dataset to a given format and partitioning.
@@ -798,6 +799,22 @@ def write_dataset(data, base_dir, basename_template=None, format=None,
def file_visitor(written_file):
visited_paths.append(written_file.path)
+ existing_data_behavior : 'error' | 'overwrite_or_ignore' | \
+'delete_matching'
+ Controls how the dataset will handle data that already exists in
+ the destination. The default behavior ('error') is to raise an error
+ if any data exists in the destination.
+
+ 'overwrite_or_ignore' will ignore any existing data and will
+ overwrite files with the same name as an output file. Other
+ existing files will be ignored. This behavior, in combination
+ with a unique basename_template for each write, will allow for
+ an append workflow.
+
+ 'delete_matching' is useful when you are writing a partitioned
+ dataset. The first time each partition directory is encountered
+ the entire directory will be deleted. This allows you to overwrite
+ old partitions completely.
"""
from pyarrow.fs import _resolve_filesystem_and_path
@@ -860,5 +877,5 @@ def write_dataset(data, base_dir, basename_template=None, format=None,
_filesystemdataset_write(
scanner, base_dir, basename_template, filesystem, partitioning,
- file_options, max_partitions, file_visitor
+ file_options, max_partitions, file_visitor, existing_data_behavior
)
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd
index 2527cde..abc79fe 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -86,6 +86,14 @@ ctypedef void cb_writer_finish(dict, CFileWriter*)
cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
+ cdef enum ExistingDataBehavior" arrow::dataset::ExistingDataBehavior":
+ ExistingDataBehavior_DELETE_MATCHING" \
+ arrow::dataset::ExistingDataBehavior::kDeleteMatchingPartitions"
+ ExistingDataBehavior_OVERWRITE_OR_IGNORE" \
+ arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore"
+ ExistingDataBehavior_ERROR" \
+ arrow::dataset::ExistingDataBehavior::kError"
+
cdef cppclass CScanOptions "arrow::dataset::ScanOptions":
@staticmethod
shared_ptr[CScanOptions] Make(shared_ptr[CSchema] schema)
@@ -278,6 +286,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
c_string basename_template
function[cb_writer_finish_internal] writer_pre_finish
function[cb_writer_finish_internal] writer_post_finish
+ ExistingDataBehavior existing_data_behavior
cdef cppclass CFileSystemDataset \
"arrow::dataset::FileSystemDataset"(CDataset):
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index e5590c4..20b1231 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -3480,6 +3480,55 @@ def test_write_dataset_with_dataset(tempdir):
assert dict(load_back_table.to_pydict()) == table.to_pydict()
+@pytest.mark.pandas
+def test_write_dataset_existing_data(tempdir):
+ directory = tempdir / 'ds'
+ table = pa.table({'b': ['x', 'y', 'z'], 'c': [1, 2, 3]})
+ partitioning = ds.partitioning(schema=pa.schema(
+ [pa.field('c', pa.int64())]), flavor='hive')
+
+ def compare_tables_ignoring_order(t1, t2):
+ df1 = t1.to_pandas().sort_values('b').reset_index(drop=True)
+ df2 = t2.to_pandas().sort_values('b').reset_index(drop=True)
+ assert df1.equals(df2)
+
+ # First write is ok
+ ds.write_dataset(table, directory, partitioning=partitioning, format='ipc')
+
+ table = pa.table({'b': ['a', 'b', 'c'], 'c': [2, 3, 4]})
+
+ # Second write should fail
+ with pytest.raises(pa.ArrowInvalid):
+ ds.write_dataset(table, directory,
+ partitioning=partitioning, format='ipc')
+
+ extra_table = pa.table({'b': ['e']})
+ extra_file = directory / 'c=2' / 'foo.arrow'
+ pyarrow.feather.write_feather(extra_table, extra_file)
+
+ # Should be ok and overwrite with overwrite behavior
+ ds.write_dataset(table, directory, partitioning=partitioning,
+ format='ipc',
+ existing_data_behavior='overwrite_or_ignore')
+
+ overwritten = pa.table(
+ {'b': ['e', 'x', 'a', 'b', 'c'], 'c': [2, 1, 2, 3, 4]})
+ readback = ds.dataset(tempdir, format='ipc',
+ partitioning=partitioning).to_table()
+ compare_tables_ignoring_order(readback, overwritten)
+ assert extra_file.exists()
+
+ # Should be ok and delete matching with delete_matching
+ ds.write_dataset(table, directory, partitioning=partitioning,
+ format='ipc', existing_data_behavior='delete_matching')
+
+ overwritten = pa.table({'b': ['x', 'a', 'b', 'c'], 'c': [1, 2, 3, 4]})
+ readback = ds.dataset(tempdir, format='ipc',
+ partitioning=partitioning).to_table()
+ compare_tables_ignoring_order(readback, overwritten)
+ assert not extra_file.exists()
+
+
@pytest.mark.parquet
@pytest.mark.pandas
def test_write_dataset_partitioned_dict(tempdir):