You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/06/27 13:51:24 UTC

[arrow] branch master updated: ARROW-3762: [C++] manage ChunkedArrayBuilder capacity explicitly

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new a634f92  ARROW-3762: [C++] manage ChunkedArrayBuilder capacity explicitly
a634f92 is described below

commit a634f925d06e58141261e60dec322186e22b750c
Author: Benjamin Kietzman <be...@gmail.com>
AuthorDate: Thu Jun 27 08:51:09 2019 -0500

    ARROW-3762: [C++] manage ChunkedArrayBuilder capacity explicitly
    
    ChunkedArrayBuilder should never request BinaryBuilder reserve space for more strings than it can hold (`kListMaximumElements`); with this change it will instead of begin a new chunk
    
    Author: Benjamin Kietzman <be...@gmail.com>
    
    Closes #4695 from bkietz/3762-Parquet-arrowTable-reads-error-when-over and squashes the following commits:
    
    95a4e30ff <Benjamin Kietzman> add configurable maximum element count to ChunkedBinaryBuilder
    5331a2cb7 <Benjamin Kietzman> add test for ChunkedBinaryBuilder's new reserve behavior
    3eb57198b <Benjamin Kietzman> manage ChunkedBinaryBuilder's capacity
    d626c073c <Benjamin Kietzman> attempted fix, killed: OOM
    cb000ab2e <Benjamin Kietzman> add (failing) test which repros issue in c++
---
 cpp/src/arrow/array-binary-test.cc                | 36 +++++++++++++++++-
 cpp/src/arrow/array.h                             |  3 +-
 cpp/src/arrow/array/builder_binary.cc             | 34 +++++++++++++++--
 cpp/src/arrow/array/builder_binary.h              | 46 +++++++++++++++++------
 cpp/src/parquet/arrow/arrow-reader-writer-test.cc | 41 ++++++++++++++++++++
 5 files changed, 141 insertions(+), 19 deletions(-)

diff --git a/cpp/src/arrow/array-binary-test.cc b/cpp/src/arrow/array-binary-test.cc
index 227f74b..cb8d6d5 100644
--- a/cpp/src/arrow/array-binary-test.cc
+++ b/cpp/src/arrow/array-binary-test.cc
@@ -665,6 +665,10 @@ class TestChunkedBinaryBuilder : public ::testing::Test {
     builder_.reset(new internal::ChunkedBinaryBuilder(chunksize));
   }
 
+  void Init(int32_t chunksize, int32_t chunklength) {
+    builder_.reset(new internal::ChunkedBinaryBuilder(chunksize, chunklength));
+  }
+
  protected:
   std::unique_ptr<internal::ChunkedBinaryBuilder> builder_;
 };
@@ -740,6 +744,36 @@ TEST_F(TestChunkedBinaryBuilder, LargeElements) {
   ASSERT_EQ(iterations * bufsize, total_data_size);
 }
 
