You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/05/06 15:12:26 UTC

[arrow] branch master updated: ARROW-5258: [C++/Python] Collect file metadata of dataset pieces

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 790c142  ARROW-5258: [C++/Python] Collect file metadata of dataset pieces
790c142 is described below

commit 790c142f360e40d405fe71056ecdccfa707aef27
Author: Pearu Peterson <pe...@gmail.com>
AuthorDate: Mon May 6 10:10:17 2019 -0500

    ARROW-5258: [C++/Python] Collect file metadata of dataset pieces
    
    This PR supersedes PR #4166 and provides
    - [x] file metadata API (C++)
    - [x] ParquetWriter `metadata` property, only available after calling `close` method (Python)
    - [x] FileMetaData `to_dict` method for collecting file metadata information to a dictionary (Python)
    - [x] support for `metadata_collector` kw argument to `ParquetWriter` and `write_to_dataset` for collecting the file metadata instances of dataset pieces.
    - [x] unit-test
    
    Author: Pearu Peterson <pe...@gmail.com>
    
    Closes #4236 from pearu/pearu/arrow-1983-2 and squashes the following commits:
    
    11419a8de <Pearu Peterson> Remove unnecessary file_metadata_.reset calls.
    d6c9f600b <Pearu Peterson> Fix lint error
    6375d9cbe <Pearu Peterson> Expose file metadata to Python. Introduce metadata_collector kw argument to ParquetWriter, used to collect metadata instances of dataset pieces.
    940b93505 <Pearu Peterson> Introduce ParquetFileWriter::metadata.
---
 cpp/src/parquet/arrow/writer.cc      |  6 +++
 cpp/src/parquet/arrow/writer.h       |  2 +
 cpp/src/parquet/file_writer.cc       |  9 ++++-
 cpp/src/parquet/file_writer.h        |  7 ++++
 python/pyarrow/_parquet.pxd          |  2 +
 python/pyarrow/_parquet.pyx          | 72 ++++++++++++++++++++++++++++++++++++
 python/pyarrow/parquet.py            | 18 +++++++--
 python/pyarrow/tests/test_parquet.py | 31 ++++++++++++++++
 8 files changed, 141 insertions(+), 6 deletions(-)

diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc
index f9092a3..67a71fb 100644
--- a/cpp/src/parquet/arrow/writer.cc
+++ b/cpp/src/parquet/arrow/writer.cc
@@ -1058,6 +1058,8 @@ class FileWriter::Impl {
 
   virtual ~Impl() {}
 
+  const std::shared_ptr<FileMetaData> metadata() const { return writer_->metadata(); }
+
  private:
   friend class FileWriter;
 
@@ -1089,6 +1091,10 @@ Status FileWriter::Close() { return impl_->Close(); }
 
 MemoryPool* FileWriter::memory_pool() const { return impl_->memory_pool(); }
 
+const std::shared_ptr<FileMetaData> FileWriter::metadata() const {
+  return impl_->metadata();
+}
+
 FileWriter::~FileWriter() {}
 
 FileWriter::FileWriter(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h
index 56cd895..20e38ac 100644
--- a/cpp/src/parquet/arrow/writer.h
+++ b/cpp/src/parquet/arrow/writer.h
@@ -181,6 +181,8 @@ class PARQUET_EXPORT FileWriter {
 
   ::arrow::MemoryPool* memory_pool() const;
 
+  const std::shared_ptr<FileMetaData> metadata() const;
+
  private:
   class PARQUET_NO_EXPORT Impl;
   std::unique_ptr<Impl> impl_;
diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc
index 51f0cb4..6d50e8b 100644
--- a/cpp/src/parquet/file_writer.cc
+++ b/cpp/src/parquet/file_writer.cc
@@ -258,8 +258,8 @@ class FileSerializer : public ParquetFileWriter::Contents {
       row_group_writer_.reset();
 
       // Write magic bytes and metadata
-      auto metadata = metadata_->Finish();
-      WriteFileMetaData(*metadata, sink_.get());
+      file_metadata_ = metadata_->Finish();
+      WriteFileMetaData(*file_metadata_, sink_.get());
 
       sink_->Close();
     }
@@ -389,6 +389,10 @@ const std::shared_ptr<const KeyValueMetadata>& ParquetFileWriter::key_value_meta
   return contents_->key_value_metadata();
 }
 
+const std::shared_ptr<FileMetaData> ParquetFileWriter::metadata() const {
+  return file_metadata_;
+}
+
 void ParquetFileWriter::Open(std::unique_ptr<ParquetFileWriter::Contents> contents) {
   contents_ = std::move(contents);
 }
@@ -396,6 +400,7 @@ void ParquetFileWriter::Open(std::unique_ptr<ParquetFileWriter::Contents> conten
 void ParquetFileWriter::Close() {
   if (contents_) {
     contents_->Close();
+    file_metadata_ = contents_->metadata();
     contents_.reset();
   }
 }
diff --git a/cpp/src/parquet/file_writer.h b/cpp/src/parquet/file_writer.h
index 860500f..a24bdc5 100644
--- a/cpp/src/parquet/file_writer.h
+++ b/cpp/src/parquet/file_writer.h
@@ -148,6 +148,9 @@ class PARQUET_EXPORT ParquetFileWriter {
 
     /// This should be the only place this is stored. Everything else is a const reference
     std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
+
+    const std::shared_ptr<FileMetaData> metadata() const { return file_metadata_; }
+    std::shared_ptr<FileMetaData> file_metadata_;
   };
 
   ParquetFileWriter();
@@ -216,9 +219,13 @@ class PARQUET_EXPORT ParquetFileWriter {
   /// Returns the file custom metadata
   const std::shared_ptr<const KeyValueMetadata>& key_value_metadata() const;
 
+  /// Returns the file metadata, only available after calling Close().
+  const std::shared_ptr<FileMetaData> metadata() const;
+
  private:
   // Holds a pointer to an instance of Contents implementation
   std::unique_ptr<Contents> contents_;
+  std::shared_ptr<FileMetaData> file_metadata_;
 };
 
 }  // namespace parquet
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index eedddff..f1b44b0 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -323,6 +323,8 @@ cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil:
         CStatus NewRowGroup(int64_t chunk_size)
         CStatus Close()
 
+        const shared_ptr[CFileMetaData] metadata() const
+
     cdef cppclass ArrowWriterProperties:
         cppclass Builder:
             Builder()
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index 3ee174e..f074852 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -68,6 +68,18 @@ cdef class RowGroupStatistics:
                                self.num_values,
                                self.physical_type)
 
+    def to_dict(self):
+        d = dict(
+            has_min_max=self.has_min_max,
+            min=self.min,
+            max=self.max,
+            null_count=self.null_count,
+            distinct_count=self.distinct_count,
+            num_values=self.num_values,
+            physical_type=self.physical_type
+        )
+        return d
+
     def __eq__(self, other):
         try:
             return self.equals(other)
@@ -192,6 +204,25 @@ cdef class ColumnChunkMetaData:
                                           self.total_compressed_size,
                                           self.total_uncompressed_size)
 
+    def to_dict(self):
+        d = dict(
+            file_offset=self.file_offset,
+            file_path=self.file_path,
+            physical_type=self.physical_type,
+            num_values=self.num_values,
+            path_in_schema=self.path_in_schema,
+            is_stats_set=self.is_stats_set,
+            statistics=self.statistics.to_dict(),
+            compression=self.compression,
+            encodings=self.encodings,
+            has_dictionary_page=self.has_dictionary_page,
+            dictionary_page_offset=self.dictionary_page_offset,
+            data_page_offset=self.data_page_offset,
+            total_compressed_size=self.total_compressed_size,
+            total_uncompressed_size=self.total_uncompressed_size
+        )
+        return d
+
     def __eq__(self, other):
         try:
             return self.equals(other)
@@ -338,6 +369,18 @@ cdef class RowGroupMetaData:
                                  self.num_rows,
                                  self.total_byte_size)
 
+    def to_dict(self):
+        columns = []
+        d = dict(
+            num_columns=self.num_columns,
+            num_rows=self.num_rows,
+            total_byte_size=self.total_byte_size,
+            columns=columns,
+        )
+        for i in range(self.num_columns):
+            columns.append(self.column(i).to_dict())
+        return d
+
     @property
     def num_columns(self):
         return self.metadata.num_columns()
@@ -396,6 +439,21 @@ cdef class FileMetaData:
                                  self.format_version,
                                  self.serialized_size)
 
