You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/12/27 04:38:11 UTC

[arrow] branch master updated: ARROW-3324: [Python] Destroy temporary metadata builder classes more eagerly when building files to reduce memory usage

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 5904eea  ARROW-3324: [Python] Destroy temporary metadata builder classes more eagerly when building files to reduce memory usage
5904eea is described below

commit 5904eea4cc2f422c14c8ef9d1ac323718ff765ea
Author: Wes McKinney <we...@apache.org>
AuthorDate: Wed Dec 26 22:38:00 2018 -0600

    ARROW-3324: [Python] Destroy temporary metadata builder classes more eagerly when building files to reduce memory usage
    
    Destroy RowGroupMetadataBuilder after each row group is completed
    
    Author: Wes McKinney <we...@apache.org>
    
    Closes #3261 from tanyaschlusser/ARROW-3324 and squashes the following commits:
    
    5f3876706 <Wes McKinney> Refine case a bit
    4f2bdcdce <Wes McKinney> Destroy RowGroupMetadataBuilder object after completing a row group to reduce memory usage
---
 cpp/src/parquet/metadata-test.cc |  2 +-
 cpp/src/parquet/metadata.cc      | 67 ++++++++++++++++------------------------
 cpp/src/parquet/metadata.h       | 25 ++++++++-------
 python/scripts/test_leak.py      | 66 +++++++++++++++++++++++++++++++--------
 4 files changed, 93 insertions(+), 67 deletions(-)

diff --git a/cpp/src/parquet/metadata-test.cc b/cpp/src/parquet/metadata-test.cc
index bcf911e..826ac4d 100644
--- a/cpp/src/parquet/metadata-test.cc
+++ b/cpp/src/parquet/metadata-test.cc
@@ -59,7 +59,6 @@ TEST(Metadata, TestBuildAccess) {
 
   auto f_builder = FileMetaDataBuilder::Make(&schema, props);
   auto rg1_builder = f_builder->AppendRowGroup();
-  auto rg2_builder = f_builder->AppendRowGroup();
 
   // Write the metadata
   // rowgroup1 metadata
@@ -75,6 +74,7 @@ TEST(Metadata, TestBuildAccess) {
   rg1_builder->Finish(1024);
 
   // rowgroup2 metadata
+  auto rg2_builder = f_builder->AppendRowGroup();
   col1_builder = rg2_builder->NextColumnChunk();
   col2_builder = rg2_builder->NextColumnChunk();
   // column metadata
diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc
index 22cfbdb..6ac53c5 100644
--- a/cpp/src/parquet/metadata.cc
+++ b/cpp/src/parquet/metadata.cc
@@ -115,7 +115,6 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
     }
     possible_stats_ = nullptr;
   }
-  ~ColumnChunkMetaDataImpl() {}
 
   // column chunk
   inline int64_t file_offset() const { return column_->file_offset; }
@@ -197,13 +196,13 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
 };
 
 std::unique_ptr<ColumnChunkMetaData> ColumnChunkMetaData::Make(
-    const uint8_t* metadata, const ColumnDescriptor* descr,
+    const void* metadata, const ColumnDescriptor* descr,
     const ApplicationVersion* writer_version) {
   return std::unique_ptr<ColumnChunkMetaData>(
       new ColumnChunkMetaData(metadata, descr, writer_version));
 }
 
