You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/05/06 15:12:26 UTC
[arrow] branch master updated: ARROW-5258: [C++/Python] Collect
file metadata of dataset pieces
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 790c142 ARROW-5258: [C++/Python] Collect file metadata of dataset pieces
790c142 is described below
commit 790c142f360e40d405fe71056ecdccfa707aef27
Author: Pearu Peterson <pe...@gmail.com>
AuthorDate: Mon May 6 10:10:17 2019 -0500
ARROW-5258: [C++/Python] Collect file metadata of dataset pieces
This PR supersedes PR #4166 and provides
- [x] file metadata API (C++)
- [x] ParquetWriter `metadata` property, only available after calling `close` method (Python)
- [x] FileMetaData `to_dict` method for collecting file metadata information to a dictionary (Python)
- [x] support for `metadata_collector` kw argument to `ParquetWriter` and `write_to_dataset` for collecting the file metadata instances of dataset pieces.
- [x] unit-test
Author: Pearu Peterson <pe...@gmail.com>
Closes #4236 from pearu/pearu/arrow-1983-2 and squashes the following commits:
11419a8de <Pearu Peterson> Remove unnecessary file_metadata_.reset calls.
d6c9f600b <Pearu Peterson> Fix lint error
6375d9cbe <Pearu Peterson> Expose file metadata to Python. Introduce metadata_collector kw argument to ParquetWriter, used to collect metadata instances of dataset pieces.
940b93505 <Pearu Peterson> Introduce ParquetFileWriter::metadata.
---
cpp/src/parquet/arrow/writer.cc | 6 +++
cpp/src/parquet/arrow/writer.h | 2 +
cpp/src/parquet/file_writer.cc | 9 ++++-
cpp/src/parquet/file_writer.h | 7 ++++
python/pyarrow/_parquet.pxd | 2 +
python/pyarrow/_parquet.pyx | 72 ++++++++++++++++++++++++++++++++++++
python/pyarrow/parquet.py | 18 +++++++--
python/pyarrow/tests/test_parquet.py | 31 ++++++++++++++++
8 files changed, 141 insertions(+), 6 deletions(-)
diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc
index f9092a3..67a71fb 100644
--- a/cpp/src/parquet/arrow/writer.cc
+++ b/cpp/src/parquet/arrow/writer.cc
@@ -1058,6 +1058,8 @@ class FileWriter::Impl {
virtual ~Impl() {}
+ const std::shared_ptr<FileMetaData> metadata() const { return writer_->metadata(); }
+
private:
friend class FileWriter;
@@ -1089,6 +1091,10 @@ Status FileWriter::Close() { return impl_->Close(); }
MemoryPool* FileWriter::memory_pool() const { return impl_->memory_pool(); }
+const std::shared_ptr<FileMetaData> FileWriter::metadata() const {
+ return impl_->metadata();
+}
+
FileWriter::~FileWriter() {}
FileWriter::FileWriter(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h
index 56cd895..20e38ac 100644
--- a/cpp/src/parquet/arrow/writer.h
+++ b/cpp/src/parquet/arrow/writer.h
@@ -181,6 +181,8 @@ class PARQUET_EXPORT FileWriter {
::arrow::MemoryPool* memory_pool() const;
+ const std::shared_ptr<FileMetaData> metadata() const;
+
private:
class PARQUET_NO_EXPORT Impl;
std::unique_ptr<Impl> impl_;
diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc
index 51f0cb4..6d50e8b 100644
--- a/cpp/src/parquet/file_writer.cc
+++ b/cpp/src/parquet/file_writer.cc
@@ -258,8 +258,8 @@ class FileSerializer : public ParquetFileWriter::Contents {
row_group_writer_.reset();
// Write magic bytes and metadata
- auto metadata = metadata_->Finish();
- WriteFileMetaData(*metadata, sink_.get());
+ file_metadata_ = metadata_->Finish();
+ WriteFileMetaData(*file_metadata_, sink_.get());
sink_->Close();
}
@@ -389,6 +389,10 @@ const std::shared_ptr<const KeyValueMetadata>& ParquetFileWriter::key_value_meta
return contents_->key_value_metadata();
}
+const std::shared_ptr<FileMetaData> ParquetFileWriter::metadata() const {
+ return file_metadata_;
+}
+
void ParquetFileWriter::Open(std::unique_ptr<ParquetFileWriter::Contents> contents) {
contents_ = std::move(contents);
}
@@ -396,6 +400,7 @@ void ParquetFileWriter::Open(std::unique_ptr<ParquetFileWriter::Contents> conten
void ParquetFileWriter::Close() {
if (contents_) {
contents_->Close();
+ file_metadata_ = contents_->metadata();
contents_.reset();
}
}
diff --git a/cpp/src/parquet/file_writer.h b/cpp/src/parquet/file_writer.h
index 860500f..a24bdc5 100644
--- a/cpp/src/parquet/file_writer.h
+++ b/cpp/src/parquet/file_writer.h
@@ -148,6 +148,9 @@ class PARQUET_EXPORT ParquetFileWriter {
/// This should be the only place this is stored. Everything else is a const reference
std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
+
+ const std::shared_ptr<FileMetaData> metadata() const { return file_metadata_; }
+ std::shared_ptr<FileMetaData> file_metadata_;
};
ParquetFileWriter();
@@ -216,9 +219,13 @@ class PARQUET_EXPORT ParquetFileWriter {
/// Returns the file custom metadata
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata() const;
+ /// Returns the file metadata, only available after calling Close().
+ const std::shared_ptr<FileMetaData> metadata() const;
+
private:
// Holds a pointer to an instance of Contents implementation
std::unique_ptr<Contents> contents_;
+ std::shared_ptr<FileMetaData> file_metadata_;
};
} // namespace parquet
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index eedddff..f1b44b0 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -323,6 +323,8 @@ cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil:
CStatus NewRowGroup(int64_t chunk_size)
CStatus Close()
+ const shared_ptr[CFileMetaData] metadata() const
+
cdef cppclass ArrowWriterProperties:
cppclass Builder:
Builder()
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index 3ee174e..f074852 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -68,6 +68,18 @@ cdef class RowGroupStatistics:
self.num_values,
self.physical_type)
+ def to_dict(self):
+ d = dict(
+ has_min_max=self.has_min_max,
+ min=self.min,
+ max=self.max,
+ null_count=self.null_count,
+ distinct_count=self.distinct_count,
+ num_values=self.num_values,
+ physical_type=self.physical_type
+ )
+ return d
+
def __eq__(self, other):
try:
return self.equals(other)
@@ -192,6 +204,25 @@ cdef class ColumnChunkMetaData:
self.total_compressed_size,
self.total_uncompressed_size)
+ def to_dict(self):
+ d = dict(
+ file_offset=self.file_offset,
+ file_path=self.file_path,
+ physical_type=self.physical_type,
+ num_values=self.num_values,
+ path_in_schema=self.path_in_schema,
+ is_stats_set=self.is_stats_set,
+ statistics=self.statistics.to_dict(),
+ compression=self.compression,
+ encodings=self.encodings,
+ has_dictionary_page=self.has_dictionary_page,
+ dictionary_page_offset=self.dictionary_page_offset,
+ data_page_offset=self.data_page_offset,
+ total_compressed_size=self.total_compressed_size,
+ total_uncompressed_size=self.total_uncompressed_size
+ )
+ return d
+
def __eq__(self, other):
try:
return self.equals(other)
@@ -338,6 +369,18 @@ cdef class RowGroupMetaData:
self.num_rows,
self.total_byte_size)
+ def to_dict(self):
+ columns = []
+ d = dict(
+ num_columns=self.num_columns,
+ num_rows=self.num_rows,
+ total_byte_size=self.total_byte_size,
+ columns=columns,
+ )
+ for i in range(self.num_columns):
+ columns.append(self.column(i).to_dict())
+ return d
+
@property
def num_columns(self):
return self.metadata.num_columns()
@@ -396,6 +439,21 @@ cdef class FileMetaData:
self.format_version,
self.serialized_size)
+ def to_dict(self):
+ row_groups = []
+ d = dict(
+ created_by=self.created_by,
+ num_columns=self.num_columns,
+ num_rows=self.num_rows,
+ num_row_groups=self.num_row_groups,
+ row_groups=row_groups,
+ format_version=self.format_version,
+ serialized_size=self.serialized_size
+ )
+ for i in range(self.num_row_groups):
+ row_groups.append(self.row_group(i).to_dict())
+ return d
+
def __eq__(self, other):
try:
return self.equals(other)
@@ -1029,3 +1087,17 @@ cdef class ParquetWriter:
with nogil:
check_status(self.writer.get()
.WriteTable(deref(ctable), c_row_group_size))
+
+ @property
+ def metadata(self):
+ cdef:
+ shared_ptr[CFileMetaData] metadata
+ FileMetaData result
+ with nogil:
+ metadata = self.writer.get().metadata()
+ if metadata:
+ result = FileMetaData()
+ result.init(metadata)
+ return result
+ raise RuntimeError(
+ 'file metadata is only available after writer close')
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 69187bc..78f7c0f 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -340,6 +340,11 @@ Parameters
where : path or file-like object
schema : arrow Schema
{0}
+**options : dict
+ If options contains a key `metadata_collector` then the
+ corresponding value is assumed to be a list (or any object with
+ `.append` method) that will be filled with file metadata instances
+ of dataset pieces.
""".format(_parquet_writer_arg_docs)
def __init__(self, where, schema, filesystem=None,
@@ -373,7 +378,7 @@ schema : arrow Schema
sink = self.file_handle = filesystem.open(path, 'wb')
else:
sink = where
-
+ self._metadata_collector = options.pop('metadata_collector', None)
self.writer = _parquet.ParquetWriter(
sink, schema,
version=version,
@@ -412,6 +417,8 @@ schema : arrow Schema
if self.is_open:
self.writer.close()
self.is_open = False
+ if self._metadata_collector is not None:
+ self._metadata_collector.append(self.writer.metadata)
if self.file_handle is not None:
self.file_handle.close()
@@ -1270,8 +1277,7 @@ def _mkdir_if_not_exists(fs, path):
def write_to_dataset(table, root_path, partition_cols=None, filesystem=None,
preserve_index=None, **kwargs):
- """
- Wrapper around parquet.write_table for writing a Table to
+ """Wrapper around parquet.write_table for writing a Table to
Parquet format by partitions.
For each combination of partition columns and values,
a subdirectories are created in the following
@@ -1300,7 +1306,11 @@ def write_to_dataset(table, root_path, partition_cols=None, filesystem=None,
partition_cols : list,
Column names by which to partition the dataset
Columns are partitioned in the order they are given
- **kwargs : dict, kwargs for write_table function.
+ **kwargs : dict,
+ kwargs for write_table function. Using `metadata_collector` in
+ kwargs allows one to collect the file metadata instances of
+ dataset pieces. See `ParquetWriter.__doc__` for more
+ information.
"""
if preserve_index is not None:
warnings.warn('preserve_index argument is deprecated as of 0.13.0 and '
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index f480215..8e31a1a 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -2620,3 +2620,34 @@ def test_read_column_invalid_index():
for index in (-1, 2):
with pytest.raises((ValueError, IndexError)):
f.reader.read_column(index)
+
+
+def test_dataset_metadata(tempdir):
+ path = tempdir / "ARROW-1983-dataset"
+
+ # create and write a test dataset
+ df = pd.DataFrame({
+ 'one': [1, 2, 3],
+ 'two': [-1, -2, -3],
+ 'three': [[1, 2], [2, 3], [3, 4]],
+ })
+ table = pa.Table.from_pandas(df)
+
+ metadata_list = []
+ pq.write_to_dataset(table, root_path=str(path),
+ partition_cols=['one', 'two'],
+ metadata_collector=metadata_list)
+
+ # open the dataset and collect metadata from pieces:
+ dataset = pq.ParquetDataset(path)
+ metadata_list2 = [p.get_metadata() for p in dataset.pieces]
+
+ # compare metadata list content:
+ assert len(metadata_list) == len(metadata_list2)
+ for md, md2 in zip(metadata_list, metadata_list2):
+ d = md.to_dict()
+ d2 = md2.to_dict()
+ # serialized_size is initialized in the reader:
+ assert d.pop('serialized_size') == 0
+ assert d2.pop('serialized_size') > 0
+ assert d == d2