You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2022/06/09 08:57:50 UTC

[arrow] branch master updated: ARROW-16761: [C++][Python] Track bytes written in dataset (#13338)

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new b8067151db ARROW-16761: [C++][Python] Track bytes written in dataset (#13338)
b8067151db is described below

commit b8067151db9bfc04860285fdd8b5e73703346037
Author: Will Jones <wi...@gmail.com>
AuthorDate: Thu Jun 9 01:57:34 2022 -0700

    ARROW-16761: [C++][Python] Track bytes written in dataset (#13338)
    
    Adds a `size` field to `WrittenFile`, so that Python users can know how many bytes were written. As part of this, was exposed in C++ as `FileWriter::GetBytesWritten()`.
    
    The reason `GetBytesWriten()` is filled in during `Finish()` is that calling `Tell()` on the output stream afterwards throws an error.
    
    Authored-by: Will Jones <wi...@gmail.com>
    Signed-off-by: Antoine Pitrou <an...@python.org>
---
 cpp/src/arrow/dataset/file_base.cc           | 13 ++++++++++++-
 cpp/src/arrow/dataset/file_base.h            |  4 ++++
 docs/source/python/api/dataset.rst           |  1 +
 docs/source/python/dataset.rst               |  3 ++-
 python/pyarrow/_dataset.pxd                  |  2 ++
 python/pyarrow/_dataset.pyx                  | 15 +++++++++++++--
 python/pyarrow/_dataset_parquet.pyx          |  4 +++-
 python/pyarrow/dataset.py                    |  1 +
 python/pyarrow/includes/libarrow_dataset.pxd |  1 +
 python/pyarrow/tests/test_dataset.py         |  4 ++++
 10 files changed, 43 insertions(+), 5 deletions(-)

diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc
index 2e05706bbb..568ab41451 100644
--- a/cpp/src/arrow/dataset/file_base.cc
+++ b/cpp/src/arrow/dataset/file_base.cc
@@ -265,7 +265,18 @@ Status FileWriter::Write(RecordBatchReader* batches) {
 }
 
 Future<> FileWriter::Finish() {
-  return FinishInternal().Then([this]() { return destination_->CloseAsync(); });
+  return FinishInternal().Then([this]() -> Future<> {
+    ARROW_ASSIGN_OR_RAISE(bytes_written_, destination_->Tell());
+    return destination_->CloseAsync();
+  });
+}
+
+Result<int64_t> FileWriter::GetBytesWritten() const {
+  if (bytes_written_.has_value()) {
+    return bytes_written_.value();
+  } else {
+    return Status::Invalid("Cannot retrieve bytes written before calling Finish()");
+  }
 }
 
 namespace {
diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h
index debb26fd4d..9eb3182c2f 100644
--- a/cpp/src/arrow/dataset/file_base.h
+++ b/cpp/src/arrow/dataset/file_base.h
@@ -320,6 +320,9 @@ class ARROW_DS_EXPORT FileWriter {
   const std::shared_ptr<FileWriteOptions>& options() const { return options_; }
   const fs::FileLocator& destination() const { return destination_locator_; }
 
+  /// \brief After Finish() is called, provides number of bytes written to file.
+  Result<int64_t> GetBytesWritten() const;
+
  protected:
   FileWriter(std::shared_ptr<Schema> schema, std::shared_ptr<FileWriteOptions> options,
              std::shared_ptr<io::OutputStream> destination,
@@ -335,6 +338,7 @@ class ARROW_DS_EXPORT FileWriter {
   std::shared_ptr<FileWriteOptions> options_;
   std::shared_ptr<io::OutputStream> destination_;
   fs::FileLocator destination_locator_;
+  util::optional<int64_t> bytes_written_;
 };
 
 /// \brief Options for writing a dataset.
diff --git a/docs/source/python/api/dataset.rst b/docs/source/python/api/dataset.rst
index fdf41b15f9..866c67440c 100644
--- a/docs/source/python/api/dataset.rst
+++ b/docs/source/python/api/dataset.rst
@@ -70,3 +70,4 @@ Classes
    Scanner
    Expression
    InMemoryDataset
+   WrittenFile
diff --git a/docs/source/python/dataset.rst b/docs/source/python/dataset.rst
index 23491629ce..4b66809bb4 100644
--- a/docs/source/python/dataset.rst
+++ b/docs/source/python/dataset.rst
@@ -733,6 +733,7 @@ to supply a visitor that will be called as each file is created:
 
     def file_visitor(written_file):
         print(f"path={written_file.path}")
+        print(f"size={written_file.size} bytes")
         print(f"metadata={written_file.metadata}")
 
 .. ipython:: python
@@ -743,7 +744,7 @@ to supply a visitor that will be called as each file is created:
 This will allow you to collect the filenames that belong to the dataset and store them elsewhere
 which can be useful when you want to avoid scanning directories the next time you need to read
 the data.  It can also be used to generate the _metadata index file used by other tools such as
-dask or spark to create an index of the dataset.
+Dask or Spark to create an index of the dataset.
 
 Configuring format-specific parameters during a write
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
diff --git a/python/pyarrow/_dataset.pxd b/python/pyarrow/_dataset.pxd
index 8e1da07d62..8e5501fa16 100644
--- a/python/pyarrow/_dataset.pxd
+++ b/python/pyarrow/_dataset.pxd
@@ -160,3 +160,5 @@ cdef class WrittenFile(_Weakrefable):
     # This metadata will have the file path attribute set to the path of
     # the written file.
     cdef public object metadata
+    # The size of the file in bytes
+    cdef public int size
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index 9ac2376aa7..68833a5350 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -766,7 +766,8 @@ cdef class FileFormat(_Weakrefable):
     cdef WrittenFile _finish_write(self, path, base_dir,
                                    CFileWriter* file_writer):
         parquet_metadata = None
-        return WrittenFile(path, parquet_metadata)
+        size = GetResultValue(file_writer.GetBytesWritten())
+        return WrittenFile(path, parquet_metadata, size)
 
     cdef inline shared_ptr[CFileFormat] unwrap(self):
         return self.wrapped
@@ -2677,11 +2678,21 @@ cdef class WrittenFile(_Weakrefable):
     """
     Metadata information about files written as
     part of a dataset write operation
+
+    Parameters
+    ----------
+    path : str
+        Path to the file.
+    metadata : pyarrow.parquet.FileMetaData, optional
+        For Parquet files, the Parquet file metadata.
+    size : int
+        The size of the file in bytes.
     """
 
-    def __init__(self, path, metadata):
+    def __init__(self, path, metadata, size):
         self.path = path
         self.metadata = metadata
+        self.size = size
 
 
 cdef void _filesystemdataset_write_visitor(
diff --git a/python/pyarrow/_dataset_parquet.pyx b/python/pyarrow/_dataset_parquet.pyx
index 684253fff3..744bfac6bf 100644
--- a/python/pyarrow/_dataset_parquet.pyx
+++ b/python/pyarrow/_dataset_parquet.pyx
@@ -160,7 +160,9 @@ cdef class ParquetFileFormat(FileFormat):
             parquet_metadata.init(metadata)
             parquet_metadata.set_file_path(os.path.relpath(path, base_dir))
 
-        return WrittenFile(path, parquet_metadata)
+        size = GetResultValue(file_writer.GetBytesWritten())
+
+        return WrittenFile(path, parquet_metadata, size)
 
     @property
     def read_options(self):
diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py
index 8ef3e2f7aa..2518e37ec6 100644
--- a/python/pyarrow/dataset.py
+++ b/python/pyarrow/dataset.py
@@ -45,6 +45,7 @@ from pyarrow._dataset import (  # noqa
     TaggedRecordBatch,
     UnionDataset,
     UnionDatasetFactory,
+    WrittenFile,
     _get_partition_keys,
     _filesystemdataset_write,
 )
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd
index 64e59e0561..bd8fbd1b56 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -197,6 +197,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
         const shared_ptr[CSchema]& schema() const
         const shared_ptr[CFileWriteOptions]& options() const
         const CFileLocator& destination() const
+        CResult[int64_t] GetBytesWritten()
 
     cdef cppclass CFileFormat "arrow::dataset::FileFormat":
         shared_ptr[CFragmentScanOptions] default_fragment_scan_options
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index 81fa7d1245..ad98c83e66 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -4158,9 +4158,11 @@ def test_write_table(tempdir):
     ]
 
     visited_paths = []
+    visited_sizes = []
 
     def file_visitor(written_file):
         visited_paths.append(written_file.path)
+        visited_sizes.append(written_file.size)
 
     partitioning = ds.partitioning(
         pa.schema([("part", pa.string())]), flavor="hive")
@@ -4169,6 +4171,8 @@ def test_write_table(tempdir):
                      partitioning=partitioning, file_visitor=file_visitor)
     file_paths = list(base_dir.rglob("*"))
     assert set(file_paths) == set(expected_paths)
+    actual_sizes = [os.path.getsize(path) for path in visited_paths]
+    assert visited_sizes == actual_sizes
     result = ds.dataset(base_dir, format="ipc", partitioning=partitioning)
     assert result.to_table().equals(table)
     assert len(visited_paths) == 2