+    def to_dict(self):
+        row_groups = []
+        d = dict(
+            created_by=self.created_by,
+            num_columns=self.num_columns,
+            num_rows=self.num_rows,
+            num_row_groups=self.num_row_groups,
+            row_groups=row_groups,
+            format_version=self.format_version,
+            serialized_size=self.serialized_size
+        )
+        for i in range(self.num_row_groups):
+            row_groups.append(self.row_group(i).to_dict())
+        return d
+
     def __eq__(self, other):
         try:
             return self.equals(other)
@@ -1029,3 +1087,17 @@ cdef class ParquetWriter:
         with nogil:
             check_status(self.writer.get()
                          .WriteTable(deref(ctable), c_row_group_size))
+
+    @property
+    def metadata(self):
+        cdef:
+            shared_ptr[CFileMetaData] metadata
+            FileMetaData result
+        with nogil:
+            metadata = self.writer.get().metadata()
+        if metadata:
+            result = FileMetaData()
+            result.init(metadata)
+            return result
+        raise RuntimeError(
+            'file metadata is only available after writer close')
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 69187bc..78f7c0f 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -340,6 +340,11 @@ Parameters
 where : path or file-like object
 schema : arrow Schema
 {0}
+**options : dict
+    If options contains a key `metadata_collector` then the
+    corresponding value is assumed to be a list (or any object with
+    `.append` method) that will be filled with file metadata instances
+    of dataset pieces.
 """.format(_parquet_writer_arg_docs)
 
     def __init__(self, where, schema, filesystem=None,
@@ -373,7 +378,7 @@ schema : arrow Schema
             sink = self.file_handle = filesystem.open(path, 'wb')
         else:
             sink = where
-
+        self._metadata_collector = options.pop('metadata_collector', None)
         self.writer = _parquet.ParquetWriter(
             sink, schema,
             version=version,
@@ -412,6 +417,8 @@ schema : arrow Schema
         if self.is_open:
             self.writer.close()
             self.is_open = False
+            if self._metadata_collector is not None:
+                self._metadata_collector.append(self.writer.metadata)
         if self.file_handle is not None:
             self.file_handle.close()
 
@@ -1270,8 +1277,7 @@ def _mkdir_if_not_exists(fs, path):
 
 def write_to_dataset(table, root_path, partition_cols=None, filesystem=None,
                      preserve_index=None, **kwargs):
-    """
-    Wrapper around parquet.write_table for writing a Table to
+    """Wrapper around parquet.write_table for writing a Table to
     Parquet format by partitions.
     For each combination of partition columns and values,
     a subdirectories are created in the following
@@ -1300,7 +1306,11 @@ def write_to_dataset(table, root_path, partition_cols=None, filesystem=None,
     partition_cols : list,
         Column names by which to partition the dataset
         Columns are partitioned in the order they are given
-    **kwargs : dict, kwargs for write_table function.
+    **kwargs : dict,
+        kwargs for write_table function. Using `metadata_collector` in
+        kwargs allows one to collect the file metadata instances of
+        dataset pieces. See `ParquetWriter.__doc__` for more
+        information.
     """
     if preserve_index is not None:
         warnings.warn('preserve_index argument is deprecated as of 0.13.0 and '
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index f480215..8e31a1a 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -2620,3 +2620,34 @@ def test_read_column_invalid_index():
     for index in (-1, 2):
         with pytest.raises((ValueError, IndexError)):
             f.reader.read_column(index)
+
+
+def test_dataset_metadata(tempdir):
+    path = tempdir / "ARROW-1983-dataset"
+
+    # create and write a test dataset
+    df = pd.DataFrame({
+        'one': [1, 2, 3],
+        'two': [-1, -2, -3],
+        'three': [[1, 2], [2, 3], [3, 4]],
+        })
+    table = pa.Table.from_pandas(df)
+
+    metadata_list = []
+    pq.write_to_dataset(table, root_path=str(path),
+                        partition_cols=['one', 'two'],
+                        metadata_collector=metadata_list)
+
+    # open the dataset and collect metadata from pieces:
+    dataset = pq.ParquetDataset(path)
+    metadata_list2 = [p.get_metadata() for p in dataset.pieces]
+
+    # compare metadata list content:
+    assert len(metadata_list) == len(metadata_list2)
+    for md, md2 in zip(metadata_list, metadata_list2):
+        d = md.to_dict()
+        d2 = md2.to_dict()
+        # serialized_size is initialized in the reader:
+        assert d.pop('serialized_size') == 0
+        assert d2.pop('serialized_size') > 0
+        assert d == d2