You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/12/03 03:44:48 UTC
[arrow] branch master updated: PARQUET-1467: [C++] Remove defunct
ChunkedAllocator code
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 98bdde8 PARQUET-1467: [C++] Remove defunct ChunkedAllocator code
98bdde8 is described below
commit 98bdde873a9f7ff3854a52f096dabe3c52be6e3a
Author: Wes McKinney <we...@apache.org>
AuthorDate: Sun Dec 2 21:44:36 2018 -0600
PARQUET-1467: [C++] Remove defunct ChunkedAllocator code
It does not seem that memory allocation on the dictionary encoding path requires something so elaborate right now
Author: Wes McKinney <we...@apache.org>
Closes #3069 from wesm/PARQUET-1467 and squashes the following commits:
f37ed0756 <Wes McKinney> Remove defunct memory allocator code
---
cpp/src/parquet/column_writer.cc | 6 +-
cpp/src/parquet/column_writer.h | 1 -
cpp/src/parquet/encoding-benchmark.cc | 3 +-
cpp/src/parquet/encoding-internal.h | 9 +-
cpp/src/parquet/encoding-test.cc | 8 +-
cpp/src/parquet/test-util.h | 5 +-
cpp/src/parquet/util/memory-test.cc | 216 -------------------------------
cpp/src/parquet/util/memory.cc | 232 ----------------------------------
cpp/src/parquet/util/memory.h | 143 ---------------------
9 files changed, 8 insertions(+), 615 deletions(-)
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index a45613f..857673d 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -353,7 +353,6 @@ ColumnWriter::ColumnWriter(ColumnChunkMetaDataBuilder* metadata,
encoding_(encoding),
properties_(properties),
allocator_(properties->memory_pool()),
- pool_(properties->memory_pool()),
num_buffered_values_(0),
num_buffered_encoded_values_(0),
rows_written_(0),
@@ -546,8 +545,7 @@ TypedColumnWriter<Type>::TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
break;
case Encoding::PLAIN_DICTIONARY:
case Encoding::RLE_DICTIONARY:
- current_encoder_.reset(
- new DictEncoder<Type>(descr_, &pool_, properties->memory_pool()));
+ current_encoder_.reset(new DictEncoder<Type>(descr_, properties->memory_pool()));
break;
default:
ParquetException::NYI("Selected encoding is not supported");
@@ -582,8 +580,6 @@ void TypedColumnWriter<Type>::WriteDictionaryPage() {
std::shared_ptr<ResizableBuffer> buffer =
AllocateBuffer(properties_->memory_pool(), dict_encoder->dict_encoded_size());
dict_encoder->WriteDict(buffer->mutable_data());
- // TODO Get rid of this deep call
- dict_encoder->mem_pool()->FreeAll();
DictionaryPage page(buffer, dict_encoder->num_entries(),
properties_->dictionary_index_encoding());
diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h
index 41bc7bd..3c69dd3 100644
--- a/cpp/src/parquet/column_writer.h
+++ b/cpp/src/parquet/column_writer.h
@@ -186,7 +186,6 @@ class PARQUET_EXPORT ColumnWriter {
LevelEncoder level_encoder_;
::arrow::MemoryPool* allocator_;
- ChunkedAllocator pool_;
// The total number of values stored in the data page. This is the maximum of
// the number of encoded definition levels or encoded values. For
diff --git a/cpp/src/parquet/encoding-benchmark.cc b/cpp/src/parquet/encoding-benchmark.cc
index 364cdba..f8d2839 100644
--- a/cpp/src/parquet/encoding-benchmark.cc
+++ b/cpp/src/parquet/encoding-benchmark.cc
@@ -104,11 +104,10 @@ static void DecodeDict(std::vector<typename Type::c_type>& values,
typedef typename Type::c_type T;
int num_values = static_cast<int>(values.size());
- ChunkedAllocator pool;
MemoryPool* allocator = default_memory_pool();
std::shared_ptr<ColumnDescriptor> descr = Int64Schema(Repetition::REQUIRED);
- DictEncoder<Type> encoder(descr.get(), &pool, allocator);
+ DictEncoder<Type> encoder(descr.get(), allocator);
for (int i = 0; i < num_values; ++i) {
encoder.Put(values[i]);
}
diff --git a/cpp/src/parquet/encoding-internal.h b/cpp/src/parquet/encoding-internal.h
index b06ad41..e2dfc23 100644
--- a/cpp/src/parquet/encoding-internal.h
+++ b/cpp/src/parquet/encoding-internal.h
@@ -465,12 +465,10 @@ class DictEncoder : public Encoder<DType> {
public:
typedef typename DType::c_type T;
- // XXX pool is unused
- explicit DictEncoder(const ColumnDescriptor* desc, ChunkedAllocator* pool = nullptr,
+ explicit DictEncoder(const ColumnDescriptor* desc,
::arrow::MemoryPool* allocator = ::arrow::default_memory_pool())
: Encoder<DType>(desc, Encoding::PLAIN_DICTIONARY, allocator),
allocator_(allocator),
- pool_(pool),
dict_encoded_size_(0),
type_length_(desc->type_length()),
memo_table_(INITIAL_HASH_TABLE_SIZE) {}
@@ -538,8 +536,6 @@ class DictEncoder : public Encoder<DType> {
/// dict_encoded_size() bytes.
void WriteDict(uint8_t* buffer);
- ChunkedAllocator* mem_pool() { return pool_; }
-
/// The number of entries in the dictionary.
int num_entries() const { return memo_table_.size(); }
@@ -549,9 +545,6 @@ class DictEncoder : public Encoder<DType> {
::arrow::MemoryPool* allocator_;
- // For ByteArray / FixedLenByteArray data. Not owned
- ChunkedAllocator* pool_;
-
/// Indices that have not yet be written out by WriteIndices().
std::vector<int> buffered_indices_;
diff --git a/cpp/src/parquet/encoding-test.cc b/cpp/src/parquet/encoding-test.cc
index 50e1394..90ceb78 100644
--- a/cpp/src/parquet/encoding-test.cc
+++ b/cpp/src/parquet/encoding-test.cc
@@ -155,7 +155,7 @@ class TestEncodingBase : public ::testing::Test {
allocator_ = default_memory_pool();
}
- void TearDown() { pool_.FreeAll(); }
+ void TearDown() {}
void InitData(int nvalues, int repeats) {
num_values_ = nvalues * repeats;
@@ -181,7 +181,6 @@ class TestEncodingBase : public ::testing::Test {
}
protected:
- ChunkedAllocator pool_;
MemoryPool* allocator_;
int num_values_;
@@ -199,7 +198,6 @@ class TestEncodingBase : public ::testing::Test {
// Member variables are not visible to templated subclasses. Possibly figure
// out an alternative to this class layering at some point
#define USING_BASE_MEMBERS() \
- using TestEncodingBase<Type>::pool_; \
using TestEncodingBase<Type>::allocator_; \
using TestEncodingBase<Type>::descr_; \
using TestEncodingBase<Type>::num_values_; \
@@ -253,14 +251,14 @@ class TestDictionaryEncoding : public TestEncodingBase<Type> {
void CheckRoundtrip() {
std::vector<uint8_t> valid_bits(BitUtil::BytesForBits(num_values_) + 1, 255);
- DictEncoder<Type> encoder(descr_.get(), &pool_);
+ DictEncoder<Type> encoder(descr_.get());
ASSERT_NO_THROW(encoder.Put(draws_, num_values_));
dict_buffer_ = AllocateBuffer(default_memory_pool(), encoder.dict_encoded_size());
encoder.WriteDict(dict_buffer_->mutable_data());
std::shared_ptr<Buffer> indices = encoder.FlushValues();
- DictEncoder<Type> spaced_encoder(descr_.get(), &pool_);
+ DictEncoder<Type> spaced_encoder(descr_.get());
// PutSpaced should lead to the same results
ASSERT_NO_THROW(spaced_encoder.PutSpaced(draws_, num_values_, valid_bits.data(), 0));
std::shared_ptr<Buffer> indices_from_spaced = spaced_encoder.FlushValues();
diff --git a/cpp/src/parquet/test-util.h b/cpp/src/parquet/test-util.h
index 3e74398..92aa8d3 100644
--- a/cpp/src/parquet/test-util.h
+++ b/cpp/src/parquet/test-util.h
@@ -247,10 +247,10 @@ class DictionaryPageBuilder {
// This class writes data and metadata to the passed inputs
explicit DictionaryPageBuilder(const ColumnDescriptor* d)
: num_dict_values_(0), have_values_(false) {
- encoder_.reset(new DictEncoder<TYPE>(d, &pool_));
+ encoder_.reset(new DictEncoder<TYPE>(d));
}
- ~DictionaryPageBuilder() { pool_.FreeAll(); }
+ ~DictionaryPageBuilder() {}
shared_ptr<Buffer> AppendValues(const vector<TC>& values) {
int num_values = static_cast<int>(values.size());
@@ -271,7 +271,6 @@ class DictionaryPageBuilder {
int32_t num_values() const { return num_dict_values_; }
private:
- ChunkedAllocator pool_;
shared_ptr<DictEncoder<TYPE>> encoder_;
int32_t num_dict_values_;
bool have_values_;
diff --git a/cpp/src/parquet/util/memory-test.cc b/cpp/src/parquet/util/memory-test.cc
index bfd685d..58903b6 100644
--- a/cpp/src/parquet/util/memory-test.cc
+++ b/cpp/src/parquet/util/memory-test.cc
@@ -34,222 +34,6 @@ namespace parquet {
class TestBuffer : public ::testing::Test {};
-// Utility class to call private functions on MemPool.
-class ChunkedAllocatorTest {
- public:
- static bool CheckIntegrity(ChunkedAllocator* pool, bool current_chunk_empty) {
- return pool->CheckIntegrity(current_chunk_empty);
- }
-
- static const int INITIAL_CHUNK_SIZE = ChunkedAllocator::INITIAL_CHUNK_SIZE;
- static const int MAX_CHUNK_SIZE = ChunkedAllocator::MAX_CHUNK_SIZE;
-};
-
-const int ChunkedAllocatorTest::INITIAL_CHUNK_SIZE;
-const int ChunkedAllocatorTest::MAX_CHUNK_SIZE;
-
-TEST(ChunkedAllocatorTest, Basic) {
- ChunkedAllocator p;
- ChunkedAllocator p2;
- ChunkedAllocator p3;
-
- for (int iter = 0; iter < 2; ++iter) {
- // allocate a total of 24K in 32-byte pieces (for which we only request 25 bytes)
- for (int i = 0; i < 768; ++i) {
- // pads to 32 bytes
- p.Allocate(25);
- }
- // we handed back 24K
- EXPECT_EQ(24 * 1024, p.total_allocated_bytes());
- // .. and allocated 28K of chunks (4, 8, 16)
- EXPECT_EQ(28 * 1024, p.GetTotalChunkSizes());
-
- // we're passing on the first two chunks, containing 12K of data; we're left with
- // one chunk of 16K containing 12K of data
- p2.AcquireData(&p, true);
- EXPECT_EQ(12 * 1024, p.total_allocated_bytes());
- EXPECT_EQ(16 * 1024, p.GetTotalChunkSizes());
-
- // we allocate 8K, for which there isn't enough room in the current chunk,
- // so another one is allocated (32K)
- p.Allocate(8 * 1024);
- EXPECT_EQ((16 + 32) * 1024, p.GetTotalChunkSizes());
-
- // we allocate 65K, which doesn't fit into the current chunk or the default
- // size of the next allocated chunk (64K)
- p.Allocate(65 * 1024);
- EXPECT_EQ((12 + 8 + 65) * 1024, p.total_allocated_bytes());
- if (iter == 0) {
- EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
- } else {
- EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
- }
- EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
-
- // Clear() resets allocated data, but doesn't remove any chunks
- p.Clear();
- EXPECT_EQ(0, p.total_allocated_bytes());
- if (iter == 0) {
- EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
- } else {
- EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
- }
- EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
-
- // next allocation reuses existing chunks
- p.Allocate(1024);
- EXPECT_EQ(1024, p.total_allocated_bytes());
- if (iter == 0) {
- EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
- } else {
- EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
- }
- EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
-
- // ... unless it doesn't fit into any available chunk
- p.Allocate(120 * 1024);
- EXPECT_EQ((1 + 120) * 1024, p.total_allocated_bytes());
- if (iter == 0) {
- EXPECT_EQ((1 + 120) * 1024, p.peak_allocated_bytes());
- } else {
- EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
- }
- EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
-
- // ... Try another chunk that fits into an existing chunk
- p.Allocate(33 * 1024);
- EXPECT_EQ((1 + 120 + 33) * 1024, p.total_allocated_bytes());
- EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
-
- // we're releasing 3 chunks, which get added to p2
- p2.AcquireData(&p, false);
- EXPECT_EQ(0, p.total_allocated_bytes());
- EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
- EXPECT_EQ(0, p.GetTotalChunkSizes());
-
- p3.AcquireData(&p2, true); // we're keeping the 65k chunk
- EXPECT_EQ(33 * 1024, p2.total_allocated_bytes());
- EXPECT_EQ(65 * 1024, p2.GetTotalChunkSizes());
-
- p.FreeAll();
- p2.FreeAll();
- p3.FreeAll();
- }
-}
-
-// Test that we can keep an allocated chunk and a free chunk.
-// This case verifies that when chunks are acquired by another memory pool the
-// remaining chunks are consistent if there were more than one used chunk and some
-// free chunks.
-TEST(ChunkedAllocatorTest, Keep) {
- ChunkedAllocator p;
- p.Allocate(4 * 1024);
- p.Allocate(8 * 1024);
- p.Allocate(16 * 1024);
- EXPECT_EQ((4 + 8 + 16) * 1024, p.total_allocated_bytes());
- EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes());
- p.Clear();
- EXPECT_EQ(0, p.total_allocated_bytes());
- EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes());
- p.Allocate(1 * 1024);
- p.Allocate(4 * 1024);
- EXPECT_EQ((1 + 4) * 1024, p.total_allocated_bytes());
- EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes());
-
- ChunkedAllocator p2;
- p2.AcquireData(&p, true);
- EXPECT_EQ(4 * 1024, p.total_allocated_bytes());
- EXPECT_EQ((8 + 16) * 1024, p.GetTotalChunkSizes());
- EXPECT_EQ(1 * 1024, p2.total_allocated_bytes());
- EXPECT_EQ(4 * 1024, p2.GetTotalChunkSizes());
-
- p.FreeAll();
- p2.FreeAll();
-}
-
-// Tests that we can return partial allocations.
-TEST(ChunkedAllocatorTest, ReturnPartial) {
- ChunkedAllocator p;
- uint8_t* ptr = p.Allocate(1024);
- EXPECT_EQ(1024, p.total_allocated_bytes());
- memset(ptr, 0, 1024);
- p.ReturnPartialAllocation(1024);
-
- uint8_t* ptr2 = p.Allocate(1024);
- EXPECT_EQ(1024, p.total_allocated_bytes());
- EXPECT_TRUE(ptr == ptr2);
- p.ReturnPartialAllocation(1016);
-
- ptr2 = p.Allocate(1016);
- EXPECT_EQ(1024, p.total_allocated_bytes());
- EXPECT_TRUE(ptr2 == ptr + 8);
- p.ReturnPartialAllocation(512);
- memset(ptr2, 1, 1016 - 512);
-
- uint8_t* ptr3 = p.Allocate(512);
- EXPECT_EQ(1024, p.total_allocated_bytes());
- EXPECT_TRUE(ptr3 == ptr + 512);
- memset(ptr3, 2, 512);
-
- for (int i = 0; i < 8; ++i) {
- EXPECT_EQ(0, ptr[i]);
- }
- for (int i = 8; i < 512; ++i) {
- EXPECT_EQ(1, ptr[i]);
- }
- for (int i = 512; i < 1024; ++i) {
- EXPECT_EQ(2, ptr[i]);
- }
-
- p.FreeAll();
-}
-
-// Test that the ChunkedAllocator overhead is bounded when we make allocations of
-// INITIAL_CHUNK_SIZE.
-TEST(ChunkedAllocatorTest, MemoryOverhead) {
- ChunkedAllocator p;
- const int alloc_size = ChunkedAllocatorTest::INITIAL_CHUNK_SIZE;
- const int num_allocs = 1000;
- int64_t total_allocated = 0;
-
- for (int i = 0; i < num_allocs; ++i) {
- uint8_t* mem = p.Allocate(alloc_size);
- ASSERT_TRUE(mem != nullptr);
- total_allocated += alloc_size;
-
- int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated;
- // The initial chunk fits evenly into MAX_CHUNK_SIZE, so should have at most
- // one empty chunk at the end.
- EXPECT_LE(wasted_memory, ChunkedAllocatorTest::MAX_CHUNK_SIZE);
- // The chunk doubling algorithm should not allocate chunks larger than the total
- // amount of memory already allocated.
- EXPECT_LE(wasted_memory, total_allocated);
- }
-
- p.FreeAll();
-}
-
-// Test that the ChunkedAllocator overhead is bounded when we make alternating
-// large and small allocations.
-TEST(ChunkedAllocatorTest, FragmentationOverhead) {
- ChunkedAllocator p;
- const int num_allocs = 100;
- int64_t total_allocated = 0;
-
- for (int i = 0; i < num_allocs; ++i) {
- int alloc_size = i % 2 == 0 ? 1 : ChunkedAllocatorTest::MAX_CHUNK_SIZE;
- uint8_t* mem = p.Allocate(alloc_size);
- ASSERT_TRUE(mem != nullptr);
- total_allocated += alloc_size;
-
- int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated;
- // Fragmentation should not waste more than half of each completed chunk.
- EXPECT_LE(wasted_memory, total_allocated + ChunkedAllocatorTest::MAX_CHUNK_SIZE);
- }
-
- p.FreeAll();
-}
-
TEST(TestBufferedInputStream, Basics) {
int64_t source_size = 256;
int64_t stream_offset = 10;
diff --git a/cpp/src/parquet/util/memory.cc b/cpp/src/parquet/util/memory.cc
index fde424a..6251f1c 100644
--- a/cpp/src/parquet/util/memory.cc
+++ b/cpp/src/parquet/util/memory.cc
@@ -115,238 +115,6 @@ template class Vector<Int96>;
template class Vector<ByteArray>;
template class Vector<FixedLenByteArray>;
-const int ChunkedAllocator::INITIAL_CHUNK_SIZE;
-const int ChunkedAllocator::MAX_CHUNK_SIZE;
-
-ChunkedAllocator::ChunkedAllocator(MemoryPool* pool)
- : current_chunk_idx_(-1),
- next_chunk_size_(INITIAL_CHUNK_SIZE),
- total_allocated_bytes_(0),
- peak_allocated_bytes_(0),
- total_reserved_bytes_(0),
- pool_(pool) {}
-
-ChunkedAllocator::ChunkInfo::ChunkInfo(int64_t size, uint8_t* buf)
- : data(buf), size(size), allocated_bytes(0) {}
-
-ChunkedAllocator::~ChunkedAllocator() {
- int64_t total_bytes_released = 0;
- for (size_t i = 0; i < chunks_.size(); ++i) {
- total_bytes_released += chunks_[i].size;
- pool_->Free(chunks_[i].data, chunks_[i].size);
- }
-
- DCHECK(chunks_.empty()) << "Must call FreeAll() or AcquireData() for this pool";
-}
-
-void ChunkedAllocator::ReturnPartialAllocation(int byte_size) {
- DCHECK_GE(byte_size, 0);
- DCHECK_NE(current_chunk_idx_, -1);
- ChunkInfo& info = chunks_[current_chunk_idx_];
- DCHECK_GE(info.allocated_bytes, byte_size);
- info.allocated_bytes -= byte_size;
- total_allocated_bytes_ -= byte_size;
-}
-
-template <bool CHECK_LIMIT_FIRST>
-uint8_t* ChunkedAllocator::Allocate(int size) {
- if (size == 0) {
- return nullptr;
- }
-
- int64_t num_bytes = ::arrow::BitUtil::RoundUp(size, 8);
- if (current_chunk_idx_ == -1 ||
- num_bytes + chunks_[current_chunk_idx_].allocated_bytes >
- chunks_[current_chunk_idx_].size) {
- // If we couldn't allocate a new chunk, return nullptr.
- if (ARROW_PREDICT_FALSE(!FindChunk(num_bytes))) {
- return nullptr;
- }
- }
- ChunkInfo& info = chunks_[current_chunk_idx_];
- uint8_t* result = info.data + info.allocated_bytes;
- DCHECK_LE(info.allocated_bytes + num_bytes, info.size);
- info.allocated_bytes += num_bytes;
- total_allocated_bytes_ += num_bytes;
- DCHECK_LE(current_chunk_idx_, static_cast<int>(chunks_.size()) - 1);
- peak_allocated_bytes_ = std::max(total_allocated_bytes_, peak_allocated_bytes_);
- return result;
-}
-
-uint8_t* ChunkedAllocator::Allocate(int size) { return Allocate<false>(size); }
-
-void ChunkedAllocator::Clear() {
- current_chunk_idx_ = -1;
- for (auto chunk = chunks_.begin(); chunk != chunks_.end(); ++chunk) {
- chunk->allocated_bytes = 0;
- }
- total_allocated_bytes_ = 0;
- DCHECK(CheckIntegrity(false));
-}
-
-void ChunkedAllocator::FreeAll() {
- int64_t total_bytes_released = 0;
- for (size_t i = 0; i < chunks_.size(); ++i) {
- total_bytes_released += chunks_[i].size;
- pool_->Free(chunks_[i].data, chunks_[i].size);
- }
- chunks_.clear();
- next_chunk_size_ = INITIAL_CHUNK_SIZE;
- current_chunk_idx_ = -1;
- total_allocated_bytes_ = 0;
- total_reserved_bytes_ = 0;
-}
-
-bool ChunkedAllocator::FindChunk(int64_t min_size) {
- // Try to allocate from a free chunk. The first free chunk, if any, will be immediately
- // after the current chunk.
- int first_free_idx = current_chunk_idx_ + 1;
- // (cast size() to signed int in order to avoid everything else being cast to
- // unsigned long, in particular -1)
- while (++current_chunk_idx_ < static_cast<int>(chunks_.size())) {
- // we found a free chunk
- DCHECK_EQ(chunks_[current_chunk_idx_].allocated_bytes, 0);
-
- if (chunks_[current_chunk_idx_].size >= min_size) {
- // This chunk is big enough. Move it before the other free chunks.
- if (current_chunk_idx_ != first_free_idx) {
- std::swap(chunks_[current_chunk_idx_], chunks_[first_free_idx]);
- current_chunk_idx_ = first_free_idx;
- }
- break;
- }
- }
-
- if (current_chunk_idx_ == static_cast<int>(chunks_.size())) {
- // need to allocate new chunk.
- int64_t chunk_size;
- DCHECK_GE(next_chunk_size_, INITIAL_CHUNK_SIZE);
- DCHECK_LE(next_chunk_size_, MAX_CHUNK_SIZE);
-
- chunk_size = std::max<int64_t>(min_size, next_chunk_size_);
-
- // Allocate a new chunk. Return early if malloc fails.
- uint8_t* buf = nullptr;
- PARQUET_THROW_NOT_OK(pool_->Allocate(chunk_size, &buf));
- if (ARROW_PREDICT_FALSE(buf == nullptr)) {
- DCHECK_EQ(current_chunk_idx_, static_cast<int>(chunks_.size()));
- current_chunk_idx_ = static_cast<int>(chunks_.size()) - 1;
- return false;
- }
-
- // If there are no free chunks put it at the end, otherwise before the first free.
- if (first_free_idx == static_cast<int>(chunks_.size())) {
- chunks_.push_back(ChunkInfo(chunk_size, buf));
- } else {
- current_chunk_idx_ = first_free_idx;
- auto insert_chunk = chunks_.begin() + current_chunk_idx_;
- chunks_.insert(insert_chunk, ChunkInfo(chunk_size, buf));
- }
- total_reserved_bytes_ += chunk_size;
- // Don't increment the chunk size until the allocation succeeds: if an attempted
- // large allocation fails we don't want to increase the chunk size further.
- next_chunk_size_ =
- static_cast<int>(std::min<int64_t>(chunk_size * 2, MAX_CHUNK_SIZE));
- }
-
- DCHECK_LT(current_chunk_idx_, static_cast<int>(chunks_.size()));
- DCHECK(CheckIntegrity(true));
- return true;
-}
-
-void ChunkedAllocator::AcquireData(ChunkedAllocator* src, bool keep_current) {
- DCHECK(src->CheckIntegrity(false));
- int num_acquired_chunks;
- if (keep_current) {
- num_acquired_chunks = src->current_chunk_idx_;
- } else if (src->GetFreeOffset() == 0) {
- // nothing in the last chunk
- num_acquired_chunks = src->current_chunk_idx_;
- } else {
- num_acquired_chunks = src->current_chunk_idx_ + 1;
- }
-
- if (num_acquired_chunks <= 0) {
- if (!keep_current) src->FreeAll();
- return;
- }
-
- auto end_chunk = src->chunks_.begin() + num_acquired_chunks;
- int64_t total_transfered_bytes = 0;
- for (auto i = src->chunks_.begin(); i != end_chunk; ++i) {
- total_transfered_bytes += i->size;
- }
- src->total_reserved_bytes_ -= total_transfered_bytes;
- total_reserved_bytes_ += total_transfered_bytes;
-
- // insert new chunks after current_chunk_idx_
- auto insert_chunk = chunks_.begin() + (current_chunk_idx_ + 1);
- chunks_.insert(insert_chunk, src->chunks_.begin(), end_chunk);
- src->chunks_.erase(src->chunks_.begin(), end_chunk);
- current_chunk_idx_ += num_acquired_chunks;
-
- if (keep_current) {
- src->current_chunk_idx_ = 0;
- DCHECK(src->chunks_.size() == 1 || src->chunks_[1].allocated_bytes == 0);
- total_allocated_bytes_ += src->total_allocated_bytes_ - src->GetFreeOffset();
- src->total_allocated_bytes_ = src->GetFreeOffset();
- } else {
- src->current_chunk_idx_ = -1;
- total_allocated_bytes_ += src->total_allocated_bytes_;
- src->total_allocated_bytes_ = 0;
- }
- peak_allocated_bytes_ = std::max(total_allocated_bytes_, peak_allocated_bytes_);
-
- if (!keep_current) src->FreeAll();
- DCHECK(CheckIntegrity(false));
-}
-
-std::string ChunkedAllocator::DebugString() {
- std::stringstream out;
- char str[16];
- out << "ChunkedAllocator(#chunks=" << chunks_.size() << " [";
- for (size_t i = 0; i < chunks_.size(); ++i) {
- sprintf(str, "0x%zx=", reinterpret_cast<size_t>(chunks_[i].data)); // NOLINT
- out << (i > 0 ? " " : "") << str << chunks_[i].size << "/"
- << chunks_[i].allocated_bytes;
- }
- out << "] current_chunk=" << current_chunk_idx_
- << " total_sizes=" << GetTotalChunkSizes()
- << " total_alloc=" << total_allocated_bytes_ << ")";
- return out.str();
-}
-
-int64_t ChunkedAllocator::GetTotalChunkSizes() const {
- int64_t result = 0;
- for (size_t i = 0; i < chunks_.size(); ++i) {
- result += chunks_[i].size;
- }
- return result;
-}
-
-bool ChunkedAllocator::CheckIntegrity(bool current_chunk_empty) {
- // check that current_chunk_idx_ points to the last chunk with allocated data
- DCHECK_LT(current_chunk_idx_, static_cast<int>(chunks_.size()));
- int64_t total_allocated = 0;
- for (int i = 0; i < static_cast<int>(chunks_.size()); ++i) {
- DCHECK_GT(chunks_[i].size, 0);
- if (i < current_chunk_idx_) {
- DCHECK_GT(chunks_[i].allocated_bytes, 0);
- } else if (i == current_chunk_idx_) {
- if (current_chunk_empty) {
- DCHECK_EQ(chunks_[i].allocated_bytes, 0);
- } else {
- DCHECK_GT(chunks_[i].allocated_bytes, 0);
- }
- } else {
- DCHECK_EQ(chunks_[i].allocated_bytes, 0);
- }
- total_allocated += chunks_[i].allocated_bytes;
- }
- DCHECK_EQ(total_allocated, total_allocated_bytes_);
- return true;
-}
-
// ----------------------------------------------------------------------
// Arrow IO wrappers
diff --git a/cpp/src/parquet/util/memory.h b/cpp/src/parquet/util/memory.h
index cccafe8..8677e6b 100644
--- a/cpp/src/parquet/util/memory.h
+++ b/cpp/src/parquet/util/memory.h
@@ -77,149 +77,6 @@ class PARQUET_EXPORT Vector {
PARQUET_DISALLOW_COPY_AND_ASSIGN(Vector);
};
-/// A ChunkedAllocator maintains a list of memory chunks from which it
-/// allocates memory in response to Allocate() calls; Chunks stay around for
-/// the lifetime of the allocator or until they are passed on to another
-/// allocator.
-//
-/// An Allocate() call will attempt to allocate memory from the chunk that was most
-/// recently added; if that chunk doesn't have enough memory to
-/// satisfy the allocation request, the free chunks are searched for one that is
-/// big enough otherwise a new chunk is added to the list.
-/// The current_chunk_idx_ always points to the last chunk with allocated memory.
-/// In order to keep allocation overhead low, chunk sizes double with each new one
-/// added, until they hit a maximum size.
-//
-/// Example:
-/// ChunkedAllocator* p = new ChunkedAllocator();
-/// for (int i = 0; i < 1024; ++i) {
-/// returns 8-byte aligned memory (effectively 24 bytes):
-/// .. = p->Allocate(17);
-/// }
-/// at this point, 17K have been handed out in response to Allocate() calls and
-/// 28K of chunks have been allocated (chunk sizes: 4K, 8K, 16K)
-/// We track total and peak allocated bytes. At this point they would be the same:
-/// 28k bytes. A call to Clear will return the allocated memory so
-/// total_allocate_bytes_
-/// becomes 0 while peak_allocate_bytes_ remains at 28k.
-/// p->Clear();
-/// the entire 1st chunk is returned:
-/// .. = p->Allocate(4 * 1024);
-/// 4K of the 2nd chunk are returned:
-/// .. = p->Allocate(4 * 1024);
-/// a new 20K chunk is created
-/// .. = p->Allocate(20 * 1024);
-//
-/// ChunkedAllocator* p2 = new ChunkedAllocator();
-/// the new ChunkedAllocator receives all chunks containing data from p
-/// p2->AcquireData(p, false);
-/// At this point p.total_allocated_bytes_ would be 0 while p.peak_allocated_bytes_
-/// remains unchanged.
-/// The one remaining (empty) chunk is released:
-/// delete p;
-
-class PARQUET_EXPORT ChunkedAllocator {
- public:
- explicit ChunkedAllocator(::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
-
- /// Frees all chunks of memory and subtracts the total allocated bytes
- /// from the registered limits.
- ~ChunkedAllocator();
-
- /// Allocates 8-byte aligned section of memory of 'size' bytes at the end
- /// of the the current chunk. Creates a new chunk if there aren't any chunks
- /// with enough capacity.
- uint8_t* Allocate(int size);
-
- /// Returns 'byte_size' to the current chunk back to the mem pool. This can
- /// only be used to return either all or part of the previous allocation returned
- /// by Allocate().
- void ReturnPartialAllocation(int byte_size);
-
- /// Makes all allocated chunks available for re-use, but doesn't delete any chunks.
- void Clear();
-
- /// Deletes all allocated chunks. FreeAll() or AcquireData() must be called for
- /// each mem pool
- void FreeAll();
-
- /// Absorb all chunks that hold data from src. If keep_current is true, let src hold on
- /// to its last allocated chunk that contains data.
- /// All offsets handed out by calls to GetCurrentOffset() for 'src' become invalid.
- void AcquireData(ChunkedAllocator* src, bool keep_current);
-
- std::string DebugString();
-
- int64_t total_allocated_bytes() const { return total_allocated_bytes_; }
- int64_t peak_allocated_bytes() const { return peak_allocated_bytes_; }
- int64_t total_reserved_bytes() const { return total_reserved_bytes_; }
-
- /// Return sum of chunk_sizes_.
- int64_t GetTotalChunkSizes() const;
-
- private:
- friend class ChunkedAllocatorTest;
- static const int INITIAL_CHUNK_SIZE = 4 * 1024;
-
- /// The maximum size of chunk that should be allocated. Allocations larger than this
- /// size will get their own individual chunk.
- static const int MAX_CHUNK_SIZE = 1024 * 1024;
-
- struct ChunkInfo {
- uint8_t* data; // Owned by the ChunkInfo.
- int64_t size; // in bytes
-
- /// bytes allocated via Allocate() in this chunk
- int64_t allocated_bytes;
-
- explicit ChunkInfo(int64_t size, uint8_t* buf);
-
- ChunkInfo() : data(NULLPTR), size(0), allocated_bytes(0) {}
- };
-
- /// chunk from which we served the last Allocate() call;
- /// always points to the last chunk that contains allocated data;
- /// chunks 0..current_chunk_idx_ are guaranteed to contain data
- /// (chunks_[i].allocated_bytes > 0 for i: 0..current_chunk_idx_);
- /// -1 if no chunks present
- int current_chunk_idx_;
-
- /// The size of the next chunk to allocate.
- int64_t next_chunk_size_;
-
- /// sum of allocated_bytes_
- int64_t total_allocated_bytes_;
-
- /// Maximum number of bytes allocated from this pool at one time.
- int64_t peak_allocated_bytes_;
-
- /// sum of all bytes allocated in chunks_
- int64_t total_reserved_bytes_;
-
- std::vector<ChunkInfo> chunks_;
-
- ::arrow::MemoryPool* pool_;
-
- /// Find or allocated a chunk with at least min_size spare capacity and update
- /// current_chunk_idx_. Also updates chunks_, chunk_sizes_ and allocated_bytes_
- /// if a new chunk needs to be created.
- bool FindChunk(int64_t min_size);
-
- /// Check integrity of the supporting data structures; always returns true but DCHECKs
- /// all invariants.
- /// If 'current_chunk_empty' is false, checks that the current chunk contains data.
- bool CheckIntegrity(bool current_chunk_empty);
-
- /// Return offset to unoccpied space in current chunk.
- int GetFreeOffset() const {
- if (current_chunk_idx_ == -1) return 0;
- return static_cast<int>(chunks_[current_chunk_idx_].allocated_bytes);
- }
-
- template <bool CHECK_LIMIT_FIRST>
- uint8_t* Allocate(int size);
-};
-
// File input and output interfaces that translate arrow::Status to exceptions
class PARQUET_EXPORT FileInterface {