+TEST_F(TestChunkedBinaryBuilder, LargeElementCount) {
+  int32_t max_chunk_length = 100;
+  Init(100, max_chunk_length);
+
+  auto length = max_chunk_length + 1;
+
+  // ChunkedBinaryBuilder can reserve memory for more than its configured maximum
+  // (per chunk) element count
+  ASSERT_OK(builder_->Reserve(length));
+
+  for (int64_t i = 0; i < 2 * length; ++i) {
+    // Appending more elements than have been reserved memory simply overflows to the next
+    // chunk
+    ASSERT_OK(builder_->Append(""));
+  }
+
+  ArrayVector chunks;
+  ASSERT_OK(builder_->Finish(&chunks));
+
+  // should have two chunks full of empty strings and another with two more empty strings
+  ASSERT_EQ(chunks.size(), 3);
+  ASSERT_EQ(chunks[0]->length(), max_chunk_length);
+  ASSERT_EQ(chunks[1]->length(), max_chunk_length);
+  ASSERT_EQ(chunks[2]->length(), 2);
+  for (auto&& boxed_chunk : chunks) {
+    const auto& chunk = checked_cast<const BinaryArray&>(*boxed_chunk);
+    ASSERT_EQ(chunk.value_offset(0), chunk.value_offset(chunk.length()));
+  }
+}
+
 TEST(TestChunkedStringBuilder, BasicOperation) {
   const int chunksize = 100;
   internal::ChunkedStringBuilder builder(chunksize);
@@ -758,7 +792,7 @@ TEST(TestChunkedStringBuilder, BasicOperation) {
 
   // Type is correct
   for (auto chunk : chunks) {
-    ASSERT_TRUE(chunk->type()->Equals(*::arrow::utf8()));
+    ASSERT_TRUE(chunk->type()->Equals(utf8()));
   }
 }
 
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index 78542c6..5cca9db 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -690,8 +690,7 @@ class ARROW_EXPORT BinaryArray : public FlatArray {
   /// Protected method for constructors
   void SetData(const std::shared_ptr<ArrayData>& data);
 
-  // Constructor that allows sub-classes/builders to propagate there logical type up the
-  // class hierarchy.
+  // Constructor to allow sub-classes/builders to substitute their own logical type
   BinaryArray(const std::shared_ptr<DataType>& type, int64_t length,
               const std::shared_ptr<Buffer>& value_offsets,
               const std::shared_ptr<Buffer>& data,
diff --git a/cpp/src/arrow/array/builder_binary.cc b/cpp/src/arrow/array/builder_binary.cc
index 88c2e86..ccb79a1 100644
--- a/cpp/src/arrow/array/builder_binary.cc
+++ b/cpp/src/arrow/array/builder_binary.cc
@@ -281,9 +281,15 @@ util::string_view FixedSizeBinaryBuilder::GetView(int64_t i) const {
 
 namespace internal {
 
-ChunkedBinaryBuilder::ChunkedBinaryBuilder(int32_t max_chunk_size, MemoryPool* pool)
-    : max_chunk_size_(max_chunk_size),
-      chunk_data_size_(0),
+ChunkedBinaryBuilder::ChunkedBinaryBuilder(int32_t max_chunk_value_length,
+                                           MemoryPool* pool)
+    : max_chunk_value_length_(max_chunk_value_length),
+      builder_(new BinaryBuilder(pool)) {}
+
+ChunkedBinaryBuilder::ChunkedBinaryBuilder(int32_t max_chunk_value_length,
+                                           int32_t max_chunk_length, MemoryPool* pool)
+    : max_chunk_value_length_(max_chunk_value_length),
+      max_chunk_length_(max_chunk_length),
       builder_(new BinaryBuilder(pool)) {}
 
 Status ChunkedBinaryBuilder::Finish(ArrayVector* out) {
@@ -301,7 +307,11 @@ Status ChunkedBinaryBuilder::NextChunk() {
   RETURN_NOT_OK(builder_->Finish(&chunk));
   chunks_.emplace_back(std::move(chunk));
 
-  chunk_data_size_ = 0;
+  if (auto capacity = extra_capacity_) {
+    extra_capacity_ = 0;
+    return Reserve(capacity);
+  }
+
   return Status::OK();
 }
 
@@ -317,6 +327,22 @@ Status ChunkedStringBuilder::Finish(ArrayVector* out) {
   return Status::OK();
 }
 
+Status ChunkedBinaryBuilder::Reserve(int64_t values) {
+  if (ARROW_PREDICT_FALSE(extra_capacity_ != 0)) {
+    extra_capacity_ += values;
+    return Status::OK();
+  }
+
+  auto min_capacity = builder_->length() + values;
+  auto new_capacity = BufferBuilder::GrowByFactor(builder_->capacity(), min_capacity);
+  if (ARROW_PREDICT_TRUE(new_capacity <= kListMaximumElements)) {
+    return builder_->Resize(new_capacity);
+  }
+
+  extra_capacity_ = new_capacity - kListMaximumElements;
+  return builder_->Resize(kListMaximumElements);
+}
+
 }  // namespace internal
 
 }  // namespace arrow
diff --git a/cpp/src/arrow/array/builder_binary.h b/cpp/src/arrow/array/builder_binary.h
index 91f48da..47d3bae 100644
--- a/cpp/src/arrow/array/builder_binary.h
+++ b/cpp/src/arrow/array/builder_binary.h
@@ -316,22 +316,37 @@ namespace internal {
 
 class ARROW_EXPORT ChunkedBinaryBuilder {
  public:
-  ChunkedBinaryBuilder(int32_t max_chunk_size,
+  ChunkedBinaryBuilder(int32_t max_chunk_value_length,
+                       MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
+
+  ChunkedBinaryBuilder(int32_t max_chunk_value_length, int32_t max_chunk_length,
                        MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
 
   virtual ~ChunkedBinaryBuilder() = default;
 
   Status Append(const uint8_t* value, int32_t length) {
-    if (ARROW_PREDICT_FALSE(length + chunk_data_size_ > max_chunk_size_)) {
-      // Move onto next chunk, unless the builder length is currently 0, which
-      // means that max_chunk_size_ is less than the item length
-      if (builder_->length() > 0) {
-        ARROW_RETURN_NOT_OK(NextChunk());
+    if (ARROW_PREDICT_FALSE(length + builder_->value_data_length() >
+                            max_chunk_value_length_)) {
+      if (builder_->value_data_length() == 0) {
+        // The current item is larger than max_chunk_size_;
+        // this chunk will be oversize and hold *only* this item
+        ARROW_RETURN_NOT_OK(builder_->Append(value, length));
+        return NextChunk();
       }
-      // else fall through
+      // The current item would cause builder_->value_data_length() to exceed
+      // max_chunk_size_, so finish this chunk and append the current item to the next
+      // chunk
+      ARROW_RETURN_NOT_OK(NextChunk());
+      return Append(value, length);
+    }
+
+    if (ARROW_PREDICT_FALSE(builder_->length() == max_chunk_length_)) {
+      // The current item would cause builder_->value_data_length() to exceed
+      // max_chunk_size_, so finish this chunk and append the current item to the next
+      // chunk
+      ARROW_RETURN_NOT_OK(NextChunk());
     }
 
-    chunk_data_size_ += length;
     return builder_->Append(value, length);
   }
 
@@ -341,21 +356,28 @@ class ARROW_EXPORT ChunkedBinaryBuilder {
   }
 
   Status AppendNull() {
-    if (ARROW_PREDICT_FALSE(builder_->length() == std::numeric_limits<int32_t>::max())) {
+    if (ARROW_PREDICT_FALSE(builder_->length() == max_chunk_length_)) {
       ARROW_RETURN_NOT_OK(NextChunk());
     }
     return builder_->AppendNull();
   }
 
-  Status Reserve(int64_t values) { return builder_->Reserve(values); }
+  Status Reserve(int64_t values);
 
   virtual Status Finish(ArrayVector* out);
 
  protected:
   Status NextChunk();
 
-  int64_t max_chunk_size_;
-  int64_t chunk_data_size_;
+  // maximum total character data size per chunk
+  int64_t max_chunk_value_length_;
+
+  // maximum elements allowed per chunk
+  int64_t max_chunk_length_ = kListMaximumElements;
+
+  // when Reserve() would cause builder_ to exceed its max_chunk_length_,
+  // add to extra_capacity_ instead and wait to reserve until the next chunk
+  int64_t extra_capacity_ = 0;
 
   std::unique_ptr<BinaryBuilder> builder_;
   std::vector<std::shared_ptr<Array>> chunks_;
diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
index 2c5c5df..792f760 100644
--- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -2658,6 +2658,47 @@ TEST(TestArrowReaderAdHoc, CorruptedSchema) {
   TryReadDataFile(path, ::arrow::StatusCode::IOError);
 }
 
+TEST(TestArrowReaderAdHoc, DISABLED_LargeStringColumn) {
+  // ARROW-3762
+  ::arrow::StringBuilder builder;
+  int64_t length = 1 << 30;
+  ASSERT_OK(builder.Resize(length));
+  ASSERT_OK(builder.ReserveData(length));
+  for (int64_t i = 0; i < length; ++i) {
+    builder.UnsafeAppend("1", 1);
+  }
+  std::shared_ptr<Array> array;
+  ASSERT_OK(builder.Finish(&array));
+  auto table = Table::Make({std::make_shared<Column>("x", array)});
+  std::shared_ptr<SchemaDescriptor> schm;
+  ASSERT_OK_NO_THROW(
+      ToParquetSchema(table->schema().get(), *default_writer_properties(), &schm));
+
+  auto sink = CreateOutputStream();
+
+  auto schm_node = std::static_pointer_cast<GroupNode>(
+      GroupNode::Make("schema", Repetition::REQUIRED, {schm->group_node()->field(0)}));
+
+  auto writer = ParquetFileWriter::Open(sink, schm_node);
+  FileWriter arrow_writer(default_memory_pool(), std::move(writer), table->schema());
+  for (int i : {0, 1}) {
+    ASSERT_OK_NO_THROW(arrow_writer.WriteTable(*table, table->num_rows())) << i;
+  }
+  ASSERT_OK_NO_THROW(arrow_writer.Close());
+
+  std::shared_ptr<Buffer> tables_buffer;
+  ASSERT_OK_NO_THROW(sink->Finish(&tables_buffer));
+
+  // drop to save memory
+  table.reset();
+  array.reset();
+
+  auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(tables_buffer));
+  FileReader arrow_reader(default_memory_pool(), std::move(reader));
+  ASSERT_OK_NO_THROW(arrow_reader.ReadTable(&table));
+  ASSERT_OK(table->Validate());
+}
+
 TEST(TestArrowReaderAdHoc, HandleDictPageOffsetZero) {
   // PARQUET-1402: parquet-mr writes files this way which tripped up
   // some business logic