-ColumnChunkMetaData::ColumnChunkMetaData(const uint8_t* metadata,
+ColumnChunkMetaData::ColumnChunkMetaData(const void* metadata,
                                          const ColumnDescriptor* descr,
                                          const ApplicationVersion* writer_version)
     : impl_{std::unique_ptr<ColumnChunkMetaDataImpl>(new ColumnChunkMetaDataImpl(
@@ -272,7 +271,6 @@ class RowGroupMetaData::RowGroupMetaDataImpl {
                                 const SchemaDescriptor* schema,
                                 const ApplicationVersion* writer_version)
       : row_group_(row_group), schema_(schema), writer_version_(writer_version) {}
-  ~RowGroupMetaDataImpl() {}
 
   inline int num_columns() const { return static_cast<int>(row_group_->columns.size()); }
 
@@ -289,9 +287,8 @@ class RowGroupMetaData::RowGroupMetaDataImpl {
          << " columns, requested metadata for column: " << i;
       throw ParquetException(ss.str());
     }
-    return ColumnChunkMetaData::Make(
-        reinterpret_cast<const uint8_t*>(&row_group_->columns[i]), schema_->Column(i),
-        writer_version_);
+    return ColumnChunkMetaData::Make(&row_group_->columns[i], schema_->Column(i),
+                                     writer_version_);
   }
 
  private:
@@ -301,14 +298,13 @@ class RowGroupMetaData::RowGroupMetaDataImpl {
 };
 
 std::unique_ptr<RowGroupMetaData> RowGroupMetaData::Make(
-    const uint8_t* metadata, const SchemaDescriptor* schema,
+    const void* metadata, const SchemaDescriptor* schema,
     const ApplicationVersion* writer_version) {
   return std::unique_ptr<RowGroupMetaData>(
       new RowGroupMetaData(metadata, schema, writer_version));
 }
 
-RowGroupMetaData::RowGroupMetaData(const uint8_t* metadata,
-                                   const SchemaDescriptor* schema,
+RowGroupMetaData::RowGroupMetaData(const void* metadata, const SchemaDescriptor* schema,
                                    const ApplicationVersion* writer_version)
     : impl_{std::unique_ptr<RowGroupMetaDataImpl>(new RowGroupMetaDataImpl(
           reinterpret_cast<const format::RowGroup*>(metadata), schema, writer_version))} {
@@ -332,10 +328,11 @@ class FileMetaData::FileMetaDataImpl {
  public:
   FileMetaDataImpl() : metadata_len_(0) {}
 
-  explicit FileMetaDataImpl(const uint8_t* metadata, uint32_t* metadata_len)
+  explicit FileMetaDataImpl(const void* metadata, uint32_t* metadata_len)
       : metadata_len_(0) {
     metadata_.reset(new format::FileMetaData);
-    DeserializeThriftMsg(metadata, metadata_len, metadata_.get());
+    DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(metadata), metadata_len,
+                         metadata_.get());
     metadata_len_ = *metadata_len;
 
     if (metadata_->__isset.created_by) {
@@ -348,7 +345,6 @@ class FileMetaData::FileMetaDataImpl {
     InitColumnOrders();
     InitKeyValueMetadata();
   }
-  ~FileMetaDataImpl() {}
 
   inline uint32_t size() const { return metadata_len_; }
   inline int num_columns() const { return schema_.num_columns(); }
@@ -375,9 +371,7 @@ class FileMetaData::FileMetaDataImpl {
          << " row groups, requested metadata for row group: " << i;
       throw ParquetException(ss.str());
     }
-    return RowGroupMetaData::Make(
-        reinterpret_cast<const uint8_t*>(&metadata_->row_groups[i]), &schema_,
-        &writer_version_);
+    return RowGroupMetaData::Make(&metadata_->row_groups[i], &schema_, &writer_version_);
   }
 
   const SchemaDescriptor* schema() const { return &schema_; }
@@ -429,13 +423,13 @@ class FileMetaData::FileMetaDataImpl {
   std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
 };
 
-std::shared_ptr<FileMetaData> FileMetaData::Make(const uint8_t* metadata,
+std::shared_ptr<FileMetaData> FileMetaData::Make(const void* metadata,
                                                  uint32_t* metadata_len) {
   // This FileMetaData ctor is private, not compatible with std::make_shared
   return std::shared_ptr<FileMetaData>(new FileMetaData(metadata, metadata_len));
 }
 
-FileMetaData::FileMetaData(const uint8_t* metadata, uint32_t* metadata_len)
+FileMetaData::FileMetaData(const void* metadata, uint32_t* metadata_len)
     : impl_{std::unique_ptr<FileMetaDataImpl>(
           new FileMetaDataImpl(metadata, metadata_len))} {}
 
@@ -606,11 +600,7 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
     Init(column_chunk);
   }
 
-  ~ColumnChunkMetaDataBuilderImpl() {}
-
-  const uint8_t* contents() const {
-    return reinterpret_cast<const uint8_t*>(column_chunk_);
-  }
+  const void* contents() const { return column_chunk_; }
 
   // column chunk
   void set_file_path(const std::string& val) { column_chunk_->__set_file_path(val); }
@@ -699,7 +689,7 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
 
 std::unique_ptr<ColumnChunkMetaDataBuilder> ColumnChunkMetaDataBuilder::Make(
     const std::shared_ptr<WriterProperties>& props, const ColumnDescriptor* column,
-    uint8_t* contents) {
+    void* contents) {
   return std::unique_ptr<ColumnChunkMetaDataBuilder>(
       new ColumnChunkMetaDataBuilder(props, column, contents));
 }
@@ -717,14 +707,14 @@ ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilder(
 
 ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilder(
     const std::shared_ptr<WriterProperties>& props, const ColumnDescriptor* column,
-    uint8_t* contents)
+    void* contents)
     : impl_{std::unique_ptr<ColumnChunkMetaDataBuilderImpl>(
           new ColumnChunkMetaDataBuilderImpl(
               props, column, reinterpret_cast<format::ColumnChunk*>(contents)))} {}
 
 ColumnChunkMetaDataBuilder::~ColumnChunkMetaDataBuilder() {}
 
-const uint8_t* ColumnChunkMetaDataBuilder::contents() const { return impl_->contents(); }
+const void* ColumnChunkMetaDataBuilder::contents() const { return impl_->contents(); }
 
 void ColumnChunkMetaDataBuilder::set_file_path(const std::string& path) {
   impl_->set_file_path(path);
@@ -754,12 +744,11 @@ void ColumnChunkMetaDataBuilder::SetStatistics(bool is_signed,
 class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
  public:
   explicit RowGroupMetaDataBuilderImpl(const std::shared_ptr<WriterProperties>& props,
-                                       const SchemaDescriptor* schema, uint8_t* contents)
+                                       const SchemaDescriptor* schema, void* contents)
       : properties_(props), schema_(schema), current_column_(0) {
     row_group_ = reinterpret_cast<format::RowGroup*>(contents);
     InitializeColumns(schema->num_columns());
   }
-  ~RowGroupMetaDataBuilderImpl() {}
 
   ColumnChunkMetaDataBuilder* NextColumnChunk() {
     if (!(current_column_ < num_columns())) {
@@ -770,8 +759,7 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
     }
     auto column = schema_->Column(current_column_);
     auto column_builder = ColumnChunkMetaDataBuilder::Make(
-        properties_, column,
-        reinterpret_cast<uint8_t*>(&row_group_->columns[current_column_++]));
+        properties_, column, &row_group_->columns[current_column_++]);
     auto column_builder_ptr = column_builder.get();
     column_builders_.push_back(std::move(column_builder));
     return column_builder_ptr;
@@ -820,14 +808,14 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
 
 std::unique_ptr<RowGroupMetaDataBuilder> RowGroupMetaDataBuilder::Make(
     const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_,
-    uint8_t* contents) {
+    void* contents) {
   return std::unique_ptr<RowGroupMetaDataBuilder>(
       new RowGroupMetaDataBuilder(props, schema_, contents));
 }
 
 RowGroupMetaDataBuilder::RowGroupMetaDataBuilder(
     const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_,
-    uint8_t* contents)
+    void* contents)
     : impl_{std::unique_ptr<RowGroupMetaDataBuilderImpl>(
           new RowGroupMetaDataBuilderImpl(props, schema_, contents))} {}
 
@@ -861,16 +849,12 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
       : properties_(props), schema_(schema), key_value_metadata_(key_value_metadata) {
     metadata_.reset(new format::FileMetaData());
   }
-  ~FileMetaDataBuilderImpl() {}
 
   RowGroupMetaDataBuilder* AppendRowGroup() {
-    auto row_group = std::unique_ptr<format::RowGroup>(new format::RowGroup());
-    auto row_group_builder = RowGroupMetaDataBuilder::Make(
-        properties_, schema_, reinterpret_cast<uint8_t*>(row_group.get()));
-    RowGroupMetaDataBuilder* row_group_ptr = row_group_builder.get();
-    row_group_builders_.push_back(std::move(row_group_builder));
-    row_groups_.push_back(std::move(row_group));
-    return row_group_ptr;
+    row_groups_.emplace_back(new format::RowGroup);
+    current_row_group_builder_ =
+        RowGroupMetaDataBuilder::Make(properties_, schema_, row_groups_.back().get());
+    return current_row_group_builder_.get();
   }
 
   std::unique_ptr<FileMetaData> Finish() {
@@ -939,7 +923,8 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
  private:
   const std::shared_ptr<WriterProperties> properties_;
   std::vector<std::unique_ptr<format::RowGroup>> row_groups_;
-  std::vector<std::unique_ptr<RowGroupMetaDataBuilder>> row_group_builders_;
+
+  std::unique_ptr<RowGroupMetaDataBuilder> current_row_group_builder_;
   const SchemaDescriptor* schema_;
   std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
 };
diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h
index 25f4d4c..209c75a 100644
--- a/cpp/src/parquet/metadata.h
+++ b/cpp/src/parquet/metadata.h
@@ -93,7 +93,7 @@ class PARQUET_EXPORT ColumnChunkMetaData {
  public:
   // API convenience to get a MetaData accessor
   static std::unique_ptr<ColumnChunkMetaData> Make(
-      const uint8_t* metadata, const ColumnDescriptor* descr,
+      const void* metadata, const ColumnDescriptor* descr,
       const ApplicationVersion* writer_version = NULLPTR);
 
   ~ColumnChunkMetaData();
@@ -119,7 +119,7 @@ class PARQUET_EXPORT ColumnChunkMetaData {
   int64_t total_uncompressed_size() const;
 
  private:
-  explicit ColumnChunkMetaData(const uint8_t* metadata, const ColumnDescriptor* descr,
+  explicit ColumnChunkMetaData(const void* metadata, const ColumnDescriptor* descr,
                                const ApplicationVersion* writer_version = NULLPTR);
   // PIMPL Idiom
   class ColumnChunkMetaDataImpl;
@@ -130,7 +130,7 @@ class PARQUET_EXPORT RowGroupMetaData {
  public:
   // API convenience to get a MetaData accessor
   static std::unique_ptr<RowGroupMetaData> Make(
-      const uint8_t* metadata, const SchemaDescriptor* schema,
+      const void* metadata, const SchemaDescriptor* schema,
       const ApplicationVersion* writer_version = NULLPTR);
 
   ~RowGroupMetaData();
@@ -144,7 +144,7 @@ class PARQUET_EXPORT RowGroupMetaData {
   std::unique_ptr<ColumnChunkMetaData> ColumnChunk(int i) const;
 
  private:
-  explicit RowGroupMetaData(const uint8_t* metadata, const SchemaDescriptor* schema,
+  explicit RowGroupMetaData(const void* metadata, const SchemaDescriptor* schema,
                             const ApplicationVersion* writer_version = NULLPTR);
   // PIMPL Idiom
   class RowGroupMetaDataImpl;
@@ -156,7 +156,7 @@ class FileMetaDataBuilder;
 class PARQUET_EXPORT FileMetaData {
  public:
   // API convenience to get a MetaData accessor
-  static std::shared_ptr<FileMetaData> Make(const uint8_t* serialized_metadata,
+  static std::shared_ptr<FileMetaData> Make(const void* serialized_metadata,
                                             uint32_t* metadata_len);
 
   ~FileMetaData();
@@ -182,7 +182,7 @@ class PARQUET_EXPORT FileMetaData {
 
  private:
   friend FileMetaDataBuilder;
-  explicit FileMetaData(const uint8_t* serialized_metadata, uint32_t* metadata_len);
+  explicit FileMetaData(const void* serialized_metadata, uint32_t* metadata_len);
 
   // PIMPL Idiom
   FileMetaData();
@@ -199,7 +199,7 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder {
 
   static std::unique_ptr<ColumnChunkMetaDataBuilder> Make(
       const std::shared_ptr<WriterProperties>& props, const ColumnDescriptor* column,
-      uint8_t* contents);
+      void* contents);
 
   ~ColumnChunkMetaDataBuilder();
 
@@ -217,7 +217,7 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder {
               bool dictionary_fallback);
 
   // The metadata contents, suitable for passing to ColumnChunkMetaData::Make
-  const uint8_t* contents() const;
+  const void* contents() const;
 
   // For writing metadata at end of column chunk
   void WriteTo(OutputStream* sink);
@@ -226,7 +226,7 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder {
   explicit ColumnChunkMetaDataBuilder(const std::shared_ptr<WriterProperties>& props,
                                       const ColumnDescriptor* column);
   explicit ColumnChunkMetaDataBuilder(const std::shared_ptr<WriterProperties>& props,
-                                      const ColumnDescriptor* column, uint8_t* contents);
+                                      const ColumnDescriptor* column, void* contents);
   // PIMPL Idiom
   class ColumnChunkMetaDataBuilderImpl;
   std::unique_ptr<ColumnChunkMetaDataBuilderImpl> impl_;
@@ -237,7 +237,7 @@ class PARQUET_EXPORT RowGroupMetaDataBuilder {
   // API convenience to get a MetaData reader
   static std::unique_ptr<RowGroupMetaDataBuilder> Make(
       const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_,
-      uint8_t* contents);
+      void* contents);
 
   ~RowGroupMetaDataBuilder();
 
@@ -253,7 +253,7 @@ class PARQUET_EXPORT RowGroupMetaDataBuilder {
 
  private:
   explicit RowGroupMetaDataBuilder(const std::shared_ptr<WriterProperties>& props,
-                                   const SchemaDescriptor* schema_, uint8_t* contents);
+                                   const SchemaDescriptor* schema_, void* contents);
   // PIMPL Idiom
   class RowGroupMetaDataBuilderImpl;
   std::unique_ptr<RowGroupMetaDataBuilderImpl> impl_;
@@ -268,9 +268,10 @@ class PARQUET_EXPORT FileMetaDataBuilder {
 
   ~FileMetaDataBuilder();
 
+  // The prior RowGroupMetaDataBuilder (if any) is destroyed
   RowGroupMetaDataBuilder* AppendRowGroup();
 
-  // commit the metadata
+  // Complete the Thrift structure
   std::unique_ptr<FileMetaData> Finish();
 
  private:
diff --git a/python/scripts/test_leak.py b/python/scripts/test_leak.py
index e3de56b..d3984a8 100644
--- a/python/scripts/test_leak.py
+++ b/python/scripts/test_leak.py
@@ -19,29 +19,49 @@
 
 import pyarrow as pa
 import numpy as np
+import pandas as pd
+import pandas.util.testing as tm
 import memory_profiler
 import gc
 import io
 
+MEGABYTE = 1 << 20
 
-def leak():
+
+def assert_does_not_leak(f, iterations=10, check_interval=1, tolerance=5):
+    gc.collect()
+    baseline = memory_profiler.memory_usage()[0]
+    for i in range(iterations):
+        f()
+        if i % check_interval == 0:
+            gc.collect()
+            usage = memory_profiler.memory_usage()[0]
+            diff = usage - baseline
+            print("{0}: {1}\r".format(i, diff), end="")
+            if diff > tolerance:
+                raise Exception("Memory increased by {0} megabytes after {1} "
+                                "iterations".format(diff, i + 1))
+    gc.collect()
+    usage = memory_profiler.memory_usage()[0]
+    diff = usage - baseline
+    print("\nMemory increased by {0} megabytes after {1} "
+          "iterations".format(diff, iterations))
+
+
+def test_leak1():
     data = [pa.array(np.concatenate([np.random.randn(100000)] * 1000))]
     table = pa.Table.from_arrays(data, ['foo'])
-    while True:
-        print('calling to_pandas')
-        print('memory_usage: {0}'.format(memory_profiler.memory_usage()))
-        table.to_pandas()
-        gc.collect()
 
-# leak()
+    def func():
+        table.to_pandas()
+    assert_does_not_leak(func)
 
 
-def leak2():
+def test_leak2():
     data = [pa.array(np.concatenate([np.random.randn(100000)] * 10))]
     table = pa.Table.from_arrays(data, ['foo'])
-    while True:
-        print('calling to_pandas')
-        print('memory_usage: {0}'.format(memory_profiler.memory_usage()))
+
+    def func():
         df = table.to_pandas()
 
         batch = pa.RecordBatch.from_pandas(df)
@@ -55,7 +75,27 @@ def leak2():
         reader = pa.open_file(buf_reader)
         reader.read_all()
 
-        gc.collect()
+    assert_does_not_leak(func, iterations=50, tolerance=50)
+
+
+def test_leak3():
+    import pyarrow.parquet as pq
+
+    df = pd.DataFrame({'a{0}'.format(i): [1, 2, 3, 4]
+                       for i in range(50)})
+    table = pa.Table.from_pandas(df, preserve_index=False)
+
+    writer = pq.ParquetWriter('leak_test_' + tm.rands(5) + '.parquet',
+                              table.schema)
+
+    def func():
+        writer.write_table(table, row_group_size=len(table))
+
+    # This does not "leak" per se but we do want to have this use as little
+    # memory as possible
+    assert_does_not_leak(func, iterations=500,
+                         check_interval=50, tolerance=20)
 
 
-leak2()
+if __name__ == '__main__':
+    test_leak3()