You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/06/27 13:51:24 UTC
[arrow] branch master updated: ARROW-3762: [C++] manage
ChunkedArrayBuilder capacity explicitly
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new a634f92 ARROW-3762: [C++] manage ChunkedArrayBuilder capacity explicitly
a634f92 is described below
commit a634f925d06e58141261e60dec322186e22b750c
Author: Benjamin Kietzman <be...@gmail.com>
AuthorDate: Thu Jun 27 08:51:09 2019 -0500
ARROW-3762: [C++] manage ChunkedArrayBuilder capacity explicitly
ChunkedArrayBuilder should never request BinaryBuilder reserve space for more strings than it can hold (`kListMaximumElements`); with this change it will instead of begin a new chunk
Author: Benjamin Kietzman <be...@gmail.com>
Closes #4695 from bkietz/3762-Parquet-arrowTable-reads-error-when-over and squashes the following commits:
95a4e30ff <Benjamin Kietzman> add configurable maximum element count to ChunkedBinaryBuilder
5331a2cb7 <Benjamin Kietzman> add test for ChunkedBinaryBuilder's new reserve behavior
3eb57198b <Benjamin Kietzman> manage ChunkedBinaryBuilder's capacity
d626c073c <Benjamin Kietzman> attempted fix, killed: OOM
cb000ab2e <Benjamin Kietzman> add (failing) test which repros issue in c++
---
cpp/src/arrow/array-binary-test.cc | 36 +++++++++++++++++-
cpp/src/arrow/array.h | 3 +-
cpp/src/arrow/array/builder_binary.cc | 34 +++++++++++++++--
cpp/src/arrow/array/builder_binary.h | 46 +++++++++++++++++------
cpp/src/parquet/arrow/arrow-reader-writer-test.cc | 41 ++++++++++++++++++++
5 files changed, 141 insertions(+), 19 deletions(-)
diff --git a/cpp/src/arrow/array-binary-test.cc b/cpp/src/arrow/array-binary-test.cc
index 227f74b..cb8d6d5 100644
--- a/cpp/src/arrow/array-binary-test.cc
+++ b/cpp/src/arrow/array-binary-test.cc
@@ -665,6 +665,10 @@ class TestChunkedBinaryBuilder : public ::testing::Test {
builder_.reset(new internal::ChunkedBinaryBuilder(chunksize));
}
+ void Init(int32_t chunksize, int32_t chunklength) {
+ builder_.reset(new internal::ChunkedBinaryBuilder(chunksize, chunklength));
+ }
+
protected:
std::unique_ptr<internal::ChunkedBinaryBuilder> builder_;
};
@@ -740,6 +744,36 @@ TEST_F(TestChunkedBinaryBuilder, LargeElements) {
ASSERT_EQ(iterations * bufsize, total_data_size);
}
+TEST_F(TestChunkedBinaryBuilder, LargeElementCount) {
+ int32_t max_chunk_length = 100;
+ Init(100, max_chunk_length);
+
+ auto length = max_chunk_length + 1;
+
+ // ChunkedBinaryBuilder can reserve memory for more than its configured maximum
+ // (per chunk) element count
+ ASSERT_OK(builder_->Reserve(length));
+
+ for (int64_t i = 0; i < 2 * length; ++i) {
+ // Appending more elements than have been reserved memory simply overflows to the next
+ // chunk
+ ASSERT_OK(builder_->Append(""));
+ }
+
+ ArrayVector chunks;
+ ASSERT_OK(builder_->Finish(&chunks));
+
+ // should have two chunks full of empty strings and another with two more empty strings
+ ASSERT_EQ(chunks.size(), 3);
+ ASSERT_EQ(chunks[0]->length(), max_chunk_length);
+ ASSERT_EQ(chunks[1]->length(), max_chunk_length);
+ ASSERT_EQ(chunks[2]->length(), 2);
+ for (auto&& boxed_chunk : chunks) {
+ const auto& chunk = checked_cast<const BinaryArray&>(*boxed_chunk);
+ ASSERT_EQ(chunk.value_offset(0), chunk.value_offset(chunk.length()));
+ }
+}
+
TEST(TestChunkedStringBuilder, BasicOperation) {
const int chunksize = 100;
internal::ChunkedStringBuilder builder(chunksize);
@@ -758,7 +792,7 @@ TEST(TestChunkedStringBuilder, BasicOperation) {
// Type is correct
for (auto chunk : chunks) {
- ASSERT_TRUE(chunk->type()->Equals(*::arrow::utf8()));
+ ASSERT_TRUE(chunk->type()->Equals(utf8()));
}
}
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index 78542c6..5cca9db 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -690,8 +690,7 @@ class ARROW_EXPORT BinaryArray : public FlatArray {
/// Protected method for constructors
void SetData(const std::shared_ptr<ArrayData>& data);
- // Constructor that allows sub-classes/builders to propagate there logical type up the
- // class hierarchy.
+ // Constructor to allow sub-classes/builders to substitute their own logical type
BinaryArray(const std::shared_ptr<DataType>& type, int64_t length,
const std::shared_ptr<Buffer>& value_offsets,
const std::shared_ptr<Buffer>& data,
diff --git a/cpp/src/arrow/array/builder_binary.cc b/cpp/src/arrow/array/builder_binary.cc
index 88c2e86..ccb79a1 100644
--- a/cpp/src/arrow/array/builder_binary.cc
+++ b/cpp/src/arrow/array/builder_binary.cc
@@ -281,9 +281,15 @@ util::string_view FixedSizeBinaryBuilder::GetView(int64_t i) const {
namespace internal {
-ChunkedBinaryBuilder::ChunkedBinaryBuilder(int32_t max_chunk_size, MemoryPool* pool)
- : max_chunk_size_(max_chunk_size),
- chunk_data_size_(0),
+ChunkedBinaryBuilder::ChunkedBinaryBuilder(int32_t max_chunk_value_length,
+ MemoryPool* pool)
+ : max_chunk_value_length_(max_chunk_value_length),
+ builder_(new BinaryBuilder(pool)) {}
+
+ChunkedBinaryBuilder::ChunkedBinaryBuilder(int32_t max_chunk_value_length,
+ int32_t max_chunk_length, MemoryPool* pool)
+ : max_chunk_value_length_(max_chunk_value_length),
+ max_chunk_length_(max_chunk_length),
builder_(new BinaryBuilder(pool)) {}
Status ChunkedBinaryBuilder::Finish(ArrayVector* out) {
@@ -301,7 +307,11 @@ Status ChunkedBinaryBuilder::NextChunk() {
RETURN_NOT_OK(builder_->Finish(&chunk));
chunks_.emplace_back(std::move(chunk));
- chunk_data_size_ = 0;
+ if (auto capacity = extra_capacity_) {
+ extra_capacity_ = 0;
+ return Reserve(capacity);
+ }
+
return Status::OK();
}
@@ -317,6 +327,22 @@ Status ChunkedStringBuilder::Finish(ArrayVector* out) {
return Status::OK();
}
+Status ChunkedBinaryBuilder::Reserve(int64_t values) {
+ if (ARROW_PREDICT_FALSE(extra_capacity_ != 0)) {
+ extra_capacity_ += values;
+ return Status::OK();
+ }
+
+ auto min_capacity = builder_->length() + values;
+ auto new_capacity = BufferBuilder::GrowByFactor(builder_->capacity(), min_capacity);
+ if (ARROW_PREDICT_TRUE(new_capacity <= kListMaximumElements)) {
+ return builder_->Resize(new_capacity);
+ }
+
+ extra_capacity_ = new_capacity - kListMaximumElements;
+ return builder_->Resize(kListMaximumElements);
+}
+
} // namespace internal
} // namespace arrow
diff --git a/cpp/src/arrow/array/builder_binary.h b/cpp/src/arrow/array/builder_binary.h
index 91f48da..47d3bae 100644
--- a/cpp/src/arrow/array/builder_binary.h
+++ b/cpp/src/arrow/array/builder_binary.h
@@ -316,22 +316,37 @@ namespace internal {
class ARROW_EXPORT ChunkedBinaryBuilder {
public:
- ChunkedBinaryBuilder(int32_t max_chunk_size,
+ ChunkedBinaryBuilder(int32_t max_chunk_value_length,
+ MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
+
+ ChunkedBinaryBuilder(int32_t max_chunk_value_length, int32_t max_chunk_length,
MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
virtual ~ChunkedBinaryBuilder() = default;
Status Append(const uint8_t* value, int32_t length) {
- if (ARROW_PREDICT_FALSE(length + chunk_data_size_ > max_chunk_size_)) {
- // Move onto next chunk, unless the builder length is currently 0, which
- // means that max_chunk_size_ is less than the item length
- if (builder_->length() > 0) {
- ARROW_RETURN_NOT_OK(NextChunk());
+ if (ARROW_PREDICT_FALSE(length + builder_->value_data_length() >
+ max_chunk_value_length_)) {
+ if (builder_->value_data_length() == 0) {
+ // The current item is larger than max_chunk_size_;
+ // this chunk will be oversize and hold *only* this item
+ ARROW_RETURN_NOT_OK(builder_->Append(value, length));
+ return NextChunk();
}
- // else fall through
+ // The current item would cause builder_->value_data_length() to exceed
+ // max_chunk_size_, so finish this chunk and append the current item to the next
+ // chunk
+ ARROW_RETURN_NOT_OK(NextChunk());
+ return Append(value, length);
+ }
+
+ if (ARROW_PREDICT_FALSE(builder_->length() == max_chunk_length_)) {
+ // The current item would cause builder_->value_data_length() to exceed
+ // max_chunk_size_, so finish this chunk and append the current item to the next
+ // chunk
+ ARROW_RETURN_NOT_OK(NextChunk());
}
- chunk_data_size_ += length;
return builder_->Append(value, length);
}
@@ -341,21 +356,28 @@ class ARROW_EXPORT ChunkedBinaryBuilder {
}
Status AppendNull() {
- if (ARROW_PREDICT_FALSE(builder_->length() == std::numeric_limits<int32_t>::max())) {
+ if (ARROW_PREDICT_FALSE(builder_->length() == max_chunk_length_)) {
ARROW_RETURN_NOT_OK(NextChunk());
}
return builder_->AppendNull();
}
- Status Reserve(int64_t values) { return builder_->Reserve(values); }
+ Status Reserve(int64_t values);
virtual Status Finish(ArrayVector* out);
protected:
Status NextChunk();
- int64_t max_chunk_size_;
- int64_t chunk_data_size_;
+ // maximum total character data size per chunk
+ int64_t max_chunk_value_length_;
+
+ // maximum elements allowed per chunk
+ int64_t max_chunk_length_ = kListMaximumElements;
+
+ // when Reserve() would cause builder_ to exceed its max_chunk_length_,
+ // add to extra_capacity_ instead and wait to reserve until the next chunk
+ int64_t extra_capacity_ = 0;
std::unique_ptr<BinaryBuilder> builder_;
std::vector<std::shared_ptr<Array>> chunks_;
diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
index 2c5c5df..792f760 100644
--- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -2658,6 +2658,47 @@ TEST(TestArrowReaderAdHoc, CorruptedSchema) {
TryReadDataFile(path, ::arrow::StatusCode::IOError);
}
+TEST(TestArrowReaderAdHoc, DISABLED_LargeStringColumn) {
+ // ARROW-3762
+ ::arrow::StringBuilder builder;
+ int64_t length = 1 << 30;
+ ASSERT_OK(builder.Resize(length));
+ ASSERT_OK(builder.ReserveData(length));
+ for (int64_t i = 0; i < length; ++i) {
+ builder.UnsafeAppend("1", 1);
+ }
+ std::shared_ptr<Array> array;
+ ASSERT_OK(builder.Finish(&array));
+ auto table = Table::Make({std::make_shared<Column>("x", array)});
+ std::shared_ptr<SchemaDescriptor> schm;
+ ASSERT_OK_NO_THROW(
+ ToParquetSchema(table->schema().get(), *default_writer_properties(), &schm));
+
+ auto sink = CreateOutputStream();
+
+ auto schm_node = std::static_pointer_cast<GroupNode>(
+ GroupNode::Make("schema", Repetition::REQUIRED, {schm->group_node()->field(0)}));
+
+ auto writer = ParquetFileWriter::Open(sink, schm_node);
+ FileWriter arrow_writer(default_memory_pool(), std::move(writer), table->schema());
+ for (int i : {0, 1}) {
+ ASSERT_OK_NO_THROW(arrow_writer.WriteTable(*table, table->num_rows())) << i;
+ }
+ ASSERT_OK_NO_THROW(arrow_writer.Close());
+
+ std::shared_ptr<Buffer> tables_buffer;
+ ASSERT_OK_NO_THROW(sink->Finish(&tables_buffer));
+
+ // drop to save memory
+ table.reset();
+ array.reset();
+
+ auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(tables_buffer));
+ FileReader arrow_reader(default_memory_pool(), std::move(reader));
+ ASSERT_OK_NO_THROW(arrow_reader.ReadTable(&table));
+ ASSERT_OK(table->Validate());
+}
+
TEST(TestArrowReaderAdHoc, HandleDictPageOffsetZero) {
// PARQUET-1402: parquet-mr writes files this way which tripped up
// some business logic