You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/12/27 04:38:11 UTC
[arrow] branch master updated: ARROW-3324: [Python] Destroy
temporary metadata builder classes more eagerly when building files to
reduce memory usage
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 5904eea ARROW-3324: [Python] Destroy temporary metadata builder classes more eagerly when building files to reduce memory usage
5904eea is described below
commit 5904eea4cc2f422c14c8ef9d1ac323718ff765ea
Author: Wes McKinney <we...@apache.org>
AuthorDate: Wed Dec 26 22:38:00 2018 -0600
ARROW-3324: [Python] Destroy temporary metadata builder classes more eagerly when building files to reduce memory usage
Destroy RowGroupMetadataBuilder after each row group is completed
Author: Wes McKinney <we...@apache.org>
Closes #3261 from tanyaschlusser/ARROW-3324 and squashes the following commits:
5f3876706 <Wes McKinney> Refine case a bit
4f2bdcdce <Wes McKinney> Destroy RowGroupMetadataBuilder object after completing a row group to reduce memory usage
---
cpp/src/parquet/metadata-test.cc | 2 +-
cpp/src/parquet/metadata.cc | 67 ++++++++++++++++------------------------
cpp/src/parquet/metadata.h | 25 ++++++++-------
python/scripts/test_leak.py | 66 +++++++++++++++++++++++++++++++--------
4 files changed, 93 insertions(+), 67 deletions(-)
diff --git a/cpp/src/parquet/metadata-test.cc b/cpp/src/parquet/metadata-test.cc
index bcf911e..826ac4d 100644
--- a/cpp/src/parquet/metadata-test.cc
+++ b/cpp/src/parquet/metadata-test.cc
@@ -59,7 +59,6 @@ TEST(Metadata, TestBuildAccess) {
auto f_builder = FileMetaDataBuilder::Make(&schema, props);
auto rg1_builder = f_builder->AppendRowGroup();
- auto rg2_builder = f_builder->AppendRowGroup();
// Write the metadata
// rowgroup1 metadata
@@ -75,6 +74,7 @@ TEST(Metadata, TestBuildAccess) {
rg1_builder->Finish(1024);
// rowgroup2 metadata
+ auto rg2_builder = f_builder->AppendRowGroup();
col1_builder = rg2_builder->NextColumnChunk();
col2_builder = rg2_builder->NextColumnChunk();
// column metadata
diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc
index 22cfbdb..6ac53c5 100644
--- a/cpp/src/parquet/metadata.cc
+++ b/cpp/src/parquet/metadata.cc
@@ -115,7 +115,6 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
}
possible_stats_ = nullptr;
}
- ~ColumnChunkMetaDataImpl() {}
// column chunk
inline int64_t file_offset() const { return column_->file_offset; }
@@ -197,13 +196,13 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
};
std::unique_ptr<ColumnChunkMetaData> ColumnChunkMetaData::Make(
- const uint8_t* metadata, const ColumnDescriptor* descr,
+ const void* metadata, const ColumnDescriptor* descr,
const ApplicationVersion* writer_version) {
return std::unique_ptr<ColumnChunkMetaData>(
new ColumnChunkMetaData(metadata, descr, writer_version));
}
-ColumnChunkMetaData::ColumnChunkMetaData(const uint8_t* metadata,
+ColumnChunkMetaData::ColumnChunkMetaData(const void* metadata,
const ColumnDescriptor* descr,
const ApplicationVersion* writer_version)
: impl_{std::unique_ptr<ColumnChunkMetaDataImpl>(new ColumnChunkMetaDataImpl(
@@ -272,7 +271,6 @@ class RowGroupMetaData::RowGroupMetaDataImpl {
const SchemaDescriptor* schema,
const ApplicationVersion* writer_version)
: row_group_(row_group), schema_(schema), writer_version_(writer_version) {}
- ~RowGroupMetaDataImpl() {}
inline int num_columns() const { return static_cast<int>(row_group_->columns.size()); }
@@ -289,9 +287,8 @@ class RowGroupMetaData::RowGroupMetaDataImpl {
<< " columns, requested metadata for column: " << i;
throw ParquetException(ss.str());
}
- return ColumnChunkMetaData::Make(
- reinterpret_cast<const uint8_t*>(&row_group_->columns[i]), schema_->Column(i),
- writer_version_);
+ return ColumnChunkMetaData::Make(&row_group_->columns[i], schema_->Column(i),
+ writer_version_);
}
private:
@@ -301,14 +298,13 @@ class RowGroupMetaData::RowGroupMetaDataImpl {
};
std::unique_ptr<RowGroupMetaData> RowGroupMetaData::Make(
- const uint8_t* metadata, const SchemaDescriptor* schema,
+ const void* metadata, const SchemaDescriptor* schema,
const ApplicationVersion* writer_version) {
return std::unique_ptr<RowGroupMetaData>(
new RowGroupMetaData(metadata, schema, writer_version));
}
-RowGroupMetaData::RowGroupMetaData(const uint8_t* metadata,
- const SchemaDescriptor* schema,
+RowGroupMetaData::RowGroupMetaData(const void* metadata, const SchemaDescriptor* schema,
const ApplicationVersion* writer_version)
: impl_{std::unique_ptr<RowGroupMetaDataImpl>(new RowGroupMetaDataImpl(
reinterpret_cast<const format::RowGroup*>(metadata), schema, writer_version))} {
@@ -332,10 +328,11 @@ class FileMetaData::FileMetaDataImpl {
public:
FileMetaDataImpl() : metadata_len_(0) {}
- explicit FileMetaDataImpl(const uint8_t* metadata, uint32_t* metadata_len)
+ explicit FileMetaDataImpl(const void* metadata, uint32_t* metadata_len)
: metadata_len_(0) {
metadata_.reset(new format::FileMetaData);
- DeserializeThriftMsg(metadata, metadata_len, metadata_.get());
+ DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(metadata), metadata_len,
+ metadata_.get());
metadata_len_ = *metadata_len;
if (metadata_->__isset.created_by) {
@@ -348,7 +345,6 @@ class FileMetaData::FileMetaDataImpl {
InitColumnOrders();
InitKeyValueMetadata();
}
- ~FileMetaDataImpl() {}
inline uint32_t size() const { return metadata_len_; }
inline int num_columns() const { return schema_.num_columns(); }
@@ -375,9 +371,7 @@ class FileMetaData::FileMetaDataImpl {
<< " row groups, requested metadata for row group: " << i;
throw ParquetException(ss.str());
}
- return RowGroupMetaData::Make(
- reinterpret_cast<const uint8_t*>(&metadata_->row_groups[i]), &schema_,
- &writer_version_);
+ return RowGroupMetaData::Make(&metadata_->row_groups[i], &schema_, &writer_version_);
}
const SchemaDescriptor* schema() const { return &schema_; }
@@ -429,13 +423,13 @@ class FileMetaData::FileMetaDataImpl {
std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
};
-std::shared_ptr<FileMetaData> FileMetaData::Make(const uint8_t* metadata,
+std::shared_ptr<FileMetaData> FileMetaData::Make(const void* metadata,
uint32_t* metadata_len) {
// This FileMetaData ctor is private, not compatible with std::make_shared
return std::shared_ptr<FileMetaData>(new FileMetaData(metadata, metadata_len));
}
-FileMetaData::FileMetaData(const uint8_t* metadata, uint32_t* metadata_len)
+FileMetaData::FileMetaData(const void* metadata, uint32_t* metadata_len)
: impl_{std::unique_ptr<FileMetaDataImpl>(
new FileMetaDataImpl(metadata, metadata_len))} {}
@@ -606,11 +600,7 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
Init(column_chunk);
}
- ~ColumnChunkMetaDataBuilderImpl() {}
-
- const uint8_t* contents() const {
- return reinterpret_cast<const uint8_t*>(column_chunk_);
- }
+ const void* contents() const { return column_chunk_; }
// column chunk
void set_file_path(const std::string& val) { column_chunk_->__set_file_path(val); }
@@ -699,7 +689,7 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
std::unique_ptr<ColumnChunkMetaDataBuilder> ColumnChunkMetaDataBuilder::Make(
const std::shared_ptr<WriterProperties>& props, const ColumnDescriptor* column,
- uint8_t* contents) {
+ void* contents) {
return std::unique_ptr<ColumnChunkMetaDataBuilder>(
new ColumnChunkMetaDataBuilder(props, column, contents));
}
@@ -717,14 +707,14 @@ ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilder(
ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilder(
const std::shared_ptr<WriterProperties>& props, const ColumnDescriptor* column,
- uint8_t* contents)
+ void* contents)
: impl_{std::unique_ptr<ColumnChunkMetaDataBuilderImpl>(
new ColumnChunkMetaDataBuilderImpl(
props, column, reinterpret_cast<format::ColumnChunk*>(contents)))} {}
ColumnChunkMetaDataBuilder::~ColumnChunkMetaDataBuilder() {}
-const uint8_t* ColumnChunkMetaDataBuilder::contents() const { return impl_->contents(); }
+const void* ColumnChunkMetaDataBuilder::contents() const { return impl_->contents(); }
void ColumnChunkMetaDataBuilder::set_file_path(const std::string& path) {
impl_->set_file_path(path);
@@ -754,12 +744,11 @@ void ColumnChunkMetaDataBuilder::SetStatistics(bool is_signed,
class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
public:
explicit RowGroupMetaDataBuilderImpl(const std::shared_ptr<WriterProperties>& props,
- const SchemaDescriptor* schema, uint8_t* contents)
+ const SchemaDescriptor* schema, void* contents)
: properties_(props), schema_(schema), current_column_(0) {
row_group_ = reinterpret_cast<format::RowGroup*>(contents);
InitializeColumns(schema->num_columns());
}
- ~RowGroupMetaDataBuilderImpl() {}
ColumnChunkMetaDataBuilder* NextColumnChunk() {
if (!(current_column_ < num_columns())) {
@@ -770,8 +759,7 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
}
auto column = schema_->Column(current_column_);
auto column_builder = ColumnChunkMetaDataBuilder::Make(
- properties_, column,
- reinterpret_cast<uint8_t*>(&row_group_->columns[current_column_++]));
+ properties_, column, &row_group_->columns[current_column_++]);
auto column_builder_ptr = column_builder.get();
column_builders_.push_back(std::move(column_builder));
return column_builder_ptr;
@@ -820,14 +808,14 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
std::unique_ptr<RowGroupMetaDataBuilder> RowGroupMetaDataBuilder::Make(
const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_,
- uint8_t* contents) {
+ void* contents) {
return std::unique_ptr<RowGroupMetaDataBuilder>(
new RowGroupMetaDataBuilder(props, schema_, contents));
}
RowGroupMetaDataBuilder::RowGroupMetaDataBuilder(
const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_,
- uint8_t* contents)
+ void* contents)
: impl_{std::unique_ptr<RowGroupMetaDataBuilderImpl>(
new RowGroupMetaDataBuilderImpl(props, schema_, contents))} {}
@@ -861,16 +849,12 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
: properties_(props), schema_(schema), key_value_metadata_(key_value_metadata) {
metadata_.reset(new format::FileMetaData());
}
- ~FileMetaDataBuilderImpl() {}
RowGroupMetaDataBuilder* AppendRowGroup() {
- auto row_group = std::unique_ptr<format::RowGroup>(new format::RowGroup());
- auto row_group_builder = RowGroupMetaDataBuilder::Make(
- properties_, schema_, reinterpret_cast<uint8_t*>(row_group.get()));
- RowGroupMetaDataBuilder* row_group_ptr = row_group_builder.get();
- row_group_builders_.push_back(std::move(row_group_builder));
- row_groups_.push_back(std::move(row_group));
- return row_group_ptr;
+ row_groups_.emplace_back(new format::RowGroup);
+ current_row_group_builder_ =
+ RowGroupMetaDataBuilder::Make(properties_, schema_, row_groups_.back().get());
+ return current_row_group_builder_.get();
}
std::unique_ptr<FileMetaData> Finish() {
@@ -939,7 +923,8 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
private:
const std::shared_ptr<WriterProperties> properties_;
std::vector<std::unique_ptr<format::RowGroup>> row_groups_;
- std::vector<std::unique_ptr<RowGroupMetaDataBuilder>> row_group_builders_;
+
+ std::unique_ptr<RowGroupMetaDataBuilder> current_row_group_builder_;
const SchemaDescriptor* schema_;
std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
};
diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h
index 25f4d4c..209c75a 100644
--- a/cpp/src/parquet/metadata.h
+++ b/cpp/src/parquet/metadata.h
@@ -93,7 +93,7 @@ class PARQUET_EXPORT ColumnChunkMetaData {
public:
// API convenience to get a MetaData accessor
static std::unique_ptr<ColumnChunkMetaData> Make(
- const uint8_t* metadata, const ColumnDescriptor* descr,
+ const void* metadata, const ColumnDescriptor* descr,
const ApplicationVersion* writer_version = NULLPTR);
~ColumnChunkMetaData();
@@ -119,7 +119,7 @@ class PARQUET_EXPORT ColumnChunkMetaData {
int64_t total_uncompressed_size() const;
private:
- explicit ColumnChunkMetaData(const uint8_t* metadata, const ColumnDescriptor* descr,
+ explicit ColumnChunkMetaData(const void* metadata, const ColumnDescriptor* descr,
const ApplicationVersion* writer_version = NULLPTR);
// PIMPL Idiom
class ColumnChunkMetaDataImpl;
@@ -130,7 +130,7 @@ class PARQUET_EXPORT RowGroupMetaData {
public:
// API convenience to get a MetaData accessor
static std::unique_ptr<RowGroupMetaData> Make(
- const uint8_t* metadata, const SchemaDescriptor* schema,
+ const void* metadata, const SchemaDescriptor* schema,
const ApplicationVersion* writer_version = NULLPTR);
~RowGroupMetaData();
@@ -144,7 +144,7 @@ class PARQUET_EXPORT RowGroupMetaData {
std::unique_ptr<ColumnChunkMetaData> ColumnChunk(int i) const;
private:
- explicit RowGroupMetaData(const uint8_t* metadata, const SchemaDescriptor* schema,
+ explicit RowGroupMetaData(const void* metadata, const SchemaDescriptor* schema,
const ApplicationVersion* writer_version = NULLPTR);
// PIMPL Idiom
class RowGroupMetaDataImpl;
@@ -156,7 +156,7 @@ class FileMetaDataBuilder;
class PARQUET_EXPORT FileMetaData {
public:
// API convenience to get a MetaData accessor
- static std::shared_ptr<FileMetaData> Make(const uint8_t* serialized_metadata,
+ static std::shared_ptr<FileMetaData> Make(const void* serialized_metadata,
uint32_t* metadata_len);
~FileMetaData();
@@ -182,7 +182,7 @@ class PARQUET_EXPORT FileMetaData {
private:
friend FileMetaDataBuilder;
- explicit FileMetaData(const uint8_t* serialized_metadata, uint32_t* metadata_len);
+ explicit FileMetaData(const void* serialized_metadata, uint32_t* metadata_len);
// PIMPL Idiom
FileMetaData();
@@ -199,7 +199,7 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder {
static std::unique_ptr<ColumnChunkMetaDataBuilder> Make(
const std::shared_ptr<WriterProperties>& props, const ColumnDescriptor* column,
- uint8_t* contents);
+ void* contents);
~ColumnChunkMetaDataBuilder();
@@ -217,7 +217,7 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder {
bool dictionary_fallback);
// The metadata contents, suitable for passing to ColumnChunkMetaData::Make
- const uint8_t* contents() const;
+ const void* contents() const;
// For writing metadata at end of column chunk
void WriteTo(OutputStream* sink);
@@ -226,7 +226,7 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder {
explicit ColumnChunkMetaDataBuilder(const std::shared_ptr<WriterProperties>& props,
const ColumnDescriptor* column);
explicit ColumnChunkMetaDataBuilder(const std::shared_ptr<WriterProperties>& props,
- const ColumnDescriptor* column, uint8_t* contents);
+ const ColumnDescriptor* column, void* contents);
// PIMPL Idiom
class ColumnChunkMetaDataBuilderImpl;
std::unique_ptr<ColumnChunkMetaDataBuilderImpl> impl_;
@@ -237,7 +237,7 @@ class PARQUET_EXPORT RowGroupMetaDataBuilder {
// API convenience to get a MetaData reader
static std::unique_ptr<RowGroupMetaDataBuilder> Make(
const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_,
- uint8_t* contents);
+ void* contents);
~RowGroupMetaDataBuilder();
@@ -253,7 +253,7 @@ class PARQUET_EXPORT RowGroupMetaDataBuilder {
private:
explicit RowGroupMetaDataBuilder(const std::shared_ptr<WriterProperties>& props,
- const SchemaDescriptor* schema_, uint8_t* contents);
+ const SchemaDescriptor* schema_, void* contents);
// PIMPL Idiom
class RowGroupMetaDataBuilderImpl;
std::unique_ptr<RowGroupMetaDataBuilderImpl> impl_;
@@ -268,9 +268,10 @@ class PARQUET_EXPORT FileMetaDataBuilder {
~FileMetaDataBuilder();
+ // The prior RowGroupMetaDataBuilder (if any) is destroyed
RowGroupMetaDataBuilder* AppendRowGroup();
- // commit the metadata
+ // Complete the Thrift structure
std::unique_ptr<FileMetaData> Finish();
private:
diff --git a/python/scripts/test_leak.py b/python/scripts/test_leak.py
index e3de56b..d3984a8 100644
--- a/python/scripts/test_leak.py
+++ b/python/scripts/test_leak.py
@@ -19,29 +19,49 @@
import pyarrow as pa
import numpy as np
+import pandas as pd
+import pandas.util.testing as tm
import memory_profiler
import gc
import io
+MEGABYTE = 1 << 20
-def leak():
+
+def assert_does_not_leak(f, iterations=10, check_interval=1, tolerance=5):
+ gc.collect()
+ baseline = memory_profiler.memory_usage()[0]
+ for i in range(iterations):
+ f()
+ if i % check_interval == 0:
+ gc.collect()
+ usage = memory_profiler.memory_usage()[0]
+ diff = usage - baseline
+ print("{0}: {1}\r".format(i, diff), end="")
+ if diff > tolerance:
+ raise Exception("Memory increased by {0} megabytes after {1} "
+ "iterations".format(diff, i + 1))
+ gc.collect()
+ usage = memory_profiler.memory_usage()[0]
+ diff = usage - baseline
+ print("\nMemory increased by {0} megabytes after {1} "
+ "iterations".format(diff, iterations))
+
+
+def test_leak1():
data = [pa.array(np.concatenate([np.random.randn(100000)] * 1000))]
table = pa.Table.from_arrays(data, ['foo'])
- while True:
- print('calling to_pandas')
- print('memory_usage: {0}'.format(memory_profiler.memory_usage()))
- table.to_pandas()
- gc.collect()
-# leak()
+ def func():
+ table.to_pandas()
+ assert_does_not_leak(func)
-def leak2():
+def test_leak2():
data = [pa.array(np.concatenate([np.random.randn(100000)] * 10))]
table = pa.Table.from_arrays(data, ['foo'])
- while True:
- print('calling to_pandas')
- print('memory_usage: {0}'.format(memory_profiler.memory_usage()))
+
+ def func():
df = table.to_pandas()
batch = pa.RecordBatch.from_pandas(df)
@@ -55,7 +75,27 @@ def leak2():
reader = pa.open_file(buf_reader)
reader.read_all()
- gc.collect()
+ assert_does_not_leak(func, iterations=50, tolerance=50)
+
+
+def test_leak3():
+ import pyarrow.parquet as pq
+
+ df = pd.DataFrame({'a{0}'.format(i): [1, 2, 3, 4]
+ for i in range(50)})
+ table = pa.Table.from_pandas(df, preserve_index=False)
+
+ writer = pq.ParquetWriter('leak_test_' + tm.rands(5) + '.parquet',
+ table.schema)
+
+ def func():
+ writer.write_table(table, row_group_size=len(table))
+
+ # This does not "leak" per se but we do want to have this use as little
+ # memory as possible
+ assert_does_not_leak(func, iterations=500,
+ check_interval=50, tolerance=20)
-leak2()
+if __name__ == '__main__':
+ test_leak3()